Masterclass
When dealing with datasets measured in terabytes or even petabytes, storing them on a single machine's filesystem becomes impractical, if not impossible. The sheer volume exceeds local disk capacity, and accessing this data efficiently during distributed training, where multiple machines (workers) need to read concurrently, poses a significant bottleneck. Distributed file systems (DFS) and cloud object storage provide the necessary infrastructure to handle these challenges, offering scalability, reliability, and parallel access.
These systems fundamentally differ from local filesystems by distributing data across multiple physical storage nodes while presenting a unified view to the user or application. This distribution allows for horizontal scaling; as your data grows, you can add more storage nodes rather than being limited by the capacity of a single machine.
Two primary characteristics make distributed storage suitable for large-scale data management:
HDFS originated within the Apache Hadoop ecosystem and is designed for storing very large files with streaming data access patterns. It follows a primary/secondary architecture:
HDFS typically replicates each data block (e.g., 128MB or 256MB chunks) three times across different DataNodes for fault tolerance.
Pros:
Cons:
HDFS is often used when LLM data preprocessing pipelines heavily rely on existing Hadoop/Spark infrastructure or for organizations managing large on-premise clusters.
Cloud object storage services like Amazon S3 (Simple Storage Service), Google Cloud Storage (GCS), and Azure Blob Storage have become the de facto standard for storing large datasets in the cloud. They operate differently from traditional filesystems:
my-data/processed/part-0001.arrow
).Multiple compute workers accessing data concurrently from a central distributed storage system.
Pros:
Cons:
For most cloud-based LLM development, object storage is the preferred choice due to its scalability, ease of use, and integration with the broader cloud ecosystem.
Training frameworks need efficient ways to read data from these distributed systems. Simply downloading terabytes of data to each worker's local disk before training is infeasible. Instead, data is typically streamed directly from the distributed storage during training.
Libraries like fsspec
(Filesystem Spec) provide a unified Pythonic interface to various storage backends, including local files, HDFS, S3, GCS, and Azure Blob Storage. PyTorch's DataLoader
can be used in conjunction with custom Dataset
implementations that leverage these libraries.
Here's a example using torch.utils.data.IterableDataset
to stream data line-by-line from multiple text files stored in an S3-like bucket:
import torch
import s3fs # Or boto3, or other relevant library
import gzip
from torch.utils.data import IterableDataset, DataLoader
class S3StreamingTextDataset(IterableDataset):
def __init__(self, bucket_name, prefix, shuffle_files=True):
super().__init__()
self.fs = s3fs.S3FileSystem() # Assumes credentials are configured
self.bucket = bucket_name
self.file_paths = self.fs.glob(f"{bucket_name}/{prefix}/*.txt.gz")
self.shuffle_files = shuffle_files
print(f"Found {len(self.file_paths)} files in "
f"s3://{bucket_name}/{prefix}")
def __iter__(self):
worker_info = torch.utils.data.get_worker_info()
files_to_process = self.file_paths
if worker_info is not None:
# Distribute files across workers
num_workers = worker_info.num_workers
worker_id = worker_info.id
files_to_process = [
f for i, f in enumerate(self.file_paths)
if i % num_workers == worker_id
]
if self.shuffle_files:
# Shuffle files for this worker epoch
# Note: For true shuffling, consider shuffling a global index first
import random
random.shuffle(files_to_process)
for file_path in files_to_process:
print(f"Worker {worker_info.id if worker_info else 0}: "
f"Processing {file_path}")
try:
# Use s3fs to open the file directly
with self.fs.open(file_path, 'rb') as f_remote:
with gzip.open(f_remote, 'rt', encoding='utf-8') as f_gz:
for line in f_gz:
# Preprocess/tokenize line if needed here
# For simplicity, just yielding the raw line
yield line.strip()
except Exception as e:
print(f"Worker {worker_info.id if worker_info else 0}: "
f"Error processing {file_path}: {e}")
# Decide how to handle errors: skip file, log, raise?
continue
# Example Usage
# dataset = S3StreamingTextDataset(
# bucket_name='my-llm-data',
# prefix='processed_data'
# )
# dataloader = DataLoader(
# dataset,
# batch_size=32,
# num_workers=4
# ) # 4 worker processes
# for batch in dataloader:
# # Process the batch of text lines
# # print(batch)
# pass
This example demonstrates streaming: each worker process (num_workers
in DataLoader
) gets assigned a subset of the files in S3 and reads them line by line directly from the object storage, decompressing on the fly. This avoids downloading the entire dataset locally and allows parallel processing across multiple workers and files. More sophisticated implementations might read data in larger chunks, use optimized file formats like Apache Arrow or Parquet (discussed in the previous section), and implement more robust shuffling strategies.
Choosing the right distributed storage system is a foundational step. For most projects starting today, particularly those leveraging cloud computing, managed object storage like S3, GCS, or Azure Blob offers a compelling combination of scalability, durability, and ease of use, integrating well with modern distributed training frameworks and data loaders designed for streaming access.
© 2025 ApX Machine Learning