趋近智
大师班
一旦海量文本数据集被清理、处理并存储,通常是在分布式文件系统或云存储中,重点转向如何将这些数据高效地送入分布式训练环境。将多TB级的数据集完全加载到内存中是不可能的。即使在训练开始前将它们完全从磁盘读取也常常不切实际。在这种情况下,流式数据加载器变得极其重要。
流式数据加载器不是预先加载整个数据集(映射式数据集),而是在训练过程中,直接从存储中动态地逐个样本或逐块读取数据。这种方法能保持较低的内存占用,并允许训练几乎立即开始,但它也带来了新的复杂性,尤其是在I/O性能、数据混洗以及分布式环境中的协调方面。
现代GPU处理数据的速度非常快。如果数据加载器无法以比GPU消耗更快的速度提供训练批次,昂贵的加速器就会处于闲置状态,浪费计算资源,并大大延长训练时间。这就是I/O瓶颈问题。
流式数据加载器必须设计成:
PyTorch的torch.utils.data.DataLoader内置支持并行工作进程(num_workers)和预取(prefetch_factor),这些对于流式处理来说必不可少。然而,数据如何逐样本获取和处理的核心逻辑位于IterableDataset内部。
PyTorch区分映射式数据集(实现__getitem__和__len__)和可迭代式数据集(实现__iter__)。对于流式处理大型数据集,IterableDataset是自然而然的选择。
IterableDataset的__iter__方法负责一次生成一个已处理的样本。当与DataLoader和多个工作进程(num_workers > 0)一起使用时,PyTorch会处理工作负载的分配。每个工作进程都会获得自己的迭代器实例,通常配置为处理数据分片的不同子集。
下面是一个为流式处理设计的IterableDataset基本结构:
import torch
import os
import random
from torch.utils.data import IterableDataset, DataLoader
class StreamingTextDataset(IterableDataset):
def __init__(self, data_dir, shard_pattern="shard_*.txt",
shuffle_shards=True):
super().__init__()
self.data_dir = data_dir
self.shard_files = sorted([
os.path.join(data_dir, f)
for f in os.listdir(data_dir)
if f.endswith(".txt") # 简化匹配
])
self.shuffle_shards = shuffle_shards
def _iter_shard(self, shard_file):
# 实际中,请使用高效的读取和解析方式
# (例如,对于Arrow/Parquet)
worker_info = torch.utils.data.get_worker_info()
worker_id = worker_info.id if worker_info else 0
print(f"Worker {worker_id}: Processing {shard_file}")
try:
with open(shard_file, 'r', encoding='utf-8') as f:
for line in f:
# 模拟处理:生成已处理样本
# (例如,分词后的ID)
# 替换为实际的分词/处理逻辑
processed_sample = line.strip()
if processed_sample:
yield processed_sample
except Exception as e:
print(f"处理分片 {shard_file} 时出错:{e}")
def __iter__(self):
worker_info = torch.utils.data.get_worker_info()
current_shards = self.shard_files
if self.shuffle_shards:
# 在不同epoch间一致地打乱分片顺序
# 但每个epoch的顺序不同
# 如果通过DataLoader worker_init_fn可用,则使用epoch号作为种子
g = torch.Generator()
# 简化种子设置
seed = int(torch.randint(0, 10000, (1,)).item())
g.manual_seed(seed)
shard_indices = torch.randperm(
len(current_shards), generator=g
).tolist()
current_shards = [self.shard_files[i] for i in shard_indices]
if worker_info is None:
# 单进程加载
worker_id = 0
num_workers = 1
shards_for_worker = current_shards
else:
# 多进程加载
worker_id = worker_info.id
num_workers = worker_info.num_workers
# 为工作进程分配分片(简单的轮询分配)
shards_for_worker = [
s for i, s in enumerate(current_shards)
if i % num_workers == worker_id
]
print(
f"Worker {worker_id}/{num_workers}: "
f"Assigned {len(shards_for_worker)} shards."
)
for shard_file in shards_for_worker:
yield from self._iter_shard(shard_file)
# 使用示例
# 假设'my_large_dataset_shards/'包含类似shard_001.txt、
# shard_002.txt等文件,...
# dataset = StreamingTextDataset(
# data_dir='my_large_dataset_shards/'
# )
# dataloader = DataLoader(
# dataset,
# batch_size=32,
# num_workers=4,
# prefetch_factor=2
# )
#
# for epoch in range(3):
# print(f"\n--- Epoch {epoch} ---")
# for batch in dataloader:
# # 使用批次进行模型训练步骤
# # print(f"接收到的批次大小:{len(batch)}")
# # 此处批次将是字符串列表
# pass # 替换为训练逻辑
在这个示例中:
__init__ 确定数据分片(例如,单个文件)。__iter__ 为每个工作进程调用。它根据worker_info确定此特定工作进程应处理哪些分片。它还可以选择打乱分片的顺序。_iter_shard 处理从单个分片读取并生成样本。真正的随机混洗需要加载整个数据集,这是不可能的。流式加载器通常使用近似混洗技术。
混洗缓冲区按顺序读取数据,但从固定大小的内存缓冲区中随机生成样本,提供比仅混洗分片更好的局部混洗效果。
混洗缓冲区的大小(M)是一个权衡:更大的缓冲区提供更好的随机性,但每个工作进程消耗更多内存。
尽管可以使用原始PyTorch IterableDataset构建流式加载器,但有几个库提供预构建的、高度优化的解决方案,专门用于大规模深度学习:
webdataset: 非常适合以文件序列形式存储的数据,特别是.tar归档文件。它提供了灵活的管道用于解码、增强和混洗数据流。它广泛用于多模态数据集,但也适用于文本。StreamingDataset: 为云原生训练而设计。它以其自己的优化格式(MDS)将数据存储在对象存储(如S3)上。它自动处理高效的分片、混洗(使用分片混洗和混洗缓冲区思想),以及在工作进程和节点间的顺畅恢复。它首先要求将数据集转换为MDS格式。datasets: 这个常用的库现在包含了流式处理能力(load_dataset(..., streaming=True))。它允许直接从Hugging Face Hub或本地存储迭代处理大型数据集,而无需先下载所有内容。它与它们的分词器和模型结合良好。使用这些库可以大大简化开发,并且由于对I/O、缓存和混洗的专门优化,通常能提供更好的性能。
长时间的LLM训练任务不可避免地会遇到中断(硬件故障、抢占)。流式数据加载器必须支持恢复,这意味着它可以在数据流中精确地从上次中断的位置重新开始。
这要求将数据加载器的状态与模型权重和优化器状态一同进行检查点保存(参见第19章:“检查点与容错”)。状态通常包含:
像StreamingDataset这样的库会自动管理此状态。如果构建自定义加载器,则需要在检查点保存时明确地从所有工作进程收集此状态,并在恢复时将其还原。
num_workers: 设置此参数以利用多个CPU核心进行数据加载。最佳值取决于CPU、存储速度和数据处理的复杂程度。可以从每个GPU可用的物理CPU核心数量开始试验。prefetch_factor: 控制每个工作进程预加载的批次数量。通常将值设置为2是一个不错的起始点。StreamingDataset这样的库已针对后者进行了优化。有效管理和流式传输数据是训练大型语言模型的重要基础设施组成部分。通过选择合适的存储格式、运用分布式文件系统,并实现高效、可恢复的流式数据加载器,可以确保您的GPU持续获得数据,从而实现成功的大规模训练。
这部分内容有帮助吗?
torch.utils.data - PyTorch Documentation, PyTorch Authors, 2025 - 解释了 PyTorch 中用于数据流式加载的基本构建块,包括 IterableDataset、DataLoader、num_workers 和 prefetch_factor。datasets 库的流式模式,直接从 Hugging Face Hub 或本地存储高效地迭代处理大规模数据集,而无需完全下载。© 2026 ApX Machine Learning用心打造