版本 | 日期 | 备注 |
---|---|---|
1.0 | 2020.3.12 | 文章首发 |
1.0.1 | 2020.3.16 | 改进部分大小写问题及形容方式 |
1.0.2 | 2020.3.21 | 改进可能会引起错误理解的部分 |
1.0.3 | 2020.3.29 | 修改标题 |
1.0.4 | 2020.4.18 | 改进小结部分 |
1.0.5 | 2020.6.26 | 更新部分部分解释,改进注释风格 |
1.0.6 | 2020.7.6 | 增加部分详细解释 |
前言
在上篇文章中,我们简单提到了Zookeeper的几个核心点。在这篇文章中,我们就来探索其存储技术。在开始前,读者可以考虑思考下列问题:
- Zookeeper的数据存储是如何实现的?
- Zookeeper进行一次写操作的时候,会发生什么å?
- 当一个Zookeeper新加入现有集群时,如何同步现集群中的数据?
抱着问题,我们进入下面的内容。
Zookeper本地存储模型
众所周知,Zookeeper不擅长大量数据的读写,因为:
- 本质上就是一个内存里的字典。
- 持久化节点的写入由于WAL会导致刷盘,过大的数据会引起额外的
seek
。 - 同样的,在zk启动时,所有的数据会从WAL的日志中读出。如果过大,也会导致启动时间较长。
而内存中的数据,也被称为ZkDatabase(Zk的内存数据库),由它来负责管理Zk的会话DataTree存储和事务日志,它也会定时向磁盘dump快照数据,在Zk启动时,也会通过事务日志和快照数据来恢复内存中的数据。
既然Zk的数据是在内存里的,那么它是如何解决数据持久化问题的呢?上一段我们已经提到了:即通过事务日志——WAL,在每次写请求前,都会根据目前的zxid来写log,将请求先记录到日志中。
接下来,我们来谈谈WAL的优化措施。
WAL的优化
WAL优化方案1:Group Commit
一般的WAL中每次写完END都要调用一次耗时的sync API,这其实是会影响到系统的性能。为了解决这个问题,我们可以一次提交多个数据写入——只在最后一个数据写入的END日志之后,才调用sync API。like this:
- without group commit:
BEGIN Data1 END Sync
BEGIN Data2 END Sync
BEGIN Data3 END Sync
- with group commit:
BEGIN Data1 END BEGIN Data2 END BEGIN Data3 END Sync
凡事都有代价,这可能会引起数据一致性相关的问题。
WAL优化方案2:File Padding
在往 WAL 里面追加日志的时候,如果当前的文件 block 不能保存新添加的日志,就要为文件分配新的 block,这要更新文件 inode 里面的信息(例如 size)。如果我们使用的是 HHD 的话,就要先 seek 到 inode 所在的位置,然后回到新添加 block 的位置进行日志追加,这些都是发生在写事务日志时,这会明显拖慢系统的性能。
为了减少这些 seek,我们可以预先为 WAL 分配 block。例如 ZooKeeper 当检测到当前事务日志文件不足4KB时,就会填充0使该文件到64MB(这里0仅仅作为填充位)。并新建一个64MB的文件。
所以这也是Zookeeper不擅长读写大数据的原因之一,这会引起大量的block分配。
WAL优化方案3:Snapshot
如果我们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增长。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照可以解决这个问题。
除了解决启动时间过长的问题之外,快照还可以减少存储空间的使用。WAL 的多个日志条目有可能是对同一个数据的改动,通过快照,就可以只保留最新的数据改动(Merge)。
Zk的确采用了这个方案来做优化。还带来的一个好处是:在一个节点加入时,可以用最新的Snapshot传过去便于同步数据。
源码解析
本节内容都以3.5.7版本为例
核心接口和类
- TxnLog:接口类型,提供读写事务日志的API。
- FileTxnLog:基于文件的TxnLog实现。
- Snapshot:快照接口类型,提供序列化、反序列化、访问快照API。
- FileSnapshot:基于文件的Snapshot实现。
- FileTxnSnapLog:TxnLog和Snapshot的封装
- DataTree:Zookeeper的内存数据结构,ZNode构成的树。
- DataNode:表示一个ZNode。
TxnLog
TxnLog是我们前面提到的事务日志。那么接下来我们就来看它的相关源码。
先看注释:
package org.apache.zookeeper.server.persistence;
import ...
/**
* This class implements the TxnLog interface. It provides api's
* to access the txnlogs and add entries to it.
* <p>
* The format of a Transactional log is as follows:
* <blockquote><pre>
* LogFile:
* FileHeader TxnList ZeroPad
*
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
*
* TxnList:
* Txn || Txn TxnList
*
* Txn:
* checksum Txnlen TxnHeader Record 0x42
*
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
* Txnlen:
* len 4bytes
*
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
* zxid 8bytes
* time 8bytes
* type 4bytes
* }
*
* Record:
* See Jute definition file for details on the various record types
*
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
* </pre></blockquote>
*/
public class FileTxnLog implements TxnLog, Closeable {
在注释中,我们可以看到一个FileLog由三部分组成:
- FileHeader
- TxnList
- ZerdPad
关于FileHeader,可以理解其为一个标示符。TxnList则为主要内容。ZeroPad是一个终结符。
TxnLog.append
我们来看看最典型的append方法,可以将其理解WAL过程中的核心方法:
/**
* append an entry to the transaction log
* @param hdr the header of the transaction
* @param txn the transaction part of the entry
* returns true iff something appended, otw false
*/
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr == null) {
//为null意味着这是一个读请求,直接返回
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
if (logStream==null) {
//为空的话则new一个Stream
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader"); //写file header
// Make sure that the magic number is written before padding.
logStream.flush(); // zxid必须比日志先落盘
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos); //加入需要Flush的队列
}
filePadding.padFile(fos.getChannel()); //确定是否要扩容。每次64m扩容
byte[] buf = Util.marshallTxnEntry(hdr, txn); //序列化写入
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm(); //生成butyArray的checkSum
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");//写入日志里
Util.writeTxnBytes(oa, buf);
return true;
}
这里有个zxid(ZooKeeper Transaction Id),有点像MySQL的GTID。每次对Zookeeper的状态的改变都会产生一个zxid,zxid是全局有序的,如果zxid1小于zxid2,则zxid1在zxid2之前发生。
简单分析一下写入过程:
- 确定要写的事务日志:当Zk启动完成或日志写满时,会与日志文件断开连接。这个时候会根据zxid创建一个日志。
- 是否需要预分配:如果检测到当前日志剩余空间不足4KB时
- 事务序列化
- 为每个事务生成一个Checksum,目的是为了校验数据的完整性和一致性。
- 写入文件,不过是写在Buffer里,并未落盘。
- 落盘。根据用户配置来决定是否强制落盘。
TxnLog.commit
这个方法被调用的时机大致有:
- 服务端比较闲的时候去调用
- 到请求数量超出1000时,调用。之前提到过GroupCommit,其实就是在这个时候调用的。
- zk的shutdown钩子被调用时,调用
/**
* commit the logs. make sure that everything hits the
* disk
*/
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);//对应fdataSync
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
if(serverStats != null) {
serverStats.incrementFsyncThresholdExceedCount();
}
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "File size is " + channel.size() + " bytes. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
代码非常的简单。如果logStream还有,那就先刷下去。然后遍历待flush的队列(是个链表,用来保持操作顺序),同时还会关注写入的时间,如果过长,则会打一个Warn的日志。
DataTree和DataNode
DataTree是Zk的内存数据结构——就是我们之前说到的MTable。它以树状结构来组织DataNode。
这么听起来可能有点云里雾里,不妨直接看一下DataNode的相关代码。
public class DataNode implements Record {
/** the data for this datanode */
byte data[];
/**
* the acl map long for this datanode. the datatree has the map
*/
Long acl;
/**
* the stat for this node that is persisted to disk.
*/
public StatPersisted stat;
/**
* the list of children for this node. note that the list of children string
* does not contain the parent path -- just the last part of the path. This
* should be synchronized on except deserializing (for speed up issues).
*/
private Set<String> children = null;
.....
}
如果用过ZkClient的小伙伴,可能非常熟悉。这就是我们根据一个path获取数据时返回的相关属性——这就是用来描述存储数据的一个类。注意,DataNode还会维护它的Children。
简单了解DataNode后,我们来看一下DataTree。为了避免干扰,我们选出最关键的成员变量:
public class DataTree {
private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);
/**
* This hashtable provides a fast lookup to the datanodes. The tree is the
* source of truth and is where all the locking occurs
*/
private final ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
/**
* This hashtable lists the paths of the ephemeral nodes of a session.
*/
private final Map<Long, HashSet<String>> ephemerals =
new ConcurrentHashMap<Long, HashSet<String>>();
.......
}
我们可以看到,DataTree本质上是通过一个ConcurrentHashMap来存储DataNode的(临时节点也是)。保存的是 DataNode 的 path 到 DataNode 的映射。
那为什么要保存两个状态呢?这得看调用它们被调用的场景:
- 一般CRUD ZNode的请求都是走ConcurrentHashMap的
- 序列化DataTree的时候会从Root节点开始遍历所有节点
如果需要获取所有节点的信息,显然遍历树会比一个个从ConcurrentHashMap 拿快。
接下来看一下序列化的相关代码:
DataNode的序列化方法
/**
* this method uses a stringbuilder to create a new path for children. This
* is faster than string appends ( str1 + str2).
*
* @param oa
* OutputArchive to write to.
* @param path
* a string builder.
* @throws IOException
* @throws InterruptedException
*/
void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
String pathString = path.toString();
DataNode node = getNode(pathString);
if (node == null) {
return;
}
String children[] = null;
DataNode nodeCopy;
synchronized (node) {
StatPersisted statCopy = new StatPersisted();
copyStatPersisted(node.stat, statCopy);
//we do not need to make a copy of node.data because the contents
//are never changed
nodeCopy = new DataNode(node.data, node.acl, statCopy);
Set<String> childs = node.getChildren();
children = childs.toArray(new String[childs.size()]);
}
serializeNodeData(oa, pathString, nodeCopy);
path.append('/');
int off = path.length();
for (String child : children) {
// since this is single buffer being resused
// we need
// to truncate the previous bytes of string.
path.delete(off, Integer.MAX_VALUE);
path.append(child);
serializeNode(oa, path);
}
}
可以看到,的确是通过DataNode的Children来遍历所有节点。
DataNode的反序列化方法
接下来看一下反序列化的代码:
public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
String path = ia.readString("path");
while (!"/".equals(path)) {
DataNode node = new DataNode();
ia.readRecord(node, "node");
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
String parentPath = path.substring(0, lastSlash);
DataNode parent = nodes.get(parentPath);
if (parent == null) {
throw new IOException("Invalid Datatree, unable to find " +
"parent " + parentPath + " of path " + path);
}
parent.addChild(path.substring(lastSlash + 1));
long eowner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(eowner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (eowner != 0) {
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
nodes.put("/", root);
// we are done with deserializing the
// the datatree
// update the quotas - create path trie
// and also update the stat nodes
setupQuota();
aclCache.purgeUnused();
}
因为序列化的时候是前序遍历。所以反序列化时是先反序列化父亲节点,再反序列化孩子节点。
Snapshot
那么DataTree在什么情况下会序列化呢?在这里就要提到快照了。
前面提到过:如果我们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增长。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照可以解决这个问题。
除了减少WAL日志,Snapshot还会在Zk全量同步时被用到——当一个全新的ZkServer(这个一般叫Learner)被加入集群时,Leader服务器会将本机上的数据全量同步给新来的ZkServer。
序列化
接下来看一下代码入口:
/**
* serialize the datatree and session into the file snapshot
* @param dt the datatree to be serialized
* @param sessions the sessions to be serialized
* @param snapShot the file to store snapshot into
*/
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
throws IOException {
if (!close) {
try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
serialize(dt, sessions, oa, header);
long val = crcOut.getChecksum().getValue();
oa.writeLong(val, "val");
oa.writeString("/", "path");
sessOS.flush();
}
} else {
throw new IOException("FileSnap has already been closed");
}
}
JavaIO的基础知识在这不再介绍,有兴趣的人可以自行查阅资料或看 从一段代码谈起——浅谈JavaIO接口。
本质就是创建文件,并调用DataTree的序列化方法,DataTree的序列化其实就是遍历DataNode去序列化,最后将这些序列化的内容写入文件。
反序列化
/**
* deserialize a data tree from the most recent snapshot
* @return the zxid of the snapshot
*/
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
LOG.info("Reading snapshot " + snap);
try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt, sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file " + snap, e);
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}
简单来说,先读取Snapshot文件们。并反序列化它们,组成DataTree。
小结
在本文中,笔者和大家一起学习了Zk的底层存储技术。在此处,我们做个简单的回顾:
- zk的数据主要维护在内存中。在写入内存前,会做WAL,同时也会定期的做快照持久化到磁盘
- WAL的常见优化手段有三种:Group Commit、File Padding、Snapshot
另外,Zk中序列化技术用的是Apache Jute——本质上调用了JavaDataOutput和Input,较为简单。故没在本文中展开。