一、总结框架图
对于Leader选举,其总体框架图如下图所示
说明:
选举的父接口为Election,其定义了lookForLeader和shutdown两个方法,lookForLeader表示寻找Leader,shutdown则表示关闭,如关闭服务端之间的连接。
AuthFastLeaderElection,同FastLeaderElection算法基本一致,只是在消息中加入了认证信息,其在3.4.0之后的版本中已经不建议使用。
FastLeaderElection,其是标准的fast paxos算法的实现,基于TCP协议进行选举。
LeaderElection,也表示一种选举算法,其在3.4.0之后的版本中已经不建议使用。
二、Election源码分析
public interface Election { public Vote lookForLeader() throws InterruptedException; public void shutdown(); }
说明:可以看到Election接口定义的方法相当简单。
三、FastLeaderElection源码分析
2.1 类的继承关系
public class FastLeaderElection implements Election {}
说明:FastLeaderElection实现了Election接口,其需要实现接口中定义的lookForLeader方法和shutdown方法,其是标准的Fast Paxos算法的实现,各服务器之间基于TCP协议进行选举。
2.2 类的内部类
FastLeaderElection有三个较为重要的内部类,分别为Notification、ToSend、Messenger。
1. Notification类
static public class Notification { /* * Format version, introduced in 3.4.6 */ public final static int CURRENTVERSION = 0x1; int version; /* * Proposed leader */ // 被推选的leader的id long leader; /* * zxid of the proposed leader */ // 被推选的leader的事务id long zxid; /* * Epoch */ // 推选者的选举周期 long electionEpoch; /* * current state of sender */ // 推选者的状态 QuorumPeer.ServerState state; /* * Address of sender */ // 推选者的id long sid; /* * epoch of the proposed leader */ // 被推选者的选举周期 long peerEpoch; @Override public String toString() { return new String(Long.toHexString(version) + " (message format version), " + leader + " (n.leader), 0x" + Long.toHexString(zxid) + " (n.zxid), 0x" + Long.toHexString(electionEpoch) + " (n.round), " + state + " (n.state), " + sid + " (n.sid), 0x" + Long.toHexString(peerEpoch) + " (n.peerEpoch) "); } } static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch) { byte requestBytes[] = new byte[40]; ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); /* * Building notification packet to send */ requestBuffer.clear(); requestBuffer.putInt(state); requestBuffer.putLong(leader); requestBuffer.putLong(zxid); requestBuffer.putLong(electionEpoch); requestBuffer.putLong(epoch); requestBuffer.putInt(Notification.CURRENTVERSION); return requestBuffer; }
说明:Notification表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的id、zxid、选举周期等信息,其buildMsg方法将选举信息封装至ByteBuffer中再进行发送。
2. ToSend类
static public class ToSend { static enum mType {crequest, challenge, notification, ack} ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch) { this.leader = leader; this.zxid = zxid; this.electionEpoch = electionEpoch; this.state = state; this.sid = sid; this.peerEpoch = peerEpoch; } /* * Proposed leader in the case of notification */ //被推举的leader的id long leader; /* * id contains the tag for acks, and zxid for notifications */ // 被推举的leader的最大事务id long zxid; /* * Epoch */ // 推举者的选举周期 long electionEpoch; /* * Current state; */ // 推举者的状态 QuorumPeer.ServerState state; /* * Address of recipient */ // 推举者的id long sid; /* * Leader epoch */ // 被推举的leader的选举周期 long peerEpoch; }
说明:ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的id、zxid、选举周期等信息。
3. Messenger类
3.1 类的内部类
Messenger包含了WorkerReceiver和WorkerSender两个内部类
3.1.1 WorkerReceiver
class WorkerReceiver implements Runnable { // 是否终止 volatile boolean stop; // 服务器之间的连接 QuorumCnxManager manager; WorkerReceiver(QuorumCnxManager manager) { this.stop = false; this.manager = manager; } public void run() { // 响应 Message response; while (!stop) { // 不终止 // Sleeps on receive try{ // 从recvQueue中取出一个选举投票消息(从其他服务器发送过来) response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); // 无投票,跳过 if(response == null) continue; /* * If it is from an observer, respond right away. * Note that the following predicate assumes that * if a server is not a follower, then it must be * an observer. If we ever have any other type of * learner in the future, we'll have to change the * way we check for observers. */ if(!self.getVotingView().containsKey(response.sid)){ // 当前的投票者集合不包含服务器 // 获取自己的投票 Vote current = self.getCurrentVote(); // 构造ToSend消息 ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock, self.getPeerState(), response.sid, current.getPeerEpoch()); // 放入sendqueue队列,等待发送 sendqueue.offer(notmsg); } else { // 包含服务器,表示接收到该服务器的选票消息 // Receive new message if (LOG.isDebugEnabled()) { LOG.debug("Receive new notification message. My id = " + self.getId()); } /* * We check for 28 bytes for backward compatibility */ // 检查向后兼容性 if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } // 若容量为28,则表示可向后兼容 boolean backCompatibility = (response.buffer.capacity() == 28); // 设置buffer中的position、limit等属性 response.buffer.clear(); // Instantiate Notification and set its attributes // 创建接收通知 Notification n = new Notification(); // State of peer that sent this message // 推选者的状态 QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; switch (response.buffer.getInt()) { // 读取状态 case 0: ackstate = QuorumPeer.ServerState.LOOKING; break; case 1: ackstate = QuorumPeer.ServerState.FOLLOWING; break; case 2: ackstate = QuorumPeer.ServerState.LEADING; break; case 3: ackstate = QuorumPeer.ServerState.OBSERVING; break; default: continue; } // 获取leader的id n.leader = response.buffer.getLong(); // 获取zxid n.zxid = response.buffer.getLong(); // 获取选举周期 n.electionEpoch = response.buffer.getLong(); n.state = ackstate; // 设置服务器的id n.sid = response.sid; if(!backCompatibility){ // 不向后兼容 n.peerEpoch = response.buffer.getLong(); } else { // 向后兼容 if(LOG.isInfoEnabled()){ LOG.info("Backward compatibility mode, server id=" + n.sid); } // 获取选举周期 n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid); } /* * Version added in 3.4.6 */ // 确定版本号 n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0; /* * Print notification info */ if(LOG.isInfoEnabled()){ printNotification(n); } /* * If this server is looking, then send proposed leader */ if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服务器为LOOKING状态 // 将消息放入recvqueue中 recvqueue.offer(n); /* * Send a notification back if the peer that sent this * message is also looking and its logical clock is * lagging behind. */ if((ackstate == QuorumPeer.ServerState.LOOKING) // 推选者服务器为LOOKING状态 && (n.electionEpoch < logicalclock)){ // 选举周期小于逻辑时钟 // 创建新的投票 Vote v = getVote(); // 构造新的发送消息(本服务器自己的投票) ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock, self.getPeerState(), response.sid, v.getPeerEpoch()); // 将发送消息放置于队列,等待发送 sendqueue.offer(notmsg); } } else { // 推选服务器状态不为LOOKING /* * If this server is not looking, but the one that sent the ack * is looking, then send back what it believes to be the leader. */ // 获取当前投票 Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ // 为LOOKING状态 if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id = " + self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId()); } ToSend notmsg; if(n.version > 0x0) { // 版本号大于0 // 构造ToSend消息 notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch()); } else { // 版本号不大于0 // 构造ToSend消息 Vote bcVote = self.getBCVote(); notmsg = new ToSend( ToSend.mType.notification, bcVote.getId(), bcVote.getZxid(), bcVote.getElectionEpoch(), self.getPeerState(), response.sid, bcVote.getPeerEpoch()); } // 将发送消息放置于队列,等待发送 sendqueue.offer(notmsg); } } } } catch (InterruptedException e) { System.out.println("Interrupted Exception while waiting for new message" + e.toString()); } } LOG.info("WorkerReceiver is down"); } }
说明:WorkerReceiver实现了Runnable接口,是选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。其是QuorumCnxManager的Message转化为FastLeaderElection的Notification。
其中,WorkerReceiver的主要逻辑在run方法中,其首先会从QuorumCnxManager中的recvQueue队列中取出其他服务器发来的选举消息,消息封装在Message数据结构中。然后判断消息中的服务器id是否包含在可以投票的服务器集合中,若不是,则会将本服务器的内部投票发送给该服务器,其流程如下
if(!self.getVotingView().containsKey(response.sid)){ // 当前的投票者集合不包含服务器 // 获取自己的投票 Vote current = self.getCurrentVote(); // 构造ToSend消息 ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock, self.getPeerState(), response.sid, current.getPeerEpoch()); // 放入sendqueue队列,等待发送 sendqueue.offer(notmsg); }
若包含该服务器,则根据消息(Message)解析出投票服务器的投票信息并将其封装为Notification,然后判断当前服务器是否为LOOKING,若为LOOKING,则直接将Notification放入FastLeaderElection的recvqueue(区别于recvQueue)中。然后判断投票服务器是否为LOOKING状态,并且其选举周期小于当前服务器的逻辑时钟,则将本(当前)服务器的内部投票发送给该服务器,否则,直接忽略掉该投票。其流程如下
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服务器为LOOKING状态 // 将消息放入recvqueue中 recvqueue.offer(n); if((ackstate == QuorumPeer.ServerState.LOOKING) // 推选者服务器为LOOKING状态 && (n.electionEpoch < logicalclock)){ // 选举周期小于逻辑时钟 // 创建新的投票 Vote v = getVote(); // 构造新的发送消息(本服务器自己的投票) ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock, self.getPeerState(), response.sid, v.getPeerEpoch()); // 将发送消息放置于队列,等待发送 sendqueue.offer(notmsg); } }
若本服务器的状态不为LOOKING,则会根据投票服务器中解析的version信息来构造ToSend消息,放入sendqueue,等待发送,起流程如下
else { // 本服务器状态不为LOOKING // 获取当前投票 Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ // 为LOOKING状态 if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id = " + self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId()); } ToSend notmsg; if(n.version > 0x0) { // 版本号大于0 // 构造ToSend消息 notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch()); } else { // 版本号不大于0 // 构造ToSend消息 Vote bcVote = self.getBCVote(); notmsg = new ToSend( ToSend.mType.notification, bcVote.getId(), bcVote.getZxid(), bcVote.getElectionEpoch(), self.getPeerState(), response.sid, bcVote.getPeerEpoch()); } // 将发送消息放置于队列,等待发送 sendqueue.offer(notmsg); } }
3.1.2 WorkerSender
class WorkerSender implements Runnable { // 是否终止 volatile boolean stop; // 服务器之间的连接 QuorumCnxManager manager; // 构造器 WorkerSender(QuorumCnxManager manager){ // 初始化属性 this.stop = false; this.manager = manager; } public void run() { while (!stop) { // 不终止 try { // 从sendqueue中取出ToSend消息 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); // 若为空,则跳过 if(m == null) continue; // 不为空,则进行处理 process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } void process(ToSend m) { // 构建消息 ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch); // 发送消息 manager.toSend(m.sid, requestBuffer); } }
说明:WorkerSender也实现了Runnable接口,为选票发送器,其会不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中,其过程是将FastLeaderElection的ToSend转化为QuorumCnxManager的Message。
3.2 类的属性
protected class Messenger { // 选票发送器 WorkerSender ws; // 选票接收器 WorkerReceiver wr; }
说明:Messenger中维护了一个WorkerSender和WorkerReceiver,分别表示选票发送器和选票接收器
3.3 类的构造函数
Messenger(QuorumCnxManager manager) { // 创建WorkerSender this.ws = new WorkerSender(manager); // 新创建线程 Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); // 设置为守护线程 t.setDaemon(true); // 启动 t.start(); // 创建WorkerReceiver this.wr = new WorkerReceiver(manager); // 创建线程 t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); // 设置为守护线程 t.setDaemon(true); // 启动 t.start(); }
说明:会启动WorkerSender和WorkerReceiver,并设置为守护线程。
2.3 类的属性
public class FastLeaderElection implements Election { // 日志 private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class); /** * Determine how much time a process has to wait * once it believes that it has reached the end of * leader election. */ // 完成Leader选举之后需要等待时长 final static int finalizeWait = 200; /** * Upper bound on the amount of time between two consecutive * notification checks. This impacts the amount of time to get * the system up again after long partitions. Currently 60 seconds. */ // 两个连续通知检查之间的最大时长 final static int maxNotificationInterval = 60000; /** * Connection manager. Fast leader election uses TCP for * communication between peers, and QuorumCnxManager manages * such connections. */ // 管理服务器之间的连接 QuorumCnxManager manager; // 选票发送队列,用于保存待发送的选票 LinkedBlockingQueue<ToSend> sendqueue; // 选票接收队列,用于保存接收到的外部投票 LinkedBlockingQueue<Notification> recvqueue; // 投票者 QuorumPeer self; Messenger messenger; // 逻辑时钟 volatile long logicalclock; /* Election instance */ // 推选的leader的id long proposedLeader; // 推选的leader的zxid long proposedZxid; // 推选的leader的选举周期 long proposedEpoch; // 是否停止选举 volatile boolean stop; }
说明:其维护了服务器之间的连接(用于发送消息)、发送消息队列、接收消息队列、推选者的一些信息(zxid、id)、是否停止选举流程标识等。
2.4 类的构造函数
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ // 字段赋值 this.stop = false; this.manager = manager; // 初始化其他信息 starter(self, manager); }
说明:构造函数中初始化了stop字段和manager字段,并且调用了starter函数,其源码如下
private void starter(QuorumPeer self, QuorumCnxManager manager) { // 赋值,对Leader和投票者的ID进行初始化操作 this.self = self; proposedLeader = -1; proposedZxid = -1; // 初始化发送队列 sendqueue = new LinkedBlockingQueue<ToSend>(); // 初始化接收队列 recvqueue = new LinkedBlockingQueue<Notification>(); // 创建Messenger,会启动接收器和发送器线程 this.messenger = new Messenger(manager); }
说明:其完成在构造函数中未完成的部分,如会初始化FastLeaderElection的sendqueue和recvqueue,并且启动接收器和发送器线程。
2.5 核心函数分析
1. sendNotifications函数
private void sendNotifications() { for (QuorumServer server : self.getVotingView().values()) { // 遍历投票参与者集合 long sid = server.id; // 构造发送消息 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING, sid, proposedEpoch); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } // 将发送消息放置于队列 sendqueue.offer(notmsg); } }
说明:其会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,其并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送。
2. totalOrderPredicate函数
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ // 使用计票器判断当前服务器的权重是否为0 return false; } // 1. 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader // 2. 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader // 3. 如果前面两个都相等那就比较服务器id,如果大,则其就是leader return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
说明:该函数将接收的投票与自身投票进行PK,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。