引言
Master选举的意义在于集群主节点在遭遇宕机时保障服务的可用性。
理解选举机制及相关算法,有利于了解ES底层的高可用原理,并学习及借鉴其设计思想。
理解Master选举
的另一个重要原因是:其为 ES 常见面试题之一!
本文我将结合Elasticsearch源码、文字、绘图的方式剖析Master选举的完整过程。有任何问题,欢迎在社区群与我探讨。
Master选举完整流程:
1、Master选举中的几个重要角色
- 主节点(active master):一般指的是集群中活跃的主节点,每个集群中只能有一个。
- 候选节点(master node):具备
master
角色的节点默认都有“被选举权”,即是一个候选节点。候选节点可以参与Master选举过程 - **投票节点(master node):**每个候选节点默认都有投票权,即每个候选节点默认都是一个投票节点,但如果配置了“voting_only ”的候选节点将只有选举权而没有被选举权,即仅投票节点。
- 专用主节点:即 node.roles: [master],一般指的是只保留master角色的候选节点。
- 仅投票节点:即 node.roles: [master, voting_only],指仅具备选举权,而被阉割了被选举权的master节点。仅投票节点的意义是在出现平票的时候投出关键票(决胜票)。仅投票节点因为没有被选举权,因此永远不会被选举为active master,即永远不会成为活跃的主节点,因此通常同时配置为数据节点,以提高资源利用率。
2、选举何时会发生(何时触发选举)
2.1 节点失效检测
在 Elasticsearch 源码的描述文件中有这样一段描述:
There are two fault detection processes running. The first is by the master, to ping all the other nodes in the cluster and verify that they are alive. And on the other end, each node pings to master to verify if its still alive or an election process needs to be initiated
从上述信息得知,在ES中有两个和选举相关的工作进程专门用于检查节点的存活状态,分别为:
- NodesFaultDetection:即NodesFD,用于定期检查集群中的节点是否存活。
- MasterFaultDetection:即MasterFD,作用是定期检查Master节点是否存活。
他们分别会通过 ping 的方式定期检测集群中的普通节点和 master 节点是否存活。
2.2 触发选举的两种情况
- 当
master-eligible
节点数量小于法定票数:当主节点侦测到候选节点数量小于法定票数的时候,会主动放弃主节点身份。 - 当主节点宕机。
3、选主流程
3.1 连接线程实现:innerJoinCluster
org.elasticsearch.discovery.zen.ZenDiscovery
连接线程的主要功能,这个函数保证在失效时加入集群或生成一个新的连接线程.
private void innerJoinCluster() { // 定义临时Master节点 DiscoveryNode masterNode = null; final Thread currentThread = Thread.currentThread(); // 开启选举上下文 nodeJoinController.startElectionContext(); // 如果 masterNode 不存在,连接线程控制已启动,并且提供的线程是当前活跃的joinThread while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) { // 选出临时Master,因为此时节点状态还没有发布给集群 masterNode = findMaster(); } // thread不再在currentJoinThread中,停止 if (!joinThreadControl.joinThreadActive(currentThread)) { logger.trace("thread is no longer in currentJoinThread. Stopping."); return; } // 如果当前节点就是 master 节点 if (transportService.getLocalNode().equals(masterNode)) { final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins); nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout, new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { synchronized (stateMutex) { joinThreadControl.markThreadAsDone(currentThread); } } @Override public void onFailure(Throwable t) { logger.trace("failed while waiting for nodes to join, rejoining", t); synchronized (stateMutex) { joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } } } ); } else { // 组织任何尝试加入请求,因为当前节点不是master节点 nodeJoinController.stopElectionContext(masterNode + " elected"); // 当前节点向Master节点发送加入集群请求 final boolean success = joinElectedMaster(masterNode); synchronized (stateMutex) { if (success) { DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode(); if (currentMasterNode == null) { // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have // a valid master. logger.debug("no master node is set, despite of join request completing. retrying pings."); joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } else if (currentMasterNode.equals(masterNode) == false) { // 更新集群状态信息 joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join"); } joinThreadControl.markThreadAsDone(currentThread); } else { // 如果加入失败,再次尝试加入 joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } } } }
3.2 发现节点:DiscoveryNode
DiscoveryNode为集群发现节点信息,为所有 Discover 模块下节点的基类。包含节点的名称、节点Id、不同集群状态下的版本号、节点角色等信息。
3.3 选举临时Master节点:findMaster()
选主的入口方法为:findMaster() 方法。其作用为选出临时Master节点。完整的过程代码如下:
private DiscoveryNode findMaster() { // 获取当前集群活动节点列表,不包含本节点,pingTimeout:默认3秒 List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList(); if (fullPingResponses == null) { logger.trace("No full ping responses"); return null; } if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); if (fullPingResponses.size() == 0) { sb.append(" {none}"); } else { for (ZenPing.PingResponse pingResponse : fullPingResponses) { sb.append("\n\t--> ").append(pingResponse); } } logger.trace("full ping responses:{}", sb); } // 获取当前节点 final DiscoveryNode localNode = transportService.getLocalNode(); // 本地节点在不在活动节点列表中 assert fullPingResponses.stream().map(ZenPing.PingResponse::node) .filter(n -> n.equals(localNode)).findAny().isPresent() == false; // 添加本地节点 fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState())); // 过滤无效选票,即不具备候选节点资格的节点的投票 final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger); // 定义activeMasters,存儲当前活跃的activeMaster列表,如果此列表为空,代表集祥中没有主节点,此时从masterCandidates(候选节点列表)中选Master List<DiscoveryNode> activeMasters = new ArrayList<>(); // 我们不能在pingMasters列表中包合本地节点,否则我们可能会在没有来自ZenDiscover#inner#Joincluster() // 中其他节点的任何检查/验证的情况下选择自己 // 首先過历所有节点,将每个节点所认为的当前Master节点加入activeMaster列表中(不包含本节点),正常情况下应为一个 for (ZenPing.PingResponse pingResponse : pingResponses) { // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without // any check / verifications from other nodes in ZenDiscover#innerJoinCluster() // pingResponse.master()为当前节点所认为的主节点,即潘丹当前节点所认为的主节点存在,并且不是当前节点本身 if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) { activeMasters.add(pingResponse.master()); } } // nodes discovered during pinging // ping期间发现的节点,去掉不具备master-eligible资格的节点(候选节点列表) List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>(); for (ZenPing.PingResponse pingResponse : pingResponses) { // 如果响应当前发出ping请求的节点是一个候选节点 if (pingResponse.node().isMasterNode()) { masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion())); } } // activeMaster不存在 if (activeMasters.isEmpty()) { // 判断候选节点的数量是否满足 candidates.size() >= minimumMasterNodes:候选节点数 大于 法定票数 if (electMaster.hasEnoughCandidates(masterCandidates)) { // 选出 获胜者 选举过程 final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates); logger.trace("candidate {} won election", winner); return winner.getNode(); } else { // if we don't have enough master nodes, we bail, because there are not enough master to elect from // 票数不足 达不到法定最小票数 logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again", masterCandidates, electMaster.minimumMasterNodes()); return null; } } else { // 如果存在 active master,判断是否当前节点 assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master"; // lets tie break between discovered nodes return electMaster.tieBreakActiveMasters(activeMasters); } }
当主节点不存在的时候,首先判断候选节点的数量是否满足:候选节点数 > 法定票数,即:candidates.size() >= minimumMasterNodes
只有满足条件时,才会发起选举,判断是否满足法定票数的逻辑如下所示:
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) { if (candidates.isEmpty()) { return false; } if (minimumMasterNodes < 1) { return true; } assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() : "duplicates ahead: " + candidates; return candidates.size() >= minimumMasterNodes; }
选主实现:electMaster
实现:org.elasticsearch.discovery.zen.ElectMasterService#electMaster
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) { assert hasEnoughCandidates(candidates); List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates); sortedCandidates.sort(MasterCandidate::compare); return sortedCandidates.get(0); }
调用 ElectMasterService.electMaster
选举的是排序后的第一个MasterCandidate(即master-eligible node)。
先按照集群状态排序,最新的状态排在前面,再按照节点id排序,id小的排在前面
集群状态版本越新就排在越前面,当clusterStateVersion越大,优先级越高。这是为了保证新Master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失。
因为Master当选后,就会以这个版本的clusterState为基础进行更新。
// 集群状态版本越新就排在越前面,当clusterStateVersion越大,优先级越高。 // 这是为了保证新Master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失。 // 因为Master当选后,就会以这个版本的clusterState为基础进行更新。 // (一个例外是集群全部重启,所有节点都没有meta,需要先选出一个master,然后master再通过持久化的数据进行meta恢复,再进行meta同步)。 public static int compare(MasterCandidate c1, MasterCandidate c2) { // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted // list, so if c2 has a higher cluster state version, it needs to come first. int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion); // 版本相同的时候,按照节点的Id比较(Id为节点第一次启动时随机生成) // 当clusterStateVersion相同时,节点的Id越小,优先级越高。即总是倾向于选择Id小的Node, // 这个Id是节点第一次启动时生成的一个随机字符串。之所以这么设计,是为了让选举结果尽可能稳定,不要出现都想当master而选不出来的情况。 if (ret == 0) { ret = compareNodes(c1.getNode(), c2.getNode()); } return ret; }
此时为临时activeMaster吗,因为此时节点状态还没有发布到集群状态信息里。
未完待续…
声明:
本文为博主原创,文中图片均为自己绘制,保留有Processon原图,任何雷同图片均出自于此,本文无任何搬运,只是在当前平台发布时间较晚。任何质疑,本人都可提供最早发布的证据!
感谢各位博友的支持与厚爱!