CreateIndex API执行流程_milvus源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: CreateIndex API执行流程_milvus源码解析

CreateIndex API执行流程源码解析

milvus版本:v2.3.2

整体架构:

architecture.png

CreateIndex 的数据流向:

create_index数据流向.jpg

1.客户端sdk发出CreateIndex API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8


print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")


fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong")


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()

print("Start Creating index IVF_FLAT")
index = {
   
   
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {
   
   "nlist": 8},
}

hello_milvus.create_index("embeddings", index,index_name="idx_embeddings")

客户端SDK向proxy发送一个CreateIndex API请求,在embeddings列上创建一个名为idx_embeddings的索引。

2.客户端接受API请求,将request封装为createIndexTask,并压入ddQueue队列。

代码路径:internal\proxy\impl.go

// CreateIndex create index for collection.
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
   
   
    ......

    // request封装为task
    cit := &createIndexTask{
   
   
        ctx:                ctx,
        Condition:          NewTaskCondition(ctx),
        req:                request,
        rootCoord:          node.rootCoord,
        datacoord:          node.dataCoord,
        replicateMsgStream: node.replicateMsgStream,
    }

    ......
    // 将task压入ddQueue队列

    if err := node.sched.ddQueue.Enqueue(cit); err != nil {
   
   
        ......
    }

    ......
    // 等待cct执行完
    if err := cit.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

3.执行createIndexTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

代码路径:internal\proxy\task_index.go

func (cit *createIndexTask) Execute(ctx context.Context) error {
   
   
    ......
    req := &indexpb.CreateIndexRequest{
   
   
        CollectionID:    cit.collectionID,
        FieldID:         cit.fieldSchema.GetFieldID(),
        IndexName:       cit.req.GetIndexName(),
        TypeParams:      cit.newTypeParams,
        IndexParams:     cit.newIndexParams,
        IsAutoIndex:     cit.isAutoIndex,
        UserIndexParams: cit.newExtraParams,
        Timestamp:       cit.BeginTs(),
    }
    cit.result, err = cit.datacoord.CreateIndex(ctx, req)
    ......
    SendReplicateMessagePack(ctx, cit.replicateMsgStream, cit.req)
    return nil
}

从代码可以看出调用了datacoord的CreateIndex接口。

4.进入datacoord的CreateIndex接口。

代码路径:internal\datacoord\index_service.go

// CreateIndex create an index on collection.
// Index building is asynchronous, so when an index building request comes, an IndexID is assigned to the task and
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
// indexBuilder will find this task and assign it to IndexNode for execution.
func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
   
   
    ......
    // 分配indexID,indexID=0
    indexID, err := s.meta.CanCreateIndex(req)
    ......

    if indexID == 0 {
   
   
        // 分配indexID
        indexID, err = s.allocator.allocID(ctx)
        ......
    }

    index := &model.Index{
   
   
        CollectionID:    req.GetCollectionID(),
        FieldID:         req.GetFieldID(),
        IndexID:         indexID,
        IndexName:       req.GetIndexName(),
        TypeParams:      req.GetTypeParams(),
        IndexParams:     req.GetIndexParams(),
        CreateTime:      req.GetTimestamp(),
        IsAutoIndex:     req.GetIsAutoIndex(),
        UserIndexParams: req.GetUserIndexParams(),
    }

    // Get flushed segments and create index

    err = s.meta.CreateIndex(index)
    ......

    // 将collectionID发送到channel,其它的goroutine进行消费。
    select {
   
   
    case s.notifyIndexChan <- req.GetCollectionID():
    default:
    }

    ......
}

变量index:

index_model.jpg

5.进入s.meta.CreateIndex()

代码路径:internal\datacoord\index_meta.go

func (m *meta) CreateIndex(index *model.Index) error {
   
   
    ......
    // 写入etcd元数据
    if err := m.catalog.CreateIndex(m.ctx, index); err != nil {
   
   
        ......
    }

    m.updateCollectionIndex(index)
    ......
}

在这里重点研究m.catalog.CreateIndex()这个方法做了什么事情。

func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error {
   
   
    key := BuildIndexKey(index.CollectionID, index.IndexID)

    value, err := proto.Marshal(model.MarshalIndexModel(index))
    if err != nil {
   
   
        return err
    }

    err = kc.MetaKv.Save(key, string(value))
    if err != nil {
   
   
        return err
    }
    return nil
}

在etcd会产生1个key。

==field-index/445834678636119060/445834678636519085==

value的值的结构为indexpb.FieldIndex,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&indexpb.FieldIndex{
   
   
    IndexInfo: &indexpb.IndexInfo{
   
   
        CollectionID:    index.CollectionID,
        FieldID:         index.FieldID,
        IndexName:       index.IndexName,
        IndexID:         index.IndexID,
        TypeParams:      index.TypeParams,
        IndexParams:     index.IndexParams,
        IsAutoIndex:     index.IsAutoIndex,
        UserIndexParams: index.UserIndexParams,
    },
    Deleted:    index.IsDeleted,
    CreateTime: index.CreateTime,
}

fieldindex.jpg

跟踪BuildIndexKey()函数,即可以得到key的规则。整理如下:

key规则:

  • 前缀/field-index/{collectionID}/{IndexID}

可以反映index属于哪个collection。Index的value可以反映属于哪个field。

不能反映属于哪个partition、哪个segment。

总结:

  • CreateIndex由proxy传递给协调器dataCoord操作etcd。
  • CreateIndex最终会在etcd上写入1种类型的key(其实还有一种,在另一篇中进行介绍)。
目录
相关文章
|
2天前
|
缓存 测试技术 API
API的封装步骤流程
API封装流程是一个系统化的过程,旨在将内部功能转化为可复用的接口供外部调用。流程包括明确需求、设计接口、选择技术和工具、编写代码、测试、文档编写及部署维护。具体步骤为确定业务功能、数据来源;设计URL、请求方式、参数及响应格式;选择开发语言、框架和数据库技术;实现数据连接、业务逻辑、错误处理;进行功能、性能测试;编写详细文档;部署并持续维护。通过这些步骤,确保API稳定可靠,提高性能。
|
18天前
|
XML JSON API
淘宝京东商品详情数据解析,API接口系列
淘宝商品详情数据包括多个方面,如商品标题、价格、图片、描述、属性、SKU(库存量单位)库存、视频等。这些数据对于买家了解商品详情以及卖家管理商品都至关重要。
|
20天前
|
API C# 开发框架
WPF与Web服务集成大揭秘:手把手教你调用RESTful API,客户端与服务器端优劣对比全解析!
【8月更文挑战第31天】在现代软件开发中,WPF 和 Web 服务各具特色。WPF 以其出色的界面展示能力受到欢迎,而 Web 服务则凭借跨平台和易维护性在互联网应用中占有一席之地。本文探讨了 WPF 如何通过 HttpClient 类调用 RESTful API,并展示了基于 ASP.NET Core 的 Web 服务如何实现同样的功能。通过对比分析,揭示了两者各自的优缺点:WPF 客户端直接处理数据,减轻服务器负担,但需处理网络异常;Web 服务则能利用服务器端功能如缓存和权限验证,但可能增加服务器负载。希望本文能帮助开发者根据具体需求选择合适的技术方案。
56 0
|
20天前
|
监控 测试技术 API
|
1月前
|
机器人 API Python
智能对话机器人(通义版)会话接口API使用Quick Start
本文主要演示了如何使用python脚本快速调用智能对话机器人API接口,在参数获取的部分给出了具体的获取位置截图,这部分容易出错,第一次使用务必仔细参考接入参数获取的位置。
100 1
|
22天前
|
存储 JSON API
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
——在成长的路上,我们都是同行者。这篇关于商品详情API接口的文章,希望能帮助到您。期待与您继续分享更多API接口的知识,请记得关注Anzexi58哦! 淘宝API接口(如淘宝开放平台提供的API)允许开发者获取淘宝商品的各种信息,包括商品详情。然而,需要注意的是,直接访问淘宝的商品数据API通常需要商家身份或开发者权限,并且需要遵循淘宝的API使用协议。
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
|
1月前
|
SQL 存储 数据处理
|
1月前
|
XML JSON API
RESTful API设计最佳实践:构建高效、可扩展的接口
【8月更文挑战第17天】RESTful API设计是一个涉及多方面因素的复杂过程。通过遵循上述最佳实践,开发者可以构建出更加高效、可扩展、易于维护的API。然而,值得注意的是,最佳实践并非一成不变,随着技术的发展和业务需求的变化,可能需要不断调整和优化API设计。因此,保持对新技术和最佳实践的关注,是成为一名优秀API设计师的关键。
|
1月前
|
监控 API 数据安全/隐私保护
​邮件API触发式接口分析?邮件API接口好评榜
邮件API在企业通信和营销中至关重要,通过自动化邮件发送流程提升效率与客户满意度。本文解析邮件API触发式接口,即基于特定事件(如用户注册、购买产品)自动发送邮件的技术,能显著加快企业响应速度并增强用户体验。推荐市场上的优秀邮件API产品,包括SendGrid、Mailgun、Amazon SES、Postmark及新兴的AOKSend,它们各具特色,如高发送率、详细分析工具、灵活配置、强大的日志功能及用户友好的API接口,帮助企业根据不同需求选择最合适的邮件API解决方案。
|
1月前
|
存储 算法 Oracle
19 Java8概述(Java8概述+lambda表达式+函数式接口+方法引用+Stream+新时间API)
19 Java8概述(Java8概述+lambda表达式+函数式接口+方法引用+Stream+新时间API)
55 8

热门文章

最新文章

推荐镜像

更多