Throughout this chapter, we've examined several advanced Python constructs: generators for memory-efficient iteration, context managers for reliable resource handling, and functional programming patterns for cleaner data transformations. Now, let's synthesize these concepts by building a practical, reusable component for a hypothetical machine learning data pipeline.
Our goal is to create a component that can read data from a source (like a file), apply a series of cleaning and transformation steps, and yield processed data ready for downstream consumption, all while being memory-efficient and robust.
Imagine we need a component that:
This component could be used as an early stage in an ML pipeline, feeding data into feature extractors or model training routines without loading the entire dataset into memory.
Let's consider how the techniques from this chapter help meet these requirements:
__enter__
/__exit__
or more simply with the @contextmanager
decorator from the contextlib
module.map
or generator expressions can apply these functions sequentially to the data stream.Let's build this component step by step. We'll use a function-based approach combined with contextlib
for simplicity.
We start with a function that takes a file path and returns a generator. We use contextlib.contextmanager
to handle the file resource.
import csv
from contextlib import contextmanager
@contextmanager
def file_reader_context(file_path, **kwargs):
"""A context manager for opening and yielding lines from a file."""
try:
# Open the file when entering the context
f = open(file_path, 'r', **kwargs)
print(f"DEBUG: Opened file: {file_path}")
yield f # The file handle is yielded to the 'with' block
except FileNotFoundError:
print(f"Error: File not found at {file_path}")
# Option: re-raise, return an empty iterator, or handle differently
yield iter([]) # Yield an empty iterator on error
except Exception as e:
print(f"An error occurred opening or reading {file_path}: {e}")
yield iter([]) # Yield an empty iterator on other errors
finally:
# Ensure the file is closed when exiting the context
if 'f' in locals() and f:
f.close()
print(f"DEBUG: Closed file: {file_path}")
# Example usage of the context manager directly
# with file_reader_context('data.csv') as file_handle:
# for line in file_handle:
# print(line.strip())
This file_reader_context
uses @contextmanager
to manage the file lifecycle. It yields the file handle f
, which can then be iterated over within a with
statement. Error handling for FileNotFoundError
is included.
Let's define some simple example functions for cleaning and transformation. In a real scenario, these would be more complex.
def strip_whitespace(row):
"""Removes leading/trailing whitespace from each element in a list."""
return [str(item).strip() for item in row]
def convert_to_float(row, column_indices):
"""Attempts to convert specific columns to float, handling errors."""
processed_row = list(row) # Create a mutable copy
for index in column_indices:
if 0 <= index < len(processed_row):
try:
processed_row[index] = float(processed_row[index])
except (ValueError, TypeError):
# Handle conversion errors, e.g., set to None or NaN
processed_row[index] = None
return processed_row
def filter_incomplete_rows(row, expected_length):
"""Filters out rows that don't have the expected number of columns."""
return row if len(row) == expected_length else None
Note that filter_incomplete_rows
returns None
for rows to be discarded. We'll need to handle this later.
Now, let's combine the file reader context and the processing functions into our main pipeline component function. This function will accept the file path and a list of processing functions.
import csv
from contextlib import contextmanager
from functools import partial # Useful for functions needing extra arguments
# Assume file_reader_context, strip_whitespace, convert_to_float,
# filter_incomplete_rows are defined as above
def data_processing_pipeline(file_path, processing_steps, delimiter=','):
"""
Creates a generator that reads a file and applies processing steps.
Args:
file_path (str): Path to the input CSV file.
processing_steps (list): A list of functions to apply sequentially.
delimiter (str): Delimiter for the CSV file.
Yields:
Processed data rows (e.g., lists).
"""
print(f"Pipeline started for file: {file_path}")
with file_reader_context(file_path, encoding='utf-8') as file_handle:
# Create a CSV reader generator
reader = csv.reader(file_handle, delimiter=delimiter)
# Chain processing steps using generator expressions
data_stream = reader
for step_func in processing_steps:
# Apply each processing function to the stream
# Note: map applies the function to each item yielded by data_stream
data_stream = map(step_func, data_stream)
# Add a final filter step to remove None values introduced by filters
processed_stream = (item for item in data_stream if item is not None)
# Yield items from the fully processed stream
yield from processed_stream
print(f"Pipeline finished for file: {file_path}")
This function data_processing_pipeline
demonstrates several key ideas:
file_reader_context
in a with
statement for resource safety.csv.reader
, which is itself an iterator.map
. Each map
object is an iterator, processing items lazily as they are requested from the previous step. This creates a processing chain without storing intermediate results for the whole dataset.(item for item in data_stream if item is not None)
filters out any None
values potentially introduced by filtering functions like filter_incomplete_rows
.yield from
efficiently yields all items from the final processed stream.Let's define a sequence of processing steps and use our component. We'll use functools.partial
to pre-configure functions like convert_to_float
and filter_incomplete_rows
with their specific arguments (column_indices
, expected_length
).
from functools import partial
# Define the processing sequence
# Assume our CSV has 3 columns, and we want to convert columns 1 and 2 (0-indexed) to float.
processing_pipeline_steps = [
strip_whitespace,
partial(convert_to_float, column_indices=[1, 2]),
partial(filter_incomplete_rows, expected_length=3)
]
# Example: Create a dummy CSV file for testing
dummy_csv_content = """ header1, header2, header3
value1 , 1.0 , 2.5
value2 , 3.5 , invalid
value3,4.2, 5.1, extra_col
value4 , 6.0 , 7.8
"""
dummy_file_path = 'sample_data.csv'
with open(dummy_file_path, 'w') as f:
f.write(dummy_csv_content)
# Run the pipeline
processed_data_generator = data_processing_pipeline(
dummy_file_path,
processing_pipeline_steps
)
# Consume the data from the generator
print("\nConsuming processed data:")
try:
# Skip header row if necessary (can be added as another step)
header = next(processed_data_generator)
print(f"Skipped Header: {header}")
for processed_row in processed_data_generator:
print(f"Processed Row: {processed_row}")
except StopIteration:
print("No data yielded (or only header found).")
except Exception as e:
print(f"An error occurred during processing: {e}")
# Clean up the dummy file (optional)
import os
#os.remove(dummy_file_path)
Expected Output:
Pipeline started for file: sample_data.csv
DEBUG: Opened file: sample_data.csv
Consuming processed data:
Skipped Header: ['header1', 'header2', 'header3']
Processed Row: ['value1', 1.0, 2.5]
Processed Row: ['value2', 3.5, None] # 'invalid' becomes None due to convert_to_float error handling
Processed Row: ['value4', 6.0, 7.8] # Row with 'extra_col' is filtered out
DEBUG: Closed file: sample_data.csv
Pipeline finished for file: sample_data.csv
This hands-on example demonstrates how combining generators, context managers, and functional patterns leads to a flexible, memory-efficient, and robust data processing component.
map
and generator expressions.data_processing_pipeline
function is generic. It can work with different files and different sets of processing functions.file_reader_context
), individual processing steps (small functions), and the pipeline orchestration (data_processing_pipeline
) makes the code easier to understand, test, and modify.partial
allows pre-configuring functions, making the processing_pipeline_steps
list clean and readable. New steps can be easily added or existing ones reordered.This component represents a building block. You could enhance it further by adding more sophisticated error logging, support for different file formats, or integration with parallel processing techniques (covered later in the course) for CPU-bound transformations. However, the core structure effectively leverages advanced Python constructs for building better ML pipelines.
© 2025 ApX Machine Learning