趋近智
大师班
当处理TB或PB级别的数据集时,这些数据集可能分散存储在分布式文件系统或对象存储中的数千个文件中,顺序扫描整个数据集以查找特定记录或子集会变得非常缓慢且成本高昂。想象一下,您可能只需要检索来自特定网站的文档,或者在预处理阶段被标记为高质量的文档;读取每一个字节的数据显然不切实际。这就是数据索引成为一种必不可少的技术,用于大规模数据的高效管理。
在此背景下,索引的作用类似于关系数据库中的索引或书本末尾的索引。它是一种辅助数据结构,能够根据特定条件快速查找数据记录,无需彻底扫描主要数据源。对于LLM数据集,索引通常涉及创建从元数据属性或记录标识符到相应数据物理位置的映射。
有效的索引直接支持LLM开发生命周期中的多项重要操作:
source、quality_score或language等元数据构建的索引,能让您在不进行完整数据集扫描的情况下,识别出相关文件或记录。可以采用多种策略,通常组合使用,取决于数据格式、存储系统和访问模式。
这可能是最常用的方法。它涉及创建辅助文件或结构,将元数据值映射到拥有这些值的数据记录列表。
结构: 这可以简单到是序列化的Python字典、JSON文件,或者更结构化的格式,例如专门用于索引的Parquet文件。例如,您可能有一个将数据源映射到包含该源数据文件的索引:
metadata_index = {
"common_crawl__file_ABC": {
"source": "Common Crawl",
"language": "en",
"quality_score": 0.85,
"token_count": 1500,
"location": {"file": "/path/to/data/part-001.parquet", "row_group": 5}
},
"book_corpus_xyz": {
"source": "Book Corpus",
"language": "en",
"quality_score": 0.95,
"token_count": 35000,
"location": {"file": "/path/to/data/part-099.parquet", "row_group": 12}
},
# ... 数百万或数十亿更多条目
}
# 使用示例:查找高质量的Common Crawl文档
cc_high_quality_docs = []
for doc_id, metadata in metadata_index.items():
if metadata["source"] == "Common Crawl" and metadata["quality_score"] > 0.9:
cc_high_quality_docs.append(doc_id) # 或者直接存储位置
print(f"找到 {len(cc_high_quality_docs)} 个高质量的Common Crawl文档。")
```
当数据未以整齐的可按行寻址的格式存储时(例如单个文本文件或不可分割容器中的记录),偏移量索引变得重要。该索引存储大型文件中每个逻辑记录的精确起始字节位置和长度。
(记录ID, 文件路径, 起始字节, 字节数)。一个偏移量索引,将记录ID映射到大型数据文件中其精确的字节位置(偏移量和长度)。
某些数据格式,例如Apache Parquet,在文件和行组级别内置了对元数据和索引的支持。Parquet在行组内存储列的统计信息(最小值/最大值)。查询引擎可以使用这些元数据来跳过读取整个行组,如果过滤条件不可能匹配该组内的数据(谓词下推)。虽然这不能完全取代所有用例中专门的记录级索引,但借助这些特点可以在数据加载期间显著加快基于索引列的过滤速度。
训练期间,这些索引的主要使用者是数据加载器。在PyTorch等框架中,自定义的Dataset实现可以使用索引高效检索特定项目或实施复杂的采样。
import torch
from torch.utils.data import Dataset
import pickle
# 假设 'metadata_index.pkl' 映射文档ID
# -> {'位置': {'文件': ..., '偏移量': ..., '长度': ...}, ...}
# 假设 'doc_ids_for_epoch.pkl' 包含文档ID列表
# 用于当前epoch/迭代,
# 可能已根据源权重或其他标准预采样。
class IndexedTextDataset(Dataset):
def __init__(self, index_path, doc_ids_path):
print(f"正在从 {index_path} 加载索引...")
with open(index_path, 'rb') as f:
self.metadata_index = pickle.load(f)
print(f"正在从 {doc_ids_path} 加载文档ID列表...")
with open(doc_ids_path, 'rb') as f:
self.doc_ids = pickle.load(f) # 此epoch的文档ID列表
print("索引和文档列表加载完毕。")
def __len__(self):
return len(self.doc_ids)
def __getitem__(self, idx):
# 获取当前索引对应的实际文档ID
doc_id = self.doc_ids[idx]
# 使用元数据索引查找文档位置
try:
metadata = self.metadata_index[doc_id]
location = metadata['location']
file_path = location['file']
offset = location['offset']
length = location['length']
except KeyError:
# 处理文档ID可能缺失的情况...
# (如果ID是从索引派生而来,理想情况不应发生)
print(f"警告:在索引中未找到文档ID {doc_id}。")
# 根据期望的行为,返回一个虚拟项或引发错误
return {"text": "", "doc_id": doc_id, "error": True}
# 打开特定文件并定位到正确位置
try:
with open(file_path, 'r', encoding='utf-8') as f:
f.seek(offset)
text_content = f.read(length)
# 在这里,您通常会对 text_content 进行分词
# tokenized_output = tokenizer(text_content, ...)
# 为简单起见,我们只返回文本
return {"text": text_content, "doc_id": doc_id}
except Exception as e:
print(f"从 {file_path} 的偏移量 {offset} 读取文档 {doc_id} 时出错: {e}")
return {"text": "", "doc_id": doc_id, "error": True}
# 使用方法:
# index_file = '/path/to/metadata_with_offsets.pkl'
# doc_ids_file = '/path/to/sampled_doc_ids_epoch_1.pkl'
# dataset = IndexedTextDataset(index_path=index_file, doc_ids_path=doc_ids_file)
# 通常通过准备 doc_ids_file 来实现随机打乱
# data_loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=False)
# # 迭代加载器的示例
# for batch in data_loader:
# # 处理文档批次(例如,输入到模型)
# # print(batch['doc_id'], len(batch['text']))
# pass
这个示例展示了Dataset如何接收索引idx(从0到len(self)-1),将其映射到预采样的doc_id,在主索引中查找该doc_id的位置,然后从正确的文件在特定偏移量处执行有针对性的读取。这避免了扫描不相关的数据。
实施数据索引涉及权衡:
然而,收益通常超过成本,特别是对于大规模LLM训练:
比较有索引和无索引时数据访问操作所需的时间,突出了索引为有针对性检索和过滤提供的显著加速。请注意时间轴上的对数刻度。
总之,数据索引是一种基础方法,用于管理LLM训练所需的海量数据集。它在存储PB级数据与高效访问训练、分析和评估所需的特定数据片段之间架起了一座桥梁,构成可扩展数据处理管道的一个重要组成部分。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造