5,主结点同步数据到从结点(ZAB协议)
5.1,发送这个propose(第一阶段)
11,完成了这个链条中的第二个环节之后,就进入第三个环节,即ProposalRequestProcessor的这个结点。这一环节只要是为了同步数据到从结点,并且将数据同步到从结点之后,会将这个数据在本地磁盘里面保存一份
public class ProposalRequestProcessor implements RequestProcessor { //主要是会走这个方法 public void processRequest(Request request) throws RequestProcessorException { nextProcessor.processRequest(request); //propose处理这个request zks.getLeader().propose(request); //将数据写入到本地磁盘 syncProcessor.processRequest(request); } }
接下来查看这个propose方法,会对主结点中的数据进行一个预处理,并将数据发送给全部的从结点
public Proposal propose(Request request) throws XidRolloverException { //序列化 byte[] data = SerializeUtils.serializeRequest(request); proposalStats.setLastBufferSize(data.length); //对数据进行打包,里面会有几种数据类型,如ping,ack等 QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); //将这些数据全部发送出去 sendPacket(pp); }
这个sendPacket方法,就是会轮询的将数据发送给所有的这个follow从结点
void sendPacket(QuorumPacket qp) { synchronized (forwardingFollowers) { //循环发送 for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); } } }
数据发送完同时也会将数据存放在这个本地磁盘里面,主要是通过这个SyncRequestProcessor 类里面的这个processRequest线程实现,主要查看这个线程的run方法。就是leader主结点将数据存在本地磁盘
@Override public void run() { //将一些数据初始化到磁盘上面 zks.getZKDatabase().rollLog(); //调用一个flush方法,主要用于写日志文件 //主要是写一些事物文件和快照文件 flush(toFlush); }
5.2,Ack确认机制
12,主要是通过这个 AckRequestProcessor 类实现,里面有一个processRequest的这个方法,就是首先这个leader会先给自己发一个ack,这样在后面统计这个只有从结点的响应的ack的同时,还需要加上这个主结点的ack。
public void processRequest(Request request) { QuorumPeer self = leader.self; if(self != null) leader.processAck(self.getId(), request.zxid, null); else LOG.error("Null QuorumPeer"); }
通过这个processAck 方法可以知道,最终会将这个结点的sid存到主结点的一个hashset的一个集合里面。
//将这台机器的sid存在这个hashset里面 p.addAck(sid); //尝试判断这个票数是否大于一半,大于一半则提交 boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
13,主结点会给所有的从结点发送数据,会和这些从结点建立nio的一个连接,然后通过这个 LearnerHandler 类来发送这个消息。这个类继承了一个线程类,那么主要看这个类的run方法
public class LearnerHandler extends ZooKeeperThread { @Override public void run() { //会开始发送这个数据包 startSendingPackets(); } private void startSendingPackets() throws InterruptedException { public void run() { try { //开始发送数据包 sendPackets(); } catch (InterruptedException e) { LOG.warn("Unexpected interruption " + e.getMessage()); } } }.start(); } private void sendPackets() throws InterruptedException { QuorumPacket p; //从队列中获取数据 p = queuedPackets.poll(); //通过bio的方式,将序列化的数据写入到从结点 oa.writeRecord(p, "packet"); } }
14,主结点将消息发送到这个从结点之后,在这个Follow的这个类里面,通过这个followLeader方法来读取主结点的发过来的消息,同时也会将这个数据存储在这个本地磁盘里面
public class Follower extends Learner{ void followLeader() throws InterruptedException { while (this.isRunning()) { //读取传过来的数据 readPacket(qp); //处理这个packet的这个数据包 processPacket(qp); } } }
从结点处理主结点发送的这个数据包的具体实现如下
//处理这个数据包的过程如下 protected void processPacket(QuorumPacket qp) throws Exception{ case Leader.PING: ping(qp); break; //将传过来的数据写入到磁盘 case Leader.PROPOSAL: fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break; ... }
15,从结点在处理完数据之后,会通过这个 SendAckRequestProcessor 类里面的 processRequest 方法来给这个主结点返回一个ack
public class SendAckRequestProcessor implements RequestProcessor, Flushable { public void processRequest(Request si) { //构建一个返回一个ACK的一个数据包 QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null); //将数据写入,通过这个bio的连接,将数据写会给这个主结点 learner.writePacket(qp, false); } }
16,从结点同步完数据之后,会返回一个ack的一个确认机制,主结点主要是 LearnerHandler 线程类的run方法里面实现,里面有一个while循环一直接收这个从结点发的消息。类型为ack时会和之前的流程一样,将这个从结点的sid存放在一个hashset的一个集合里面,最后会去尝试这个commit提交,大于一半就会提交
public class LearnerHandler extends ZooKeeperThread { @Override public void run() { while (true) { //leader获取数据 qp = new QuorumPacket(); ia.readRecord(qp, "packet"); //获取数据的类型 switch (qp.getType()) { //这个ack会走和主结点ack一样的流程 case Leader.ACK: ...; break; case Leader.PING: ...; break; case Leader.REVALIDATE: ...; break; case Leader.REQUEST: ...; break; } } } }
5.3,commit提交(第二阶段)
17,在主结点获取到这个ack之后,都会有一个尝试commit的提交操作,如果这个票数过半,那么就会走这个正式的commit的提交操作。就是说leader会再发起一个请求,告诉这些从结点也可以进行数据的提交,就是将之前存在日志里面的数据加载到内存里面,那么其他客户端来查询就可以从这个从结点里面查出这个数据。从结点数据提交之后,这个主结点的数据也会提交。
synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { commit(zxid); inform(p); //从结点提交之后,主结点这边也会进行一个提交 zk.commitProcessor.commit(p.request); }
然后主要查看这个commit 方法
public void commit(long zxid) { synchronized(this){ lastCommitted = zxid; } //又会构建一个数据包,这个类型是COMMIT类型,同时返回一个zxid QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); //轮询的方式发送给所有的从结点, sendPacket(qp); } void sendPacket(QuorumPacket qp) { //轮询的方式发送 synchronized (forwardingFollowers) { for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); } } }
再查看这个inform 方法,可以发现这个数据也会同步给Observer的结点
public void inform(Proposal proposal) { QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null); sendObserverPacket(qp); }
在从结点提交完数据之后,主结点也会提交数据,就是将存在磁盘里面的数据加载到内存里面
protected void processCommitted() { Request request; //从队列中获取消息 request = committedRequests.poll(); Request pending = nextPending.get(); sendToNextProcessor(pending); }
6,服务端走完最后两个链条结点
18,接下来进入链条的倒数第二个结点ToBeAppliedRequestProcessor
static class ToBeAppliedRequestProcessor implements RequestProcessor{ public void processRequest(Request request) throws RequestProcessorException { //责任链模式,直接进入下一个request next.processRequest(request); } }
19,那么直接进入责任链里面的最后一个节点 FinalRequestProcessor
public class FinalRequestProcessor implements RequestProcessor { public void processRequest(Request request) { //处理事务 rc = zks.processTxn(request); } }
接下来可以查看这个processTxn的这个方法,就是将这个内存中的数据存储到对应的树形结构里面
private ProcessTxnResult processTxn(Request request, TxnHeader hdr,Record txn) { rc = getZKDatabase().processTxn(hdr, txn); } public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { //将结果加入到zookeeper的树形结构中 return dataTree.processTxn(hdr, txn); }
7,服务端给客户端反馈
20,依旧是在这个责任链模式的最后一个结点FinalRequestProcessor,里面会有一个服务端给客户端的响应。就是告知客户端这条命令执行是否成功失败
public class FinalRequestProcessor implements RequestProcessor { public void processRequest(Request request) { //给客户端响应 cnxn.sendResponse(hdr, rsp, "response"); } }
然后就是查看这个sendResponse方法,
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); //对数据进行序列化 BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); try { baos.write(fourBytes); bos.writeRecord(h, "header"); if (r != null) { bos.writeRecord(r, tag); } baos.close(); } catch (IOException e) { LOG.error("Error serializing response"); } byte b[] = baos.toByteArray(); serverStats().updateClientResponseSize(b.length - 4); ByteBuffer bb = ByteBuffer.wrap(b); bb.putInt(b.length - 4).rewind(); //以流的方式发送 sendBuffer(bb); }
在看这个sendBuffer方法,将封装给客户端的数据返回
@Override public void sendBuffer(ByteBuffer sendBuffer) { if (sendBuffer == ServerCnxnFactory.closeConn) { close(); return; } channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener( onSendBufferDoneListener); }
8,客户端接收反馈
21,由于这个客户端和这个服务一开始就建立了这个nio连接或者netty连接,因此在服务端给客户端发送这个数据的时候,客户端这边也可以立马收到响应。
依旧是在这个ClientCnxn类里面,找到这个这个客户端的doTransport方法,就是会去处理对应的事件
public class ClientCnxn { clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); }
在nio的这个doTransport方法里面,会去判断这个事件的读写
@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { //判断是读事件开始写事件 if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, cnxn); } }
22,客户端这边主要是通过这个doIO方法来读取这个io流的数据
@Override void doIO(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { //读取这个io sendThread.readResponse(incomingBuffer); } void readResponse(ByteBuffer incomingBuffer) throws IOException { //客户端这边会有一个watcher的监听器 WatchedEvent we = new WatchedEvent(event); //会将这个事件加入到队列中 eventThread.queueEvent( we ); }
接下来查看这个重点的queueEvent方法,就是会将这个事件加入到一个阻塞队列里面。在zookeeper客户端启动的时候,就会创建两个线程,一个就是用于监听机制的eventThread线程,监听的事件就是现在加入的事件。
private void queueEvent(WatchedEvent event,Set<Watcher> materializedWatchers) { WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); waitingEvents.add(pair); }
三,总结
1,ZAB的消息广播总结
一个zookeeper的原子消息的协议,主要通过两阶段提交的方式实现:
1,在第一阶段,首先zookeeper的客户端和这个服务端的leader主结点会通过nio或者netty的方式建立连接,然后客户端可以向这个主结点里面发送数据。
2,主结点接收到数据之后,leader主结点会向这个从结点发送一个proposal的一个命令,并且会以轮询的方式发给所有从结点,同时会将这个data数据和事务id一起发送给从结点
3,主结点发送完这个命令之后,leader主结点会同步将数据存在本地磁盘里面,并且给自己投一个ack的票
4,从结点将主结点发来的数据会先存储在本地磁盘,并且给主结点返回一个ack
5,主结点会去统计这个ack的票数,就是从结点所返回的ack和自己投票的ack
6,在第二阶段,如果ack的票数大于一半,那么主结点就会给从结点发送一个commit提交命令
7,在从结点里面的数据一开始是存储在磁盘的,在接收到这个commit命令之后,会将数据存储到内存
8,主结点也会将存储在磁盘的数据加入这个内存里面
9,主结点最后会给客户端一个数据变动的Event事件,并给这个客户端返回一个命令操作的结果
2,zookeeper的脑分裂问题
就是说在一段很短的时间内,这个网络不稳定或者说这个出现这个断网的现象,那么可能造成leader和follow无法通信的情况,那么的从结点就会认为这个主结点可能挂了,因此集群的从结点就会重新进行一个主结点的选举,在短时间内,这个之前的主结点又恢复了,那么此时会有两个这个主结点,就是造成了这个脑裂问题,这样就会有大量的数据丢失。
解决答案 :就是通过这个zab解决。就是说在如果出现脑分裂,那么就会有两个主结点,其中后面这个新选举的主结点会有从结点,而这个出现网络故障的主结点没有这个从结点。根据这个两阶段提交,如果外面有数据写进来,会先写到磁盘里面,此时两个主结点磁盘都有数据,但是需要通过投票机制,超过一半的投票才能将数据提交到内存里面,这个没有从结点的主结点获取的ack票数不能超过一半,那么就不能触发commit提交,那么就不能将数据加载到内存,就不会出现这个数据丢失的情况。