45-微服务技术栈(高级):分布式协调服务zookeeper源码篇(持久化FileTxnSnapLog)

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类。

一、前言

  前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类。

二、FileTxnSnapLog源码分析

2.1 类的属性

public class FileTxnSnapLog {

   //the direcotry containing the

   //the transaction logs

   // 日志文件目录

   private final File dataDir;

   //the directory containing the

   //the snapshot directory

   // 快照文件目录

   private final File snapDir;

   // 事务日志

   private TxnLog txnLog;

   // 快照

   private SnapShot snapLog;

   // 版本号

   public final static int VERSION = 2;

   // 版本

   public final static String version = "version-";


   // Logger

   private static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLog.class);

}

说明:类的属性中包含了TxnLog和SnapShot接口,即对FileTxnSnapLog的很多操作都会转发给TxnLog和SnapLog进行操作,这是一种典型的组合方法。

2.2 内部类

  FileTxnSnapLog包含了PlayBackListener内部类,用来接收事务应用过程中的回调,在Zookeeper数据恢复后期,会有事务修正过程,此过程会回调PlayBackListener来进行对应的数据修正。其源码如下 

public interface PlayBackListener {

   void onTxnLoaded(TxnHeader hdr, Record rec);

}

 说明:在完成事务操作后,会调用到onTxnLoaded方法进行相应的处理。

2.3 构造函数

  FileTxnSnapLog的构造函数如下

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {

   LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);

   // 在datadir和snapdir下生成version-2目录

   this.dataDir = new File(dataDir, version + VERSION);

   this.snapDir = new File(snapDir, version + VERSION);

   if (!this.dataDir.exists()) { // datadir存在但无法创建目录,则抛出异常

       if (!this.dataDir.mkdirs()) {

           throw new IOException("Unable to create data directory "

                                 + this.dataDir);

       }

   }

   if (!this.snapDir.exists()) { // snapdir存在但无法创建目录,则抛出异常

       if (!this.snapDir.mkdirs()) {

           throw new IOException("Unable to create snap directory "

                                 + this.snapDir);

       }

   }

   // 给属性赋值

   txnLog = new FileTxnLog(this.dataDir);

   snapLog = new FileSnap(this.snapDir);

}

说明:对于构造函数而言,其会在传入的datadir和snapdir目录下新生成version-2的目录,并且会判断目录是否创建成功,之后会创建txnLog和snapLog。

2.4 核心函数分析

1. restore函数 

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {

   // 根据snap文件反序列化dt和sessions

   snapLog.deserialize(dt, sessions);

   //

   FileTxnLog txnLog = new FileTxnLog(dataDir);

   // 获取比最后处理的zxid+1大的log文件的迭代器

   TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);

   // 最大的zxid

   long highestZxid = dt.lastProcessedZxid;


   TxnHeader hdr;

   try {

       while (true) {

           // itr在read函数调用后就已经指向第一个合法的事务

           // 获取事务头

           hdr = itr.getHeader();

           if (hdr == null) { // 事务头为空

               // 表示日志文件为空

               return dt.lastProcessedZxid;

           }

           // 事务头的zxid小于snapshot中的最大zxid并且其不为0,则会报错,因为上面获取的是+1

           if (hdr.getZxid() < highestZxid && highestZxid != 0) {

               LOG.error("{}(higestZxid) > {}(next log) for type {}",

                         new Object[] { highestZxid, hdr.getZxid(),

                                       hdr.getType() });

           } else {

               // 重新赋值highestZxid

               highestZxid = hdr.getZxid();

           }

           try {

               // 在datatree上处理事务

               processTransaction(hdr,dt,sessions, itr.getTxn());

           } catch(KeeperException.NoNodeException e) {

               throw new IOException("Failed to process transaction type: " +

                                     hdr.getType() + " error: " + e.getMessage(), e);

           }

           // 每处理完一个事务都会进行回调

           listener.onTxnLoaded(hdr, itr.getTxn());

           if (!itr.next()) // 已无事务,跳出循环

               break;

       }

   } finally {

       if (itr != null) { // 迭代器不为空,则关闭

           itr.close();

       }

   }

   // 返回最高的zxid

   return highestZxid;

}

说明:restore用于恢复datatree和sessions,其步骤大致如下

  ① 根据snapshot文件反序列化datatree和sessions,进入②

  ② 获取比snapshot文件中的zxid+1大的log文件的迭代器,以对log文件中的事务进行迭代,进入③

  ③ 迭代log文件的每个事务,并且将该事务应用在datatree中,同时会调用onTxnLoaded函数进行后续处理,进入④

  ④ 关闭迭代器,返回log文件中最后一个事务的zxid(作为最高的zxid)

其中会调用到FileTxnLog的read函数,read函数在FileTxnLog中已经进行过分析,会调用processTransaction函数,其源码如下 

public void processTransaction(TxnHeader hdr,DataTree dt,

                              Map<Long, Integer> sessions, Record txn)

   throws KeeperException.NoNodeException {

   // 事务处理结果

   ProcessTxnResult rc;

   switch (hdr.getType()) { // 确定事务类型

       case OpCode.createSession: // 创建会话

           // 添加进会话

           sessions.put(hdr.getClientId(),

                        ((CreateSessionTxn) txn).getTimeOut());

           if (LOG.isTraceEnabled()) {

               ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,

                                        "playLog --- create session in log: 0x"

                                        + Long.toHexString(hdr.getClientId())

                                        + " with timeout: "

                                        + ((CreateSessionTxn) txn).getTimeOut());

           }

           // give dataTree a chance to sync its lastProcessedZxid

           // 处理事务

           rc = dt.processTxn(hdr, txn);

           break;

       case OpCode.closeSession: // 关闭会话

           // 会话中移除

           sessions.remove(hdr.getClientId());

           if (LOG.isTraceEnabled()) {

               ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,

                                        "playLog --- close session in log: 0x"

                                        + Long.toHexString(hdr.getClientId()));

           }

           // 处理事务

           rc = dt.processTxn(hdr, txn);

           break;

       default:

           // 处理事务

           rc = dt.processTxn(hdr, txn);

   }


   /**

        * Snapshots are lazily created. So when a snapshot is in progress,

        * there is a chance for later transactions to make into the

        * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS

        * errors could occur. It should be safe to ignore these.

        */

   if (rc.err != Code.OK.intValue()) { // 忽略处理结果中可能出现的错误

       LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()

                 + ", error: " + rc.err + ", path: " + rc.path);

   }

}

说明:processTransaction会根据事务头中记录的事务类型(createSession、closeSession、其他类型)来进行相应的操作,对于createSession类型而言,其会将会话和超时时间添加至会话map中,对于closeSession而言,会话map会根据客户端的id号删除其会话(因为存储时候key=clientId),同时,所有的操作都会调用到dt.processTxn函数,其源码如下 

public ProcessTxnResult processTxn(TxnHeader header, Record txn)

{

   // 事务处理结果

   ProcessTxnResult rc = new ProcessTxnResult();

   try {

       // 从事务头中解析出相应属性并保存至rc中

       rc.clientId = header.getClientId();

       rc.cxid = header.getCxid();

       rc.zxid = header.getZxid();

       rc.type = header.getType();

       rc.err = 0;

       rc.multiResult = null;

       switch (header.getType()) { // 确定事务类型

           case OpCode.create: // 创建结点

               // 显示转化

               CreateTxn createTxn = (CreateTxn) txn;

               // 获取创建结点路径

               rc.path = createTxn.getPath();

               // 创建结点

               createNode(

                   createTxn.getPath(),

                   createTxn.getData(),

                   createTxn.getAcl(),

                   createTxn.getEphemeral() ? header.getClientId() : 0,

                   createTxn.getParentCVersion(),

                   header.getZxid(), header.getTime());

               break;

           case OpCode.delete: // 删除结点

               // 显示转化

               DeleteTxn deleteTxn = (DeleteTxn) txn;

               // 获取删除结点路径

               rc.path = deleteTxn.getPath();

               // 删除结点

               deleteNode(deleteTxn.getPath(), header.getZxid());

               break;

           case OpCode.setData: // 写入数据

               // 显示转化

               SetDataTxn setDataTxn = (SetDataTxn) txn;

               // 获取写入数据结点路径

               rc.path = setDataTxn.getPath();

               // 写入数据

               rc.stat = setData(setDataTxn.getPath(), setDataTxn

                                 .getData(), setDataTxn.getVersion(), header

                                 .getZxid(), header.getTime());

               break;

           case OpCode.setACL: // 设置ACL

               // 显示转化

               SetACLTxn setACLTxn = (SetACLTxn) txn;

               // 获取路径

               rc.path = setACLTxn.getPath();

               // 设置ACL

               rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),

                                setACLTxn.getVersion());

               break;

           case OpCode.closeSession: // 关闭会话

               // 关闭会话

               killSession(header.getClientId(), header.getZxid());

               break;

           case OpCode.error: // 错误

               // 显示转化

               ErrorTxn errTxn = (ErrorTxn) txn;

               // 记录错误

               rc.err = errTxn.getErr();

               break;

           case OpCode.check: // 检查

               // 显示转化

               CheckVersionTxn checkTxn = (CheckVersionTxn) txn;

               // 获取路径

               rc.path = checkTxn.getPath();

               break;

           case OpCode.multi: // 多个事务

               // 显示转化

               MultiTxn multiTxn = (MultiTxn) txn ;

               // 获取事务列表

               List<Txn> txns = multiTxn.getTxns();

               rc.multiResult = new ArrayList<ProcessTxnResult>();

               boolean failed = false;

               for (Txn subtxn : txns) { // 遍历事务列表

                   if (subtxn.getType() == OpCode.error) {

                       failed = true;

                       break;

                   }

               }

               boolean post_failed = false;

               for (Txn subtxn : txns) { // 遍历事务列表,确定每个事务类型并进行相应操作

                   // 处理事务的数据

                   ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());

                   Record record = null;

                   switch (subtxn.getType()) {

                       case OpCode.create:

                           record = new CreateTxn();

                           break;

                       case OpCode.delete:

                           record = new DeleteTxn();

                           break;

                       case OpCode.setData:

                           record = new SetDataTxn();

                           break;

                       case OpCode.error:

                           record = new ErrorTxn();

                           post_failed = true;

                           break;

                       case OpCode.check:

                           record = new CheckVersionTxn();

                           break;

                       default:

                           throw new IOException("Invalid type of op: " + subtxn.getType());

                   }

                   assert(record != null);

                   // 将bytebuffer转化为record(初始化record的相关属性)

                   ByteBufferInputStream.byteBuffer2Record(bb, record);


                   if (failed && subtxn.getType() != OpCode.error){ // 失败并且不为error类型

                       int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue()

                           : Code.OK.intValue();


                       subtxn.setType(OpCode.error);

                       record = new ErrorTxn(ec);

                   }


                   if (failed) { // 失败

                       assert(subtxn.getType() == OpCode.error) ;

                   }


                   // 生成事务头

                   TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),

                                                    header.getZxid(), header.getTime(),

                                                    subtxn.getType());

                   // 递归调用处理事务

                   ProcessTxnResult subRc = processTxn(subHdr, record);

                   // 保存处理结果

                   rc.multiResult.add(subRc);

                   if (subRc.err != 0 && rc.err == 0) {

                       rc.err = subRc.err ;

                   }

               }

               break;

       }

   } catch (KeeperException e) {

       if (LOG.isDebugEnabled()) {

           LOG.debug("Failed: " + header + ":" + txn, e);

       }

       rc.err = e.code().intValue();

   } catch (IOException e) {

       if (LOG.isDebugEnabled()) {

           LOG.debug("Failed: " + header + ":" + txn, e);

       }

   }

   /*

        * A snapshot might be in progress while we are modifying the data

        * tree. If we set lastProcessedZxid prior to making corresponding

        * change to the tree, then the zxid associated with the snapshot

        * file will be ahead of its contents. Thus, while restoring from

        * the snapshot, the restore method will not apply the transaction

        * for zxid associated with the snapshot file, since the restore

        * method assumes that transaction to be present in the snapshot.

        *

        * To avoid this, we first apply the transaction and then modify

        * lastProcessedZxid.  During restore, we correctly handle the

        * case where the snapshot contains data ahead of the zxid associated

        * with the file.

        */

   // 事务处理结果中保存的zxid大于已经被处理的最大的zxid,则重新赋值

   if (rc.zxid > lastProcessedZxid) {

       lastProcessedZxid = rc.zxid;

   }


   /*

        * Snapshots are taken lazily. It can happen that the child

        * znodes of a parent are created after the parent

        * is serialized. Therefore, while replaying logs during restore, a

        * create might fail because the node was already

        * created.

        *

        * After seeing this failure, we should increment

        * the cversion of the parent znode since the parent was serialized

        * before its children.

        *

        * Note, such failures on DT should be seen only during

        * restore.

        */

   if (header.getType() == OpCode.create &&

       rc.err == Code.NODEEXISTS.intValue()) { // 处理在恢复数据过程中的结点创建操作

       LOG.debug("Adjusting parent cversion for Txn: " + header.getType() +

                 " path:" + rc.path + " err: " + rc.err);

       int lastSlash = rc.path.lastIndexOf('/');

       String parentName = rc.path.substring(0, lastSlash);

       CreateTxn cTxn = (CreateTxn)txn;

       try {

           setCversionPzxid(parentName, cTxn.getParentCVersion(),

                            header.getZxid());

       } catch (KeeperException.NoNodeException e) {

           LOG.error("Failed to set parent cversion for: " +

                     parentName, e);

           rc.err = e.code().intValue();

       }

   } else if (rc.err != Code.OK.intValue()) {

       LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +

                 " : error: " + rc.err);

   }

   return rc;

}

说明:processTxn用于处理事务,即将事务操作应用到DataTree【参见下面源码】内存数据库中,以恢复成最新的数据。其操作所有数据节点都是借助于DataNode,数据会类似于树状存储,

以删除为例:

public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException {

   int lastSlash = path.lastIndexOf('/');

   String parentName = path.substring(0, lastSlash);

   String childName = path.substring(lastSlash + 1);

   DataNode node = nodes.get(path);

   if (node == null) {

       throw new KeeperException.NoNodeException();

   }

   nodes.remove(path);

   DataNode parent = nodes.get(parentName);

   if (parent == null) {

       throw new KeeperException.NoNodeException();

   }

   synchronized (parent) {

       parent.removeChild(childName);

       parent.stat.setPzxid(zxid);

       long eowner = node.stat.getEphemeralOwner();

       if (eowner != 0) {

           HashSet<String> nodes = ephemerals.get(eowner);

           if (nodes != null) {

               synchronized (nodes) {

                   nodes.remove(path);

               }

           }

       }

       node.parent = null;

   }

   ......

}

2. save函数  

public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)

   throws IOException {

   // 获取最后处理的zxid

   long lastZxid = dataTree.lastProcessedZxid;

   // 生成snapshot文件

   File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));

   LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);

   // 序列化datatree、sessionsWithTimeouts至snapshot文件

   snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);


}

说明:save函数用于将sessions和datatree保存至snapshot文件中,其大致步骤如下

  ① 获取内存数据库中已经处理的最新的zxid,进入②

  ② 根据zxid和快照目录生成snapshot文件,进入③

  ③ 将datatree(内存数据库)、sessionsWithTimeouts序列化至快照文件。

  其他的函数或多或少都是调用TxnLog和SnapLog中的相应函数,之前已经进行过分析,这里不再累赘。

三、总结

本篇博文分析了FileTxnSnapLog的源码,其主要:

  • 封装了TxnLog和SnapLog来进行相应的处理
  • 提供了从snapshot文件和log文件中恢复内存数据库的接口
  • 其对于事务的维护,或说事务的存储结构是一个个DataTree,代码层面看到的是DataNode,其增删都是追加/删除父子节点,类似于树状操作
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
26天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
60 3
|
9天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
43 1
|
2月前
|
存储 NoSQL Redis
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
Redis持久化、RDB和AOF方案、Redis主从集群、哨兵、分片集群、散列插槽、自动手动故障转移
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
|
2月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
75 3
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
44 0
|
4月前
|
canal 缓存 NoSQL
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;先删除缓存还是先修改数据库,双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
|
3月前
|
Java 数据库连接 微服务
揭秘微服务架构下的数据魔方:Hibernate如何玩转分布式持久化,实现秒级响应的秘密武器?
【8月更文挑战第31天】微服务架构通过将系统拆分成独立服务,提升了可维护性和扩展性,但也带来了数据一致性和事务管理等挑战。Hibernate 作为强大的 ORM 工具,在微服务中发挥关键作用,通过二级缓存和分布式事务支持,简化了对象关系映射,并提供了有效的持久化策略。其二级缓存机制减少数据库访问,提升性能;支持 JTA 保证跨服务事务一致性;乐观锁机制解决并发数据冲突。合理配置 Hibernate 可助力构建高效稳定的分布式系统。
64 0
|
3月前
|
Java 应用服务中间件 数据库
SpringCloud:服务保护和分布式事务详解
SpringCloud:服务保护和分布式事务详解
129 0
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2

相关产品

  • 微服务引擎
  • 下一篇
    无影云桌面