Vector 基于多索引表架构应对海量多租户的大规模向量检索

本文涉及的产品
对象存储 OSS,OSS 加速器 50 GB 1个月
简介: OSS 向量 Bucket 支持多索引架构,通过按租户/业务创建独立索引,实现数据隔离、并发检索与结果合并,兼顾高安全性与毫秒级响应,轻松应对千万级向量的多租户 RAG 与语义搜索场景。

随着 RAG 和语义搜索需求的增长,向量检索系统常面临两类挑战:

  • 多租户隔离:SaaS 服务商为大量企业客户提供知识库,或企业内部各部门拥有独立知识库,要求数据严格隔离。
  • 超大规模数据:单索引数据量达到千万甚至亿级时,检索延迟显著增加,难以满足实时性要求。

OSS 向量 Bucket 支持同一账号在同一地域创建大量向量索引(Index)。通过多索引架构,可以按租户或业务维度拆分数据,兼顾隔离性和检索性能。

多索引架构的优势

  • 数据隔离:不同租户或业务的数据存放在独立索引中,从底层避免跨租户数据泄露。
  • 检索提速:将大表拆分为多张小表,缩小单次检索范围;配合并发检索多个索引再合并结果,可有效降低总响应耗时。
  • 运维灵活:不同索引可独立配置维度、模型、相似度算法。删除某个租户的数据只需删除对应索引,无需逐条过滤删除。

通过 CLI 按租户导入数据

oss-vectors-embed CLI 工具支持将指定文件写入指定索引,实现按租户或业务维度的定向导入。

安装方式请参见使用OSS Vectors Embed CLI工具写入和检索向量数据

开始前,请确保满足以下条件:

  • 已配置环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRETDASHSCOPE_API_KEY
  • 已创建向量 Bucket 和各租户对应的向量索引。

将以下示例中的占位符替换为实际值:

占位符

说明

<your-account-id>

阿里云账号 ID

<your-vector-bucket>

向量 Bucket 名称

按租户写入不同索引

将不同租户的数据写入各自独立的索引,实现数据隔离。

# 将租户 A 的文档写入租户 A 的索引
oss-vectors-embed \
  --account-id "<your-account-id>" \
  --vectors-region cn-hangzhou \
  put \
  --vector-bucket-name "<your-vector-bucket>" \
  --index-name "tenantcompanya" \
  --model-id text-embedding-v4 \
  --text-value "租户A的知识库文档内容" \
  --key "doc_001" \
  --metadata '{"tenant": "company_a", "category": "faq"}'
# 将租户 B 的文档写入租户 B 的索引
oss-vectors-embed \
  --account-id "<your-account-id>" \
  --vectors-region cn-hangzhou \
  put \
  --vector-bucket-name "<your-vector-bucket>" \
  --index-name "tenantcompanyb" \
  --model-id text-embedding-v4 \
  --text-value "租户B的知识库文档内容" \
  --key "doc_001" \
  --metadata '{"tenant": "company_b", "category": "manual"}'

按租户定向检索

检索时只查询目标租户的索引,天然实现数据隔离。

# 仅在租户 A 的索引中检索
oss-vectors-embed \
  --account-id "<your-account-id>" \
  --vectors-region cn-hangzhou \
  query \
  --vector-bucket-name "<your-vector-bucket>" \
  --index-name "tenantcompanya" \
  --model-id text-embedding-v4 \
  --text-value "常见问题" \
  --top-k 5 \
  --return-metadata

通过 SDK 构建多索引架构

Python SDK

开始前请安装 alibabacloud-oss-v2 SDK:

pip install alibabacloud-oss-v2

确保已配置环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET

创建多租户索引

以租户 ID 为后缀命名索引,批量创建独立的向量索引。

import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = "<your-account-id>"
REGION = "cn-hangzhou"
BUCKET = "<your-vector-bucket>"
def create_vector_client():
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = REGION
    cfg.account_id = ACCOUNT_ID
    return oss_vectors.Client(cfg)
client = create_vector_client()
# 批量为租户创建索引
tenant_ids = ["companya", "companyb", "companyc"]
for tenant_id in tenant_ids:
    index_name = f"tenant{tenant_id}"
    result = client.put_vector_index(oss_vectors.models.PutVectorIndexRequest(
        bucket=BUCKET,
        index_name=index_name,
        dimension=1024,
        data_type="float32",
        distance_metric="cosine",
    ))
    print(f"索引 {index_name} 创建完成,status_code={result.status_code}")

运行后输出:

索引 tenantcompanya 创建完成,status_code=200
索引 tenantcompanyb 创建完成,status_code=200
索引 tenantcompanyc 创建完成,status_code=200

按租户写入数据

将不同租户的数据写入各自的索引。

import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = "<your-account-id>"
REGION = "cn-hangzhou"
BUCKET = "<your-vector-bucket>"
def create_vector_client():
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = REGION
    cfg.account_id = ACCOUNT_ID
    return oss_vectors.Client(cfg)
client = create_vector_client()
# 向租户 A 的索引写入数据
result = client.put_vectors(oss_vectors.models.PutVectorsRequest(
    bucket=BUCKET,
    index_name="tenantcompanya",
    vectors=[
        {
            "key": "faq_001",
            "data": {"float32": [0.1] * 1024},  # 向量维度需与索引一致
            "metadata": {"tenant": "company_a", "category": "faq"}
        }
    ]
))
print(f"租户 A 写入完成,status_code={result.status_code}")
# 向租户 B 的索引写入数据
result = client.put_vectors(oss_vectors.models.PutVectorsRequest(
    bucket=BUCKET,
    index_name="tenantcompanyb",
    vectors=[
        {
            "key": "manual_001",
            "data": {"float32": [0.2] * 1024},  # 向量维度需与索引一致
            "metadata": {"tenant": "company_b", "category": "manual"}
        }
    ]
))
print(f"租户 B 写入完成,status_code={result.status_code}")

运行后输出:

租户 A 写入完成,status_code=200
租户 B 写入完成,status_code=200

并发检索多个索引并合并结果

将大表拆分为多张小表后,通过并发检索多个索引再合并排序,降低总响应耗时。

from concurrent.futures import ThreadPoolExecutor, as_completed
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = "<your-account-id>"
REGION = "cn-hangzhou"
BUCKET = "<your-vector-bucket>"
def create_vector_client():
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = REGION
    cfg.account_id = ACCOUNT_ID
    return oss_vectors.Client(cfg)
def search_index(client, index_name, query_vector, top_k=10):
    """检索单个索引"""
    result = client.query_vectors(oss_vectors.models.QueryVectorsRequest(
        bucket=BUCKET,
        index_name=index_name,
        query_vector=query_vector,
        return_metadata=True,
        return_distance=True,
        top_k=top_k,
    ))
    return {
        "index": index_name,
        "status_code": result.status_code,
        "vectors": result.vectors or [ ],
    }
def parallel_search(index_names, query_vector, top_k=10):
    """并发检索多个索引并合并结果"""
    client = create_vector_client()
    all_vectors = [ ]
    with ThreadPoolExecutor(max_workers=len(index_names)) as executor:
        futures = {
            executor.submit(search_index, client, idx, query_vector, top_k): idx
            for idx in index_names
        }
        for future in as_completed(futures):
            result = future.result()
            print(f"索引 {result['index']} 返回 {len(result['vectors'])} 条结果")
            all_vectors.extend(result["vectors"])
    # 按 distance 升序排序(距离越小越相似),取全局 TopK
    all_vectors.sort(key=lambda v: v.get("distance", float("inf")))
    return all_vectors[:top_k]
# 并发检索 3 个分表索引
indices = ["tenantcompanya", "tenantcompanyb", "tenantcompanyc"]
query_vec = {"float32": [0.1] * 1024}  # 向量维度需与索引一致
results = parallel_search(indices, query_vec, top_k=5)
print(f"\n合并后全局 Top5:")
for v in results:
    print(f"  key={v.get('key')}, distance={v.get('distance')}, metadata={v.get('metadata')}")

运行后输出:

索引 tenantcompanya 返回 1 条结果
索引 tenantcompanyb 返回 1 条结果
索引 tenantcompanyc 返回 0 条结果
合并后全局 Top5:
  key=faq_001, distance=0.0, metadata={'tenant': 'company_a', 'category': 'faq'}
  key=manual_001, distance=0.19999998807907104, metadata={'tenant': 'company_b', 'category': 'manual'}

说明:并发检索多个索引后,在客户端按 distance 排序合并结果。如需更高精度,可引入 Rerank 模型进行二次精排。

Go SDK

开始前请安装 alibabacloud-oss-go-sdk-v2 SDK:

go get github.com/aliyun/alibabacloud-oss-go-sdk-v2

确保已配置环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET

创建多租户索引

package main
import (
  "context"
  "fmt"
  "log"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)
const (
  region     = "cn-hangzhou"
  bucketName = "<your-vector-bucket>"
  accountId  = "<your-account-id>"
)
func main() {
  cfg := oss.LoadDefaultConfig().
    WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
    WithRegion(region).
    WithAccountId(accountId)
  client := vectors.NewVectorsClient(cfg)
  // 批量为租户创建索引
  tenantIDs := [ ]string{"companya", "companyb", "companyc"}
  for _, tenantID := range tenantIDs {
    indexName := fmt.Sprintf("tenant%s", tenantID)
    result, err := client.PutVectorIndex(context.TODO(), &vectors.PutVectorIndexRequest{
      Bucket:         oss.Ptr(bucketName),
      IndexName:      oss.Ptr(indexName),
      Dimension:      oss.Ptr(1024),
      DataType:       oss.Ptr("float32"),
      DistanceMetric: oss.Ptr("cosine"),
    })
    if err != nil {
      log.Printf("索引 %s 创建失败: %v", indexName, err)
      continue
    }
    fmt.Printf("索引 %s 创建完成,status_code=%d\n", indexName, result.StatusCode)
  }
}

运行后输出:

索引 tenantcompanya 创建完成,status_code=200
索引 tenantcompanyb 创建完成,status_code=200
索引 tenantcompanyc 创建完成,status_code=200

并发检索多个索引并合并结果

package main
import (
  "context"
  "fmt"
  "log"
  "sort"
  "sync"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)
const (
  region     = "cn-hangzhou"
  bucketName = "<your-vector-bucket>"
  accountId  = "<your-account-id>"
  dimension  = 1024
)
func makeVector(val float32, dim int) [ ]float32 {
  v := make([ ]float32, dim)
  for i := range v {
    v[i] = val
  }
  return v
}
func main() {
  cfg := oss.LoadDefaultConfig().
    WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
    WithRegion(region).
    WithAccountId(accountId)
  client := vectors.NewVectorsClient(cfg)
  indices := [ ]string{"tenantcompanya", "tenantcompanyb", "tenantcompanyc"}
  queryVector := map[string]any{"float32": makeVector(0.1, dimension)}
  var mu sync.Mutex
  var allVectors [ ]map[string]any
  var wg sync.WaitGroup
  for _, indexName := range indices {
    wg.Add(1)
    go func(idx string) {
      defer wg.Done()
      result, err := client.QueryVectors(context.TODO(), &vectors.QueryVectorsRequest{
        Bucket:         oss.Ptr(bucketName),
        IndexName:      oss.Ptr(idx),
        QueryVector:    queryVector,
        ReturnMetadata: oss.Ptr(true),
        ReturnDistance:  oss.Ptr(true),
        TopK:           oss.Ptr(10),
      })
      if err != nil {
        log.Printf("索引 %s 检索失败: %v", idx, err)
        return
      }
      fmt.Printf("索引 %s 返回 %d 条结果\n", idx, len(result.Vectors))
      mu.Lock()
      allVectors = append(allVectors, result.Vectors...)
      mu.Unlock()
    }(indexName)
  }
  wg.Wait()
  // 按 distance 升序排序,取全局 Top5
  sort.Slice(allVectors, func(i, j int) bool {
    di, _ := allVectors[i]["distance"].(float64)
    dj, _ := allVectors[j]["distance"].(float64)
    return di < dj
  })
  topK := 5
  if len(allVectors) < topK {
    topK = len(allVectors)
  }
  fmt.Printf("\n合并后全局 Top%d:\n", topK)
  for _, v := range allVectors[:topK] {
    fmt.Printf("  key=%v, distance=%v, metadata=%v\n", v["key"], v["distance"], v["metadata"])
  }
}

运行后输出:

索引 tenantcompanya 返回 1 条结果
索引 tenantcompanyc 返回 0 条结果
索引 tenantcompanyb 返回 1 条结果
合并后全局 Top2:
  key=faq_001, distance=0, metadata=map[category:faq tenant:company_a]
  key=manual_001, distance=0.19999998807907104, metadata=map[category:manual tenant:company_b]

最佳实践

  • 索引命名规范:以租户 ID 或业务维度作为索引名称的后缀(如 tenant{tenantid})。索引名仅支持小写字母和数字,不支持下划线和连字符。
  • 租户数较多时:直接利用索引名称进行逻辑隔离。OSS 向量索引的创建是秒级的,管理开销极低。
  • 追求极低延迟时:当单索引超过千万级数据量,按业务逻辑(如时间、类别)进行水平拆分,通过并发检索多个索引再合并结果。
  • 结果重排(Rerank):多索引表的检索结果合并后,可以根据距离相似度进行简单重排,也可引入 Rerank 模型进行二次排序。
  • 索引清理:删除某个租户或业务的数据只需调用 DeleteVectorIndex 删除对应索引,无需逐条过滤删除。
相关实践学习
对象存储OSS快速上手——如何使用ossbrowser
本实验是对象存储OSS入门级实验。通过本实验,用户可学会如何用对象OSS的插件,进行简单的数据存、查、删等操作。
相关文章
|
存储 缓存 文件存储
如何保证分布式文件系统的数据一致性
分布式文件系统需要向上层应用提供透明的客户端缓存,从而缓解网络延时现象,更好地支持客户端性能水平扩展,同时也降低对文件服务器的访问压力。当考虑客户端缓存的时候,由于在客户端上引入了多个本地数据副本(Replica),就相应地需要提供客户端对数据访问的全局数据一致性。
32698 79
如何保证分布式文件系统的数据一致性
|
前端开发 容器
HTML5+CSS3前端入门教程---从0开始通过一个商城实例手把手教你学习PC端和移动端页面开发第8章FlexBox布局(上)
HTML5+CSS3前端入门教程---从0开始通过一个商城实例手把手教你学习PC端和移动端页面开发第8章FlexBox布局
17750 20
|
设计模式 存储 监控
设计模式(C++版)
看懂UML类图和时序图30分钟学会UML类图设计原则单一职责原则定义:单一职责原则,所谓职责是指类变化的原因。如果一个类有多于一个的动机被改变,那么这个类就具有多于一个的职责。而单一职责原则就是指一个类或者模块应该有且只有一个改变的原因。bad case:IPhone类承担了协议管理(Dial、HangUp)、数据传送(Chat)。good case:里式替换原则定义:里氏代换原则(Liskov 
36681 19
设计模式(C++版)
|
存储 编译器 C语言
抽丝剥茧C语言(初阶 下)(下)
抽丝剥茧C语言(初阶 下)
|
机器学习/深度学习 人工智能 自然语言处理
带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
24757 14
|
机器学习/深度学习 弹性计算 监控
重生之---我测阿里云U1实例(通用算力型)
阿里云产品全线降价的一力作,2023年4月阿里云推出新款通用算力型ECS云服务器Universal实例,该款服务器的真实表现如何?让我先测为敬!
36660 15
重生之---我测阿里云U1实例(通用算力型)
|
SQL 存储 弹性计算
Redis性能高30%,阿里云倚天ECS性能摸底和迁移实践
Redis在倚天ECS环境下与同规格的基于 x86 的 ECS 实例相比,Redis 部署在基于 Yitian 710 的 ECS 上可获得高达 30% 的吞吐量优势。成本方面基于倚天710的G8y实例售价比G7实例低23%,总性价比提高50%;按照相同算法,相对G8a,性价比为1.4倍左右。
|
存储 算法 Java
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的限流器RateLimiter功能服务
随着互联网的快速发展,越来越多的应用程序需要处理大量的请求。如果没有限制,这些请求可能会导致应用程序崩溃或变得不可用。因此,限流器是一种非常重要的技术,可以帮助应用程序控制请求的数量和速率,以保持稳定和可靠的运行。
29838 52
下一篇
开通oss服务