Modern data lakes rely on the decoupling of compute and storage. While this architecture lowers storage costs and increases scalability, it introduces a physical separation between the processing engine and the data persistence layer. Unlike a traditional Relational Database Management System (RDBMS) where the engine has direct, local access to the underlying storage medium, a data lake query engine must retrieve data over a network from object storage services like Amazon S3, Azure Data Lake Storage, or Google Cloud Storage.To bridge this gap and deliver interactive performance, we utilize distributed SQL engines built on Massively Parallel Processing (MPP) architectures. The most common examples in the modern data stack are Trino (formerly PrestoSQL), Apache Spark SQL, and Apache Hive LLAP. These systems allow you to run standard ANSI SQL queries against petabytes of data by dividing the work across a cluster of servers.Massively Parallel Processing ArchitectureIn an MPP system, the processing workload is shared among many independent nodes. These nodes do not share memory or disk space (a "shared-nothing" architecture). Instead, they communicate and exchange data exclusively through the network.The architecture generally consists of two primary node types:The Coordinator (or Driver): This node acts as the brain of the cluster. It accepts the SQL query from the client, parses it, optimizes the execution plan, and manages the distribution of tasks to worker nodes. It does not typically process the data itself.The Workers (or Executors): These nodes are the brawn. They connect directly to the object storage, read the data files, and perform the actual computation (filtering, aggregating, joining).When a query is submitted, the coordinator breaks the query down into stages. These stages are further divided into tasks, which are processed in parallel.The following diagram illustrates the flow of control and data in a typical distributed query engine like Trino.digraph G { rankdir=TB; node [shape=box, style="filled,rounded", fontname="Helvetica", fontsize=10, color="#dee2e6"]; edge [fontname="Helvetica", fontsize=9, color="#868e96"]; subgraph cluster_compute { label = "Compute Cluster"; style = filled; color = "#f8f9fa"; Coordinator [label="Coordinator Node\n(Parser, Planner, Scheduler)", fillcolor="#748ffc", fontcolor="white"]; subgraph cluster_workers { label = "Worker Nodes"; style = filled; color = "#e9ecef"; Worker1 [label="Worker 1\n(Execute Task)", fillcolor="#a5d8ff"]; Worker2 [label="Worker 2\n(Execute Task)", fillcolor="#a5d8ff"]; Worker3 [label="Worker 3\n(Execute Task)", fillcolor="#a5d8ff"]; } } Client [label="SQL Client\n(e.g., Jupyter, DBeaver)", shape=ellipse, fillcolor="#ffc9c9"]; subgraph cluster_storage { label = "Object Storage Layer"; style = filled; color = "#e9ecef"; S3 [label="Data Lake Storage\n(Parquet/Avro Files)", shape=cylinder, fillcolor="#ced4da"]; } Client -> Coordinator [label="Submit SQL"]; Coordinator -> Worker1 [label="Assign Splits"]; Coordinator -> Worker2 [label="Assign Splits"]; Coordinator -> Worker3 [label="Assign Splits"]; Worker1 -> S3 [label="Read File Range"]; Worker2 -> S3 [label="Read File Range"]; Worker3 -> S3 [label="Read File Range"]; Worker1 -> Coordinator [label="Return Results"]; Worker2 -> Coordinator [label="Return Results"]; Worker3 -> Coordinator [label="Return Results"]; }The coordinator manages the query plan and assigns tasks to workers, which interact directly with the storage layer.The Life Cycle of a Distributed QueryUnderstanding how a SQL statement transforms into a distributed operation is necessary for performance tuning. If a query is slow, the bottleneck usually lies in one of the specific phases of this lifecycle.1. Parsing and AnalysisWhen the coordinator receives a SQL statement, it first verifies the syntax. It then consults the Metadata Catalog (such as Hive Metastore or AWS Glue) to verify that the tables and columns exist and that the user has the required permissions. At this stage, the query is an abstract syntax tree.2. Logical Planning and OptimizationThe engine converts the syntax tree into a logical plan. This is where the Cost-Based Optimizer (CBO) operates. The CBO evaluates different ways to execute the query. For example, if you are joining a small table of "Product Categories" with a massive table of "Sales," the CBO decides whether to broadcast the small table to all nodes or shuffle the large table.The optimizer uses statistics (row counts, column distinct values, min/max values) to estimate the cost of operations. If these statistics are missing from your metadata catalog, the engine may choose an inefficient plan, resulting in slow performance.3. Physical Planning and SchedulingThe logical plan is converted into a physical plan consisting of stages.Source Stage: Reading data from the lake.Compute Stage: Aggregating or filtering data in memory.Sink Stage: Writing results back or returning them to the client.The coordinator breaks these stages into Splits. A split is a unit of parallelism. In the context of a data lake, a split usually corresponds to a specific range of bytes within a file stored in object storage.4. Execution and Data ShufflingWorkers execute the assigned splits. If the query is a simple SELECT * FROM table WHERE id = 1, the operation is "embarrassingly parallel." Each worker scans its assigned files, filters the rows, and returns the matches.However, if the query involves a GROUP BY or JOIN, data must move between workers. This process is called Shuffling.For example, to count sales by region_id, all rows belonging to region_id=1 must end up on the same worker node to be counted. Since the raw files are likely scattered randomly, workers essentially exchange data over the cluster network.The time complexity of a query is roughly defined by the volume of data processed divided by the number of parallel resources, plus the overhead of network exchange:$$T_{total} \approx \frac{V_{data}}{N_{workers}} + T_{network_shuffle} + T_{latency}$$In data lake architectures, $T_{latency}$ (listing files and establishing HTTP connections to S3) and $T_{network_shuffle}$ often dominate the execution time.In-Memory vs. Spill-to-Disk EnginesWhile most distributed engines share the architecture described above, they differ in how they handle memory constraints.Interactive Engines (e.g., Trino, Presto): These engines are designed for speed. They attempt to keep all intermediate data in memory. If a query requires more memory than the cluster has available (for example, a massive hash join), the query will fail with an "Out of Memory" (OOM) error. This makes them excellent for ad-hoc analysis and dashboarding but less resilient for heavy ETL jobs.Batch Engines (e.g., Apache Spark): Spark is designed for resilience and throughput. If the memory fills up during a large join or sort, Spark will "spill" the data to the local disk of the worker node. The query will continue to run, albeit slower, rather than failing. This makes Spark the preferred choice for heavy data engineering pipelines and ingestion jobs described in Chapter 3.Optimizing for the EngineBecause the engine communicates with storage over a network, reducing the amount of data transferred is the most effective optimization technique.Predicate Pushdown: The engine "pushes" the filter conditions (the WHERE clause) down to the lowest level possible. If using formats like Parquet, the worker can check the file footer statistics and skip the file entirely if the data range doesn't match the filter.Column Pruning: Since most data lake formats are columnar, the engine only requests the specific byte ranges for the columns explicitly selected in the SQL query. SELECT * is significantly more expensive than SELECT id, value because it forces the worker to read and transfer every column from object storage.By understanding that the query engine is a distributed system orchestrating network requests and memory buffers, you can write SQL that aligns with the engine's strengths, specifically by filtering early and selecting only necessary columns.