Writing isolated assertions provides value for ad-hoc debugging, but production environments require a structured approach. A data validation suite aggregates individual checks into a unified execution engine. This engine reads configuration files, applies logic to datasets, and produces a consolidated report on data health.In this section, we will engineer a lightweight validation framework in Python. This exercise demonstrates the mechanics behind enterprise tools like Great Expectations or Soda. By building it from scratch, you will understand how to decouple validation logic from data processing code.The Architecture of a Validation SuiteA validation suite consists of three distinct components:The Contract (Configuration): A declarative definition of what the data should look like.The Validator (Engine): The code that accepts data and the contract, executing the necessary logic.The Report (Artifact): The structured output detailing which tests passed, which failed, and the severity of those failures.Separating these components allows you to update data quality rules without modifying the underlying pipeline code.digraph G { rankdir=LR; node [fontname="Helvetica", shape=box, style=filled, color="#dee2e6"]; edge [color="#868e96"]; config [label="Validation Config\n(JSON/YAML)", fillcolor="#bac8ff"]; data [label="Input Data\n(DataFrame)", fillcolor="#b2f2bb"]; engine [label="Validation\nEngine", shape=component, fillcolor="#e599f7"]; report [label="Quality Report", fillcolor="#ffc9c9"]; action [label="Action\n(Alert/Block)", fillcolor="#ffe066"]; config -> engine; data -> engine; engine -> report; report -> action; }The flow of data and metadata through a validation architecture. The engine acts as the interface between static rules and dynamic data.Defining the Validation ContractHardcoding assertions (e.g., assert df['id'].notnull().all()) inside your ETL scripts makes them difficult to manage. Instead, we define expectations in a dictionary or a JSON file. This acts as a policy-as-code layer.Consider a dataset of user transactions. We expect the transaction_id to be unique, amount to be positive, and currency to be a valid ISO code.validation_config = [ { "column": "transaction_id", "test": "is_unique", "severity": "block" }, { "column": "amount", "test": "is_positive", "severity": "block" }, { "column": "user_age", "test": "in_range", "params": {"min": 18, "max": 120}, "severity": "warn" } ]Notice the severity field. Not all data failures are critical. A missing transaction_id breaks the pipeline (Block), but an unusual user_age might just require investigation (Warn).Implementing the Validator EngineThe engine reads the config and maps string identifiers (like "is_unique") to actual Python functions. We will use Pandas for this implementation, though the pattern applies equally to PySpark or SQL.import pandas as pd class DataValidator: def __init__(self, df): self.df = df self.results = [] def check_is_unique(self, col, severity): """Asserts that a column contains no duplicates.""" is_valid = self.df[col].is_unique self._log_result("is_unique", col, is_valid, severity) def check_is_positive(self, col, severity): """Asserts that numeric values are > 0.""" # Check if type is numeric first to avoid errors if pd.api.types.is_numeric_dtype(self.df[col]): is_valid = (self.df[col] > 0).all() else: is_valid = False self._log_result("is_positive", col, is_valid, severity) def check_in_range(self, col, severity, min_val, max_val): """Asserts values fall within a specific range.""" is_valid = self.df[col].between(min_val, max_val).all() self._log_result("in_range", col, is_valid, severity) def _log_result(self, test_name, col, status, severity): self.results.append({ "test": test_name, "column": col, "status": "PASS" if status else "FAIL", "severity": severity }) def run_suite(self, config): for rule in config: test_type = rule["test"] col = rule["column"] severity = rule["severity"] params = rule.get("params", {}) # Dynamic dispatch if test_type == "is_unique": self.check_is_unique(col, severity) elif test_type == "is_positive": self.check_is_positive(col, severity) elif test_type == "in_range": self.check_in_range(col, severity, params["min"], params["max"]) return pd.DataFrame(self.results)Running the Suite and Interpreting OutputWith the class defined, we instantiate it with a DataFrame containing sample data. In a production pipeline, this DataFrame would be the result of an extraction or transformation step immediately prior to loading.# Sample Data data = pd.DataFrame({ "transaction_id": [101, 102, 103, 101], # Duplicate 101 "amount": [50.0, -10.0, 100.0, 25.0], # Negative value "user_age": [25, 30, 150, 40] # 150 is out of range }) # Execution validator = DataValidator(data) report = validator.run_suite(validation_config) print(report)The output provides a clear status of the data quality:testcolumnstatusseverityis_uniquetransaction_idFAILblockis_positiveamountFAILblockin_rangeuser_ageFAILwarnStatistical IntegrationRule-based checks (like null checks) are deterministic. However, as discussed in the section on statistical profiling, we often need to validate against distributions. We can extend our DataValidator to include statistical assertions.For example, ensuring the average transaction value $\mu$ remains consistent with historical norms. If the historical mean is $$50$, and we allow a deviation $\sigma$ of $$10$, our assertion logic becomes:$$ 40 \le \mu_{current} \le 60 $$We can add this to our validator methods: def check_mean_within_range(self, col, severity, min_mean, max_mean): current_mean = self.df[col].mean() is_valid = min_mean <= current_mean <= max_mean self._log_result("mean_within_range", col, is_valid, severity)This allows the suite to catch subtle data drift issues where the data format is correct (valid floats), but the business logic is skewed (e.g., a currency conversion bug causing all values to drop by 90%).Visualizing Test ResultsWhen running these suites in Continuous Integration (CI) or scheduled pipelines, visualization helps in quickly identifying the health of a batch. Below is a representation of how a suite might visualize the results of 100 checks across different categories.{"layout": {"title": "Validation Suite Results by Severity", "xaxis": {"title": "Test Result"}, "yaxis": {"title": "Count"}, "barmode": "group", "plot_bgcolor": "#f8f9fa"}, "data": [{"type": "bar", "name": "Blocking Checks", "x": ["Pass", "Fail"], "y": [45, 2], "marker": {"color": "#fa5252"}}, {"type": "bar", "name": "Warning Checks", "x": ["Pass", "Fail"], "y": [48, 5], "marker": {"color": "#fab005"}}]}A summary of test execution showing passed vs. failed tests, categorized by severity.Enforcing Quality GatesThe final step in writing a validation suite is determining the exit code. A suite should programmatically decide whether the pipeline continues.Filter for Failures: Select all rows in the report where status == 'FAIL'.Check Severity: If any failure has severity == 'block', raise an exception.Alerting: If failures exist but are only severity == 'warn', send a notification (e.g., to Slack or PagerDuty) but allow the pipeline to proceed.def enforce_gate(report): failed_blocks = report[ (report['status'] == 'FAIL') & (report['severity'] == 'block') ] if not failed_blocks.empty: error_msg = f"Blocking quality checks failed: {failed_blocks['test'].tolist()}" raise ValueError(error_msg) print("Quality gate passed.") # This would raise a ValueError based on our sample data # enforce_gate(report) By wrapping your validation logic in a suite, you transform data quality from a manual, reactive process into an automated, proactive engineering discipline. This structure serves as the foundation for the observability systems we will explore in the next chapter.