趋近智
构建可伸缩数据摄取管道的设计和实现考量在此呈现。这种管道是任何大型分布式检索增强生成(RAG)系统的一个必要基础。一个能够处理高容量、高速率数据的架构被概述,从而确保RAG系统基于新近、准确处理的信息运行。
这里的目标并非提供单一、庞大的代码方案,而是为你提供一个蓝图和必要的战略思考,使用常见、强大的分布式计算工具来构建这样的管道。我们将侧重于集成以下组件:Apache Kafka 用于消息队列,Apache Spark 用于分布式处理,以及与可伸缩向量 (vector)数据库的交互。
总体而言,我们的数据摄取管道将由多个阶段组成,旨在实现并行性、容错性和可维护性。下图展示了典型的数据流从各种数据源流向你的RAG系统知识库。
一个典型的RAG可伸缩数据摄取管道架构,强调了从数据源经过处理到存储的数据流。
让我们详细分析各组件及其实现考量。
Apache Kafka 作为所有流向你RAG系统的数据的弹性、高吞吐量 (throughput)的入口点。其分布式特性和发布-订阅模型解耦了数据生产者与消费者,使它们能够独立伸缩。
raw_documents: 用于新增或批量加载的文档。消息可能包含数据指针(例如S3路径)或较小文档的完整内容。cdc_stream: 用于源数据库的变更数据捕获(CDC)事件,支持近实时更新。acks=all) 并采用适当的重试机制。对于极高吞吐量,考虑采用异步发送并进行细致的批处理。Apache Spark 非常适合处理繁重任务:从Kafka消费数据、执行转换、生成嵌入 (embedding)并写入存储。你可能会使用Spark Structured Streaming进行连续处理,或使用Spark Batch进行周期性的大规模更新。
你的Spark应用程序将订阅 raw_documents Kafka主题。
此阶段将原始文档转换为适合嵌入的、可管理且有意义的块。
以下是一个从Kafka消费并应用分块函数的PySpark示例代码段:
# 假设 'spark' 是一个 SparkSession 且 'kafka_bootstrap_servers' 已定义
# 从 Kafka 读取
raw_documents_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", "raw_documents") \
.option("startingOffsets", "latest") \ # Or "earliest" for full reprocessing
.load()
# 反序列化 (JSON示例, 适用于 Avro/Protobuf)
# 假设 value 是一个 JSON 字符串: {"doc_id": "id123", "content_path": "s3://..."}
schema = StructType([
StructField("doc_id", StringType()),
StructField("content_path", StringType())
# 根据你的模式添加其他字段
])
parsed_df = raw_documents_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
# 用于获取内容的UDF (高度简化)
def fetch_content(path):
# 在实际场景中: 使用合适的库 (如 boto3 用于 S3 等)
# 添加错误处理和重试
# 在本示例中, 假设它返回文本内容
if path.startswith("s3://"): # 模拟逻辑
return "This is fetched content for " + path
return ""
fetch_content_udf = udf(fetch_content, StringType())
fetched_df = parsed_df.withColumn("text_content", fetch_content_udf(col("content_path")))
# 用于分块的UDF (你的高级逻辑的占位符)
def chunk_document(doc_id, text_content):
# 在此处实现你选择的分块策略
# 返回 (块ID, 块文本, 块元数据) 元组的列表
chunks = []
# 示例: 简单的固定大小分块
chunk_size = 500
overlap = 50
for i in range(0, len(text_content), chunk_size - overlap):
chunk_text = text_content[i:i+chunk_size]
chunk_id = f"{doc_id}_chunk_{len(chunks)}"
chunk_metadata = {"original_doc_id": doc_id, "offset": i}
chunks.append((chunk_id, chunk_text, chunk_metadata))
return chunks
# 定义 explode 的输出模式
chunk_schema = ArrayType(StructType([
StructField("chunk_id", StringType()),
StructField("chunk_text", StringType()),
StructField("chunk_metadata", MapType(StringType(), StringType()))
]))
chunk_document_udf = udf(chunk_document, chunk_schema)
chunked_df = fetched_df \
.withColumn("chunks", chunk_document_udf(col("doc_id"), col("text_content"))) \
.select(explode(col("chunks")).alias("chunk_data")) \
.select("chunk_data.*")
# chunked_df 现在包含列: chunk_id, chunk_text, chunk_metadata
这通常是摄取管道中计算最密集的部分。
以下是调用外部嵌入服务的PySpark示例代码段:
# 假设 'chunked_df' 来自上一步
# 假设 'embedding_service_url' 已定义
def get_embeddings_batch(partition_iterator):
# 此函数处理数据分区
# 它批处理块并调用外部嵌入服务
# import requests # 或你偏好的HTTP客户端库
batches = []
current_batch = []
batch_size = 64 # 可配置
for row in partition_iterator:
current_batch.append({"id": row.chunk_id, "text": row.chunk_text})
if len(current_batch) >= batch_size:
batches.append(list(current_batch)) # 创建副本
current_batch = []
if current_batch: # 添加剩余项
batches.append(list(current_batch))
results = []
for batch_data in batches:
# response = requests.post(embedding_service_url, json={"inputs": batch_data})
# response.raise_for_status() # 检查HTTP错误
# embeddings_response = response.json()["embeddings"] # [{id: "...", vector: [...]}, ...]
# 模拟响应,用于说明
embeddings_response = [{"id": item["id"], "vector": [0.1] * 768} for item in batch_data] # 将 768 替换为你的维度
# 将嵌入与原始行数据重新关联
# 这需要通过ID匹配
# 为简化起见,假设顺序得以保留或使用ID进行匹配
original_rows_in_batch = [row for row in batch_data] # 此逻辑已简化
for i, emb_data in enumerate(embeddings_response):
# 通过ID查找原始行(或假设顺序)
original_row_text = ""
original_row_metadata = {}
for item in original_rows_in_batch:
if item["id"] == emb_data["id"]:
original_row_text = item["text"] # 我们需要传递此信息或重新连接
# 这意味着你需要传递 chunk_text 和 chunk_metadata
# 或者再次查询 chunked_df。更详细的方法可能涉及
# 通过 chunk_id 将嵌入结果重新连接到 'chunked_df'。
# 为简化起见,我们假设可以访问或传递它。
# 示例的这一部分突出了 mapPartitions 中状态管理的复杂性。
# 为简化起见,我们假设 chunk_text 和 chunk_metadata 已在 batch_data 中传递
# 或可以检索到。我们在此处进行模拟。
original_row_text = "text_for_" + emb_data["id"]
original_row_metadata = {"original_doc_id": "doc_for_" + emb_data["id"]}
results.append((emb_data["id"], original_row_text, original_row_metadata, emb_data["vector"]))
return iter(results)
# 嵌入输出模式
embedding_schema = StructType([
StructField("chunk_id", StringType()),
StructField("chunk_text", StringType()), # 存储时传递很重要
StructField("chunk_metadata", MapType(StringType(), StringType())), # 传递元数据
StructField("embedding_vector", ArrayType(FloatType()))
])
# 使用 mapInPandas 或 mapPartitions 以提高外部调用的效率
# mapPartitions 对批处理逻辑提供更多控制
embedded_df = chunked_df.repartition(200) \
.mapPartitions(get_embeddings_batch, schema=embedding_schema) # 调整分区数
注意: 上述 get_embeddings_batch 函数已简化。在生产系统中,你需要错误处理、将结果与原始数据高效重新关联(可能通过传递更多上下文 (context)或在 mapPartitions 调用后使用连接),以及潜在的异步HTTP调用以提高吞吐量 (throughput)。
块嵌入 (embedding)后,它们以及它们的文本和元数据都需要被存储。
foreachBatch(在Structured Streaming中)或 save 操作将数据并行写入你选择的向量数据库(例如Milvus、Weaviate、Pinecone、Qdrant)。许多向量数据库提供Spark连接器或客户端库,可在UDF或 mapPartitions 中使用。以下是将数据写入Spark Structured Streaming中通用向量数据库接收器的示例:
# 假设 'embedded_df' 来自上一步
def write_to_vector_db_and_metadata_store(batch_df, batch_id):
# 此函数为Structured Streaming中的每个微批次调用
# 持久化以避免执行多个操作时重新计算
batch_df.persist()
# --- 写入向量数据库 ---
# 示例: 使用向量数据库客户端
# vector_db_client = VectorDBClient(config)
def process_partition_for_vector_db(iterator):
# vector_db_client_partition_instance = VectorDBClient(config) # 为每个分区初始化
records_to_insert = []
for row in iterator:
# vector_payload = {**row.chunk_metadata, "text": row.chunk_text} # Optional payload
records_to_insert.append(
# vector_db_client_partition_instance.Record(
# id=row.chunk_id,
# vector=row.embedding_vector,
# payload=vector_payload
# )
(row.chunk_id, row.embedding_vector, {"text": row.chunk_text, **row.chunk_metadata}) # Simplified
)
if len(records_to_insert) >= 100: # 批量插入
# vector_db_client_partition_instance.upsert(collection_name="my_rag_collection", records=records_to_insert)
print(f"VectorDB: 插入 {len(records_to_insert)} 条记录 (模拟)")
records_to_insert = []
if records_to_insert:
# vector_db_client_partition_instance.upsert(collection_name="my_rag_collection", records=records_to_insert)
print(f"VectorDB: 插入 {len(records_to_insert)} 条记录 (模拟)")
# vector_db_client_partition_instance.close()
return iter([]) # 必须返回一个迭代器
batch_df.select("chunk_id", "embedding_vector", "chunk_text", "chunk_metadata") \
.rdd.mapPartitions(process_partition_for_vector_db).collect() # .collect() 以在 foreachBatch 内触发操作
# --- 写入元数据存储 ---
# 示例: 将数据写入S3上的Parquet文件作为简单的元数据存储
# 在生产环境中,使用合适的数据库连接器(JDBC、Cassandra连接器等)
# metadata_path = f"s3://your-bucket/metadata_store/batch-{batch_id}/"
# batch_df.select("chunk_id", "chunk_text", "chunk_metadata") \
# .write.mode("append").parquet(metadata_path)
print(f"元数据存储: 为批次 {batch_id} 写入元数据 (模拟)")
batch_df.unpersist()
# 假设 'embedded_df' 是一个流式DataFrame
query = embedded_df \
.writeStream \
.foreachBatch(write_to_vector_db_and_metadata_store) \
.option("checkpointLocation", "s3://your-bucket/checkpoints/rag_ingestion/") \
.trigger(processingTime="1 minute") \ # 配置触发间隔
.start()
# query.awaitTermination() # 在脚本中
这种 foreachBatch 方法允许你使用批处理DataFrame操作,使其更容易与各种数据接收器集成。确保你的向量数据库客户端和元数据存储客户端可序列化,或在每个任务/分区中正确初始化。
为了使你的RAG系统知识库保持最新,你需要处理源系统的更新和删除。CDC机制,通常使用Debezium等工具将变更流式传输到Kafka,是必不可少的。
cdc_stream Kafka主题消费数据。CREATE、UPDATE、DELETE)和受影响的数据(主键、更改的字段)。
raw_documents 主题,或者如果CDC事件包含足够信息则直接处理。尽管详细的编排将在第5章中讨论,但考虑以下几点很重要:
raw_documents 和 cdc_stream)。spark.streaming.backpressure.enabled 和相关参数 (parameter)。你现在已经了解了专为大型分布式RAG系统设计的可伸缩数据摄取管道。这个蓝图强调模块化、容错性以及强大分布式工具的应用。简而言之,你需采用Kafka进行解耦和队列管理,Spark进行并行处理和复杂转换,并仔细集成向量 (vector)和元数据存储。处理CDC事件对于保持数据新鲜度很重要。
请记住,工具和配置的具体选择将取决于你的确切需求、现有基础设施以及数据特性。然而,这里概述的原则为构建一个能够有效支持你的专家级RAG应用的高性能数据管道提供了坚实的基础。下一步是将这些设计转化为代码,进行全面测试,并迭代优化性能和成本。
简洁的语法。内置调试功能。从第一天起就可投入生产。
为 ApX 背后的 AI 系统而构建
这部分内容有帮助吗?
© 2026 ApX Machine LearningAI伦理与透明度•