引言
在上一篇文章[[LSM-Tree - LevelDb了解和实现]]中介绍了LevelDb,并且介绍了相关的数据结构和核心组件,最终介绍了有关LevelDB的核心读写部分以及为什么在这个数据库中写入的速度要比读取的速度快上好几倍。
这一节代码内容非常多,所以不建议在手机或者移动设备阅读,更适合在PC上观看。
LevelDB的源代码还是比较好懂的,好懂到我这个主要学JAVA只有基础C语言知识的人也能看懂,另一方面作者在关键的地方都给了注释,甚至告诉你为什么要这么设计,写的很好很棒让人落泪为什么自己没这样的同事,如果还是看不懂,作者也写了很多介绍文档在doc目录中告诉你核心组件的作用介绍。
总之,不要惧怕这个数据库,作为优秀代码和设计模式以及数据结构应用范本都非常值得学习和参考。
源码运行
LevelDB的编译是比较简单的,可以从官网直接克隆代码。github.com/google/leve…,具体操作步骤如下(可以参考源代码中的README文件):
git clone --recurse-submodules https://github.com/google/leveldb.git mkdir -p build && cd build cmake -DCMAKE_BUILD_TYPE=Release .. && cmake --build .
完成整个编译动作之后,我们可以新增一个动态库,一个静态库和test目录,接着就可以编写单元测试了,同时官方的源代码中有很多的单元测试可以提供自己编写的测试程序进行调试使用。
底层存储存储结构
除开某些元数据的存储,SSTable是整个数据库的主要结构,所有的SSTable文件本身的内容是不可修改的,虽然通常数据在内存中操作,但是数据不可能无限存储,当数据到达一定量之后就需要持久化到磁盘中,而压缩合并的处理就十分考验系统性能了,为此LevelDb使用分层的结构进行存储,下面我们从外部的使用结构开始来了解内部的设计。
首先是整个DB的源代码,DB源代码可以从下面的路径访问到:
我们需要了解DB存储结构,可以看到存储引擎的对外提供的接口十分简单,有点类似map的操作方法:
class LEVELDB_EXPORT DB { public: // 设置数据库的key-value结构,如果没有返回OK则视为操作失败, // 备注:考虑默认打开sync=true操作,`Put` 方法在内部最终会调用 `Write` 方法,只是在上层为调用者提供了两个不同的选择。 virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& value) = 0; // 成功返回OK,如果异常则不反悔OK,如果什么都返回,说明呗删除的Key不存在, virtual Status Delete(const WriteOptions& options, const Slice& key) = 0; // 写入操作 virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; // 根据key获取数据 virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) = 0; }
Get
和 Put
是 LevelDB 为上层提供的用于读写的接口,如果我们清楚内部的操作逻辑,那么这个数据库的内部结构和运作过程也基本清楚。
下面先从写入操作开始,看看数据是如何进入到LevelDb,以及内部是如何管理的:
write部分
关联:[[WriteLevel0Table]],[[Write]]。
Write的内部逻辑十分复杂,这里画一个基本的流程图:
我们从Write()
的方法切入,简化代码之后大致的流程如下:
// mark1 为写入构建足够的空间,此时可以不需要加锁。 Status status = MakeRoomForWrite(updates == nullptr); // mark2 通过 `AddRecord` 方法向日志中追加一条写操作的记录; status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); // mark3 如果日志记录成功,则将数据进行写入 if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); }
整个执行流程如下:
- 首先调用
MakeRoomForWrite
方法为即将进行的写入提供足够的空间。
- 如果当前空间不足需要冻结当前的memtable,发生 Minor Compaction 并创建一个新的
MemTable
对象; - 如果满足触发
Major Compaction
需要对数据进行压缩并且对于SSTable进行合并
- 通过
AddRecord
方法向日志中追加一条写操作记录。 - 最终调用memtable往内存结构中添加key/value,完成最终写入操作。
将写入操作的源代码逻辑简化之后最终如下:
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { Writer w(&mutex_); w.batch = my_batch; MakeRoomForWrite(my_batch == NULL); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(updates, last_sequence + 1); // 记录最终的操作记录点 last_sequence += WriteBatchInternal::Count(updates); // 日志编写 log_->AddRecord(WriteBatchInternal::Contents(updates)); // 将数据写入memtable WriteBatchInternal::InsertInto(updates, mem_); versions_->SetLastSequence(last_sequence); return Status::OK(); }
对于压缩合并在源码中有如下判断,系统会定时检查是否可以进行压缩合并,这一些if/else用于多线程并发写入的时候进行合并写入的操作,当发现有且他线程在操作就会等待结果或者等到拿到锁之后接管合并写入的操作。
如果对于下面的代码有疑问,可以阅读[[LSM-Tree - LevelDb了解和实现]]中关于“合并写入”的部分,为了节省时间,可以在网页中直接输入关键字“合并写入”快速定位,这里假设读者已经了解基本的工作流程,就不再赘述了。
#LevelDb合并写入操作
void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (background_compaction_scheduled_) { // Already scheduled // 正在压缩 } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions // DB正在被删除;不再有后台压缩 } else if (!bg_error_.ok()) { // Already got an error; no more changes // 已经发生异常,不能做更多改动。 } else if (imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // 不需要合并则不工作 } else { // 设置当前正常进行压缩合并 background_compaction_scheduled_ = true; // 开始压缩合并 env_->Schedule(&DBImpl::BGWork, this); } }
不可变memtable:
在write的函数内部有这样一串代码,此时会暂停解锁等待写入,这个写入又是干嘛的?
Status status = MakeRoomForWrite(updates == nullptr);
访问内部会发现通过一个while循环判断当前的 memtable状态,一旦发现memtable写入已经写满整个mem,则需要停止写入并且将当前的memtable转为imutiablememtable,并且创建新的mem切换写入,此时还会同时根据一些条件判断是否可以进行压缩mem。
源码中GUARDED_BY含义:
GUARDED_BY是数据成员的属性,该属性声明数据成员受给定功能保护。对数据的读操作需要共享访问,而写操作则需要互斥访问。 该 GUARDED_BY属性声明线程必须先锁定listener_list_mutex才能对其进行读写listener_list,从而确保增量和减量操作是原子的。
mem可以看作是当前的系统备忘录或者说一个临时的记账板,这种设计和大多数的日志或者关系型数据库类似,都是先写入日志在进行后续的所有操作,日志优先于记录操作。同时根据日志写入操作加锁来完成并发操作的正常运行。
MakeRoomForWrite方法中比较关键的部分都加了注释,很多操作作者都有介绍意图,代码逻辑都比较简单,多看几遍基本就了解了(C++语法看不懂不必过多纠结)
while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; } else if (allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { // 我们正接近于达到对L0文件数量的硬性限制。L0文件的数量。当我们遇到硬性限制时,与其将单个写操作延迟数而是在我们达到硬限制时,开始将每个mem单独写1ms以减少延迟变化。另外。这个延迟将一些CPU移交给压缩线程,因为 如果它与写入者共享同一个核心的话。 mutex_.Unlock(); env_->SleepForMicroseconds(1000); // 不要将一个单一的写入延迟超过一次 allow_delay = false; mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // 在当前的mem中还有空间 break; } else if (imm_ != nullptr) { // 我们已经填满了当前的memtable,但之前的的mem还在写入,所以需要等待 background_work_finished_signal_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { background_work_finished_signal_.Wait(); } else { // A试图切换到一个新的memtable并触发对旧memtable的压缩 assert(versions_->PrevLogNumber() == 0); // 新建文件号 uint64_t new_log_number = versions_->NewFileNumber(); //return next_file_number_++; WritableFile* lfile = nullptr; // 新建可写入文件, 内部通过一个map构建一个文件:文件状态的简易文件系统 // typedef std::map<std::string, FileState*> FileSystem; s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); if (!s.ok()) { // 避免死循环重复新增文件号 versions_->ReuseFileNumber(new_log_number); break; } delete log_; delete logfile_; logfile_ = lfile; logfile_number_ = new_log_number; // 写入日志 log_ = new log::Writer(lfile); // **重点:imm_ 就是immutable 他将引用指向当前已经写满的mem,其实和mem对象没什么区别,就是加了一个互斥共享锁而已(写互斥,读共享)** imm_ = mem_; has_imm_.store(true, std::memory_order_release); // 新建新的memtable mem_ = new MemTable(internal_comparator_); // 引用至新块 mem_->Ref(); force = false; // Do not force another compaction if have room // 尝试对于已满mem压缩合并 ,此处承接上文 MaybeScheduleCompaction(); } }
下面用一个简单的示意图了解上面的大致流程:
![[Pasted image 20220330174257.png]]
对于[[SSTable]]的原始理论来说,这样的实现结构显然是有出入的。观察上面的条件判断也可以知道,在通常情况下memtable可以通过短暂的延迟读写请求等待压缩完成,但是一旦发现mem占用的内存过大,此时就需要给当前的mem加锁变为_imu状态,然后创建一个新的 MemTable 实例并且把新进来的请求转到新的mem中,这样就可以继续接受外界的写操作,不再需要等待 Minor Compaction
的结束了。
再次注意此处会通过函数 MaybeScheduleCompaction 是否进行压缩合并的操作判断。