Kafka Exiting Consumers and Poll Loop
Kafka : Exiting Consumer Poll Loop Safelyđź”—
1. The Problem: Infinite poll() Loopđź”—
A typical Kafka consumer continuously runs in an infinite loop like this:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
} finally {
consumer.close();
}
The consumer must poll() continuously to:
- Fetch new records,
- Send heartbeats to Kafka (so Kafka knows the consumer is alive), and
- Maintain its membership in the consumer group.
But this raises a question: How do you stop the consumer safely when you want to shut down your application?
If you just break the loop or kill the thread abruptly:
- You might lose messages that were fetched but not yet processed or committed.
- Kafka won’t immediately know this consumer is gone, so it will take a session timeout (typically 10 seconds or more) before Kafka rebalances partitions to another consumer.
2. The Challenge: poll() May Be Waitingđź”—
The poll() call can block for a certain duration (for example, up to the timeout specified in poll(Duration.ofMillis(x))).
So if you just try to stop the thread directly, it might still be waiting for poll() to return — meaning your shutdown could hang for several seconds.
You need a way to interrupt the poll safely.
3. The Solution: consumer.wakeup()đź”—
Kafka provides a special mechanism for this:
This is the only thread-safe method in the Kafka Consumer API.
You can safely call it from another thread, even while the main thread is blocked in poll().
What happens when wakeup() is calledđź”—
- If the consumer is currently waiting inside
poll(), the method immediately causespoll()to exit and throw aWakeupException. - If the consumer is not inside poll() at that moment, the next time
poll()is called, it will throwWakeupException.
This gives you a clean, predictable way to break out of the loop.
4. Using a Shutdown Hookđź”—
If your consumer runs in the main thread, you can use a Shutdown Hook (a special JVM mechanism that runs code before the process terminates).
Example:
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Detected shutdown, calling consumer.wakeup()...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
This means:
- When you stop your application (for example, by pressing
Ctrl+C), the shutdown hook runs in a separate thread. - That thread calls
consumer.wakeup()to interrupt the main thread’s poll loop.
5. Handling the WakeupExceptionđź”—
Once wakeup() is called, the consumer’s next poll() will throw a WakeupException.
You typically use this pattern:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
} catch (WakeupException e) {
// This is expected during shutdown, so no action needed.
} finally {
consumer.close();
System.out.println("Consumer closed.");
}
Key points:
- The
WakeupExceptionis not an error — it’s just Kafka’s way of telling you thatpoll()was interrupted. - You don’t need to log or rethrow it.
- But before you exit, you must call
consumer.close().
6. Why consumer.close() Is Importantđź”—
Calling close() ensures:
- Offsets are committed if
enable.auto.commit=trueor if you have uncommitted offsets and auto-commit on close is enabled. - The consumer sends a “leave group” message to the group coordinator.
- Kafka immediately triggers a rebalance, redistributing this consumer’s partitions to others in the same group.
If you skip close():
- Kafka will think the consumer might still be alive.
- It will wait for the session timeout to expire (default 10 seconds or longer) before rebalancing.
- This delays recovery and causes processing downtime for other consumers.
7. Summaryđź”—
| Step | Action | Purpose |
|---|---|---|
| 1 | Consumer runs an infinite poll() loop |
To fetch data and send heartbeats |
| 2 | Add a Shutdown Hook | To catch application termination |
| 3 | Call consumer.wakeup() from another thread |
Safely interrupt poll() |
| 4 | Catch WakeupException |
Exit the loop gracefully |
| 5 | Call consumer.close() |
Commit offsets and trigger rebalance |
8. Why This Mattersđź”—
A clean shutdown is essential in Kafka because:
- It prevents data duplication or loss.
- It avoids unnecessary delays in rebalancing.
- It keeps your consumer group’s state consistent.
In production systems, this shutdown pattern is standard — you’ll find it in nearly all well-designed Kafka consumer implementations.