API请求执行流程_milvus源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: API请求执行流程_milvus源码解析

API请求执行流程

1.milvus客户端发起api rpc请求,请求内容为request。

2.proxy接受api请求,将request包装为task。

3.将task压入队列。

4.调度器执行队列中的task。

api请求执行流程.jpg

以创建collection的API(CreateCollection)为例:

1.客户端发起创建collection的请求。

from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 3000, 1024

print(f"start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus2", schema, consistency_level="Strong",shards_num=2)

2.proxy接受客户端发送过来的request,将其包装为createCollectionTask。

将createCollectionTask压入队列ddTaskQueue,等待调度器执行。

代码路径:internal\proxy\impl.go

func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
   
   
    ...省略...
    // 将request包装为createCollectionTask
    cct := &createCollectionTask{
   
   
        ctx:                     ctx,
        Condition:               NewTaskCondition(ctx),
        CreateCollectionRequest: request,
        rootCoord:               node.rootCoord,
    }

    ...省略...
    // 将task压入队列ddTaskQueue
    if err := node.sched.ddQueue.Enqueue(cct); err != nil {
   
   
        log.Warn(
            rpcFailedToEnqueue(method),
            zap.Error(err))

    ...省略...
    // 等待task执行完成
    if err := cct.WaitToFinish(); err != nil {
   
   
        log.Warn(
            rpcFailedToWaitToFinish(method),
            zap.Error(err),
            zap.Uint64("BeginTs", cct.BeginTs()),
            zap.Uint64("EndTs", cct.EndTs()))

    ...省略...
}

3.调度器执行队列中的task。

会依次执行cct的PreExecute()、Execute()、PostExecute()方法。

PreExecute()一般用来做预处理。

Execute()真正执行task的任务。

PostExecute()用来task完成后执行的动作,一般直接返回nil,也就是什么都不做。

代码路径:internal\proxy\task.go

type createCollectionTask struct {
   
   
    Condition
    *milvuspb.CreateCollectionRequest
    ctx       context.Context
    rootCoord types.RootCoordClient
    result    *commonpb.Status
    schema    *schemapb.CollectionSchema
}

func (t *createCollectionTask) PreExecute(ctx context.Context) error {
   
   
    ...省略...
}

func (t *createCollectionTask) Execute(ctx context.Context) error {
   
   
    var err error
    t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
    return err
}

func (t *createCollectionTask) PostExecute(ctx context.Context) error {
   
   
    return nil
}

为什么会是PreExecute()、Execute()、PostExecute()这个顺序,这个就需要阅读task调度器的源码了。

代码路径:internal\proxy\task_scheduler.go

核心代码如下:

task压入队列后执行的是processTask()方法。

func (sched *taskScheduler) processTask(t task, q taskQueue) {
   
   
    ......

    err := t.PreExecute(ctx)

    ......
    err = t.Execute(ctx)

    ......
    err = t.PostExecute(ctx)
    ......
}

这里再思考另一个问题,processTask()是由谁调用的,调度器是什么时候启动的。

task_scheduler.go有一个方法Start()。由这个方法启动一个goroutine进行调度。

// ddQueue *ddTaskQueue
// dmQueue *dmTaskQueue
// dqQueue *dqTaskQueue
// dcQueue *ddTaskQueue

func (sched *taskScheduler) Start() error {
   
   
    sched.wg.Add(1)
    // ddQueue的调度,数据定义的task
    go sched.definitionLoop()

    sched.wg.Add(1)
    // dcQueue的调度,数据控制的task
    go sched.controlLoop()

    sched.wg.Add(1)
    // dmQueue的调度,数据操作的task
    go sched.manipulationLoop()

    sched.wg.Add(1)
    // dqQueue的调度,数据查询的task
    go sched.queryLoop()

    return nil
}

createCollectionTask是数据定义语言,走go sched.definitionLoop()这条路径。

// definitionLoop schedules the ddl tasks.
func (sched *taskScheduler) definitionLoop() {
   
   
    defer sched.wg.Done()
    for {
   
   
        select {
   
   
        case <-sched.ctx.Done():
            return
        case <-sched.ddQueue.utChan():
            if !sched.ddQueue.utEmpty() {
   
   
                t := sched.scheduleDdTask()
                sched.processTask(t, sched.ddQueue)
            }
        }
    }
}

在这里可以看到processTask()方法的调用。for循环里,只要通道有值就会调用processTask()方法。

这样PreExecute()、Execute()、PostExecute()的逻辑就搞清楚了。

目录
相关文章
|
2天前
|
缓存 测试技术 API
API的封装步骤流程
API封装流程是一个系统化的过程,旨在将内部功能转化为可复用的接口供外部调用。流程包括明确需求、设计接口、选择技术和工具、编写代码、测试、文档编写及部署维护。具体步骤为确定业务功能、数据来源;设计URL、请求方式、参数及响应格式;选择开发语言、框架和数据库技术;实现数据连接、业务逻辑、错误处理;进行功能、性能测试;编写详细文档;部署并持续维护。通过这些步骤,确保API稳定可靠,提高性能。
手机上网流程解析
【9月更文挑战第5天】
|
18天前
|
XML JSON API
淘宝京东商品详情数据解析,API接口系列
淘宝商品详情数据包括多个方面,如商品标题、价格、图片、描述、属性、SKU(库存量单位)库存、视频等。这些数据对于买家了解商品详情以及卖家管理商品都至关重要。
|
20天前
|
持续交付 jenkins Devops
WPF与DevOps的完美邂逅:从Jenkins配置到自动化部署,全流程解析持续集成与持续交付的最佳实践
【8月更文挑战第31天】WPF与DevOps的结合开启了软件生命周期管理的新篇章。通过Jenkins等CI/CD工具,实现从代码提交到自动构建、测试及部署的全流程自动化。本文详细介绍了如何配置Jenkins来管理WPF项目的构建任务,确保每次代码提交都能触发自动化流程,提升开发效率和代码质量。这一方法不仅简化了开发流程,还加强了团队协作,是WPF开发者拥抱DevOps文化的理想指南。
39 1
|
12天前
|
缓存 网络协议 Linux
DNS的执行流程是什么?
DNS的执行流程是什么?
25 0
|
12天前
|
存储 JSON API
Python编程:解析HTTP请求返回的JSON数据
使用Python处理HTTP请求和解析JSON数据既直接又高效。`requests`库的简洁性和强大功能使得发送请求、接收和解析响应变得异常简单。以上步骤和示例提供了一个基础的框架,可以根据你的具体需求进行调整和扩展。通过合适的异常处理,你的代码将更加健壮和可靠,为用户提供更加流畅的体验。
39 0
|
20天前
|
持续交付 jenkins C#
“WPF与DevOps深度融合:从Jenkins配置到自动化部署全流程解析,助你实现持续集成与持续交付的无缝衔接”
【8月更文挑战第31天】本文详细介绍如何在Windows Presentation Foundation(WPF)项目中应用DevOps实践,实现自动化部署与持续集成。通过具体代码示例和步骤指导,介绍选择Jenkins作为CI/CD工具,结合Git进行源码管理,配置构建任务、触发器、环境、构建步骤、测试及部署等环节,显著提升开发效率和代码质量。
37 0
|
20天前
|
C# 开发者 Windows
震撼发布:全面解析WPF中的打印功能——从基础设置到高级定制,带你一步步实现直接打印文档的完整流程,让你的WPF应用程序瞬间升级,掌握这一技能,轻松应对各种打印需求,彻底告别打印难题!
【8月更文挑战第31天】打印功能在许多WPF应用中不可或缺,尤其在需要生成纸质文档时。WPF提供了强大的打印支持,通过`PrintDialog`等类简化了打印集成。本文将详细介绍如何在WPF应用中实现直接打印文档的功能,并通过具体示例代码展示其实现过程。
79 0
|
20天前
|
API C# 开发框架
WPF与Web服务集成大揭秘:手把手教你调用RESTful API,客户端与服务器端优劣对比全解析!
【8月更文挑战第31天】在现代软件开发中,WPF 和 Web 服务各具特色。WPF 以其出色的界面展示能力受到欢迎,而 Web 服务则凭借跨平台和易维护性在互联网应用中占有一席之地。本文探讨了 WPF 如何通过 HttpClient 类调用 RESTful API,并展示了基于 ASP.NET Core 的 Web 服务如何实现同样的功能。通过对比分析,揭示了两者各自的优缺点:WPF 客户端直接处理数据,减轻服务器负担,但需处理网络异常;Web 服务则能利用服务器端功能如缓存和权限验证,但可能增加服务器负载。希望本文能帮助开发者根据具体需求选择合适的技术方案。
56 0
|
20天前
|
监控 测试技术 API

热门文章

最新文章

推荐镜像

更多