Maintaining a streaming pipeline is straightforward when the data structure remains static. The operational complexity spikes when business requirements dictate a change in the event structure while the system is processing thousands of events per second. You cannot simply stop the pipeline, flush the topics, and restart with a new schema. You must implement a strategy for schema evolution in flight.
This section executes a controlled schema upgrade using Apache Avro and the Confluent Schema Registry. We will simulate a common production scenario: adding a new dimension to an analytics event without breaking existing consumers or requiring a hard stop of the Flink cluster.
Before writing code, we must define the rules of engagement between producers and consumers. In a decoupled architecture, these components evolve independently. The Schema Registry enforces compatibility modes to prevent "poison pill" scenarios where a consumer crashes upon reading an unrecognized format.
For high-throughput systems, Backward Compatibility is the standard for upgrading consumers first.
The following diagram illustrates the data flow during a Backward Compatibility rollout.
The upgraded consumer (V2) interacts with the Schema Registry to project V1 records into the V2 structure, ensuring the application code never encounters a
ClassCastException.
We start with a simple Transaction event. In a workflow, this Avro definition resides in a .avsc file under src/main/resources/avro.
{
"type": "record",
"name": "Transaction",
"namespace": "com.pipeline.events",
"fields": [
{"name": "transaction_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
When this schema is registered, the Schema Registry assigns it a unique ID (e.g., ID 1). The Kafka Producer serializes the data and prepends this ID to the payload.
Marketing requires tracking the promotion_code associated with transactions. To add this field safely, we must account for the existing data in the topic that lacks this field.
If we add the field as mandatory (non-nullable), the V2 Consumer will fail when trying to read V1 data because the V1 data does not contain a promotion_code. To maintain Backward Compatibility, we must assign a default value.
Here is the evolved V2 schema:
{
"type": "record",
"name": "Transaction",
"namespace": "com.pipeline.events",
"fields": [
{"name": "transaction_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"},
{
"name": "promotion_code",
"type": ["null", "string"],
"default": null
}
]
}
The critical addition is "default": null. This instructs the Avro deserializer: "If you encounter a record that is missing promotion_code (i.e., a V1 record), fill the gap with null."
In Flink, simply regenerating the Java class from the Avro definition is insufficient. You must configure the deserializer to strictly adhere to the specific class structure generated by the V2 schema.
When using the FlinkKafkaConsumer, specific configurations are required to enable schema evolution support.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "broker1:9092");
props.setProperty("group.id", "transaction-processor");
// Configuration for Schema Evolution
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://registry:8081");
KafkaAvroDeserializationSchema<Transaction> deserializer =
new KafkaAvroDeserializationSchema<>(
Transaction.class
);
FlinkKafkaConsumer<Transaction> consumer = new FlinkKafkaConsumer<>(
"transactions",
deserializer,
props
);
The setting SPECIFIC_AVRO_READER_CONFIG set to true is the mechanism that forces the deserializer to project the writer schema (whatever version is in the message) onto the reader schema (the V2 class compiled into the Flink jar). Without this, the consumer acts as a generic reader and may not apply default values correctly, leading to runtime errors.
To verify the stability of the pipeline during the upgrade, we monitor the deserialization error rate. A successful rollout is characterized by zero serialization exceptions during the transition period where both V1 and V2 schemas coexist in the stream.
The mathematical expectation for data availability during the transition, where is the total number of records and is the number of deserialization errors, should be:
If , the compatibility contract has been violated.
We can visualize the upgrade process timeline. The goal is to verify that at no point does the consumer lag increase uncontrollably due to schema mismatches.
The throughput remains constant during the upgrade of both consumers (minute 10) and producers (minute 20), indicating a successful schema evolution with backward compatibility.
If you attempt to register a schema that violates the compatibility mode (e.g., deleting a mandatory field in the producer while the consumer still expects it), the Schema Registry will reject the registration request with a 409 Conflict.
To prevent this from happening during deployment, you should run a compatibility check in your CI/CD pipeline using the Schema Registry Maven plugin or REST API before building the artifact.
# Example using Schema Registry API to check compatibility before deployment
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", ...}"}' \
http://registry:8081/compatibility/subjects/transactions-value/versions/latest
If the response is {"is_compatible": true}, you proceed. If false, the pipeline halts, protecting the production environment from corruption.
By adhering to a strict Backward Compatibility strategy and utilizing the default field in Avro, you decouple the lifecycle of your producers and consumers. This allows your Flink applications to adapt to changing business logic without downtime or data reprocessing.
Was this section helpful?
© 2026 ApX Machine LearningAI Ethics & Transparency•