一旦海量文本数据集被清理、处理并存储,通常是在分布式文件系统或云存储中,重点转向如何将这些数据高效地送入分布式训练环境。将多TB级的数据集完全加载到内存中是不可能的。即使在训练开始前将它们完全从磁盘读取也常常不切实际。在这种情况下,流式数据加载器变得极其重要。流式数据加载器不是预先加载整个数据集(映射式数据集),而是在训练过程中,直接从存储中动态地逐个样本或逐块读取数据。这种方法能保持较低的内存占用,并允许训练几乎立即开始,但它也带来了新的复杂性,尤其是在I/O性能、数据混洗以及分布式环境中的协调方面。对速度的要求:避免I/O瓶颈现代GPU处理数据的速度非常快。如果数据加载器无法以比GPU消耗更快的速度提供训练批次,昂贵的加速器就会处于闲置状态,浪费计算资源,并大大延长训练时间。这就是I/O瓶颈问题。流式数据加载器必须设计成:高效读取: 对所选存储格式采用优化的读取模式(例如,对HDFS上的文件进行顺序读取,对S3等对象存储进行并行请求)。使用Apache Arrow或Parquet等二进制格式(在“数据存储格式”中介绍)通常比即时解析原始文本文件快得多。并行加载: 使用多个CPU工作进程并行获取、解码和预处理数据,使数据加载与GPU计算重叠进行。预取数据: 维护一个已准备好的批次队列,供GPU使用,这样当前批次处理完成后,下一个批次即可立即使用。PyTorch的torch.utils.data.DataLoader内置支持并行工作进程(num_workers)和预取(prefetch_factor),这些对于流式处理来说必不可少。然而,数据如何逐样本获取和处理的核心逻辑位于IterableDataset内部。PyTorch中的可迭代式数据集PyTorch区分映射式数据集(实现__getitem__和__len__)和可迭代式数据集(实现__iter__)。对于流式处理大型数据集,IterableDataset是自然而然的选择。IterableDataset的__iter__方法负责一次生成一个已处理的样本。当与DataLoader和多个工作进程(num_workers > 0)一起使用时,PyTorch会处理工作负载的分配。每个工作进程都会获得自己的迭代器实例,通常配置为处理数据分片的不同子集。下面是一个为流式处理设计的IterableDataset基本结构:import torch import os import random from torch.utils.data import IterableDataset, DataLoader class StreamingTextDataset(IterableDataset): def __init__(self, data_dir, shard_pattern="shard_*.txt", shuffle_shards=True): super().__init__() self.data_dir = data_dir self.shard_files = sorted([ os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith(".txt") # 简化匹配 ]) self.shuffle_shards = shuffle_shards def _iter_shard(self, shard_file): # 实际中,请使用高效的读取和解析方式 # (例如,对于Arrow/Parquet) worker_info = torch.utils.data.get_worker_info() worker_id = worker_info.id if worker_info else 0 print(f"Worker {worker_id}: Processing {shard_file}") try: with open(shard_file, 'r', encoding='utf-8') as f: for line in f: # 模拟处理:生成已处理样本 # (例如,分词后的ID) # 替换为实际的分词/处理逻辑 processed_sample = line.strip() if processed_sample: yield processed_sample except Exception as e: print(f"处理分片 {shard_file} 时出错:{e}") def __iter__(self): worker_info = torch.utils.data.get_worker_info() current_shards = self.shard_files if self.shuffle_shards: # 在不同epoch间一致地打乱分片顺序 # 但每个epoch的顺序不同 # 如果通过DataLoader worker_init_fn可用,则使用epoch号作为种子 g = torch.Generator() # 简化种子设置 seed = int(torch.randint(0, 10000, (1,)).item()) g.manual_seed(seed) shard_indices = torch.randperm( len(current_shards), generator=g ).tolist() current_shards = [self.shard_files[i] for i in shard_indices] if worker_info is None: # 单进程加载 worker_id = 0 num_workers = 1 shards_for_worker = current_shards else: # 多进程加载 worker_id = worker_info.id num_workers = worker_info.num_workers # 为工作进程分配分片(简单的轮询分配) shards_for_worker = [ s for i, s in enumerate(current_shards) if i % num_workers == worker_id ] print( f"Worker {worker_id}/{num_workers}: " f"Assigned {len(shards_for_worker)} shards." ) for shard_file in shards_for_worker: yield from self._iter_shard(shard_file) # 使用示例 # 假设'my_large_dataset_shards/'包含类似shard_001.txt、 # shard_002.txt等文件,... # dataset = StreamingTextDataset( # data_dir='my_large_dataset_shards/' # ) # dataloader = DataLoader( # dataset, # batch_size=32, # num_workers=4, # prefetch_factor=2 # ) # # for epoch in range(3): # print(f"\n--- Epoch {epoch} ---") # for batch in dataloader: # # 使用批次进行模型训练步骤 # # print(f"接收到的批次大小:{len(batch)}") # # 此处批次将是字符串列表 # pass # 替换为训练逻辑在这个示例中:__init__ 确定数据分片(例如,单个文件)。__iter__ 为每个工作进程调用。它根据worker_info确定此特定工作进程应处理哪些分片。它还可以选择打乱分片的顺序。_iter_shard 处理从单个分片读取并生成样本。大规模混洗:混洗缓冲区真正的随机混洗需要加载整个数据集,这是不可能的。流式加载器通常使用近似混洗技术。分片级混洗: 如代码所示,打乱分片的处理顺序可以提供粗粒度的随机性。一个分片内的所有样本仍然是连续的。混洗缓冲区: 更有效的方法是维护一个内存中的缓冲区。加载器将样本读入缓冲区,并在生成样本时,从缓冲区中随机选择一个并用从存储中读取的下一个样本替换它。digraph G { rankdir=LR; node [shape=box, style=filled, color="#e9ecef"]; edge [color="#495057"]; subgraph cluster_0 { label = "数据存储(分片)"; style=filled; color="#f8f9fa"; Shard1 [label="分片 1"]; Shard2 [label="分片 2"]; ShardN [label="分片 N"]; } Reader [label="读取器\n(填充缓冲区)", shape=ellipse, color="#a5d8ff"]; Buffer [label="混洗缓冲区\n(大小 M)", shape=cylinder, color="#ffe066"]; Sampler [label="采样器\n(生成样本)", shape=ellipse, color="#b2f2bb"]; Trainer [label="训练过程", shape=component, color="#ffc9c9"]; Shard1 -> Reader; Shard2 -> Reader; ShardN -> Reader; Reader -> Buffer [label="填充"]; Buffer -> Sampler [label="随机样本"]; Sampler -> Buffer [label="替换"]; Sampler -> Trainer [label="生成批次"]; }混洗缓冲区按顺序读取数据,但从固定大小的内存缓冲区中随机生成样本,提供比仅混洗分片更好的局部混洗效果。混洗缓冲区的大小($M$)是一个权衡:更大的缓冲区提供更好的随机性,但每个工作进程消耗更多内存。专用于流式处理的库尽管可以使用原始PyTorch IterableDataset构建流式加载器,但有几个库提供预构建的、高度优化的解决方案,专门用于大规模深度学习:webdataset: 非常适合以文件序列形式存储的数据,特别是.tar归档文件。它提供了灵活的管道用于解码、增强和混洗数据流。它广泛用于多模态数据集,但也适用于文本。MosaicML StreamingDataset: 为云原生训练而设计。它以其自己的优化格式(MDS)将数据存储在对象存储(如S3)上。它自动处理高效的分片、混洗(使用分片混洗和混洗缓冲区思想),以及在工作进程和节点间的顺畅恢复。它首先要求将数据集转换为MDS格式。Hugging Face datasets: 这个常用的库现在包含了流式处理能力(load_dataset(..., streaming=True))。它允许直接从Hugging Face Hub或本地存储迭代处理大型数据集,而无需先下载所有内容。它与它们的分词器和模型结合良好。使用这些库可以大大简化开发,并且由于对I/O、缓存和混洗的专门优化,通常能提供更好的性能。恢复与状态管理长时间的LLM训练任务不可避免地会遇到中断(硬件故障、抢占)。流式数据加载器必须支持恢复,这意味着它可以在数据流中精确地从上次中断的位置重新开始。这要求将数据加载器的状态与模型权重和优化器状态一同进行检查点保存(参见第19章:“检查点与容错”)。状态通常包含:每个工作进程正在处理的最后一个分片的标识符。在该分片内的确切位置(字节偏移或样本索引)。每个工作进程的混洗缓冲区内容和状态。用于混洗的随机数生成器状态。像StreamingDataset这样的库会自动管理此状态。如果构建自定义加载器,则需要在检查点保存时明确地从所有工作进程收集此状态,并在恢复时将其还原。性能调优num_workers: 设置此参数以利用多个CPU核心进行数据加载。最佳值取决于CPU、存储速度和数据处理的复杂程度。可以从每个GPU可用的物理CPU核心数量开始试验。prefetch_factor: 控制每个工作进程预加载的批次数量。通常将值设置为2是一个不错的起始点。数据格式: 二进制格式(Arrow、Parquet、MDS)通常比文本或JSON格式的读取和解析速度更快。压缩方式的选择也重要(例如,Zstandard速度快)。存储位置: 从快速的本地存储(如NVMe SSD)读取数据比通过网络从云对象存储读取更快,尽管像StreamingDataset这样的库已针对后者进行了优化。混洗缓冲区大小: 平衡内存使用与混洗质量。批次大小: 较大的批次能提高GPU利用率,但需要更多内存,并可能影响模型收敛(参见第17章:“LLM的优化算法”)。有效管理和流式传输数据是训练大型语言模型的重要基础设施组成部分。通过选择合适的存储格式、运用分布式文件系统,并实现高效、可恢复的流式数据加载器,可以确保您的GPU持续获得数据,从而实现成功的大规模训练。