Etcd源码分析: put流程

简介: put操作 put操作是etcd v3 client支持的命令,和v2的set用法差不多 但是需要注意的是,如果你在一个3节点的etcd集群中,A节点切换为v3 client版本,然后put进了一对key-value,在B节点,还是v2的client,这个时候你get不到数据的,如果在B节点切换到.

put操作

put操作是etcd v3 client支持的命令,和v2的set用法差不多

但是需要注意的是,如果你在一个3节点的etcd集群中,A节点切换为v3 client版本,然后put进了一对key-value,在B节点,还是v2的client,这个时候你get不到数据的,如果在B节点切换到了v3 client,这个时候才可以get到数据

简单的说,v2和v3 client,插入数据到同一个etcd集群中,数据不能互通

put流程分析

client端

put命令接受的入口,在/etcdctl/ctlv3/command/put_command.go中的NewPutCommand()函数中,采用了一个corba结构体接受命令参数,实际Run执行的命令是putCommandFunc()

func putCommandFunc(cmd *cobra.Command, args []string) {
    key, value, opts := getPutOp(cmd, args)  //解析参数

    ctx, cancel := commandCtx(cmd)
    resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
    cancel()
    if err != nil {
        ExitWithError(ExitError, err)
    }
    display.Put(*resp) //打印返回的结果
}

该函数的核心就是mustClientFromCmd(cmd).Put(ctx, key, value, opts...)

mustClientFromCmd()函数返回的是一个clientv3.Client结构体指针

相当于上面调用了clientv3.Client.Put()

/clientV3/client.go 中,可以看到Client 结构体的定义,

type Client struct {
    Cluster
    KV
    Lease

里面内嵌一个KV的interface,由下面的代码具体实现接口

func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
    r, err := kv.Do(ctx, OpPut(key, val, opts...))
    return r.put, toErr(ctx, err)
}

继续往下,是到了func (c *kVClient) Put(......),该函数里面,通过grpc的调用发到了server端 grpc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, c.cc, opts...)

server端

/etcdserver/etcdserverpb/rpc.pb.go里面,可以看到上面定义的ServiceName和MethodName,可以找到对应的方法_KV_Put_Handler

var _KV_serviceDesc = grpc.ServiceDesc{
    ServiceName: "etcdserverpb.KV",
    HandlerType: (*KVServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "Range",
            Handler:    _KV_Range_Handler,
        },
        {
            MethodName: "Put",
            Handler:    _KV_Put_Handler,
        },

往下追踪 srv.(KVServer).Put(ctx, in) -> (s *EtcdServer) Put() -> ··· ··· -> (s *EtcdServer) processInternalRaftRequestOnce(...)

在该函数里面有一句关键调用 s.r.Propose(cctx, data)

sEtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了

(n *node) Propose() -> step(), 该函数代码较短,来看看

func (n *node) step(ctx context.Context, m pb.Message) error {
    ch := n.recvc
    if m.Type == pb.MsgProp {
        ch = n.propc
    }

    select {
    case ch <- m:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
}

这段代码主要就是根据消息类型来把传进来的pb.Message赋值给channel n.recvc或者n.propc,上面的Propose()定义了pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}),所以就是赋值给了n.propc

raft协议里的流程

在一个raft集群启动完成以后, (n *node) run() 函数就是其运行的主函数, 里面是一个死循环, 循环中会根据channel来响应各种事件, 从而跳转状态

上节有说到channel propc里面被塞入了数据, run() 函数里面就会有对应的处理,代码如下:

        case m := <-propc:
            m.From = r.id
            r.Step(m)

对应的(r *raft) Step(m pb.Message)函数也是raft协议中的核心函数, 负责状态机的跳转, 里面主要有2个逻辑

  • 根据传进来的term和本身的term的大小, 决定要做的动作, 具体的逻辑原理可见raft协议原理;
  • 根据消息类型m.Type做不同的处理

注意其最后一段

    default:
        r.step(r, m)

step()是一个类似函数借口的东西,根据节点的类型不同而调用不同的函数,比如leader节点该函数就是raft.stepLeader()

在node run的死循环中,看看开头

             // readyc 和 advance 只有一个是有效值
        if advancec != nil {
            readyc = nil
        } else {
            rd = newReady(r, prevSoftSt, prevHardSt)
                   
                   // 如果raft.msgs中队列大小不为0 也会返回true 表示有数据发出
            if rd.containsUpdates() {
                readyc = n.readyc
            } else {
                readyc = nil
            }
        }

如果advancec是nil,说明刚commit了,可以创建ready channel来继续去把ready commit的commit了。

(未完待续)

相关文章
|
XML JSON Go
etcd源码分析 - 3.【打通核心流程】PUT键值对的执行链路
在上一讲,我们一起看了etcd server是怎么匹配到对应的处理函数的,如果忘记了请回顾一下。 今天,我们再进一步,看看`PUT`操作接下来是怎么执行的。
86 0
etcd源码分析 - 2.【打通核心流程】PUT键值对匹配处理函数
在阅读了etcd server的启动流程后,我们对很多关键性函数的入口都有了初步印象。 那么,接下来我们一起看看对键值对的修改,在etcd server内部是怎么流转的。
80 0
etcd源码分析 - 2.【打通核心流程】PUT键值对匹配处理函数
|
IDE Go 开发工具
etcd源码分析 - 5.【打通核心流程】EtcdServer消息的处理函数
在上一讲,我们梳理了`EtcdServer`的关键函数`processInternalRaftRequestOnce`里的四个细节。 其中,`wait.Wait`组件使用里,我们还遗留了一个细节实现,也就是请求的处理结果是怎么通过channel返回的。
113 0
etcd源码分析 - 5.【打通核心流程】EtcdServer消息的处理函数
|
6月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
102 0
|
6月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
75 0
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)
|
6月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
53 0
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)
|
6月前
|
安全 Java API
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(1)
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】
163 0
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(1)
|
6月前
|
缓存 Java 容器
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(2)
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】
143 0
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】(2)
|
6月前
|
索引
HashMap的put方法的具体流程
HashMap的put方法的具体流程
|
Kubernetes 监控 容器
etcd源码分析 - 1.【打通核心流程】etcd server的启动流程
在第一阶段,我将从主流程出发,讲述一个`PUT`指令是怎么将数据更新到`etcd server`中的。今天,我们先来看看server是怎么启动的。
169 0