Writing isolated assertions provides value for ad-hoc debugging, but production environments require a structured approach. A data validation suite aggregates individual checks into a unified execution engine. This engine reads configuration files, applies logic to datasets, and produces a consolidated report on data health.
In this section, we will engineer a lightweight validation framework in Python. This exercise demonstrates the mechanics behind enterprise tools like Great Expectations or Soda. By building it from scratch, you will understand how to decouple validation logic from data processing code.
A validation suite consists of three distinct components:
Separating these components allows you to update data quality rules without modifying the underlying pipeline code.
The flow of data and metadata through a validation architecture. The engine acts as the interface between static rules and dynamic data.
Hardcoding assertions (e.g., assert df['id'].notnull().all()) inside your ETL scripts makes them difficult to manage. Instead, we define expectations in a dictionary or a JSON file. This acts as a policy-as-code layer.
Consider a dataset of user transactions. We expect the transaction_id to be unique, amount to be positive, and currency to be a valid ISO code.
validation_config = [
{
"column": "transaction_id",
"test": "is_unique",
"severity": "block"
},
{
"column": "amount",
"test": "is_positive",
"severity": "block"
},
{
"column": "user_age",
"test": "in_range",
"params": {"min": 18, "max": 120},
"severity": "warn"
}
]
Notice the severity field. Not all data failures are critical. A missing transaction_id breaks the pipeline (Block), but an unusual user_age might just require investigation (Warn).
The engine reads the config and maps string identifiers (like "is_unique") to actual Python functions. We will use Pandas for this implementation, though the pattern applies equally to PySpark or SQL.
import pandas as pd
class DataValidator:
def __init__(self, df):
self.df = df
self.results = []
def check_is_unique(self, col, severity):
"""Asserts that a column contains no duplicates."""
is_valid = self.df[col].is_unique
self._log_result("is_unique", col, is_valid, severity)
def check_is_positive(self, col, severity):
"""Asserts that numeric values are > 0."""
# Check if type is numeric first to avoid errors
if pd.api.types.is_numeric_dtype(self.df[col]):
is_valid = (self.df[col] > 0).all()
else:
is_valid = False
self._log_result("is_positive", col, is_valid, severity)
def check_in_range(self, col, severity, min_val, max_val):
"""Asserts values fall within a specific range."""
is_valid = self.df[col].between(min_val, max_val).all()
self._log_result("in_range", col, is_valid, severity)
def _log_result(self, test_name, col, status, severity):
self.results.append({
"test": test_name,
"column": col,
"status": "PASS" if status else "FAIL",
"severity": severity
})
def run_suite(self, config):
for rule in config:
test_type = rule["test"]
col = rule["column"]
severity = rule["severity"]
params = rule.get("params", {})
# Dynamic dispatch
if test_type == "is_unique":
self.check_is_unique(col, severity)
elif test_type == "is_positive":
self.check_is_positive(col, severity)
elif test_type == "in_range":
self.check_in_range(col, severity, params["min"], params["max"])
return pd.DataFrame(self.results)
With the class defined, we instantiate it with a DataFrame containing sample data. In a production pipeline, this DataFrame would be the result of an extraction or transformation step immediately prior to loading.
# Sample Data
data = pd.DataFrame({
"transaction_id": [101, 102, 103, 101], # Duplicate 101
"amount": [50.0, -10.0, 100.0, 25.0], # Negative value
"user_age": [25, 30, 150, 40] # 150 is out of range
})
# Execution
validator = DataValidator(data)
report = validator.run_suite(validation_config)
print(report)
The output provides a clear status of the data quality:
| test | column | status | severity |
|---|---|---|---|
| is_unique | transaction_id | FAIL | block |
| is_positive | amount | FAIL | block |
| in_range | user_age | FAIL | warn |
Rule-based checks (like null checks) are deterministic. However, as discussed in the section on statistical profiling, we often need to validate against distributions. We can extend our DataValidator to include statistical assertions.
For example, ensuring the average transaction value remains consistent with historical norms. If the historical mean is \50\sigma$10$, our assertion logic becomes:
We can add this to our validator methods:
def check_mean_within_range(self, col, severity, min_mean, max_mean):
current_mean = self.df[col].mean()
is_valid = min_mean <= current_mean <= max_mean
self._log_result("mean_within_range", col, is_valid, severity)
This allows the suite to catch subtle data drift issues where the data format is correct (valid floats), but the business logic is skewed (e.g., a currency conversion bug causing all values to drop by 90%).
When running these suites in Continuous Integration (CI) or scheduled pipelines, visualization helps in quickly identifying the health of a batch. Below is a representation of how a suite might visualize the results of 100 checks across different categories.
A summary of test execution showing passed vs. failed tests, categorized by severity.
The final step in writing a validation suite is determining the exit code. A suite should programmatically decide whether the pipeline continues.
status == 'FAIL'.severity == 'block', raise an exception.severity == 'warn', send a notification (e.g., to Slack or PagerDuty) but allow the pipeline to proceed.def enforce_gate(report):
failed_blocks = report[
(report['status'] == 'FAIL') &
(report['severity'] == 'block')
]
if not failed_blocks.empty:
error_msg = f"Blocking quality checks failed: {failed_blocks['test'].tolist()}"
raise ValueError(error_msg)
print("Quality gate passed.")
# This would raise a ValueError based on our sample data
# enforce_gate(report)
By wrapping your validation logic in a suite, you transform data quality from a manual, reactive process into an automated, proactive engineering discipline. This structure serves as the foundation for the observability systems we will explore in the next chapter.
Was this section helpful?
© 2026 ApX Machine LearningEngineered with