CreateCollection API执行流程(addCollectionMetaStep)_milvus源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: CreateCollection API执行流程(addCollectionMetaStep)_milvus源码解析

CreateCollection API执行流程(addCollectionMetaStep)源码解析

milvus版本:v2.3.2

CreateCollection这个API流程较长,也是milvus的核心API之一,涉及的内容比较复杂。这里介绍和channel相关的流程。

整体架构:

architecture.png

CreateCollection(addCollectionMetaStep)的数据流向:

watchChannelstep数据流.jpg

1.客户端sdk发出CreateCollection API请求。

from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 3000, 1024

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)

客户端SDK向proxy发送一个CreateCollection API请求,创建一个名为hello_milvus的collection。

hello_milvus.jpg

2.客户端接受API请求,将request封装为createCollectionTask,并压入ddQueue队列。

代码路径:internal\proxy\impl.go

func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
   
   
    ......
    // request封装为task
    cct := &createCollectionTask{
   
   
        ctx:                     ctx,
        Condition:               NewTaskCondition(ctx),
        CreateCollectionRequest: request,
        rootCoord:               node.rootCoord,
    }

    ......
    // 将task压入ddQueue队列
    if err := node.sched.ddQueue.Enqueue(cct); err != nil {
   
   
        ......
    }

    ......
    // 等待cct执行完
    if err := cct.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

3.执行createCollectionTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task.go

func (t *createCollectionTask) Execute(ctx context.Context) error {
   
   
    var err error
    t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
    return err
}

从代码可以看出调用了rootCoord的CreateCollection接口。

4.进入rootCoord的CreateCollection接口。

代码路径:internal\rootcoord\root_coord.go

继续将请求封装为rootcoord里的createCollectionTask

func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
   
   
    ......
    // 封装为createCollectionTask
    t := &createCollectionTask{
   
   
        baseTask: newBaseTask(ctx, c),
        Req:      in,
    }
    // 加入调度
    if err := c.scheduler.AddTask(t); err != nil {
   
   
        ......
    }
    // 等待task完成
    if err := t.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

5.执行createCollectionTask的Prepare、Execute、NotifyDone方法。

Execute()为核心方法。

代码路径:internal\rootcoord\create_collection_task.go

func (t *createCollectionTask) Execute(ctx context.Context) error {
   
   
    // collID为collectionID,在Prepare()里分配
    // partIDs为partitionID,在Prepare()里分配
    collID := t.collID
    partIDs := t.partIDs
    // 产生时间戳
    ts, err := t.getCreateTs()
    if err != nil {
   
   
        return err
    }
    // vchanNames为虚拟channel,在Prepare()里分配
    // chanNames为物理channel,在Prepare()里分配
    vchanNames := t.channels.virtualChannels
    chanNames := t.channels.physicalChannels

    startPositions, err := t.addChannelsAndGetStartPositions(ctx, ts)
    if err != nil {
   
   
        t.core.chanTimeTick.removeDmlChannels(t.channels.physicalChannels...)
        return err
    }
    // 填充partition,创建collection的时候,默认只有一个名为"Default partition"的partition。
    partitions := make([]*model.Partition, len(partIDs))
    for i, partID := range partIDs {
   
   
        partitions[i] = &model.Partition{
   
   
            PartitionID:               partID,
            PartitionName:             t.partitionNames[i],
            PartitionCreatedTimestamp: ts,
            CollectionID:              collID,
            State:                     pb.PartitionState_PartitionCreated,
        }
    }
    // 填充collection
    // 可以看出collection由collID、dbid、schemaName、fields、vchanName、chanName、partition、shardNum等组成
    collInfo := model.Collection{
   
   
        CollectionID:         collID,
        DBID:                 t.dbID,
        Name:                 t.schema.Name,
        Description:          t.schema.Description,
        AutoID:               t.schema.AutoID,
        Fields:               model.UnmarshalFieldModels(t.schema.Fields),
        VirtualChannelNames:  vchanNames,
        PhysicalChannelNames: chanNames,
        ShardsNum:            t.Req.ShardsNum,
        ConsistencyLevel:     t.Req.ConsistencyLevel,
        StartPositions:       toKeyDataPairs(startPositions),
        CreateTime:           ts,
        State:                pb.CollectionState_CollectionCreating,
        Partitions:           partitions,
        Properties:           t.Req.Properties,
        EnableDynamicField:   t.schema.EnableDynamicField,
    }

    clone := collInfo.Clone()

    existedCollInfo, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), typeutil.MaxTimestamp)
    if err == nil {
   
   
        equal := existedCollInfo.Equal(*clone)
        if !equal {
   
   
            return fmt.Errorf("create duplicate collection with different parameters, collection: %s", t.Req.GetCollectionName())
        }

        log.Warn("add duplicate collection", zap.String("collection", t.Req.GetCollectionName()), zap.Uint64("ts", ts))
        return nil
    }
    // 分为多个step执行,每一个undoTask由todoStep和undoStep构成
    // 执行todoStep,报错则执行undoStep
    undoTask := newBaseUndoTask(t.core.stepExecutor)
    undoTask.AddStep(&expireCacheStep{
   
   
        baseStep:        baseStep{
   
   core: t.core},
        dbName:          t.Req.GetDbName(),
        collectionNames: []string{
   
   t.Req.GetCollectionName()},
        collectionID:    InvalidCollectionID,
        ts:              ts,
    }, &nullStep{
   
   })
    undoTask.AddStep(&nullStep{
   
   }, &removeDmlChannelsStep{
   
   
        baseStep:  baseStep{
   
   core: t.core},
        pChannels: chanNames,
    }) 
    undoTask.AddStep(&addCollectionMetaStep{
   
   
        baseStep: baseStep{
   
   core: t.core},
        coll:     &collInfo,
    }, &deleteCollectionMetaStep{
   
   
        baseStep:     baseStep{
   
   core: t.core},
        collectionID: collID,
        ts: ts,
    })

    undoTask.AddStep(&nullStep{
   
   }, &unwatchChannelsStep{
   
   
        baseStep:     baseStep{
   
   core: t.core},
        collectionID: collID,
        channels:     t.channels,
        isSkip:       !Params.CommonCfg.TTMsgEnabled.GetAsBool(),
    })
    undoTask.AddStep(&watchChannelsStep{
   
   
        baseStep: baseStep{
   
   core: t.core},
        info: &watchInfo{
   
   
            ts:             ts,
            collectionID:   collID,
            vChannels:      t.channels.virtualChannels,
            startPositions: toKeyDataPairs(startPositions),
            schema: &schemapb.CollectionSchema{
   
   
                Name:        collInfo.Name,
                Description: collInfo.Description,
                AutoID:      collInfo.AutoID,
                Fields:      model.MarshalFieldModels(collInfo.Fields),
            },
        },
    }, &nullStep{
   
   })
    undoTask.AddStep(&changeCollectionStateStep{
   
   
        baseStep:     baseStep{
   
   core: t.core},
        collectionID: collID,
        state:        pb.CollectionState_CollectionCreated,
        ts:           ts,
    }, &nullStep{
   
   })

    return undoTask.Execute(ctx)
}

创建collection涉及多个步骤,可以看出这里依次分为expireCacheStep、addCollectionMetaStep、watchChannelsStep、changeCollectionStateStep这几个步骤,addCollectionMetaStep是关于etcd元数据的step,已在另一篇文章对其进行详细解析。本篇幅对watchChannelsStep进行解析。

6.进入watchChannelsStep,执行其Execute()方法。

代码路径:internal\rootcoord\step.go

func (s *watchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
   
   
    err := s.core.broker.WatchChannels(ctx, s.info)
    return nil, err
}

在这里重点研究s.core.broker.WatchChannels()这个方法做了什么事情。

调用栈如下:

s.core.broker.WatchChannels()
  |--WatchChannels()(internal\rootcoord\broker.go)
    |--b.s.dataCoord.WatchChannels()
      |--WatchChannels()(internal\datacoord\services.go)
        |--s.channelManager.Watch()
          |--c.updateWithTimer()(internal\datacoord\channel_manager.go)
            |--c.store.Update()
              |--c.update()(internal\datacoord\channel_store.go)
                |--c.txn()(同上)
                  |--c.store.MultiSaveAndRemove()(同上)
                    |--MultiSaveAndRemove()(internal\kv\etcd\etcd_kv.go)
        |--s.meta.catalog.MarkChannelAdded()

watch_channel堆栈.jpg

WatchChannels这个操作最终是在etcd写入kv。那么我们研究写入的kv是什么。

根据堆栈顺序来进行分析。

1.WatchChannels()方法

代码路径:internal\datacoord\services.go

// WatchChannels notifies DataCoord to watch vchannels of a collection.
func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
   
   
    log := log.Ctx(ctx).With(
        zap.Int64("collectionID", req.GetCollectionID()),
        zap.Strings("channels", req.GetChannelNames()),
    )
    log.Info("receive watch channels request")
    resp := &datapb.WatchChannelsResponse{
   
   
        Status: merr.Success(),
    }

    if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
   
   
        return &datapb.WatchChannelsResponse{
   
   
            Status: merr.Status(err),
        }, nil
    }
    // req.GetChannelNames()得到的值为:
    // by-dev-rootcoord-dml_2_445674962009727985v0
    // by-dev-rootcoord-dml_3_445674962009727985v1
    for _, channelName := range req.GetChannelNames() {
   
   
        ch := &channel{
   
   
            Name:            channelName,
            CollectionID:    req.GetCollectionID(),
            StartPositions:  req.GetStartPositions(),
            Schema:          req.GetSchema(),
            CreateTimestamp: req.GetCreateTimestamp(),
        }
        // 循环执行watch()
        err := s.channelManager.Watch(ctx, ch)
        if err != nil {
   
   
            log.Warn("fail to watch channelName", zap.Error(err))
            resp.Status = merr.Status(err)
            return resp, nil
        }
        // 向etcd写入另外一个kv
        if err := s.meta.catalog.MarkChannelAdded(ctx, ch.Name); err != nil {
   
   
            // TODO: add background task to periodically cleanup the orphaned channel add marks.
            log.Error("failed to mark channel added", zap.Error(err))
            resp.Status = merr.Status(err)
            return resp, nil
        }
    }

    return resp, nil
}

函数入参req的值如下:

req值.jpg

在这里有2个channelName,是虚拟channel,为什么是2个channel?因为客户端SDK创建collection传入了shards_num=2。一个shard对应一个虚拟channel。

channel名称by-dev-rootcoord-dml_2_445674962009727985v0中的445674962009727985是collectionID。

2.进入到s.channelManager.Watch()

代码路径:internal\datacoord\channel_manager.go

// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists.
func (c *ChannelManager) Watch(ctx context.Context, ch *channel) error {
   
   
    log := log.Ctx(ctx)
    c.mu.Lock()
    defer c.mu.Unlock()
    // 使用分配策略:datacoord.AverageAssignPolicy
    updates := c.assignPolicy(c.store, []*channel{
   
   ch})
    if len(updates) == 0 {
   
   
        return nil
    }
    log.Info("try to update channel watch info with ToWatch state",
        zap.String("channel", ch.String()),
        zap.Array("updates", updates))
    // 操作etcd
    err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
    if err != nil {
   
   
        log.Warn("fail to update channel watch info with ToWatch state",
            zap.String("channel", ch.String()), zap.Array("updates", updates), zap.Error(err))
    }
    return err
}

updates的值为:

updates.jpg

updates变量是一个ChannelOpSet类型。这时候ChannelWatchInfos为空。

type ChannelOpSet []*ChannelOp

type ChannelOp struct {
   
   
    Type              ChannelOpType
    NodeID            int64
    Channels          []*channel
    ChannelWatchInfos []*datapb.ChannelWatchInfo
}

3.进入c.updateWithTimer()

代码路径:internal\datacoord\channel_manager.go

func (c *ChannelManager) updateWithTimer(updates ChannelOpSet, state datapb.ChannelWatchState) error {
   
   
    channelsWithTimer := []string{
   
   }
    // updates此时数组长度为1
    for _, op := range updates {
   
   
        if op.Type == Add {
   
   
            // 填充ChannelWatchInfos
            channelsWithTimer = append(channelsWithTimer, c.fillChannelWatchInfoWithState(op, state)...)
        }
    }
    // 操作etcd
    err := c.store.Update(updates)
    if err != nil {
   
   
        log.Warn("fail to update", zap.Array("updates", updates), zap.Error(err))
        c.stateTimer.removeTimers(channelsWithTimer)
    }
    c.lastActiveTimestamp = time.Now()
    return err
}

4.进入c.store.Update()

代码路径:internal\datacoord\channel_store.go

// Update applies the channel operations in opSet.
func (c *ChannelStore) Update(opSet ChannelOpSet) error {
   
   
    totalChannelNum := 0
    for _, op := range opSet {
   
   
        totalChannelNum += len(op.Channels)
    }
    // totalChannelNum = 1
    // maxOperationsPerTxn = 64
    if totalChannelNum <= maxOperationsPerTxn {
   
   
        // 走这条路径
        return c.update(opSet)
    }
    // 如果超过则分批执行
    ......
}

5.进入c.update(opSet)

代码路径:internal\datacoord\channel_store.go

// update applies the ADD/DELETE operations to the current channel store.
func (c *ChannelStore) update(opSet ChannelOpSet) error {
   
   
    // Update ChannelStore's kv store.
    // 操作etcd
    if err := c.txn(opSet); err != nil {
   
   
        return err
    }

    // Update node id -> channel mapping.
    for _, op := range opSet {
   
   
        switch op.Type {
   
   
        case Add:
            for _, ch := range op.Channels {
   
   
                if c.checkIfExist(op.NodeID, ch) {
   
   
                    continue // prevent adding duplicated channel info
                }
                // Append target channels to channel store.
                c.channelsInfo[op.NodeID].Channels = append(c.channelsInfo[op.NodeID].Channels, ch)
            }
        case Delete:
            // Remove target channels from channel store.
            del := make(map[string]struct{
   
   })
            for _, ch := range op.Channels {
   
   
                del[ch.Name] = struct{
   
   }{
   
   }
            }
            prev := c.channelsInfo[op.NodeID].Channels
            curr := make([]*channel, 0, len(prev))
            for _, ch := range prev {
   
   
                if _, ok := del[ch.Name]; !ok {
   
   
                    curr = append(curr, ch)
                }
            }
            c.channelsInfo[op.NodeID].Channels = curr
        default:
            return errUnknownOpType
        }
        metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(op.NodeID, 10)).Set(float64(len(c.channelsInfo[op.NodeID].Channels)))
    }
    return nil
}

6.进入c.txn(opSet)

代码路径:internal\datacoord\channel_store.go

// txn updates the channelStore's kv store with the given channel ops.
func (c *ChannelStore) txn(opSet ChannelOpSet) error {
   
   
    saves := make(map[string]string)
    var removals []string
    for _, op := range opSet {
   
   
        for i, ch := range op.Channels {
   
   
            // 构建key的规则
            k := buildNodeChannelKey(op.NodeID, ch.Name)
            switch op.Type {
   
   
            case Add:
                // 构建value,ChannelWatchInfo
                info, err := proto.Marshal(op.ChannelWatchInfos[i])
                if err != nil {
   
   
                    return err
                }
                saves[k] = string(info)
            case Delete:
                removals = append(removals, k)
            default:
                return errUnknownOpType
            }
        }
    }
    return c.store.MultiSaveAndRemove(saves, removals)
}

因为op.Type是Add,所以removals是nil。

key的值:

channelwatch/1/by-dev-rootcoord-dml_2_445674962009727985v0

规则为:channelwatch/{nodeID}/{chName}

saves变量的值:

saves.jpg

后面已经不用再跟踪下去。

使用etcd-manager查看etcd。

channelwatch.jpg

7.进入s.meta.catalog.MarkChannelAdded()

代码路径:internal\metastore\kv\datacoord\kv_catalog.go

func (kc *Catalog) MarkChannelAdded(ctx context.Context, channel string) error {
   
   
    // 构建key的规则:datacoord-meta/channel-removal/{channelName}
    key := buildChannelRemovePath(channel)
    // 构建value:NonRemoveFlagTomestone = "non-removed"
    err := kc.MetaKv.Save(key, NonRemoveFlagTomestone)
    if err != nil {
   
   
        log.Error("failed to mark channel added", zap.String("channel", channel), zap.Error(err))
        return err
    }
    log.Info("NON remove flag tombstone added", zap.String("channel", channel))
    return nil
}

构建key的规则:

datacoord-meta/channel-removal/{channelName}

channel-removal.jpg

总结:

1.CreateCollection的addCollectionMetaStep会创建2种类型的key。

  • channelwatch/{nodeID}/{chName}
  • datacoord-meta/channel-removal/{channelName}
目录
相关文章
|
2天前
|
缓存 测试技术 API
API的封装步骤流程
API封装流程是一个系统化的过程,旨在将内部功能转化为可复用的接口供外部调用。流程包括明确需求、设计接口、选择技术和工具、编写代码、测试、文档编写及部署维护。具体步骤为确定业务功能、数据来源;设计URL、请求方式、参数及响应格式;选择开发语言、框架和数据库技术;实现数据连接、业务逻辑、错误处理;进行功能、性能测试;编写详细文档;部署并持续维护。通过这些步骤,确保API稳定可靠,提高性能。
|
18天前
|
XML JSON API
淘宝京东商品详情数据解析,API接口系列
淘宝商品详情数据包括多个方面,如商品标题、价格、图片、描述、属性、SKU(库存量单位)库存、视频等。这些数据对于买家了解商品详情以及卖家管理商品都至关重要。
|
21天前
|
Java API
Java 8新特性:Lambda表达式与Stream API的深度解析
【7月更文挑战第61天】本文将深入探讨Java 8中的两个重要特性:Lambda表达式和Stream API。我们将首先介绍Lambda表达式的基本概念和语法,然后详细解析Stream API的使用和优势。最后,我们将通过实例代码演示如何结合使用Lambda表达式和Stream API,以提高Java编程的效率和可读性。
|
20天前
|
API C# 开发框架
WPF与Web服务集成大揭秘:手把手教你调用RESTful API,客户端与服务器端优劣对比全解析!
【8月更文挑战第31天】在现代软件开发中,WPF 和 Web 服务各具特色。WPF 以其出色的界面展示能力受到欢迎,而 Web 服务则凭借跨平台和易维护性在互联网应用中占有一席之地。本文探讨了 WPF 如何通过 HttpClient 类调用 RESTful API,并展示了基于 ASP.NET Core 的 Web 服务如何实现同样的功能。通过对比分析,揭示了两者各自的优缺点:WPF 客户端直接处理数据,减轻服务器负担,但需处理网络异常;Web 服务则能利用服务器端功能如缓存和权限验证,但可能增加服务器负载。希望本文能帮助开发者根据具体需求选择合适的技术方案。
56 0
|
20天前
|
监控 测试技术 API
|
20天前
|
UED 开发工具 iOS开发
Uno Platform大揭秘:如何在你的跨平台应用中,巧妙融入第三方库与服务,一键解锁无限可能,让应用功能飙升,用户体验爆棚!
【8月更文挑战第31天】Uno Platform 让开发者能用同一代码库打造 Windows、iOS、Android、macOS 甚至 Web 的多彩应用。本文介绍如何在 Uno Platform 中集成第三方库和服务,如 Mapbox 或 Google Maps 的 .NET SDK,以增强应用功能并提升用户体验。通过 NuGet 安装所需库,并在 XAML 页面中添加相应控件,即可实现地图等功能。尽管 Uno 平台减少了平台差异,但仍需关注版本兼容性和性能问题,确保应用在多平台上表现一致。掌握正确方法,让跨平台应用更出色。
26 0
|
1月前
|
机器人 API Python
智能对话机器人(通义版)会话接口API使用Quick Start
本文主要演示了如何使用python脚本快速调用智能对话机器人API接口,在参数获取的部分给出了具体的获取位置截图,这部分容易出错,第一次使用务必仔细参考接入参数获取的位置。
100 1
|
22天前
|
存储 JSON API
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
——在成长的路上,我们都是同行者。这篇关于商品详情API接口的文章,希望能帮助到您。期待与您继续分享更多API接口的知识,请记得关注Anzexi58哦! 淘宝API接口(如淘宝开放平台提供的API)允许开发者获取淘宝商品的各种信息,包括商品详情。然而,需要注意的是,直接访问淘宝的商品数据API通常需要商家身份或开发者权限,并且需要遵循淘宝的API使用协议。
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
|
1月前
|
SQL 存储 数据处理
|
1月前
|
XML JSON API
RESTful API设计最佳实践:构建高效、可扩展的接口
【8月更文挑战第17天】RESTful API设计是一个涉及多方面因素的复杂过程。通过遵循上述最佳实践,开发者可以构建出更加高效、可扩展、易于维护的API。然而,值得注意的是,最佳实践并非一成不变,随着技术的发展和业务需求的变化,可能需要不断调整和优化API设计。因此,保持对新技术和最佳实践的关注,是成为一名优秀API设计师的关键。

推荐镜像

更多