一、前言 前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。二、持久化总体框架 持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。
· TxnLog,接口类型,读取事务性日志的接口。 · FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。 · Snapshot,接口类型,持久层快照接口。 · FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。 · FileTxnSnapLog,封装了TxnLog和SnapShot。 · Util,工具类,提供持久化所需的API。 下面先来分析TxnLog和FileTxnLog的源码。三、TxnLog源码分析 TxnLog是接口,规定了对日志的响应操作。其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。四、FileTxnLog源码分析 对于LogFile而言,其格式可分为如下三部分 LogFile: FileHeader TxnList ZeroPad FileHeader格式如下 FileHeader: { magic 4bytes (ZKLG) version 4bytes dbid 8bytes } TxnList格式如下 TxnList: Txn || Txn TxnList Txn格式如下 Txn: checksum Txnlen TxnHeader Record 0x42 Txnlen格式如下 Txnlen: len 4bytes TxnHeader格式如下 TxnHeader: { sessionid 8bytes cxid 4bytes zxid 8bytes time 8bytes type 4bytes } ZeroPad格式如下 ZeroPad: 0 padded to EOF (filled during preallocation stage) 了解LogFile的格式对于理解源码会有很大的帮助。4.1 属性 4.2. 核心函数 1. append函数说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下 ① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false ② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤ ③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④ ④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤ ⑤ 填充数据,填充0,进入⑥ ⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦ ⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧ ⑧ 将更新的ByteBuffer写入磁盘文件,返回trueappend间接调用了padLog函数,其源码如下 说明:padLog其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小。2. getLogFiles函数 说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。 ① 对所有log文件按照zxid进行升序排序,进入② ② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③ ③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④ ④ 转化后返回getLogFiles函数调用了sortDataDir,其源码如下说明:getLogFiles其用于排序log文件,可以选择根据zxid进行升序或降序。getLogFiles函数间接调用了getZxidFromName,其源码如下: 说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始。3. getLastLoggedZxid函数 说明:该函数主要用于获取记录在log中的最后一个zxid。其步骤大致如下 ① 获取已排好序的所有log文件,并从最后一个文件中取出zxid作为候选的最大zxid,进入② ② 新生成FileTxnLog并读取步骤①中zxid之后的所有事务,进入③ ③ 遍历所有事务并提取出相应的zxid,最后返回。其中getLastLoggedZxid调用了read函数,其源码如下 说明:read函数会生成一个FileTxnIterator,其是TxnLog.TxnIterator的子类,之后在FileTxnIterator构造函数中会调用init函数,其源码如下说明:init函数用于进行初始化操作,会根据zxid的不同进行不同的初始化操作,在init函数中会调用goToNextLog函数,其源码如下 说明:goToNextLog表示选取下一个log文件,在init函数中还调用了next函数,其源码如下 说明:next表示将迭代器移动至下一个事务,方便读取,next函数的步骤如下。 ① 读取事务的crcValue值,用于后续的验证,进入② ② 读取事务,使用CRC32进行更新并与①中的结果进行比对,若不相同,则抛出异常,否则,进入③ ③ 将事务进行反序列化并保存至相应的属性中(如事务头和事务体),会确定具体的事务操作类型。 ④ 在读取过程抛出异常时,会首先关闭流,然后再尝试调用next函数(即进入下一个事务进行读取)。4. commit函数 说明:该函数主要用于提交事务日志至磁盘,其大致步骤如下 ① 若日志流logStream不为空,则强制刷新至磁盘,进入② ② 遍历需要刷新至磁盘的所有流streamsToFlush并进行刷新,进入③ ③ 判断是否需要强制性同步,如是,则计算每个流的流式时间并在控制台给出警告,进入④ ④ 移除所有流并关闭。5. truncate函数
Java
运行代码复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
// 获取迭代器
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
long pos = input.getPosition();
// now, truncate at the current position
// 从当前位置开始清空
RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
raf.setLength(pos);
raf.close();
while (itr.goToNextLog()) { // 存在下一个log文件
if (!itr.logFile.delete()) { // 删除
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
// 关闭迭代器
close(itr);
}
return true;
}
说明:该函数用于清空大于给定zxid的所有事务日志。五、总结 对于持久化中的TxnLog和FileTxnLog的源码分析就已经完成了,本章节需重点记住:append函数实现日志追加,记录通过事务的crcValue验证,决定是否更新通过getLogFiles获取全部日志文件并排序通过getLastLoggedZxid找到最大的zxid,保证后续函数决定下一个日志文件id通过commit提交,真正生成日志文件通过trancate清空指定事务日志