Feature engineering transforms raw data into formats suitable for machine learning algorithms. This often involves SQL queries over static partitions of a data warehouse when operating in batch processing. However, these static partitions introduce significant latency. By the time a feature vector is computed, the user context may have changed, rendering the prediction irrelevant. Online feature generation moves this computation to the stream processing layer, calculating feature values as events arrive to ensure the model input reflects the immediate state of the environment.Stateful Stream AggregationsThe primary mechanism for generating features on data streams is the stateful aggregation. Unlike stateless transformations such as filtering or mapping, feature generation requires context. To calculate the "average transaction value over the last hour" or "number of clicks in the past 5 minutes," the processor must retain history.In Apache Flink, we achieve this through windowing logic combined with keyed state. The raw stream is partitioned by a logical entity, such as a user_id or device_id, using the keyBy operator. This ensures that all events for a specific entity are routed to the same parallel task slot, allowing local state access without network shuffling during the window accumulation phase.digraph G { rankdir=LR; node [shape=box, style=filled, fontname="Helvetica", fontsize=12]; edge [fontname="Helvetica", fontsize=10]; subgraph cluster_0 { label=""; style=invis; RawStream [label="Raw Event Stream", fillcolor="#a5d8ff", color="#1c7ed6"]; KeyBy [label="KeyBy(User ID)", fillcolor="#e9ecef", color="#adb5bd"]; Window [label="Sliding Window\n(1hr, 5min slide)", fillcolor="#b2f2bb", color="#37b24d"]; State [label="Managed State\n(RocksDB)", shape=cylinder, fillcolor="#ffe066", color="#f59f00"]; FeatureVector [label="Feature Vector", fillcolor="#a5d8ff", color="#1c7ed6"]; } RawStream -> KeyBy; KeyBy -> Window; Window -> State [dir=both, label=" Read/Write"]; Window -> FeatureVector [label=" Emit Update"]; }Data flow for a window aggregation. Events are sharded by key, processed against local state, and emitted as updated feature vectors.Optimizing with Incremental AggregationsA common performance bottleneck in feature generation arises from buffering raw events. If a window spans 24 hours and contains millions of events, storing all payloads in the state backend until the window triggers is inefficient. It increases memory pressure and causes spikes in CPU usage during the evaluation phase.To mitigate this, we utilize incremental aggregation. Instead of storing the events, Flink updates a compact accumulator state. For a simple average, the state consists only of the running sum and the count. When a new event $x_t$ arrives, we update the state immediately:$$ S_t = S_{t-1} + x_t $$ $$ N_t = N_{t-1} + 1 $$When the window closes or triggers, the feature is computed simply as $S_t / N_t$. This approach ensures that the storage complexity remains $O(1)$ regardless of the number of events in the window.In Flink, this is implemented using the AggregateFunction interface. This interface requires defining an accumulator, an add method, and a result retrieval method. The merge method is also critical for session windows where multiple partial aggregates might need to combine.Sliding Windows and Hop PerformanceSliding windows are ubiquitous in feature engineering. A feature defined as "count of login failures in the last 15 minutes, updated every minute" implies a window size of 15 minutes and a slide (hop) of 1 minute.While simply, sliding windows can be computationally expensive if not tuned. A single event belongs to $Size / Slide$ windows. For a 1-hour window sliding every minute, every event belongs to 60 overlapping windows. If Flink were to duplicate the event for each window bucket, the state size would explode.Flink optimizes this by slicing the stream into "panes" based on the greatest common divisor of the window and slide interval. However, for high-throughput feature generation, it is often more efficient to use an Exponential Moving Average (EMA). The EMA places greater weight on recent data points and decays older information mathematically, eliminating the need for strict window boundaries and buffer management.The recursive formula for EMA is:$$ EMA_t = \alpha \cdot x_t + (1 - \alpha) \cdot EMA_{t-1} $$Here, $\alpha$ represents the smoothing factor, where $0 < \alpha < 1$. A higher $\alpha$ discounts older observations faster. This calculation requires storing only the previous $EMA$ value, making it highly efficient for low-latency feature pipelines.Handling Time Skew and WatermarksReal-time features depend heavily on the correctness of time. If network issues delay an event, it might arrive after the window for its timestamp has theoretically closed. In fraud detection, processing a transaction out of order could lead to a feature vector that misses a critical signal, resulting in a false negative.Flink handles this via Watermarks. A watermark is a mechanism that flows with the stream and declares that no more events with a timestamp less than $T$ will arrive. You must configure the watermark generation strategy to balance latency and completeness.Strict/Low Latency: Watermarks are generated tightly behind the max timestamp. This reduces the delay in emitting features but increases the risk of dropping late data.Accommodating/High Completeness: Watermarks lag significantly behind the max timestamp. This allows late events to be included in the aggregation but delays the availability of the feature.For online inference, we often cannot afford to wait for late data. The standard pattern is to configure a short allowed lateness period and redirect any extremely late data to a side output for monitoring, ensuring the main feature pipeline remains low-latency.Dealing with High CardinalityWhen generating features for millions of users, the state backend becomes the limiting factor. Storing feature state on the Java Heap is rarely feasible due to Garbage Collection pauses. Instead, we configure RocksDB as the state backend. RocksDB stores state on the local disk (SSD) and relies on off-heap memory for caching.However, RocksDB serialization adds overhead. Every state access requires serializing the key and value to bytes. To optimize this:Use primitive arrays or specialized serialization formats (like Protobuf) instead of heavy Java objects for accumulators.Enable incremental checkpointing to reduce the I/O bandwidth required to persist state snapshots to distributed storage (S3/HDFS).Monitor the state.backend.rocksdb.block-cache-usage metric. If the cache is too small, read amplification will occur as the system frequently fetches data from disk.The following chart illustrates the latency trade-off when interacting with disk-based state versus in-memory computation as throughput scales.{ "layout": { "title": "Latency vs. Throughput by State Backend", "xaxis": { "title": "Throughput (Events/sec)", "showgrid": true, "gridcolor": "#dee2e6" }, "yaxis": { "title": "99th Percentile Latency (ms)", "showgrid": true, "gridcolor": "#dee2e6" }, "plot_bgcolor": "white", "font": { "family": "Helvetica" } }, "data": [ { "x": [1000, 5000, 10000, 20000, 50000], "y": [2, 3, 5, 15, 45], "type": "scatter", "mode": "lines+markers", "name": "Heap (Memory)", "line": {"color": "#4dabf7", "width": 3} }, { "x": [1000, 5000, 10000, 20000, 50000], "y": [5, 8, 12, 18, 25], "type": "scatter", "mode": "lines+markers", "name": "RocksDB (SSD)", "line": {"color": "#fa5252", "width": 3} } ] }Comparison of latency characteristics. Heap state offers lower initial latency but degrades rapidly under memory pressure (GC), while RocksDB maintains stable performance at higher throughputs.Feature Encoding in StreamsBeyond numerical aggregations, models often require categorical features. In batch systems, techniques like One-Hot Encoding or Target Encoding are applied using the full vocabulary of the dataset. In streams, the vocabulary is unbounded and evolving.For One-Hot Encoding, maintaining a dynamic dictionary in state is dangerous due to unbounded growth. A common stream-friendly alternative is Feature Hashing (hashing trick). We hash the categorical string to a fixed integer space.$$ Index = hash(category_string) \pmod{Vector_Size} $$This collision-prone method trades accuracy for constant memory usage, which is a necessary compromise in streaming environments. Alternatively, for Target Encoding (replacing a category with the mean of the target variable), we use the same incremental aggregation logic described earlier: maintain a running sum of the target and a count for each category key, updating the map in real-time.By mastering these aggregation and state management techniques, you ensure that the feature vectors serving your AI models are both computationally efficient and temporally relevant.