MergeTree启动原理

简介: MergeTree启动原理

part结构


我们知道,在clickhouse中,MergeTree表由一个个part组成。每个part对应一个目录,该目录下有两类文件:元数据文件,数据文件和projection part目录(如果该表创建了projection的话)

$ ll ./20210321_310_310_0 
total 36
-rw-r----- 1 root root   28 Feb  9 14:26 primary.idx
-rw-r----- 1 root root    4 Feb  9 14:26 partition.dat
-rw-r----- 1 root root    4 Feb  9 14:26 minmax_day.idx
-rw-r----- 1 root root   10 Feb  9 14:26 default_compression_codec.txt
-rw-r----- 1 root root 1648 Feb  9 14:26 data.mrk3
-rw-r----- 1 root root 2110 Feb  9 14:26 data.bin
-rw-r----- 1 root root    1 Feb  9 14:26 count.txt
-rw-r----- 1 root root  790 Feb  9 14:26 columns.txt
-rw-r----- 1 root root  252 Feb  9 14:26 checksums.txt

其中元数据文件包括:


  • partition.dat: 记录本part所在分区的值


  • primary.idx: 主键索引文件,记录每个Granule起始行的主键值,需配合.mrk3文件使用


  • count.txt: 记录本part的数据行数


  • checksums.txt: 记录本part下所有文件的size和checksum


  • minmax_XX.idx: 分区索引或二级minmax索引文件,记录对应字段的min值和max值
  • default_compression_codec.txt: 记录该part默认的压缩编码,如ZTSD, LZ4等


  • *.mrk, *.mrk2, *.mrk3: 不同版本的mark文件,它记录着每个Granule的起始行数、在对应的数据文件中的偏移位置和解压后的Block中的偏移位置。


  • uuid.txt: 记录本part的唯一id。仅在assign_part_uuids = true时才会出现。


  • ttl.txt: 记录本part的过期时间


其中数据文件包括:


  • XX.bin。part有三种类型,Wide, Compact, InMemory。当part类型为Wide时,每个字段对应一个bin文件和mark文件。当part类型为Compact时,part目录下仅有一个全局的data.bin和data.mrk3文件。当part类型为InMemory时,没有part目录,只有一个全局的wal.bin文件,因此InMemory Part对启动性能的影响很小,不在我们考虑范围之内。



启动流程


clickhouse-server启动之后:


  • 首先加载系统库(包括system/information_schema/INFORMATION_SCHEMA)的元数据,然后attach系统库下的表。


  • 然后调用loadMetadata加载除了系统库之外的database


  • 最后创建HTTP、TCP等server,提供对外查询


系统库中表的数量有限,主要是第二步比较耗时

try
    {
auto & database_catalog = DatabaseCatalog::instance();
/// We load temporary database first, because projections need it.
        database_catalog.initializeAndLoadTemporaryDatabase();
        loadMetadataSystem(global_context);
/// After attaching system databases we can initialize system log.
        global_context->initializeSystemLogs();
        global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
        attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
        attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));
        attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
/// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread,
        /// that may execute DROP before loadMarkedAsDroppedTables() in background,
        /// and so loadMarkedAsDroppedTables() will find it and try to add, and UUID will overlap.
        database_catalog.loadMarkedAsDroppedTables();
/// Then, load remaining databases
        loadMetadata(global_context, default_database);
        startupSystemTables();
        database_catalog.loadDatabases();
/// After loading validate that default database exists
        database_catalog.assertDatabaseExists(default_database);
    }
  ...
    {
        attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics);
        {
std::lock_guard lock(servers_lock);
            createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers);
if (servers.empty())
throw Exception(
"No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
                    ErrorCodes::NO_ELEMENTS_IN_CONFIG);
        }
if (servers.empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
                ErrorCodes::NO_ELEMENTS_IN_CONFIG);

loadMetadata中,


  • 首先遍历metadata目录,获取每个database的名称和元数据路径


  • 对于每个database,加载对应的.sql文件,获取create database query并执行, 并将该database加入到catalog中


  • 执行TablesLoader::loadTables,对每个database对象执行



  • loadStoredObjects,即完成该database下每个table的加载。对于Engine为MergeTree Family的表来说,加载的主要工作就是读取所有part下的元数据文件到内存中。


  • 执行TablesLoader::startupTables,对每个database对象执行startupTables。对于非Replicated的MergeTree Family表,启动包括清理过期part、清理过期WAL、清理空part、启动后台disk move任务(如果该表配置了多disk的话)等工作。对于Replicated的MergeTree Family表,启动包括开启副本间同步、副本选主、启用副本、启动后台disk move等工作


void loadMetadata(ContextMutablePtr context, const String & default_database_name)
{
    ...
for (fs::directory_iterator it(path); it != dir_end; ++it)
    {
if (it->is_symlink())
continue;
const auto current_file = it->path().filename().string();
if (!it->is_directory())
        {
/// TODO: DETACH DATABASE PERMANENTLY ?
            if (fs::path(current_file).extension() == ".sql")
            {
                String db_name = fs::path(current_file).stem();
if (!isSystemOrInformationSchema(db_name))
                    databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name);
            }
    ...
    TablesLoader::Databases loaded_databases;
for (const auto & [name, db_path] : databases)
    {
        loadDatabase(context, name, db_path, has_force_restore_data_flag);
        loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
    }
    TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true};
    loader.loadTables();
    loader.startupTables();
    ...
}

DatabaseOrdinary::loadStoredObjects


  • 首先遍历database目录是下所有*.sql文件。对于每个文件,读取其中的建表语句并解析成AST


  • 对于该database目录下的每张表,生成对应的Storage对象,并attach到本database中。注意这个过程以表为最小粒度并发的,并发度等于cpu物理核数
void DatabaseOrdinary::loadStoredObjects(
    ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
/** Tables load faster if they are loaded in sorted (by name) order.
      * Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
      *  which does not correspond to order tables creation and does not correspond to order of their location on disk.
      */
    ParsedTablesMetadata metadata;
    loadTablesMetadata(local_context, metadata);
    ...
/// Attach tables.
    for (const auto & name_with_path_and_query : metadata.parsed_tables)
    {
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
        {
            pool.scheduleOrThrowOnError([&]()
            {
                loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
                logAboutProgress(log, ++tables_processed, total_tables, watch);
            });
        }
    }
    pool.wait();
    ...
}

那么MergeTree表加载元数据的操作发生在哪里呢?发生在Storage的构造函数中,参考StorageReplicatedMergeTree::StorageReplicatedMergeTree -> MergeTreeData::loadDataParts。在loadDataParts


  • 首先遍历每一个disk下的表的数据目录,收集其中的part。注意这个过程是并行的,并行度为本表关联的disk数量。


  • 对于InMemory part, 加载wal文件中的数据。
  • 对于Wide或Compact part,记录part名到disk的映射关系。


  • 然后加载Wide和Compact part。loadDataPartsFromDisk
  • 最后加载InMemory part. loadDataPartsFromWAL
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
  ...
for (const auto & disk_ptr : disks)
    {
if (disk_ptr->isBroken())
continue;
auto & disk_parts = disk_part_map[disk_ptr->getName()];
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
        pool.scheduleOrThrowOnError([&, disk_ptr]()
        {
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
            {
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
                if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
                    disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
                {
std::unique_lock lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
                            ErrorCodes::CORRUPTED_DATA);
                    write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
                        disk_wal_parts.push_back(std::move(part));
                }
    ...
if (num_parts > 0)
        loadDataPartsFromDisk(
            broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
if (!parts_from_wal.empty())
        loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
}

一个MergeTree表的In memory是相当有限的,我们重点分析Wide和Compact part的加载,即loadDataPartsFromDisk。每个part的加载流程如下:


  • 加载uuid.txt文件


  • 加载columns.txt文件


  • 加载checksums.txt文件


  • 加载任意mark文件。对于Compact part,只有一个mark文件:data.mrk3


  • 从checksums中获取每个column和secondary index文件的大小。


  • 加载primary.idx文件


  • 加载ttl.txt文件


  • 加载projection parts中的metadata文件。


  • 根据checksums校验metadata和data文件的一致性


  • 加载default_compression_codec.txt文件


注意MergeTree表加载part的过程是并行的,并发度默认为cpu硬件核数。当MergeTree表完成所有part的加载后,便可利用元数据构造的分区索引、主键索引、二级索引、Projection加速查询。这就是为什么clickhouse-server必须等待所有表完成加载后才可对外提供查询服务的原因。



为什么慢


从上面的流程可以看到,加载一个MergeTree part最多读取8个metadata 文件。假如clickhouse示例上有70w part, metadata文件的数量为560w。如果MergeTree还创建了一个或多个projection, metadata文件数量可能会超过1000w。对这么多文件的并发读取会瞬间把disk ioutil打满,disk read成为性能瓶颈。这就是为什么MergeTree启动慢的原因。



启动加速优化


优化方案


优化的核心在于减少disk io次数。因此我们决定引入RocksDB用于缓存metadata file。RocksDB启动之后会自身的LSM文件,文件个数远远少于metadata文件数,文件位置在磁盘上也更加集中。当所有part的metadata都缓存到RocksDB时,clickhouse启动时仅从RocksDB中就可获得所有part的metadata,总体来看disk io的次数大大减小了。


缓存结构


RocksDB中:


  • key: 考虑到同一张表的part可能分布在不同的disk中,所以key由两部分组成,disk name和part相对disk的路径,如default:store/b47/b47ee9c8-afa1-41ac-adfb-2e0dc42de819/6_7_7_0/primary.idx
  • value: metadata文件的内容


缓存一致性


既然用RockDB作为缓存层,那么如何保证缓存层和持久化层也就是磁盘文件的一致性呢?主要分为两大类场景:


  • 读meta:当需要读取某个part的元数据时,首先从缓存层查询,如果缓存未命中, 则读取持久化层的结果,并将该结果更新到缓存层并返回结果。如果缓存命中则直接返回结果。该操作主要发生在clickhouse-server启动阶段


  • 写meta:当需要更新某个part的元数据时,对缓存层和持久化层进行双写。该操作可能发生在插入数据、删除分区、后台数据合并/移动、ALTER TABLE等场景下。


那么是否需要考虑缓存的并发问题呢? 一个cache key唯一对应一个metadata文件,而clickhouse MergeTree的设计已经保证了不可能有多写或读写并发的情况出现。因此在设计缓存层时无需考虑这个问题。



如何检测不一致


增加了一层缓存层,那么


源码走读

相关文章
|
Ubuntu
LLVM编译源码
LLVM编译源码
470 0
|
存储 监控 算法
ClickHouse源码分析-压缩算法大揭秘
ClickHouse在近年来增加了很多压缩算法,最主要的改进还是为了更好的适应时序场景,提高压缩率,节省存储空间。本期就给大家带来ClickHouse的压缩算法介绍。
5839 0
ClickHouse源码分析-压缩算法大揭秘
|
8月前
|
数据采集 人工智能 JSON
Crawl4AI:为大语言模型打造的开源网页数据采集工具
随着大语言模型(LLMs)的快速发展,高质量数据成为智能系统的关键基础。**Crawl4AI**是一款专为LLMs设计的开源网页爬取工具,可高效提取并结构化处理网页数据,突破传统API限制,支持JSON、HTML或Markdown等格式输出。
671 3
Crawl4AI:为大语言模型打造的开源网页数据采集工具
|
存储 缓存
clickhouse新特性之————MergeTree启动加速(使用篇)
clickhouse新特性之————MergeTree启动加速(使用篇)
1300 0
|
存储 运维 监控
云时代,好用的数据迁移方案推荐
本文将介绍数据库迁移的步骤以及市面上常见的迁移工具,推荐大家选择能够支持“业务零停机迁移”的工具产品。同时,平台工具(例NineData) 的自动化体验及配套设施(例:数据校验工具、迁移限流、监控告警等)一般较为完善,是比较推荐的选择。
1502 1
云时代,好用的数据迁移方案推荐
|
监控 API 网络安全
​邮件通知提醒邮箱警告设置教程及API代码示例
**摘要:** 在系统管理中,邮件通知提醒用于及时报告异常和重要事件。本文提供AOKSend的设置教程和API代码示例,教你如何配置邮件警告。通过自动化邮件通知,可以提升响应速度,确保系统稳定性。步骤包括注册AOKSend账户、获取API密钥、设置SMTP配置、创建触发条件及编写Python API代码示例。利用AOKSend API发送警告邮件,如CPU使用率过高通知,可有效监控和测试,确保系统异常时能快速响应。
|
JavaScript 前端开发 开发者
Element-UI快速入门
Element-UI快速入门Element-UI快速入门
1512 0
Element-UI快速入门
|
Web App开发 安全 JavaScript
Electron 进程间通信的实现
Electron 进程间通信的实现
388 0
|
编译器 Linux vr&ar
C语言之预处理,动态库,静态库
C语言之预处理,动态库,静态库
198 1