CreateCollection_dataSyncService_执行流程源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: CreateCollection_dataSyncService_执行流程源码解析

CreateCollectiondataSyncService执行流程源码解析

milvus版本:v2.3.2

CreateCollection这个API流程较长,也是milvus的核心API之一,涉及的内容比较复杂。这里介绍dataSyncService相关的流程。

这边文章基于【CreateCollection流程_addCollectionMetaStep_milvus源码解析】这篇文章。

整体架构:

architecture.png

CreateCollection 的数据流向:

watchChannelstep数据流.jpg

1.客户端sdk发出CreateCollection API请求。

客户端SDK向proxy发送一个CreateCollection API请求,创建一个名为hello_milvus的collection。

dataCoord会向etcd写入2种类型的key:

  • channelwatch/{nodeID}/{chName}
  • datacoord-meta/channel-removal/{channelName}

2.dataNode启动时会watch etcd的key(channelwatch/)

这样当dataCoord写入此类型的key,就会触发dataNode相应的动作。

下面进行源码分析:

1.从dataNode启动开始

堆栈:

Run()
  |--d.svr.Run()(cmd\components\data_node.go)
   |--s.start()(internal\distributed\datanode\service.go)
     |--s.datanode.Start()(同上)
       |--Start()(grpc调用internal\datanode\data_node.go)
         |--go node.StartWatchChannels(node.ctx)(同上)

go node.StartWatchChannels():开启一个goroutine进行watch。

2.进入node.StartWatchChannels()

代码路径:internal\datanode\event_manager.go

// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now)
func (node *DataNode) StartWatchChannels(ctx context.Context) {
   
   
    defer node.stopWaiter.Done()
    defer logutil.LogPanic()
    // 构建key的规则:channelwatch/{nodeID}
    watchPrefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID))
    log.Info("Start watch channel", zap.String("prefix", watchPrefix))
    // 在etcd上watch key,例如:channelwatch/5
    // 创建collection会产生key:channelwatch/{nodeID}/{chName}
    evtChan := node.watchKv.WatchWithPrefix(watchPrefix)
    // after watch, first check all exists nodes first
    err := node.checkWatchedList()
    if err != nil {
   
   
        log.Warn("StartWatchChannels failed", zap.Error(err))
        return
    }
    // 处理watch事件
    for {
   
   
        select {
   
   
        case <-ctx.Done():
            log.Info("watch etcd loop quit")
            return
        case event, ok := <-evtChan:
            if !ok {
   
   
                ......
            }

            if err := event.Err(); err != nil {
   
   
                ......
            }
            // 处理watch的event
            for _, evt := range event.Events {
   
   
                // We need to stay in order until events enqueued
                node.handleChannelEvt(evt)
            }
        }
    }
}

node.handleChannelEvt(evt)用来处理具体的event,主要是PUT和DELETE事件。

创建collection会向etcd写入kv,属于PUT事件。

删除collection会删除etcd的kv,属于DELETE事件。

3.进入node.handleChannelEvt()

代码路径:internal\datanode\data_node.go

// handleChannelEvt handles event from kv watch event
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
   
   
    var e *event
    // 根据type填充event
    switch evt.Type {
   
   
    case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here
        e = &event{
   
   
            eventType: putEventType,
            version:   evt.Kv.Version,
        }

    case clientv3.EventTypeDelete:
        e = &event{
   
   
            eventType: deleteEventType,
            version:   evt.Kv.Version,
        }
    }
    node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value)
}

evt.Kv.Key的值:

by-dev/meta/channelwatch/6/by-dev-rootcoord-dml_0_445698323354747022v0

4.进入node.handleWatchInfo()

代码路径:internal\datanode\event_manager.go

func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
   
   
    switch e.eventType {
   
   
    case putEventType:
        // 反序列化得到ChannelWatchInfo
        watchInfo, err := parsePutEventData(data)
        if err != nil {
   
   
            log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err))
            return
        }

        if isEndWatchState(watchInfo.State) {
   
   
            log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
            return
        }

        if watchInfo.Progress != 0 {
   
   
            log.Info("DataNode received a PUT event with tickler update progress", zap.String("channel", watchInfo.Vchan.ChannelName), zap.Int64("version", e.version))
            return
        }

        e.info = watchInfo
        // 填充虚拟channel名称
        e.vChanName = watchInfo.GetVchan().GetChannelName()
        log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String()))
    case deleteEventType:
        e.vChanName = parseDeleteEventKey(key)
        log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))
    }

    actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager(
        node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
    ))
    // loaded=false
    if !loaded {
   
   
        actualManager.Run()
    }

    actualManager.handleEvent(*e)

    // Whenever a delete event comes, this eventManager will be removed from map
    if e.eventType == deleteEventType {
   
   
        if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded {
   
   
            m.Close()
        }
    }
}

变量e的值:

event.jpg

newChannelEventManager()返回一个channelEventManager结构体:

func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) error,
    handleDel func(string), retryInterval time.Duration,
) *channelEventManager {
   
   
    return &channelEventManager{
   
   
        eventChan:         make(chan event, 10),
        closeChan:         make(chan struct{
   
   }),
        handlePutEvent:    handlePut,// 设置PUT处理函数
        handleDeleteEvent: handleDel,// 设置DELETE处理函数
        retryInterval:     retryInterval,
    }
}

handlePutEvent设置为node.handlePutEvent()。

handleDeleteEvent设置为node.handleDeleteEvent()。

4.进入actualManager.Run()

代码路径:internal\datanode\event_manager.go

func (e *channelEventManager) Run() {
   
   
    e.wg.Add(1)
    go func() {
   
   
        defer e.wg.Done()
        for {
   
   
            select {
   
   
            case event := <-e.eventChan:
                switch event.eventType {
   
   
                case putEventType:
                    // 处理PUT事件:node.handlePutEvent()
                    err := e.handlePutEvent(event.info, event.version)
                    if err != nil {
   
   
                        // logging the error is convenient for follow-up investigation of problems
                        log.Warn("handle put event failed", zap.String("vChanName", event.vChanName), zap.Error(err))
                    }
                case deleteEventType:
                    // 处理DELETE事件:node.handleDeleteEvent()
                    e.handleDeleteEvent(event.vChanName)
                }
            case <-e.closeChan:
                return
            }
        }
    }()
}

event.version的值为1

event.info的值如下:

eventinfo.jpg

5.进入e.handlePutEvent(),其实就是进入node.handlePutEvent()

代码路径:internal\datanode\event_manager.go

func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
   
   
    // 获取虚拟channel名称,例如:by-dev-rootcoord-dml_2_445698762473996462v0
    vChanName := watchInfo.GetVchan().GetChannelName()
    // 获取key,例如:channelwatch/7/by-dev-rootcoord-dml_2_445698762473996462v0
    key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName)
    tickler := newEtcdTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second))

    switch watchInfo.State {
   
   
    case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
        // 走这条路径
        if err := node.flowgraphManager.addAndStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
   
   
            ......
        } else {
   
   
            log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
            watchInfo.State = datapb.ChannelWatchState_WatchSuccess
        }
    case datapb.ChannelWatchState_ToRelease:
        ......
    }

    v, err := proto.Marshal(watchInfo)
    if err != nil {
   
   
        ......
    }

    success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v))

    if err != nil {
   
   
        ......
    }
    log.Info("handle put event success", zap.String("key", key),
        zap.String("state", watchInfo.State.String()), zap.String("vChanName", vChanName))
    return nil
}

6.进入node.flowgraphManager.addAndStartWithEtcdTickler()

代码路径:internal\datanode\flow_graph_manager.go

func (fm *flowgraphManager) addAndStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error {
   
   
    log := log.With(zap.String("channel", vchan.GetChannelName()))
    if fm.flowgraphs.Contain(vchan.GetChannelName()) {
   
   
        log.Warn("try to add an existed DataSyncService")
        return nil
    }
    // 构建dataSyncService结构体
    dataSyncService, err := newServiceWithEtcdTickler(context.TODO(), dn, &datapb.ChannelWatchInfo{
   
   
        Schema: schema,
        Vchan:  vchan,
    }, tickler)
    if err != nil {
   
   
        log.Warn("fail to create new DataSyncService", zap.Error(err))
        return err
    }
    // 启动dataSyncService
    dataSyncService.start()
    fm.flowgraphs.Insert(vchan.GetChannelName(), dataSyncService)

    metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
    return nil
}

总结:

1.datanode对channelwatch/{nodeID}进行watch

2.当创建或者删除collection,对etcd进行PUT或者DELETE,触发datanode相应事件。

当PUT事件发生,触发dataSyncService的启动。

一个shard对应一个vchannel,同时启动一个dataSyncService。

目录
相关文章
|
6天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
10天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
6天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
手机上网流程解析
【9月更文挑战第5天】
|
20天前
|
持续交付 jenkins Devops
WPF与DevOps的完美邂逅:从Jenkins配置到自动化部署,全流程解析持续集成与持续交付的最佳实践
【8月更文挑战第31天】WPF与DevOps的结合开启了软件生命周期管理的新篇章。通过Jenkins等CI/CD工具,实现从代码提交到自动构建、测试及部署的全流程自动化。本文详细介绍了如何配置Jenkins来管理WPF项目的构建任务,确保每次代码提交都能触发自动化流程,提升开发效率和代码质量。这一方法不仅简化了开发流程,还加强了团队协作,是WPF开发者拥抱DevOps文化的理想指南。
39 1
|
12天前
|
缓存 网络协议 Linux
DNS的执行流程是什么?
DNS的执行流程是什么?
25 0
|
20天前
|
持续交付 jenkins C#
“WPF与DevOps深度融合:从Jenkins配置到自动化部署全流程解析,助你实现持续集成与持续交付的无缝衔接”
【8月更文挑战第31天】本文详细介绍如何在Windows Presentation Foundation(WPF)项目中应用DevOps实践,实现自动化部署与持续集成。通过具体代码示例和步骤指导,介绍选择Jenkins作为CI/CD工具,结合Git进行源码管理,配置构建任务、触发器、环境、构建步骤、测试及部署等环节,显著提升开发效率和代码质量。
37 0
|
20天前
|
C# 开发者 Windows
震撼发布:全面解析WPF中的打印功能——从基础设置到高级定制,带你一步步实现直接打印文档的完整流程,让你的WPF应用程序瞬间升级,掌握这一技能,轻松应对各种打印需求,彻底告别打印难题!
【8月更文挑战第31天】打印功能在许多WPF应用中不可或缺,尤其在需要生成纸质文档时。WPF提供了强大的打印支持,通过`PrintDialog`等类简化了打印集成。本文将详细介绍如何在WPF应用中实现直接打印文档的功能,并通过具体示例代码展示其实现过程。
79 0
|
20天前
|
监控 测试技术 API
|
22天前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件

热门文章

最新文章

推荐镜像

更多