Kafka Serializers Avro Pt1
Serializing with Apache Avro๐
๐งฉ What is Avro?๐
Apache Avro is a data serialization system โ meaning it defines how data is structured, stored, and transmitted between systems in a compact and efficient way.
It was created by Doug Cutting (also the creator of Hadoop and Lucene) to solve a common problem in distributed systems:
โHow do you exchange complex structured data between systems written in different languages โ without losing type information or causing compatibility issues?โ
โ๏ธ How Avro Works๐
At its core, Avro works with two components:
- Schema โ Defines the structure (fields, types, defaults) of your data.
- Data โ The actual binary or JSON-encoded data following that schema.
So Avro separates โwhat the data looks likeโ (schema) from โwhat the data isโ (values).
๐ Avro Schema Example (in JSON)๐
Hereโs how a simple Avro schema looks:
{
"type": "record",
"name": "Employee",
"namespace": "com.company",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "department", "type": ["null", "string"], "default": null}
]
}
This defines a record with three fields:
id
(integer)name
(string)department
(optional string โ can be null)
๐พ Serialization and Deserialization๐
- Serialization: Converting data (e.g., a Python object) into Avro binary using the schema.
- Deserialization: Reading Avro binary data back into a usable form using the same or compatible schema.
Because the schema is embedded in Avro files, they are self-describing โ any system can read them if it supports Avro.
๐ง Why Avro Is Special๐
Here are Avroโs key advantages:
Feature | Description |
---|---|
๐๏ธ Schema-based | Data has a well-defined structure stored in JSON format. |
โก Compact Binary Format | Binary encoding reduces file size and improves I/O performance. |
๐ Schema Evolution | You can change schemas over time (add, rename, remove fields) without breaking existing consumers. |
๐ Language Neutral | Works with many languages (Java, Python, C++, Go, etc.). |
๐ฌ Great for Messaging Systems | Used in Kafka, Redpanda, and Schema Registry setups. |
๐งฑ Splittable and Compressible | Ideal for big data systems (Hadoop, Spark, Hive). |
๐ Schema Evolution in Avro๐
This is the most powerful part โ and the reason Avro is heavily used in Kafka.
Suppose your producerโs old schema was:
Now you update it to:
{
"type": "record",
"name": "Employee",
"fields": [
{"name": "id", "type": "int"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
โ Avro allows backward and forward compatibility:
- Backward-compatible: New consumers can read old data.
- Forward-compatible: Old consumers can read new data (using defaults for new fields).
Thatโs why Kafka uses Avro with a Schema Registry โ to ensure producers and consumers can evolve independently.
๐งฐ Where Avro is Commonly Used๐
Use Case | Description |
---|---|
๐ชฃ Data Lakes (HDFS, S3) | Store schema-defined data for Spark/Hive. |
๐งต Kafka Messaging | Producers publish Avro messages, Schema Registry keeps schema versions. |
๐งฌ ETL Pipelines | Efficient and schema-safe data transfer between stages. |
๐งฎ Analytics | Compact binary format makes large datasets efficient to query. |
๐ Avro vs. Other Formats๐
Feature | Avro | JSON | Parquet | ORC |
---|---|---|---|---|
Storage Type | Row-based | Text | Columnar | Columnar |
Schema | Yes (JSON-defined) | No | Yes | Yes |
Compression | Yes | Poor | Excellent | Excellent |
Best For | Streaming, Kafka | Debugging, APIs | Analytics, OLAP | Analytics, OLAP |
Human Readable | No | Yes | No | No |
So Avro is ideal for data interchange (Kafka, APIs, log pipelines), while Parquet/ORC are better for data storage and querying.
๐งฉ Summary๐
Apache Avro = Schema + Compact Binary + Evolution Support Itโs designed for:
- Cross-language data exchange
- Streaming and message serialization
- Schema evolution without breaking systems
And thatโs why itโs a favorite in modern data pipelines, especially with Kafka, Redpanda, Spark, and Schema Registry.
Caveats While Using Avro๐
๐งฉ Caveat 1:๐
โThe schema used for writing the data and the schema expected by the reading application must be compatible.โ
๐ What this means๐
When Avro serializes (writes) and deserializes (reads) data, both sides must agree on the structure of the data โ at least enough to interpret the fields correctly.
If the producer (writer) and consumer (reader) use incompatible schemas, the data canโt be decoded properly.
๐ง Example๐
Writerโs Schema (v1)๐
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}
Readerโs Schema (v2)๐
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "full_name", "type": "string"}
]
}
Here, the field name changed from "name"
โ "full_name"
.
๐ฅ Result: Not compatible โ Avro wonโt know how to map "name"
to "full_name"
.
The reader will fail to interpret the data.
โ Compatible Evolution Example๐
If we add a new optional field, Avro can handle that gracefully.
Writerโs Schema (old)๐
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}
Readerโs Schema (new)๐
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
๐ข Compatible โ The new field email
has a default value, so old data still works fine.
โ๏ธ Compatibility Rules (Simplified)๐
Avro defines several compatibility types:
Compatibility Type | Meaning |
---|---|
Backward | New readers can read old data. |
Forward | Old readers can read new data. |
Full | Both backward + forward. |
Breaking | Schema changes that break both directions (e.g. removing required fields). |
You can configure which rule to enforce in systems like the Confluent Schema Registry.
๐งฉ Caveat 2:๐
โThe deserializer will need access to the schema that was used when writing the data, even when it is different from the schema expected by the application that accesses the data.โ
๐ What this means๐
To read Avro data, you always need:
- The writerโs schema (used when data was created)
- The readerโs schema (used by your application)
Avro uses both together to resolve differences.
If your application only knows its own schema (reader schema) but not the original one, it canโt interpret the binary data โ because Avro binary doesnโt include field names, just encoded values.
๐ง Analogy๐
Think of Avro data as a compressed shopping list:
- The writerโs schema says the order of items: 1๏ธโฃ "Milk" โ string 2๏ธโฃ "Eggs" โ int
- The data only stores the values:
"Amul", 12
If the reader doesnโt know the original order (schema), it canโt tell which value corresponds to which field.
๐งฉ Caveat 3:๐
โIn Avro files, the writing schema is included in the file itself, but there is a better way to handle this for Kafka messages.โ
๐ What this means๐
When Avro is used for files (like on HDFS or S3), the schema is embedded inside the file header. Thatโs fine for static data โ every file carries its own schema.
But in Kafka, embedding the full schema inside every message would be inefficient:
- Each message might be just a few KB,
- But the schema (JSON) could be several hundred bytes.
That would cause massive duplication and network overhead.
๐ง The โBetter Wayโ (Hinted at in the text)๐
Instead of embedding schemas in every Kafka message, systems like Confluent Schema Registry store schemas centrally.
Hereโs how it works:
-
When a producer sends a message:
-
It registers its schema once with the Schema Registry.
- It gets a schema ID (like
42
). -
It sends the binary Avro data prefixed with that schema ID.
-
When a consumer reads a message:
-
It looks up schema ID
42
in the Schema Registry. - It retrieves the writerโs schema and deserializes the message correctly.
This way:
- โ Small message sizes
- โ Centralized schema management
- โ Schema evolution tracking
๐๏ธ Summary Table๐
Concept | In Avro Files | In Kafka Messages |
---|---|---|
Where is schema stored? | Embedded inside file | Stored in Schema Registry |
Message overhead | Higher (includes schema) | Lower (schema ID only) |
Schema evolution | File-based | Centrally managed (versioned) |
Typical use case | Batch systems (HDFS, S3, Hive) | Streaming systems (Kafka, Redpanda) |
๐งฉ In Short๐
Caveat | Meaning | Solution |
---|---|---|
1๏ธโฃ Writer/Reader schema must be compatible | Data wonโt deserialize if incompatible | Follow Avro compatibility rules |
2๏ธโฃ Reader needs writerโs schema | Required to decode binary data | Include schema in file or fetch from registry |
3๏ธโฃ Donโt embed schema per message in Kafka | Wasteful and redundant | Use Schema Registry (stores schema IDs) |
Demo Producer Code with Schema Registry in Python๐
from confluent_kafka.avro import AvroProducer
import json
# Avro schema as JSON string or loaded from file
user_schema_str = """
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
"""
# Kafka + Schema Registry configuration (example placeholders)
conf = {
'bootstrap.servers': 'kafka-broker1:9092,kafka-broker2:9092',
'schema.registry.url': 'http://schema-registry-host:8081',
# If SR needs auth, add 'schema.registry.basic.auth.user.info': 'user:password'
}
# The AvroProducer takes the schema used for the value (and optionally key schema)
producer = AvroProducer(conf, default_value_schema=json.loads(user_schema_str))
topic = "users-avro"
def produce_user(user_obj):
# key_schema can also be provided; here we use string keys
key = str(user_obj["id"])
producer.produce(topic=topic, value=user_obj, key=key)
producer.flush() # flush per message for demo; in production batch/async flush
if __name__ == "__main__":
user = {"id": 1, "name": "Alice", "email": "alice@example.com"}
produce_user(user)
print("Sent Avro message to", topic)
Demo Producer Code In Java๐
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
Producer<String, Customer> producer = new KafkaProducer<>(props);
// We keep producing new events until someone ctrl-c
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " +
customer.toString());
ProducerRecord<String, Customer> record =
new ProducerRecord<>(topic, customer.getName(), customer);
producer.send(record);
}
๐งฉ The Big Picture๐
This Java code is an Avro Kafka producer. It:
- Connects to Kafka.
- Uses Confluentโs Avro serializers.
- Generates Avro objects (
Customer
records). - Sends them continuously to a Kafka topic.
๐งฑ Step-by-step Explanation๐
1๏ธโฃ Create configuration properties๐
๐ Properties
is a Java Map
-like object that stores key-value configuration settings for the Kafka producer.
2๏ธโฃ Configure Kafka connection and serialization๐
๐น bootstrap.servers
โ tells the producer where Kafka brokers are located.
The producer will connect to this address to find the rest of the cluster.
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
๐น These lines configure how keys and values are converted into bytes before being sent over the network.
- Both the key and value use the Confluent Avro serializer.
-
The serializer:
-
Converts Avro objects (like
Customer
) into Avro binary format. - Registers the Avro schema with the Schema Registry.
- Sends the schema ID (a small integer) along with the serialized message.
So the consumer can later retrieve the schema and deserialize properly.
๐น This tells the serializer where the Schema Registry is (e.g., http://localhost:8081
).
The serializer uses this URL to:
- Check if the schema for
Customer
already exists in the registry. - Register it if not.
- Retrieve schema IDs for encoding messages.
3๏ธโฃ Create Kafka topic variable๐
Just defines the target Kafka topic to send messages to.
4๏ธโฃ Create the producer instance๐
๐น This creates the Kafka producer client.
-
Producer<K, V>
โ a generic class parameterized by key and value types. Here: -
K
=String
(customer name, used as key) V
=Customer
(the Avro object)
The producer will use the serializers defined earlier to encode these before sending.
5๏ธโฃ Generate and send Avro messages in a loop๐
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record =
new ProducerRecord<>(topic, customer.getName(), customer);
producer.send(record);
}
Letโs break this down line by line.
๐ง Customer customer = CustomerGenerator.getNext();
๐
This uses a helper class CustomerGenerator
(not shown here) that probably:
- Creates random or mock customer data.
- Returns a
Customer
object (which is an Avro-generated Java class).
๐จ๏ธ System.out.println(...)
๐
Just logs the generated customer to the console for visibility.
๐งพ ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getName(), customer);
๐
This creates the message record to be sent to Kafka.
- The topic:
"customerContacts"
. - The key:
customer.getName()
. - The value: the entire
Customer
Avro object.
Kafka uses the key to determine which partition the message goes to (same keys โ same partition).
๐ producer.send(record);
๐
This sends the record asynchronously to Kafka.
The Avro serializer will:
- Serialize
Customer
into binary Avro. - Register or look up the schema in Schema Registry.
- Encode the schema ID + binary payload.
- Send the message to the Kafka broker.
๐ while (true) {...}
๐
Runs indefinitely โ keeps producing customer messages until you stop it with Ctrl + C.
In a real-world app, you might:
- Add a
Thread.sleep()
for pacing, - Limit message count, or
- Gracefully close the producer with
producer.close()
.
๐งฉ What the Customer
class is๐
This class is likely generated automatically from an Avro schema using the Avro Java code generator.
For example, the Avro schema could look like:
{
"namespace": "example.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "phoneNumber", "type": ["null", "string"], "default": null}
]
}
After generating Java classes from this schema (avro-maven-plugin
or avro-tools
), you can use the Customer
object directly in code โ the Avro serializer knows how to handle it.
๐ง Summary Table๐
Line | What it does | Why it matters |
---|---|---|
bootstrap.servers |
Broker list | Connect to Kafka cluster |
key.serializer / value.serializer |
Avro serializers | Convert Java objects to bytes |
schema.registry.url |
Registry URL | Store and retrieve schemas |
KafkaProducer<>(props) |
Creates producer | Main client that talks to Kafka |
CustomerGenerator.getNext() |
Generates Avro object | Produces mock data |
new ProducerRecord<>(...) |
Wraps message | Defines topic, key, and value |
producer.send(record) |
Sends async | Pushes data to Kafka |
while (true) |
Infinite loop | Keeps producing |
โ๏ธ What Happens Behind the Scenes๐
- You produce a
Customer
Avro object. - Serializer looks up (or registers) its schema in Schema Registry.
- Schema Registry returns a unique schema ID.
- Producer encodes:
Customer
object.
๐งฉ In Simple Terms๐
This Java code:
Continuously generates random customer data, serializes each record in Avro format using Confluent Schema Registry, and sends it to a Kafka topic named
customerContacts
.
Customer Class is not a regular Plain Old Java Object but rather specialized one generated from a schema. We can generate these classes using avro-tools.jar or Maven plugin.