Etcd源码分析: put流程

简介: put操作 put操作是etcd v3 client支持的命令,和v2的set用法差不多 但是需要注意的是,如果你在一个3节点的etcd集群中,A节点切换为v3 client版本,然后put进了一对key-value,在B节点,还是v2的client,这个时候你get不到数据的,如果在B节点切换到.

put操作

put操作是etcd v3 client支持的命令,和v2的set用法差不多

但是需要注意的是,如果你在一个3节点的etcd集群中,A节点切换为v3 client版本,然后put进了一对key-value,在B节点,还是v2的client,这个时候你get不到数据的,如果在B节点切换到了v3 client,这个时候才可以get到数据

简单的说,v2和v3 client,插入数据到同一个etcd集群中,数据不能互通

put流程分析

client端

put命令接受的入口,在/etcdctl/ctlv3/command/put_command.go中的NewPutCommand()函数中,采用了一个corba结构体接受命令参数,实际Run执行的命令是putCommandFunc()

func putCommandFunc(cmd *cobra.Command, args []string) {
    key, value, opts := getPutOp(cmd, args)  //解析参数

    ctx, cancel := commandCtx(cmd)
    resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
    cancel()
    if err != nil {
        ExitWithError(ExitError, err)
    }
    display.Put(*resp) //打印返回的结果
}

该函数的核心就是mustClientFromCmd(cmd).Put(ctx, key, value, opts...)

mustClientFromCmd()函数返回的是一个clientv3.Client结构体指针

相当于上面调用了clientv3.Client.Put()

/clientV3/client.go 中,可以看到Client 结构体的定义,

type Client struct {
    Cluster
    KV
    Lease

里面内嵌一个KV的interface,由下面的代码具体实现接口

func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
    r, err := kv.Do(ctx, OpPut(key, val, opts...))
    return r.put, toErr(ctx, err)
}

继续往下,是到了func (c *kVClient) Put(......),该函数里面,通过grpc的调用发到了server端 grpc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, c.cc, opts...)

server端

/etcdserver/etcdserverpb/rpc.pb.go里面,可以看到上面定义的ServiceName和MethodName,可以找到对应的方法_KV_Put_Handler

var _KV_serviceDesc = grpc.ServiceDesc{
    ServiceName: "etcdserverpb.KV",
    HandlerType: (*KVServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "Range",
            Handler:    _KV_Range_Handler,
        },
        {
            MethodName: "Put",
            Handler:    _KV_Put_Handler,
        },

往下追踪 srv.(KVServer).Put(ctx, in) -> (s *EtcdServer) Put() -> ··· ··· -> (s *EtcdServer) processInternalRaftRequestOnce(...)

在该函数里面有一句关键调用 s.r.Propose(cctx, data)

sEtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了

(n *node) Propose() -> step(), 该函数代码较短,来看看

func (n *node) step(ctx context.Context, m pb.Message) error {
    ch := n.recvc
    if m.Type == pb.MsgProp {
        ch = n.propc
    }

    select {
    case ch <- m:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
}

这段代码主要就是根据消息类型来把传进来的pb.Message赋值给channel n.recvc或者n.propc,上面的Propose()定义了pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}),所以就是赋值给了n.propc

raft协议里的流程

在一个raft集群启动完成以后, (n *node) run() 函数就是其运行的主函数, 里面是一个死循环, 循环中会根据channel来响应各种事件, 从而跳转状态

上节有说到channel propc里面被塞入了数据, run() 函数里面就会有对应的处理,代码如下:

        case m := <-propc:
            m.From = r.id
            r.Step(m)

对应的(r *raft) Step(m pb.Message)函数也是raft协议中的核心函数, 负责状态机的跳转, 里面主要有2个逻辑

  • 根据传进来的term和本身的term的大小, 决定要做的动作, 具体的逻辑原理可见raft协议原理;
  • 根据消息类型m.Type做不同的处理

注意其最后一段

    default:
        r.step(r, m)

step()是一个类似函数借口的东西,根据节点的类型不同而调用不同的函数,比如leader节点该函数就是raft.stepLeader()

在node run的死循环中,看看开头

             // readyc 和 advance 只有一个是有效值
        if advancec != nil {
            readyc = nil
        } else {
            rd = newReady(r, prevSoftSt, prevHardSt)
                   
                   // 如果raft.msgs中队列大小不为0 也会返回true 表示有数据发出
            if rd.containsUpdates() {
                readyc = n.readyc
            } else {
                readyc = nil
            }
        }

如果advancec是nil,说明刚commit了,可以创建ready channel来继续去把ready commit的commit了。

(未完待续)

相关文章
|
XML JSON Go
etcd源码分析 - 3.【打通核心流程】PUT键值对的执行链路
在上一讲,我们一起看了etcd server是怎么匹配到对应的处理函数的,如果忘记了请回顾一下。 今天,我们再进一步,看看`PUT`操作接下来是怎么执行的。
243 0
|
存储 JSON NoSQL
ETCD教程-4.深入ETCD
目前etcd主要经历了3个大的版本,分别为etcd 0.4版本、etcd 2.0版本和etcd 3.0版本。
1143 0
ETCD教程-4.深入ETCD
|
负载均衡 算法 微服务
基于gRPC的注册发现与负载均衡的原理和实战
基于gRPC的注册发现与负载均衡的原理和实战
|
5月前
|
存储 运维 Kubernetes
Java启动参数JVM_OPTS="-Xms512m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError"
本文介绍了Java虚拟机(JVM)常用启动参数配置,包括设置初始堆内存(-Xms512m)、最大堆内存(-Xmx1024m)及内存溢出时生成堆转储文件(-XX:+HeapDumpOnOutOfMemoryError),用于性能调优与故障排查。
502 0
|
9月前
|
存储 人工智能 缓存
DeepSeek 3FS解读与源码分析(1):高效训练之道
本文从设计文档和源码,深入对 3FS 在文件系统和 AI workload 方面做一系列的解读。如有错误欢迎指正。
|
10月前
|
Java
java引入本地 MultipartFile 实现多部分文件上传
在Java中,`MultipartFile`通常用于处理通过HTML表单上传的文件。但在某些情况下,需要直接从本地文件系统获取文件并上传。本文介绍如何创建一个实现了`MultipartFile`接口的本地类`LocalMultipartFile`,将本地文件转换为`MultipartFile`对象,简化文件上传流程。此方法适用于批量上传等场景,避免了表单上传的复杂性。代码示例展示了如何实现和使用该类进行文件上传操作。作者:华科云商小彭。链接:[稀土掘金](https://juejin.cn/post/7377559533785530431)。
500 18
|
10月前
|
存储 数据可视化 搜索推荐
「从0到1搭建知识库:设计团队的效率革命」
在快节奏的设计行业中,团队协作的效率和质量至关重要。本文探讨了如何通过搭建高效的设计团队知识库解决信息分散、规范不统一等问题,提升团队协同效率。内容涵盖知识库的核心价值、常见痛点、搭建方法、运营策略及未来趋势,帮助团队实现从混乱到高效的转变。知识库不仅能统一设计规范、沉淀最佳实践,还能提高新人上手速度。文章还介绍了内容框架设计、流程化管理、工具赋能等黄金方法论,并展望了智能化与场景化的未来趋势。
685 10
|
机器学习/深度学习 编解码 算法
ICCV 2023 | 当尺度感知调制遇上Transformer,会碰撞出怎样的火花?
近年来,基于Transformer和CNN的视觉基础模型取得巨大成功。有许多研究进一步地将Transformer结构与CNN架构结合,设计出了更为高效的hybrid CNN-Transformer Network,但它们的精度仍然不尽如意。本文介绍了一种新的基础模型SMT(Scale-Aware Modulation Transformer),它以更低的参数量(params)和计算量(flops)取得了大幅性能的提升。
|
存储 缓存 索引
etcd raft 处理流程图系列3-wal的存储和运行
etcd raft 处理流程图系列3-wal的存储和运行
221 1
|
Kubernetes API 索引