趋近智
处理以太字节甚至拍字节衡量的数据集时,将它们存储在单台机器的文件系统上会变得不切实际,甚至不可能。庞大的数据量超出了本地磁盘容量,而在分布式训练期间,多个机器(工作节点)需要并发读取这些数据,这会造成一个很大的瓶颈。分布式文件系统(DFS)和云对象存储提供了处理这些挑战所需的基础设施,它们具有可扩展性、高可靠性和并行访问能力。
这些系统与本地文件系统根本不同,它们将数据分布在多个物理存储节点上,同时向用户或应用程序呈现统一的视图。这种分布实现了横向扩展;随着数据增长,你可以增加更多存储节点,而不会受到单台机器容量的限制。
两个主要特点使分布式存储适合大规模数据管理:
HDFS起源于Apache Hadoop生态系统,专为存储具有流式数据访问模式的超大文件而设计。它采用主/从架构:
HDFS通常会将每个数据块(例如,128MB或256MB的块)在不同的DataNodes上复制三份以实现容错。
优点:
缺点:
当LLM数据预处理流程严重依赖现有Hadoop/Spark基础设施,或者组织管理大型本地集群时,HDFS常被使用。
像Amazon S3(简单存储服务)、Google Cloud Storage(GCS)和Azure Blob Storage这样的云对象存储服务已经成为在云中存储大型数据集的事实标准。它们与传统文件系统的运作方式不同:
my-data/processed/part-0001.arrow)。多个计算工作节点从中心分布式存储系统并发访问数据。
优点:
缺点:
对于大多数基于云的LLM开发,对象存储是首选,因为它的可扩展性、易用性以及与更广泛的云生态系统的集成。
训练框架需要有效的方法从这些分布式系统中读取数据。在训练前简单地将数太字节的数据下载到每个工作节点的本地磁盘是不切实际的。相反,数据通常在训练期间直接从分布式存储进行流式传输。
像fsspec(文件系统规范)这样的库为各种存储后端提供了统一的Python接口,包括本地文件、HDFS、S3、GCS和Azure Blob Storage。PyTorch的DataLoader可以与利用这些库的自定义Dataset实现结合使用。
这里有一个使用torch.utils.data.IterableDataset从存储在S3类桶中的多个文本文件逐行流式传输数据的例子:
import torch
import s3fs # 或者boto3,或其他相关库
import gzip
from torch.utils.data import IterableDataset, DataLoader
class S3StreamingTextDataset(IterableDataset):
def __init__(self, bucket_name, prefix, shuffle_files=True):
super().__init__()
self.fs = s3fs.S3FileSystem() # 假设凭据已配置
self.bucket = bucket_name
self.file_paths = self.fs.glob(f"{bucket_name}/{prefix}/*.txt.gz")
self.shuffle_files = shuffle_files
print(f"在"
f"s3://{bucket_name}/{prefix}" f"中找到了 {len(self.file_paths)} 个文件")
def __iter__(self):
worker_info = torch.utils.data.get_worker_info()
files_to_process = self.file_paths
if worker_info is not None:
# 将文件分配给不同的工作节点
num_workers = worker_info.num_workers
worker_id = worker_info.id
files_to_process = [
f for i, f in enumerate(self.file_paths)
if i % num_workers == worker_id
]
if self.shuffle_files:
# 为这个工作节点轮次打乱文件
# 注意:要实现真正的打乱,请首先考虑打乱全局索引
import random
random.shuffle(files_to_process)
for file_path in files_to_process:
print(f"工作节点 {worker_info.id if worker_info else 0}: "
f"正在处理 {file_path}")
try:
# 使用s3fs直接打开文件
with self.fs.open(file_path, 'rb') as f_remote:
with gzip.open(f_remote, 'rt', encoding='utf-8') as f_gz:
for line in f_gz:
# 如果需要,在这里预处理/分词行
# 为简单起见,只返回原始行
yield line.strip()
except Exception as e:
print(f"工作节点 {worker_info.id if worker_info else 0}: "
f"处理 {file_path} 时出错: {e}")
# 决定如何处理错误:跳过文件、记录日志、还是抛出异常?
continue
# 使用示例
# dataset = S3StreamingTextDataset(
# bucket_name='my-llm-data',
# prefix='processed_data'
# )
# dataloader = DataLoader(
# dataset,
# batch_size=32,
# num_workers=4
# ) # 4 worker processes
# for batch in dataloader:
# # 处理文本行批次
# # print(batch)
# pass
这个例子展示了流式传输:每个工作进程(DataLoader中的num_workers)被分配S3中文件的一个子集,并直接从对象存储逐行读取,同时进行解压。这避免了将整个数据集下载到本地,并允许在多个工作节点和文件之间进行并行处理。更完善的实现可能会以更大的块读取数据,使用像Apache Arrow或Parquet(在上一节中讨论过)这样的优化文件格式,并实施更强大的混洗策略。
选择合适的分布式存储系统是重要的第一步。对于现在开始的大多数项目,特别是那些利用云计算的项目,S3、GCS或Azure Blob等托管对象存储提供了可扩展性、持久性和易用性的吸引人组合,并且与专为流式访问设计的现代分布式训练框架和数据加载器很好地集成。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造