7.选举机制源码分析

简介: scascas

一、总结框架图

对于Leader选举,其总体框架图如下图所示

  

说明:

  选举的父接口为Election,其定义了lookForLeader和shutdown两个方法,lookForLeader表示寻找Leader,shutdown则表示关闭,如关闭服务端之间的连接。

  AuthFastLeaderElection,同FastLeaderElection算法基本一致,只是在消息中加入了认证信息,其在3.4.0之后的版本中已经不建议使用。

  FastLeaderElection,其是标准的fast paxos算法的实现,基于TCP协议进行选举。

  LeaderElection,也表示一种选举算法,其在3.4.0之后的版本中已经不建议使用。

二、Election源码分析 

public interface Election {
    public Vote lookForLeader() throws InterruptedException;
    public void shutdown();
}

说明:可以看到Election接口定义的方法相当简单。

三、FastLeaderElection源码分析

2.1 类的继承关系 

public class FastLeaderElection implements Election {}

说明:FastLeaderElection实现了Election接口,其需要实现接口中定义的lookForLeader方法和shutdown方法,其是标准的Fast Paxos算法的实现,各服务器之间基于TCP协议进行选举。

2.2 类的内部类

FastLeaderElection有三个较为重要的内部类,分别为Notification、ToSend、Messenger。

1. Notification类 

static public class Notification {
        /*
         * Format version, introduced in 3.4.6
         */
        
        public final static int CURRENTVERSION = 0x1; 
        int version;
                
        /*
         * Proposed leader
         */
        // 被推选的leader的id
        long leader;
        /*
         * zxid of the proposed leader
         */
        // 被推选的leader的事务id
        long zxid;
        /*
         * Epoch
         */
        // 推选者的选举周期
        long electionEpoch;
        /*
         * current state of sender
         */
        // 推选者的状态
        QuorumPeer.ServerState state;
        /*
         * Address of sender
         */
        // 推选者的id
        long sid;
        /*
         * epoch of the proposed leader
         */
        // 被推选者的选举周期
        long peerEpoch;
        
        @Override
        public String toString() {
            return new String(Long.toHexString(version) + " (message format version), " 
                    + leader + " (n.leader), 0x"
                    + Long.toHexString(zxid) + " (n.zxid), 0x"
                    + Long.toHexString(electionEpoch) + " (n.round), " + state
                    + " (n.state), " + sid + " (n.sid), 0x"
                    + Long.toHexString(peerEpoch) + " (n.peerEpoch) ");
        }
    }
    
    static ByteBuffer buildMsg(int state,
            long leader,
            long zxid,
            long electionEpoch,
            long epoch) {
        byte requestBytes[] = new byte[40];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        /*
         * Building notification packet to send 
         */
        requestBuffer.clear();
        requestBuffer.putInt(state);
        requestBuffer.putLong(leader);
        requestBuffer.putLong(zxid);
        requestBuffer.putLong(electionEpoch);
        requestBuffer.putLong(epoch);
        requestBuffer.putInt(Notification.CURRENTVERSION);
        
        return requestBuffer;
    }

说明:Notification表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的id、zxid、选举周期等信息,其buildMsg方法将选举信息封装至ByteBuffer中再进行发送。

2. ToSend类  

static public class ToSend {
        static enum mType {crequest, challenge, notification, ack}
        ToSend(mType type,
                long leader,
                long zxid,
                long electionEpoch,
                ServerState state,
                long sid,
                long peerEpoch) {
            this.leader = leader;
            this.zxid = zxid;
            this.electionEpoch = electionEpoch;
            this.state = state;
            this.sid = sid;
            this.peerEpoch = peerEpoch;
        }
        /*
         * Proposed leader in the case of notification
         */
        //被推举的leader的id
        long leader;
        /*
         * id contains the tag for acks, and zxid for notifications
         */
        // 被推举的leader的最大事务id
        long zxid;
        /*
         * Epoch
         */
        // 推举者的选举周期
        long electionEpoch;
        /*
         * Current state;
         */
        // 推举者的状态
        QuorumPeer.ServerState state;
        /*
         * Address of recipient
         */
        // 推举者的id
        long sid;
        
        /*
         * Leader epoch
         */
        // 被推举的leader的选举周期
        long peerEpoch;
    }

说明:ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的id、zxid、选举周期等信息。

3. Messenger类

3.1 类的内部类

Messenger包含了WorkerReceiver和WorkerSender两个内部类

3.1.1 WorkerReceiver  

class WorkerReceiver implements Runnable {
    // 是否终止
    volatile boolean stop;
    // 服务器之间的连接
    QuorumCnxManager manager;
    WorkerReceiver(QuorumCnxManager manager) {
        this.stop = false;
        this.manager = manager;
    }
    public void run() {
        // 响应
        Message response;
        while (!stop) { // 不终止
            // Sleeps on receive
            try{
                // 从recvQueue中取出一个选举投票消息(从其他服务器发送过来)
                response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                // 无投票,跳过
                if(response == null) continue;
                /*
                         * If it is from an observer, respond right away.
                         * Note that the following predicate assumes that
                         * if a server is not a follower, then it must be
                         * an observer. If we ever have any other type of
                         * learner in the future, we'll have to change the
                         * way we check for observers.
                         */
                if(!self.getVotingView().containsKey(response.sid)){ // 当前的投票者集合不包含服务器
                    // 获取自己的投票
                    Vote current = self.getCurrentVote();
                    // 构造ToSend消息
                    ToSend notmsg = new ToSend(ToSend.mType.notification,
                                               current.getId(),
                                               current.getZxid(),
                                               logicalclock,
                                               self.getPeerState(),
                                               response.sid,
                                               current.getPeerEpoch());
                    // 放入sendqueue队列,等待发送
                    sendqueue.offer(notmsg);
                } else { // 包含服务器,表示接收到该服务器的选票消息
                    // Receive new message
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Receive new notification message. My id = "
                                  + self.getId());
                    }
                    /*
                             * We check for 28 bytes for backward compatibility
                             */
                    // 检查向后兼容性
                    if (response.buffer.capacity() < 28) {
                        LOG.error("Got a short response: "
                                  + response.buffer.capacity());
                        continue;
                    }
                    // 若容量为28,则表示可向后兼容
                    boolean backCompatibility = (response.buffer.capacity() == 28);
                    // 设置buffer中的position、limit等属性
                    response.buffer.clear();
                    // Instantiate Notification and set its attributes
                    // 创建接收通知
                    Notification n = new Notification();
                    // State of peer that sent this message
                    // 推选者的状态
                    QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                    switch (response.buffer.getInt()) { // 读取状态
                        case 0:
                            ackstate = QuorumPeer.ServerState.LOOKING;
                            break;
                        case 1:
                            ackstate = QuorumPeer.ServerState.FOLLOWING;
                            break;
                        case 2:
                            ackstate = QuorumPeer.ServerState.LEADING;
                            break;
                        case 3:
                            ackstate = QuorumPeer.ServerState.OBSERVING;
                            break;
                        default:
                            continue;
                    }
                    // 获取leader的id
                    n.leader = response.buffer.getLong();
                    // 获取zxid
                    n.zxid = response.buffer.getLong();
                    // 获取选举周期
                    n.electionEpoch = response.buffer.getLong();
                    n.state = ackstate;
                    // 设置服务器的id
                    n.sid = response.sid;
                    if(!backCompatibility){ // 不向后兼容
                        n.peerEpoch = response.buffer.getLong();
                    } else { // 向后兼容
                        if(LOG.isInfoEnabled()){
                            LOG.info("Backward compatibility mode, server id=" + n.sid);
                        }
                        // 获取选举周期
                        n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
                    }
                    /*
                             * Version added in 3.4.6
                             */
                    // 确定版本号
                    n.version = (response.buffer.remaining() >= 4) ? 
                        response.buffer.getInt() : 0x0;
                    /*
                             * Print notification info
                             */
                    if(LOG.isInfoEnabled()){
                        printNotification(n);
                    }
                    /*
                             * If this server is looking, then send proposed leader
                             */
                    if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服务器为LOOKING状态
                        // 将消息放入recvqueue中
                        recvqueue.offer(n);
                        /*
                                 * Send a notification back if the peer that sent this
                                 * message is also looking and its logical clock is
                                 * lagging behind.
                                 */
                        if((ackstate == QuorumPeer.ServerState.LOOKING) // 推选者服务器为LOOKING状态
                           && (n.electionEpoch < logicalclock)){ // 选举周期小于逻辑时钟
                            // 创建新的投票
                            Vote v = getVote();
                            // 构造新的发送消息(本服务器自己的投票)
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                                       v.getId(),
                                                       v.getZxid(),
                                                       logicalclock,
                                                       self.getPeerState(),
                                                       response.sid,
                                                       v.getPeerEpoch());
                            // 将发送消息放置于队列,等待发送
                            sendqueue.offer(notmsg);
                        }
                    } else { // 推选服务器状态不为LOOKING
                        /*
                                 * If this server is not looking, but the one that sent the ack
                                 * is looking, then send back what it believes to be the leader.
                                 */
                        // 获取当前投票
                        Vote current = self.getCurrentVote(); 
                        if(ackstate == QuorumPeer.ServerState.LOOKING){ // 为LOOKING状态
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Sending new notification. My id =  " +
                                          self.getId() + " recipient=" +
                                          response.sid + " zxid=0x" +
                                          Long.toHexString(current.getZxid()) +
                                          " leader=" + current.getId());
                            }
                            ToSend notmsg;
                            if(n.version > 0x0) { // 版本号大于0
                                // 构造ToSend消息
                                notmsg = new ToSend(
                                    ToSend.mType.notification,
                                    current.getId(),
                                    current.getZxid(),
                                    current.getElectionEpoch(),
                                    self.getPeerState(),
                                    response.sid,
                                    current.getPeerEpoch());
                            } else { // 版本号不大于0
                                // 构造ToSend消息
                                Vote bcVote = self.getBCVote();
                                notmsg = new ToSend(
                                    ToSend.mType.notification,
                                    bcVote.getId(),
                                    bcVote.getZxid(),
                                    bcVote.getElectionEpoch(),
                                    self.getPeerState(),
                                    response.sid,
                                    bcVote.getPeerEpoch());
                            }
                            // 将发送消息放置于队列,等待发送
                            sendqueue.offer(notmsg);
                        }
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("Interrupted Exception while waiting for new message" +
                                   e.toString());
            }
        }
        LOG.info("WorkerReceiver is down");
    }
}

说明:WorkerReceiver实现了Runnable接口,是选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。其是QuorumCnxManager的Message转化为FastLeaderElection的Notification。

  其中,WorkerReceiver的主要逻辑在run方法中,其首先会从QuorumCnxManager中的recvQueue队列中取出其他服务器发来的选举消息,消息封装在Message数据结构中。然后判断消息中的服务器id是否包含在可以投票的服务器集合中,若不是,则会将本服务器的内部投票发送给该服务器,其流程如下  

if(!self.getVotingView().containsKey(response.sid)){ // 当前的投票者集合不包含服务器
    // 获取自己的投票
    Vote current = self.getCurrentVote();
    // 构造ToSend消息
    ToSend notmsg = new ToSend(ToSend.mType.notification,
                               current.getId(),
                               current.getZxid(),
                               logicalclock,
                               self.getPeerState(),
                               response.sid,
                               current.getPeerEpoch());
    // 放入sendqueue队列,等待发送
    sendqueue.offer(notmsg);
}

  若包含该服务器,则根据消息(Message)解析出投票服务器的投票信息并将其封装为Notification,然后判断当前服务器是否为LOOKING,若为LOOKING,则直接将Notification放入FastLeaderElection的recvqueue(区别于recvQueue)中。然后判断投票服务器是否为LOOKING状态,并且其选举周期小于当前服务器的逻辑时钟,则将本(当前)服务器的内部投票发送给该服务器,否则,直接忽略掉该投票。其流程如下  

if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服务器为LOOKING状态
    // 将消息放入recvqueue中
    recvqueue.offer(n);
    if((ackstate == QuorumPeer.ServerState.LOOKING) // 推选者服务器为LOOKING状态
       && (n.electionEpoch < logicalclock)){ // 选举周期小于逻辑时钟
        // 创建新的投票
        Vote v = getVote();
        // 构造新的发送消息(本服务器自己的投票)
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                                   v.getId(),
                                   v.getZxid(),
                                   logicalclock,
                                   self.getPeerState(),
                                   response.sid,
                                   v.getPeerEpoch());
        // 将发送消息放置于队列,等待发送
        sendqueue.offer(notmsg);
    }
}

  若本服务器的状态不为LOOKING,则会根据投票服务器中解析的version信息来构造ToSend消息,放入sendqueue,等待发送,起流程如下 

else { // 本服务器状态不为LOOKING
    // 获取当前投票
    Vote current = self.getCurrentVote(); 
    if(ackstate == QuorumPeer.ServerState.LOOKING){ // 为LOOKING状态
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending new notification. My id =  " +
                      self.getId() + " recipient=" +
                      response.sid + " zxid=0x" +
                      Long.toHexString(current.getZxid()) +
                      " leader=" + current.getId());
        }
        ToSend notmsg;
        if(n.version > 0x0) { // 版本号大于0
            // 构造ToSend消息
            notmsg = new ToSend(
                ToSend.mType.notification,
                current.getId(),
                current.getZxid(),
                current.getElectionEpoch(),
                self.getPeerState(),
                response.sid,
                current.getPeerEpoch());
        } else { // 版本号不大于0
            // 构造ToSend消息
            Vote bcVote = self.getBCVote();
            notmsg = new ToSend(
                ToSend.mType.notification,
                bcVote.getId(),
                bcVote.getZxid(),
                bcVote.getElectionEpoch(),
                self.getPeerState(),
                response.sid,
                bcVote.getPeerEpoch());
        }
        // 将发送消息放置于队列,等待发送
        sendqueue.offer(notmsg);
    }
}

3.1.2 WorkerSender

class WorkerSender implements Runnable {
    // 是否终止
    volatile boolean stop;
    // 服务器之间的连接
    QuorumCnxManager manager;
    // 构造器
    WorkerSender(QuorumCnxManager manager){
        // 初始化属性
        this.stop = false;
        this.manager = manager;
    }
    public void run() {
        while (!stop) { // 不终止
            try {
                // 从sendqueue中取出ToSend消息
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                // 若为空,则跳过
                if(m == null) continue;
                // 不为空,则进行处理
                process(m);
            } catch (InterruptedException e) {
                break;
            }
        }
        LOG.info("WorkerSender is down");
    }
    void process(ToSend m) {
        // 构建消息
        ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), 
                                            m.leader,
                                            m.zxid, 
                                            m.electionEpoch, 
                                            m.peerEpoch);
        // 发送消息
        manager.toSend(m.sid, requestBuffer);
    }
}

说明:WorkerSender也实现了Runnable接口,为选票发送器,其会不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中,其过程是将FastLeaderElection的ToSend转化为QuorumCnxManager的Message。

3.2 类的属性

protected class Messenger {
    // 选票发送器
    WorkerSender ws;
    // 选票接收器
    WorkerReceiver wr;
}

说明:Messenger中维护了一个WorkerSender和WorkerReceiver,分别表示选票发送器和选票接收器

3.3 类的构造函数 

Messenger(QuorumCnxManager manager) {
    // 创建WorkerSender
    this.ws = new WorkerSender(manager);
    // 新创建线程
    Thread t = new Thread(this.ws,  "WorkerSender[myid=" + self.getId() + "]");
    // 设置为守护线程
    t.setDaemon(true);
    // 启动
    t.start();
    // 创建WorkerReceiver
    this.wr = new WorkerReceiver(manager);
    // 创建线程
    t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
    // 设置为守护线程
    t.setDaemon(true);
    // 启动
    t.start();
}

说明:会启动WorkerSender和WorkerReceiver,并设置为守护线程。

2.3 类的属性 

public class FastLeaderElection implements Election {
    // 日志
    private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);
    /**
     * Determine how much time a process has to wait
     * once it believes that it has reached the end of
     * leader election.
     */
    // 完成Leader选举之后需要等待时长
    final static int finalizeWait = 200;
    /**
     * Upper bound on the amount of time between two consecutive
     * notification checks. This impacts the amount of time to get
     * the system up again after long partitions. Currently 60 seconds.
     */
    // 两个连续通知检查之间的最大时长
    final static int maxNotificationInterval = 60000;
    /**
     * Connection manager. Fast leader election uses TCP for
     * communication between peers, and QuorumCnxManager manages
     * such connections.
     */
    // 管理服务器之间的连接
    QuorumCnxManager manager;
    // 选票发送队列,用于保存待发送的选票
    LinkedBlockingQueue<ToSend> sendqueue;
    
    // 选票接收队列,用于保存接收到的外部投票
    LinkedBlockingQueue<Notification> recvqueue;
    // 投票者
    QuorumPeer self;
    Messenger messenger;
    // 逻辑时钟
    volatile long logicalclock; /* Election instance */
    // 推选的leader的id
    long proposedLeader;
    // 推选的leader的zxid
    long proposedZxid;
    // 推选的leader的选举周期
    long proposedEpoch;
    // 是否停止选举
    volatile boolean stop;
}

说明:其维护了服务器之间的连接(用于发送消息)、发送消息队列、接收消息队列、推选者的一些信息(zxid、id)、是否停止选举流程标识等。

2.4 类的构造函数 

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
     // 字段赋值
     this.stop = false;
     this.manager = manager;
     // 初始化其他信息
     starter(self, manager);
 }

说明:构造函数中初始化了stop字段和manager字段,并且调用了starter函数,其源码如下   

private void starter(QuorumPeer self, QuorumCnxManager manager) {
     // 赋值,对Leader和投票者的ID进行初始化操作
     this.self = self;
     proposedLeader = -1;
     proposedZxid = -1;
     // 初始化发送队列
     sendqueue = new LinkedBlockingQueue<ToSend>();
     // 初始化接收队列
     recvqueue = new LinkedBlockingQueue<Notification>();
     // 创建Messenger,会启动接收器和发送器线程
     this.messenger = new Messenger(manager);
 }

说明:其完成在构造函数中未完成的部分,如会初始化FastLeaderElection的sendqueue和recvqueue,并且启动接收器和发送器线程。

2.5 核心函数分析

1. sendNotifications函数  

private void sendNotifications() {
    for (QuorumServer server : self.getVotingView().values()) { // 遍历投票参与者集合
        long sid = server.id;
        // 构造发送消息
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                                   proposedLeader,
                                   proposedZxid,
                                   logicalclock,
                                   QuorumPeer.ServerState.LOOKING,
                                   sid,
                                   proposedEpoch);
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
                      " (n.round), " + sid + " (recipient), " + self.getId() +
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        // 将发送消息放置于队列
        sendqueue.offer(notmsg);
    }
}

说明:其会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,其并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送。

2. totalOrderPredicate函数 

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
              Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
    if(self.getQuorumVerifier().getWeight(newId) == 0){ // 使用计票器判断当前服务器的权重是否为0
        return false;
    }
    // 1. 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader
    // 2. 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader
    // 3. 如果前面两个都相等那就比较服务器id,如果大,则其就是leader
    return ((newEpoch > curEpoch) ||
            ((newEpoch == curEpoch) &&
             ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

说明:该函数将接收的投票与自身投票进行PK,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。

相关文章
|
13天前
|
数据采集 人工智能 安全
|
8天前
|
编解码 人工智能 自然语言处理
⚽阿里云百炼通义万相 2.6 视频生成玩法手册
通义万相Wan 2.6是全球首个支持角色扮演的AI视频生成模型,可基于参考视频形象与音色生成多角色合拍、多镜头叙事的15秒长视频,实现声画同步、智能分镜,适用于影视创作、营销展示等场景。
648 4
|
8天前
|
机器学习/深度学习 人工智能 前端开发
构建AI智能体:七十、小树成林,聚沙成塔:随机森林与大模型的协同进化
随机森林是一种基于决策树的集成学习算法,通过构建多棵决策树并结合它们的预测结果来提高准确性和稳定性。其核心思想包括两个随机性:Bootstrap采样(每棵树使用不同的训练子集)和特征随机选择(每棵树分裂时只考虑部分特征)。这种方法能有效处理大规模高维数据,避免过拟合,并评估特征重要性。随机森林的超参数如树的数量、最大深度等可通过网格搜索优化。该算法兼具强大预测能力和工程化优势,是机器学习中的常用基础模型。
350 164
|
7天前
|
机器学习/深度学习 自然语言处理 机器人
阿里云百炼大模型赋能|打造企业级电话智能体与智能呼叫中心完整方案
畅信达基于阿里云百炼大模型推出MVB2000V5智能呼叫中心方案,融合LLM与MRCP+WebSocket技术,实现语音识别率超95%、低延迟交互。通过电话智能体与座席助手协同,自动化处理80%咨询,降本增效显著,适配金融、电商、医疗等多行业场景。
359 155