趋近智
分散检索工作负载是构建能够处理海量数据集的RAG系统的根本。分片向量索引是实现这种分布的主要方法,允许多个节点或进程并行处理查询和存储向量嵌入。本次实践练习将演示一个分片向量索引的简化实现,使用FAISS模拟其核心运作。虽然为了清晰起见将使用本地配置,但所演示的原理可直接应用于大规模分布式向量数据库和搜索系统。
我们的目标是理解如何划分数据、将索引和查询操作路由到适当的分片,以及聚合结果。本次动手操作将巩固您对先前讨论的分布式检索策略的理解。
本次实践中,我们将使用Python,以及Facebook AI Research的FAISS库进行高效相似性搜索,并使用NumPy进行数值运算。FAISS使我们能够在内存中创建和管理向量索引,使其成为演示分片逻辑的理想选择,无需完整分布式数据库设置的额外开销。
设想我们有大量需要索引的文档嵌入。我们将创建 N_SHARDS 个较小的索引,而非单一的整体索引。每个文档嵌入都将被分配到一个分片。
主要组成部分:
我们假设嵌入是 D 维向量。
首先,我们需要初始化分片。在真实的分布式系统中,每个分片可能是一个独立的服务器或管理FAISS索引实例的进程。在此,我们将用一个FAISS索引列表来模拟此过程。
import faiss
import numpy as np
# 配置
N_SHARDS = 4
D_EMBEDDING = 128 # 嵌入的维度
K_NEIGHBORS = 5 # 要检索的邻居数量
# 初始化分片
# 为简化起见,我们使用IndexFlatL2,它适用于较小的数据集。
# 在实际使用中,对于大规模应用,您会使用更高级的索引类型,例如IndexIVFPQ。
shards = [faiss.IndexFlatL2(D_EMBEDDING) for _ in range(N_SHARDS)]
shard_doc_ids = [[] for _ in range(N_SHARDS)] # 用于存储每个分片的原始文档ID
print(f"已初始化 {N_SHARDS} 个分片,每个分片用于 {D_EMBEDDING} 维向量。")
接下来,我们定义一个分片函数。对文档的唯一标识符进行简单的模运算是一种常见方法,用于相对均匀地分布数据,前提是ID分布良好。
def get_shard_index(doc_id_numeric, num_shards):
"""根据给定的数字文档ID确定分片索引。"""
return doc_id_numeric % num_shards
本次实践中,我们假设 doc_id_numeric 是一个整数。如果您的ID是字符串,则需要先将其哈希为整数。
现在,我们来模拟摄取一些数据。我们将生成随机嵌入并为其分配文档ID。每个嵌入及其ID都将被路由到相应的分片。
NUM_DOCUMENTS = 10000
np.random.seed(42) # 为了可重复性
# 生成虚拟文档嵌入和ID
# 在真实系统中,这些嵌入来自您的嵌入模型
all_embeddings = np.random.rand(NUM_DOCUMENTS, D_EMBEDDING).astype('float32')
# 为简化起见,分配连续数字ID
all_doc_ids = np.arange(NUM_DOCUMENTS)
# 将数据摄取到分片中
for i in range(NUM_DOCUMENTS):
doc_id = all_doc_ids[i]
embedding = all_embeddings[i:i+1] # FAISS需要一个二维数组
shard_idx = get_shard_index(doc_id, N_SHARDS)
shards[shard_idx].add(embedding)
shard_doc_ids[shard_idx].append(doc_id) # 存储从FAISS索引到原始ID的映射
# 验证分片中的数据量
for i, shard in enumerate(shards):
print(f"分片 {i} 包含 {shard.ntotal} 个嵌入。")
至此,我们的 NUM_DOCUMENTS 个嵌入已分布到 N_SHARDS 个FAISS索引中。每个分片都更小,并且可以独立管理。
当查询到达时,它必须被分派到所有分片,因为任何分片都可能包含相关向量。这是一种被称为“散发-收集”的常见模式。
# 模拟一个查询嵌入
query_embedding = np.random.rand(1, D_EMBEDDING).astype('float32')
all_shard_distances = []
all_shard_original_ids = []
# 1. 将查询散发到所有分片 & 2. 收集结果
for i, shard_index_instance in enumerate(shards):
# 在当前分片上执行搜索
# 我们从每个分片请求 K_NEIGHBORS 个邻居,如果 K_NEIGHBORS 较小
# 且结果稀疏,可能需要更多。对于Top-K,通常 K_shard_query > K_final_query。
distances, faiss_indices = shard_index_instance.search(query_embedding, K_NEIGHBORS)
# 3. 映射到原始ID
# faiss_indices 包含相对于该分片的索引。
# -1 表示在该分片中未找到更多与查询向量匹配的邻居。
for j in range(distances.shape[1]): # 遍历为查询找到的邻居
if faiss_indices[0, j] != -1: # 如果找到了有效的邻居
original_doc_id = shard_doc_ids[i][faiss_indices[0, j]]
all_shard_distances.append(distances[0, j])
all_shard_original_ids.append(original_doc_id)
# 4. 聚合与重新排序
if all_shard_distances:
# 合并距离和原始ID
results = sorted(zip(all_shard_distances, all_shard_original_ids))
# 获取全局Top-K结果
final_top_k_results = results[:K_NEIGHBORS]
print(f"\n分片索引中的Top {K_NEIGHBORS} 个结果:")
for dist, doc_id in final_top_k_results:
print(f" 文档ID: {doc_id}, 距离: {dist:.4f}")
else:
print("\n在任何分片中均未找到结果。")
此代码模拟了基本操作:摄取时对数据进行分片,并执行散发-收集查询,随后进行结果聚合。
上述过程可以可视化如下:
分片向量索引系统中的查询处理。查询路由器将搜索分发到所有分片,聚合器合并部分结果以生成最终列表。
尽管分片提高了可扩展性,但它也为专业人员带来了一系列需要考量的问题:
N_SHARDS):选择正确的分片数量很重要。分片过少,并行度不足。分片过多,管理分片和聚合结果的开销可能会增加延迟,特别是当每个分片对典型查询返回的结果很少时。这通常取决于总数据量、查询量和硬件资源。微妙或巧妙)分片策略或重新平衡机制。k:在此示例中,我们从每个分片查询了 K_NEIGHBORS 个结果。这是一种简化做法。为了确保找到真正的全局Top K_NEIGHBORS,您通常需要在聚合之前从每个分片检索超过 K_NEIGHBORS 个项(例如,K_NEIGHBORS 或 K_NEIGHBORS + buffer_size),特别是当距离分布在分片之间存在显著差异,或者当 K_NEIGHBORS 很小时。具体的数量取决于数据分布和所需召回率。本次动手实践演示了分片向量索引的核心原理。您已经看到了如何将数据分布到多个逻辑(或在真实系统中是物理)分片中,以及如何实现带有结果聚合的散发-收集查询模式。
理解这些运作方式对于设计和问题排查分布式检索管道的性能很重要。随着您进一步学习,您会将分片与其他方法结合,例如复制、分片内部的复杂索引结构(例如FAISS中的IVFADC),以及高级重新排序,以构建真正有弹性和高性能的系统。
简洁的语法。内置调试功能。从第一天起就可投入生产。
为 ApX 背后的 AI 系统而构建
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造