Skip to content

Kafka Exiting Consumers and Poll Loop

Kafka : Exiting Consumer Poll Loop Safelyđź”—


1. The Problem: Infinite poll() Loopđź”—

A typical Kafka consumer continuously runs in an infinite loop like this:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
    }
} finally {
    consumer.close();
}

The consumer must poll() continuously to:

  • Fetch new records,
  • Send heartbeats to Kafka (so Kafka knows the consumer is alive), and
  • Maintain its membership in the consumer group.

But this raises a question: How do you stop the consumer safely when you want to shut down your application?

If you just break the loop or kill the thread abruptly:

  • You might lose messages that were fetched but not yet processed or committed.
  • Kafka won’t immediately know this consumer is gone, so it will take a session timeout (typically 10 seconds or more) before Kafka rebalances partitions to another consumer.

2. The Challenge: poll() May Be Waitingđź”—

The poll() call can block for a certain duration (for example, up to the timeout specified in poll(Duration.ofMillis(x))).

So if you just try to stop the thread directly, it might still be waiting for poll() to return — meaning your shutdown could hang for several seconds.

You need a way to interrupt the poll safely.


3. The Solution: consumer.wakeup()đź”—

Kafka provides a special mechanism for this:

consumer.wakeup();

This is the only thread-safe method in the Kafka Consumer API. You can safely call it from another thread, even while the main thread is blocked in poll().

What happens when wakeup() is calledđź”—

  • If the consumer is currently waiting inside poll(), the method immediately causes poll() to exit and throw a WakeupException.
  • If the consumer is not inside poll() at that moment, the next time poll() is called, it will throw WakeupException.

This gives you a clean, predictable way to break out of the loop.


4. Using a Shutdown Hookđź”—

If your consumer runs in the main thread, you can use a Shutdown Hook (a special JVM mechanism that runs code before the process terminates).

Example:

final Thread mainThread = Thread.currentThread();

Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        System.out.println("Detected shutdown, calling consumer.wakeup()...");
        consumer.wakeup();
        try {
            mainThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

This means:

  • When you stop your application (for example, by pressing Ctrl+C), the shutdown hook runs in a separate thread.
  • That thread calls consumer.wakeup() to interrupt the main thread’s poll loop.

5. Handling the WakeupExceptionđź”—

Once wakeup() is called, the consumer’s next poll() will throw a WakeupException. You typically use this pattern:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
    }
} catch (WakeupException e) {
    // This is expected during shutdown, so no action needed.
} finally {
    consumer.close();
    System.out.println("Consumer closed.");
}

Key points:

  • The WakeupException is not an error — it’s just Kafka’s way of telling you that poll() was interrupted.
  • You don’t need to log or rethrow it.
  • But before you exit, you must call consumer.close().

6. Why consumer.close() Is Importantđź”—

Calling close() ensures:

  1. Offsets are committed if enable.auto.commit=true or if you have uncommitted offsets and auto-commit on close is enabled.
  2. The consumer sends a “leave group” message to the group coordinator.
  3. Kafka immediately triggers a rebalance, redistributing this consumer’s partitions to others in the same group.

If you skip close():

  • Kafka will think the consumer might still be alive.
  • It will wait for the session timeout to expire (default 10 seconds or longer) before rebalancing.
  • This delays recovery and causes processing downtime for other consumers.

7. Summaryđź”—

Step Action Purpose
1 Consumer runs an infinite poll() loop To fetch data and send heartbeats
2 Add a Shutdown Hook To catch application termination
3 Call consumer.wakeup() from another thread Safely interrupt poll()
4 Catch WakeupException Exit the loop gracefully
5 Call consumer.close() Commit offsets and trigger rebalance

8. Why This Mattersđź”—

A clean shutdown is essential in Kafka because:

  • It prevents data duplication or loss.
  • It avoids unnecessary delays in rebalancing.
  • It keeps your consumer group’s state consistent.

In production systems, this shutdown pattern is standard — you’ll find it in nearly all well-designed Kafka consumer implementations.