程序员必备的十大技能(进阶版)之分布式核心技术(二)

简介: 教程来源 http://xcfsr.cn/ Paxos、Raft、ZAB是三大分布式一致性协议:Paxos理论奠基但复杂;Raft分治设计(选举/日志/安全),易理解易实现;ZAB为ZooKeeper定制,基于ZXID实现原子广播与崩溃恢复。

三、分布式一致性协议

3.1 Paxos协议详解
Paxos是Leslie Lamport提出的分布式一致性协议,虽难理解但却是后续协议的基础。

// Paxos角色
public enum PaxosRole {
    PROPOSER,   // 提案者:提出value
    ACCEPTOR,   // 接受者:投票决定value
    LEARNER     // 学习者:学习最终决议
}

// Paxos两阶段流程
public class PaxosProtocol {

    // Phase 1: Prepare阶段
    // 1. Proposer生成提案编号n,向多数派Acceptor发送Prepare(n)
    // 2. Acceptor收到Prepare(n):若n > 已响应的最大编号,则承诺不再接受小于n的提案,并返回已接受的最高编号提案

    // Phase 2: Accept阶段
    // 1. Proposer收到多数派响应后,选择编号最大的提案值v,发送Accept(n, v)
    // 2. Acceptor收到Accept(n, v):若未响应过大于n的Prepare,则接受提案

    private static class Proposal {
        long number;
        Object value;
    }

    private static class Acceptor {
        private long lastPreparedNumber = 0;      // 已响应的最大Prepare编号
        private Proposal acceptedProposal = null; // 已接受的提案(编号和值)

        // 处理Prepare请求
        public synchronized PrepareResponse prepare(long proposalNumber) {
            if (proposalNumber > lastPreparedNumber) {
                lastPreparedNumber = proposalNumber;
                return new PrepareResponse(true, acceptedProposal);
            }
            return new PrepareResponse(false, acceptedProposal);
        }

        // 处理Accept请求
        public synchronized boolean accept(long proposalNumber, Object value) {
            if (proposalNumber >= lastPreparedNumber) {
                lastPreparedNumber = proposalNumber;
                acceptedProposal = new Proposal();
                acceptedProposal.number = proposalNumber;
                acceptedProposal.value = value;
                return true;
            }
            return false;
        }
    }

    private static class Proposer {
        private long proposalNumber;
        private Acceptor[] acceptors;

        public Object propose(Object value) {
            // Phase 1: Prepare
            proposalNumber = generateNextNumber();
            List<PrepareResponse> prepareResponses = new ArrayList<>();

            for (Acceptor acceptor : acceptors) {
                PrepareResponse response = acceptor.prepare(proposalNumber);
                if (response.isPromised) {
                    prepareResponses.add(response);
                }
            }

            // 检查是否获得多数派承诺
            if (prepareResponses.size() <= acceptors.length / 2) {
                throw new RuntimeException("无法获得多数派承诺");
            }

            // 选择编号最大的已接受提案的值
            Proposal maxProposal = null;
            for (PrepareResponse response : prepareResponses) {
                if (response.acceptedProposal != null) {
                    if (maxProposal == null || 
                        response.acceptedProposal.number > maxProposal.number) {
                        maxProposal = response.acceptedProposal;
                    }
                }
            }

            // Phase 2: Accept
            Object acceptValue = (maxProposal != null) ? maxProposal.value : value;
            int acceptCount = 0;

            for (Acceptor acceptor : acceptors) {
                if (acceptor.accept(proposalNumber, acceptValue)) {
                    acceptCount++;
                }
            }

            if (acceptCount > acceptors.length / 2) {
                return acceptValue;
            }

            throw new RuntimeException("无法获得多数派接受");
        }
    }
}

3.2 Raft协议详解
Raft是更易于理解的分布式一致性协议,将问题分解为:领导者选举、日志复制、安全性。

public class RaftNode {

    // 节点状态
    private enum NodeState {
        FOLLOWER, CANDIDATE, LEADER
    }

    private NodeState state = NodeState.FOLLOWER;

    // 持久化状态(所有节点)
    private long currentTerm = 0;              // 当前任期号
    private String votedFor = null;            // 当前任期投票给谁
    private List<LogEntry> log = new ArrayList<>(); // 日志条目

    // 易失性状态(所有节点)
    private long commitIndex = 0;              // 已知已提交的最高日志索引
    private long lastApplied = 0;              // 已应用到状态机的最高日志索引

    // 易失性状态(仅Leader)
    private Map<String, Long> nextIndex = new ConcurrentHashMap<>();   // 每个节点下一个日志索引
    private Map<String, Long> matchIndex = new ConcurrentHashMap<>();  // 每个节点已复制的最高索引

    private final List<RaftNode> peers;         // 集群中其他节点
    private final Timer electionTimer;          // 选举超时定时器
    private final Timer heartbeatTimer;         // 心跳定时器(Leader用)

    static class LogEntry {
        long term;          // 任期号
        int index;          // 日志索引
        String command;     // 命令(如SET key value)

        LogEntry(long term, int index, String command) {
            this.term = term;
            this.index = index;
            this.command = command;
        }
    }

    // 请求投票RPC
    private class RequestVoteRPC {
        long term;
        String candidateId;
        long lastLogIndex;
        long lastLogTerm;

        RequestVoteRPC(long term, String candidateId, long lastLogIndex, long lastLogTerm) {
            this.term = term;
            this.candidateId = candidateId;
            this.lastLogIndex = lastLogIndex;
            this.lastLogTerm = lastLogTerm;
        }
    }

    private class RequestVoteResponse {
        long term;
        boolean voteGranted;

        RequestVoteResponse(long term, boolean voteGranted) {
            this.term = term;
            this.voteGranted = voteGranted;
        }
    }

    // 领导者选举
    private void startElection() {
        state = NodeState.CANDIDATE;
        currentTerm++;
        votedFor = getNodeId();

        RequestVoteRPC request = new RequestVoteRPC(
            currentTerm, 
            getNodeId(), 
            getLastLogIndex(), 
            getLastLogTerm()
        );

        int votesReceived = 1;  // 投票给自己
        for (RaftNode peer : peers) {
            new Thread(() -> {
                RequestVoteResponse response = peer.requestVote(request);
                synchronized (this) {
                    if (response.term > currentTerm) {
                        // 发现更高任期,退化为Follower
                        becomeFollower(response.term);
                        return;
                    }
                    if (response.voteGranted) {
                        votesReceived++;
                        if (votesReceived > peers.size() / 2 && state == NodeState.CANDIDATE) {
                            becomeLeader();
                        }
                    }
                }
            }).start();
        }

        // 重置选举计时器
        resetElectionTimer();
    }

    // 处理投票请求
    private RequestVoteResponse requestVote(RequestVoteRPC request) {
        synchronized (this) {
            // 如果请求的任期小于当前任期,拒绝
            if (request.term < currentTerm) {
                return new RequestVoteResponse(currentTerm, false);
            }

            // 如果请求的任期大于当前任期,成为Follower
            if (request.term > currentTerm) {
                becomeFollower(request.term);
            }

            // 检查日志是否至少与本地一样新
            boolean logUpToDate = isLogUpToDate(request.lastLogTerm, request.lastLogIndex);

            // 如果尚未投票给其他人,或者投票给了同一个候选人,且日志足够新
            if ((votedFor == null || votedFor.equals(request.candidateId)) && logUpToDate) {
                votedFor = request.candidateId;
                resetElectionTimer();
                return new RequestVoteResponse(currentTerm, true);
            }

            return new RequestVoteResponse(currentTerm, false);
        }
    }

    // 日志复制
    private void replicateLog() {
        for (RaftNode peer : peers) {
            long nextIdx = nextIndex.getOrDefault(peer.getNodeId(), 1L);
            List<LogEntry> entries = new ArrayList<>();
            for (long i = nextIdx; i <= getLastLogIndex(); i++) {
                entries.add(log.get((int) i - 1));  // 索引从1开始
            }

            AppendEntriesRPC request = new AppendEntriesRPC(
                currentTerm,
                getNodeId(),
                nextIdx - 1,
                getTermAtIndex(nextIdx - 1),
                entries,
                commitIndex
            );

            peer.appendEntries(request, response -> {
                synchronized (this) {
                    if (response.success) {
                        // 更新matchIndex和nextIndex
                        long newMatchIndex = nextIdx + entries.size() - 1;
                        matchIndex.put(peer.getNodeId(), newMatchIndex);
                        nextIndex.put(peer.getNodeId(), newMatchIndex + 1);

                        // 提交日志
                        updateCommitIndex();
                    } else {
                        // 复制失败,减小nextIndex重试
                        if (response.term > currentTerm) {
                            becomeFollower(response.term);
                        } else {
                            nextIndex.put(peer.getNodeId(), Math.max(1, nextIdx - 1));
                        }
                    }
                }
            });
        }
    }

    // 更新提交索引
    private void updateCommitIndex() {
        for (int i = (int) commitIndex + 1; i <= getLastLogIndex(); i++) {
            // 检查日志是否在当前任期
            if (log.get(i - 1).term != currentTerm) {
                continue;
            }

            // 统计有多少节点复制了这条日志
            int replicatedCount = 1;  // 自己
            for (Long matchIdx : matchIndex.values()) {
                if (matchIdx >= i) {
                    replicatedCount++;
                }
            }

            // 超过半数节点复制,提交
            if (replicatedCount > peers.size() / 2) {
                commitIndex = i;
                applyLogToStateMachine();
            }
        }
    }

    // 成为Leader后的初始化
    private void becomeLeader() {
        state = NodeState.LEADER;
        log.info("节点 {} 成为Leader,任期 {}", getNodeId(), currentTerm);

        // 初始化nextIndex和matchIndex
        long lastLogIndex = getLastLogIndex();
        for (RaftNode peer : peers) {
            nextIndex.put(peer.getNodeId(), lastLogIndex + 1);
            matchIndex.put(peer.getNodeId(), 0L);
        }

        // 启动心跳
        startHeartbeat();
    }

    // 成为Follower
    private void becomeFollower(long term) {
        state = NodeState.FOLLOWER;
        currentTerm = term;
        votedFor = null;
        resetElectionTimer();
        log.info("节点 {} 成为Follower,任期 {}", getNodeId(), term);
    }
}

3.3 ZooKeeper的ZAB协议
ZAB协议是ZooKeeper使用的原子广播协议,类似Raft但有其特色。

// ZooKeeper节点状态
public class ZKNode {

    // ZXID结构:高32位为epoch(纪元),低32位为counter
    public static long createZxid(long epoch, long counter) {
        return (epoch << 32) | (counter & 0xFFFFFFFFL);
    }

    // 启动模式
    public enum ServerState {
        LOOKING,   // 寻找Leader
        FOLLOWING, // 跟随者
        LEADING,   // 领导者
        OBSERVING  // 观察者(不参与投票)
    }

    // 事务处理
    public class ZKDatabase {
        private final TreeMap<Long, TxnLogEntry> txnLog = new TreeMap<>();
        private final DataTree dataTree = new DataTree();

        public long processTxn(TxnHeader header, Record txn) {
            long zxid = header.getZxid();
            txnLog.put(zxid, new TxnLogEntry(header, txn, dataTree));
            return zxid;
        }

        // 数据同步
        public void sync(long peerZxid) {
            // 获取peerZxid之后的所有事务
            SortedMap<Long, TxnLogEntry> proposals = txnLog.tailMap(peerZxid + 1);
            for (Map.Entry<Long, TxnLogEntry> entry : proposals.entrySet()) {
                // 发送给Follower
                sendProposal(entry.getValue());
            }
        }
    }

    // 领导者选举(FastLeaderElection)
    public class FastLeaderElection {

        private static class Vote {
            long zxid;
            long sid;  // Server ID

            boolean isBetter(Vote other) {
                if (zxid != other.zxid) {
                    return zxid > other.zxid;
                }
                return sid > other.sid;
            }
        }

        public Vote lookForLeader() {
            // 投票给自己
            Vote selfVote = new Vote();
            selfVote.zxid = getLastZxid();
            selfVote.sid = getServerId();

            sendNotifications(selfVote);

            // 收集其他节点的投票
            Map<Long, Vote> votes = new ConcurrentHashMap<>();
            votes.put(selfVote.sid, selfVote);

            while (true) {
                Notification n = recvQueue.poll(500, TimeUnit.MILLISECONDS);
                if (n == null) {
                    // 超时,重新发送通知
                    sendNotifications(selfVote);
                    continue;
                }

                Vote receivedVote = new Vote();
                receivedVote.zxid = n.zxid;
                receivedVote.sid = n.sid;

                // 更新逻辑时钟
                if (n.state == ServerState.LOOKING && n.logicalClock > logicalClock) {
                    logicalClock = n.logicalClock;
                    receivedVote = selfVote;
                }

                if (isBetterVote(receivedVote, selfVote)) {
                    selfVote = receivedVote;
                    sendNotifications(selfVote);
                }

                votes.put(n.sid, receivedVote);

                // 检查是否获得多数票
                int count = 0;
                for (Vote v : votes.values()) {
                    if (v.sid == selfVote.sid && v.zxid == selfVote.zxid) {
                        count++;
                    }
                }

                if (count > getQuorumSize()) {
                    // 确认投票
                    if (n.state == ServerState.LEADING) {
                        return selfVote;
                    }
                }
            }
        }
    }
}

来源:
http://htnus.cn/

相关文章
|
3天前
|
人工智能 API 开发者
阿里云发布为Agent而生的全新AI产品官网“千问云”,模型服务全面Skill、CLI化
5月20日,阿里云发布“千问云”(www.qianwenai.com)——专为Agent时代打造的AI模型服务平台,集成150+主流模型API,首创Skills与CLI工具链,支持模型选型、调用、用量管理等全链路自动化,助力开发者与Agent高效构建AI应用。
513 16
|
存储 安全 算法
一文理解UDS安全访问服务(0x27)
一文理解UDS安全访问服务(0x27)
一文理解UDS安全访问服务(0x27)
|
9天前
|
设计模式 人工智能 JSON
Agent Skill规范、构建与设计模式
文章从 Skill 的规范格式、三层渐进式加载机制、模型驱动触发逻辑出发,深入解析 Skill-Creator 的工程化开发范式。(文章内容基于作者个人技术实践与独立思考,旨在分享经验,仅代表个人观点。)
Agent Skill规范、构建与设计模式
|
3天前
|
消息中间件 负载均衡 算法
程序员必备的十大技能(进阶版)之分布式核心技术(一)
教程来源 http://unbgv.cn/ 本文系统剖析分布式核心技术,涵盖CAP/BASE理论、服务治理、一致性协议、分布式事务、锁、消息中间件、负载均衡、存储及可观测性九大维度,直击微服务演进中的核心挑战与落地实践。
|
3天前
|
人工智能
阿里云ai模型Token活动手动整理:免费领百炼Tokens、CodingPlan、TokenPlan及节省计划活动
2026年阿里云AI大模型Token五大优惠:开通百炼即领7000万Tokens(每模型100万,有效期90天);Coding Plan Pro版200元/月限量抢购;Token Plan团队版198元起免抢直购;AI节省计划最高5.3折;按量付费满200元返200元。
128 1
|
23天前
|
前端开发 JavaScript vr&ar
前端组件库 ——A‑Frame 知识点大全(一)
教程来源 https://tmywi.cn A-Frame 是 Mozilla 开发的开源 WebXR 框架,基于 Three.js 与 WebGL,以声明式 HTML(如 `&lt;a-scene&gt;` `&lt;a-box&gt;`)降低 VR/AR 开发门槛。支持跨平台运行,兼具组件化、高性能与易扩展性,让熟悉 HTML 的开发者快速构建沉浸式 3D 应用。
|
23天前
|
自然语言处理 前端开发 容器
前端组件库 ——FormMaking 知识点大全(二)
教程来源 https://zlpow.cn FormMaking字段体系涵盖基础、高级与布局三类字段,支持丰富配置;全局配置含国际化、默认属性及字段标识绑定,助力低代码高效构建企业级表单。
|
24天前
|
人工智能 API Go
AI中,几乎每天都在说的“Token”到底是什么?90%的人不知道!
把“请帮我写一篇关于如何学好Python的详细文章,字数大约2000字,要包含代码示例,语气要亲切”
3234 1
|
3天前
|
设计模式 网络协议 Java
程序员必备的十大技能(进阶版)之网络与高并发原理(一)
教程来源 http://yyvgt.cn/ 本文深度解析网络与高并发核心原理,涵盖TCP/IP协议栈、三次握手/四次挥手、滑动窗口、拥塞控制、五种I/O模型(含epoll机制)、零拷贝、Reactor/Proactor、Java NIO/Netty源码及全链路调优,助你突破性能瓶颈,成为高阶工程师。
|
3天前
|
存储 缓存 负载均衡
程序员必备的十大技能(进阶版)之分布式核心技术(五)
教程来源 http://yvyus.cn/ 本节系统讲解分布式核心能力:涵盖加权随机、最少活跃连接等负载均衡算法;Failover、Forking等容错策略;多级缓存与一致性保障;虚拟桶分片与平滑迁移;OpenTelemetry链路追踪及全链路日志透传,助力构建高可用、可观测的分布式系统。

热门文章

最新文章