Skip to content

Kafka Request Processing Part 1๐Ÿ”—


1. High-level overview โ€” Kafka as a request/response server๐Ÿ”—

Kafka brokers are fundamentally requestโ€“response servers built on top of a binary TCP protocol. Every action that clients (producers, consumers, admin tools, or other brokers for replication) perform โ€” producing messages, fetching messages, creating topics, fetching metadata, etc. โ€” happens through this protocol.

  • Clients always initiate connections (brokers never call clients).
  • Requests flow in one direction: client โ†’ broker.
  • Responses flow back: broker โ†’ client.

This simple design โ€” clients initiate, brokers respond โ€” allows Kafka to scale horizontally with predictable connection behavior.


2. Kafka binary protocol๐Ÿ”—

Kafka has its own binary protocol defined over TCP. It is versioned and explicitly documented in Kafkaโ€™s source code and official documentation. Every request follows a well-defined structure containing a header and a payload.

Request Header Components๐Ÿ”—

Field Description
API Key (Request Type) Identifies the operation โ€” e.g., Produce, Fetch, Metadata, OffsetCommit, FindCoordinator, etc.
API Version Indicates the version of the API so brokers can interact correctly with older/newer clients. This enables forward and backward compatibility.
Correlation ID A unique integer assigned by the client per request. The broker includes this same ID in the response. Used by clients (and humans reading logs) to match requests and responses for debugging.
Client ID A string identifying the client application โ€” often useful for logging, quotas, and metrics. For example, a Spark jobโ€™s Kafka source might use spark-streaming-consumer as the client ID.

Every request type has its own payload schema (for example, a ProduceRequest has topic, partition, and message batch data). Kafka serializes and deserializes these messages efficiently using its internal protocol definitions.


3. Processing order and ordering guarantees๐Ÿ”—

Kafka guarantees that:

All requests sent over a single TCP connection are processed in the order they were received.

This is extremely important because:

  • It ensures message ordering per partition (since producers send messages to one leader partition in order).
  • It prevents reordering caused by concurrent network delivery.
  • If a client sends multiple produce or fetch requests, Kafka processes them serially per connection.

This means: within a single connection, request n will always be fully processed before request n+1.


4. Broker threading model โ€” acceptors, processors, and handlers๐Ÿ”—

A Kafka broker is built around a multi-threaded, event-driven network model. It manages potentially tens of thousands of simultaneous client connections, so efficient concurrency is critical.

Hereโ€™s the workflow for every connection:

Step 1: Acceptor Thread๐Ÿ”—

  • Each Kafka broker port (by default 9092 for plaintext, or 9093 for SSL) has an acceptor thread.
  • The acceptor listens for new TCP connections using the Java NIO (non-blocking I/O) framework.
  • When a new client connection arrives, the acceptor:

  • Accepts it,

  • Configures it for non-blocking mode,
  • Hands it over to one of the network processor threads for further handling.

Step 2: Network Processor Threads (a.k.a. Network Threads)๐Ÿ”—

  • These are responsible for:

  • Reading data from client sockets,

  • Parsing Kafka requests,
  • Enqueuing the parsed requests into a request queue,
  • Taking completed responses from a response queue and writing them back to the sockets.

You can control how many network threads are created using:

num.network.threads=3   # Default is usually 3

This is often increased on brokers that handle heavy network I/O (for example, many producers/consumers).

Each processor thread can handle many connections using non-blocking I/O multiplexing.


5. Internal queues: Request and Response Queues๐Ÿ”—

Kafka uses two key in-memory queues per broker:

  1. Request Queue
  2. Response Queue

Request Queue๐Ÿ”—

  • All incoming parsed requests go here.
  • Requests in this queue are picked up by I/O handler threads (a.k.a. I/O threads or KafkaRequestHandler threads).
  • Each handler thread processes the request (for example, appending messages to a log, reading data, updating offsets).
  • Number of handler threads is configurable:

num.io.threads=8
* Each handler thread executes business logic depending on the request type.

Response Queue๐Ÿ”—

  • Once processing is done, a handler thread places a response object into the response queue.
  • The network processor picks up this response and sends it back to the client.
  • The response contains the correlation ID, allowing the client to map the reply to its original request.

6. Delayed responses and โ€œpurgatoriesโ€๐Ÿ”—

Not every request can be immediately completed. Kafka has a mechanism called purgatory โ€” a waiting area for requests that must be delayed until certain conditions are met.

Example scenarios:๐Ÿ”—

  1. Consumer Fetch Requests

  2. If a consumer issues a fetch and the partition has no new data yet, the broker does not respond immediately.

  3. Instead, it holds the fetch request in purgatory until:

    • New data arrives, or
    • A timeout (e.g., fetch.max.wait.ms) expires.
    • This long-polling mechanism reduces unnecessary network churn and improves efficiency.
  4. Producer Acknowledgments

  5. When a producer sends data with acks=all, the broker only responds once all in-sync replicas have confirmed receipt.

  6. The request remains in purgatory until this replication condition is met.

  7. Admin Requests

  8. Example: DeleteTopics request.

  9. Topic deletion is asynchronous โ€” the request is acknowledged only once deletion is initiated or completed. Until then, it may wait in purgatory.

Purgatory management:๐Ÿ”—

  • Kafka maintains several purgatories for different request types (e.g., one for produce, one for fetch).
  • Internally, these are managed as time-indexed data structures that efficiently wake up waiting requests when conditions are satisfied.
  • Requests in purgatory are tracked by keys like topic-partition identifiers or completion conditions.

7. Error handling and correlation IDs๐Ÿ”—

Every broker response (success or failure) includes:

  • The same correlation ID as the request,
  • A response code (error code or success indicator),
  • Possibly an error message.

This enables:

  • Clients to match responses to requests asynchronously,
  • Operators to trace specific requests in logs using correlation IDs,
  • Brokers to log meaningful error lines (e.g., "Request 1557 from client=producer_app failed with NOT_LEADER_FOR_PARTITION").

8. Putting it all together (end-to-end flow)๐Ÿ”—

Letโ€™s trace a Produce request step by step:

  1. Producer Client sends a ProduceRequest to the leader broker for a given partition over a TCP connection.
  2. Brokerโ€™s Acceptor Thread accepts the connection (if new) and assigns it to a Network Processor.
  3. Network Processor Thread reads the bytes, parses the request, and enqueues it into the Request Queue.
  4. IO Handler Thread picks the request from the queue, appends the messages to the commit log on disk, triggers replication to followers, and waits for acknowledgment if required.
  5. If the producer used acks=all, the request waits in ProduceRequestPurgatory until all ISR replicas have replicated the message.
  6. Once the condition is met, the response is enqueued in the Response Queue.
  7. Network Processor Thread dequeues the response and writes it to the producerโ€™s TCP connection.
  8. The Producer Client receives the response (with matching correlation ID) and marks the batch as successfully acknowledged.

This process happens thousands of times per second per broker across thousands of connections.


9. Configuration summary (key performance knobs)๐Ÿ”—

Parameter Purpose
num.network.threads Number of threads handling network I/O (socket read/write).
num.io.threads Number of threads processing the business logic of requests.
queued.max.requests Maximum number of requests that can be queued at once before throttling new ones.
replica.fetch.max.bytes Max data size per fetch request for replication.
fetch.max.wait.ms Maximum wait time for fetch requests (affects consumer purgatory).

10. Why this architecture matters๐Ÿ”—

Kafkaโ€™s broker threading and request queue model:

  • Enables high throughput (hundreds of thousands of requests per second),
  • Ensures ordering and consistency per connection,
  • Supports long-polling and asynchronous operations efficiently,
  • Allows for fault isolation โ€” network I/O, request processing, and delayed response management are handled by distinct thread pools,
  • Provides clear metrics: each queue, thread pool, and purgatory exposes metrics that are vital for monitoring (e.g., queue sizes, response times, request rates).

When you later monitor Kafka (via JMX or Prometheus), youโ€™ll see metrics like:

  • RequestQueueSize
  • ResponseQueueSize
  • RequestHandlerAvgIdlePercent
  • NetworkProcessorAvgIdlePercent
  • ProduceRequestPurgatorySize
  • FetchRequestPurgatorySize

These correspond exactly to the architectural components described above.


In summary๐Ÿ”—

  • Kafka brokers are TCP servers that handle structured binary requests from clients, other brokers, and the controller.
  • Requests are processed strictly in order per connection.
  • The threading model (acceptor โ†’ network โ†’ handler) ensures scalability and concurrency.
  • Internal queues decouple I/O from processing.
  • โ€œPurgatoriesโ€ efficiently handle delayed operations like long polling and replication acknowledgment.
  • Configuration parameters and metrics directly map to these internal components.

alt text

Letโ€™s unpack that passage thoroughly and explain how Kafka clients discover partition leaders, how metadata management works, and how the brokerโ€“client interaction ensures that produce and fetch requests always go to the right leader.

This explanation builds on the previous section about Kafkaโ€™s internal threading model and request queues, focusing now on what happens inside those requests, how clients choose where to send them, and how metadata refreshes maintain correctness.


1. The three main request types handled by Kafka brokers๐Ÿ”—

Once I/O (request handler) threads pick up requests from the request queue, they process different kinds of client operations. The three dominant categories are:

1. Produce Requests๐Ÿ”—

  • Sent by producers (e.g., Java/Python/Go clients).
  • Contain batches of records (messages) to be appended to a specific topic partition.
  • Must be sent to the leader replica of that partition.
  • The broker:

  • Validates the request,

  • Appends the data to its local log,
  • Waits for acknowledgments from ISR replicas (depending on acks setting),
  • Sends a response back (success or error).

2. Fetch Requests๐Ÿ”—

  • Sent by consumers and follower replicas.
  • Consumers use them to read messages.
  • Followers use them to replicate messages from the leader.
  • Fetch requests must also go to the leader broker for a partition.

3. Admin Requests๐Ÿ”—

  • Sent by administrative clients (e.g., using Kafka Admin API).
  • Examples:

  • CreateTopics

  • DeleteTopics
  • DescribeCluster
  • ListOffsets
  • AlterConfigs
  • These can be sent to any broker, because admin requests are metadata-oriented and do not depend on specific partition leadership.

2. Why produce and fetch requests must go to the leader๐Ÿ”—

Each Kafka partition has:

  • One leader replica (which handles all reads/writes),
  • Zero or more follower replicas (which replicate data asynchronously).

Kafka enforces single-leader semantics per partition to maintain ordering and consistency:

  • The leader is the authoritative source for appending messages.
  • Producers canโ€™t write to followers, and consumers canโ€™t fetch from followers (unless fetch.from.follower is explicitly supported, as in some special configurations).

If a client sends a request to a broker that is not the leader, the broker immediately rejects it with the error:

NOT_LEADER_FOR_PARTITION

or in newer Kafka versions:

NOT_LEADER_OR_FOLLOWER

This tells the client:

โ€œYouโ€™ve sent this request to the wrong broker โ€” the partitionโ€™s leader has changed.โ€


3. How clients know where the leader is โ€” metadata requests๐Ÿ”—

This is where the Metadata API comes into play.

Metadata Request๐Ÿ”—

  • A client sends a MetadataRequest to any broker in the cluster.
  • The request lists one or more topics that the client cares about.
  • The broker responds with a metadata map describing:

  • Each topic,

  • Each partition within that topic,
  • The broker IDs and endpoints for all replicas of that partition,
  • Which broker currently acts as leader for each partition.

Example: MetadataResponse๐Ÿ”—

Topic Partition Leader Replicas ISR
orders 0 Broker 101 101, 102, 103 101, 103
orders 1 Broker 102 101, 102, 103 102, 103
orders 2 Broker 103 101, 102, 103 101, 102, 103

From this, the client learns:

  • If it wants to send messages to orders-0, it must connect to broker 101.
  • If it wants to fetch from orders-1, it must contact broker 102.

4. The brokerโ€™s role in serving metadata๐Ÿ”—

Every Kafka broker maintains a metadata cache that is constantly updated by the controller broker.

  • The controller (a special broker elected via ZooKeeper or KRaft, depending on Kafka mode) manages cluster metadata: who is leader for each partition, which brokers are alive, etc.
  • Whenever leadership changes (e.g., due to broker failure), the controller broadcasts metadata updates to all brokers.
  • Therefore, any broker can respond to a metadata request โ€” not just the controller.

This is why:

Clients can send metadata requests to any broker in the cluster.


5. Client caching and metadata refreshes๐Ÿ”—

Client-side metadata cache๐Ÿ”—

  • After receiving metadata, Kafka clients cache it locally.
  • The cache maps:

{ topic โ†’ [partition โ†’ leader_broker_id] }
* This allows producers and consumers to route requests directly to the correct leader broker without asking the cluster each time.

Refresh intervals๐Ÿ”—

The clientโ€™s cached metadata can become stale over time, especially if:

  • Brokers are added or removed,
  • Partitions are rebalanced,
  • A leader fails and a new one is elected.

To handle this, clients automatically refresh metadata periodically.

Controlled by:

metadata.max.age.ms=300000  # default = 5 minutes

Meaning:

  • Every 5 minutes (by default), the client will re-fetch metadata proactively.
  • This keeps the routing information current even if no errors occur.

Error-triggered refresh๐Ÿ”—

If a client receives NOT_LEADER_FOR_PARTITION or UNKNOWN_TOPIC_OR_PARTITION, it immediately triggers a forced metadata refresh before retrying the request.

This reactive behavior ensures that clients recover quickly from leadership changes.


6. Typical produce/fetch cycle with metadata lookups๐Ÿ”—

Example workflow โ€” Producer๐Ÿ”—

  1. The producer starts up and sends a MetadataRequest for topic transactions.
  2. Broker responds with:

Partition 0 โ†’ Leader = Broker 1
Partition 1 โ†’ Leader = Broker 2
3. Producer caches this mapping:

transactions-0 โ†’ Broker 1
transactions-1 โ†’ Broker 2
4. Producer sends ProduceRequest batches to the appropriate brokers. 5. Later, if broker 2 goes down and broker 3 becomes leader for transactions-1, producerโ€™s next produce to broker 2 fails with:

NOT_LEADER_FOR_PARTITION
6. Producer triggers a metadata refresh, updates its cache, and resends the request to broker 3. 7. Processing continues seamlessly after a brief retry delay.

Example workflow โ€” Consumer๐Ÿ”—

  1. Consumer subscribes to topic transactions.
  2. Sends a MetadataRequest to any broker.
  3. Receives mapping of partitions to leaders.
  4. Connects to leader brokers directly and sends FetchRequests.
  5. If a broker fails and leadership changes, the consumer detects fetch errors โ†’ refreshes metadata โ†’ reconnects.

7. Configuration summary (client-side)๐Ÿ”—

Parameter Description
metadata.max.age.ms Maximum time before client automatically refreshes metadata (default 5 minutes).
reconnect.backoff.ms Time to wait before reconnecting to a failed broker.
retry.backoff.ms Time to wait before retrying a failed produce/fetch request.
max.in.flight.requests.per.connection Number of unacknowledged requests allowed per connection; preserves ordering when โ‰ค1.
bootstrap.servers Initial list of brokers used to obtain metadata. Only used for discovery; clients may later connect elsewhere.

8. Internal resiliency and efficiency๐Ÿ”—

This metadata-driven routing model provides several advantages:

Efficiency๐Ÿ”—

  • Clients communicate directly with the brokers that own the data they need. Thereโ€™s no central proxy or router.
  • Reduces network hops and central bottlenecks.

Resiliency๐Ÿ”—

  • Clients automatically recover from leadership changes by re-fetching metadata.
  • No single broker failure halts all operations because metadata is available cluster-wide.

Scalability๐Ÿ”—

  • Each broker only handles produce/fetch requests for the partitions it leads, distributing load evenly across the cluster.

9. Putting it all together โ€” end-to-end summary๐Ÿ”—

  1. Clients initiate requests over TCP using Kafkaโ€™s binary protocol.
  2. Brokers accept connections, enqueue requests, and process them via I/O threads.
  3. Produce and Fetch requests must go to the leader broker for each partition.
  4. Metadata requests provide clients with up-to-date partition-to-leader mappings.
  5. Clients cache this metadata and refresh it periodically or when errors indicate stale information.
  6. If a broker leadership changes, clients retry the request after refreshing metadata.
  7. All brokers can answer metadata requests because they maintain an updated metadata cache synced from the controller.

10. Key takeaway๐Ÿ”—

Kafkaโ€™s metadata-driven client routing is what enables the system to be:

  • Decentralized (no single broker handles all traffic),
  • Highly available (clients self-heal after leader changes),
  • Efficient (requests go straight to the right broker),
  • Ordered and consistent (per-partition request routing ensures proper sequencing).