Close

Kafka - ConsumerRebalanceListener Example

[Last Updated: Jun 25, 2020]

The interface ConsumerRebalanceListener is a callback interface that the user can implement to listen to the events when partitions rebalance is triggered.

package org.apache.kafka.clients.consumer;
public interface ConsumerRebalanceListener {

    //This method will be called during a rebalance operation when the consumer has to give up some partitions.
    void onPartitionsRevoked(Collection<TopicPartition> partitions);
    
    //This method will be called after the partition re-assignment completes and before the
    //consumer starts fetching data, and only as the result of a Consumer#poll() call 
    void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

This interface can be implemented to trigger some custom actions when the set of partitions assigned to the consumer changes.

In this simple example we will understand at what scenarios this interface is called.

Example

Run Kafka server as described here.

Example Config

package com.logicbig.example;

import java.util.Properties;

public class ExampleConfig {
  public static final String BROKERS = "localhost:9092";

  public static Properties getProducerProps() {
      Properties props = new Properties();
      props.put("bootstrap.servers", BROKERS);
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      return props;
  }

  public static Properties getConsumerProps() {
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", BROKERS);
      props.setProperty("group.id", "testGroup");
      props.setProperty("enable.auto.commit", "false");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      return props;
  }
}

Creating example topic

Let's create a topic with 3 partitions.

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
  public static void main(String[] args) throws Exception {
      createTopic("example-topic-2020-6-24", 3);
  }

  private static void createTopic(String topicName, int numPartitions) throws Exception {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
      AdminClient admin = AdminClient.create(config);

      //checking if topic already exists
      boolean alreadyExists = admin.listTopics().names().get().stream()
                                   .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
      if (alreadyExists) {
          System.out.printf("topic already exits: %s%n", topicName);
      } else {
          //creating new topic
          System.out.printf("creating topic: %s%n", topicName);
          NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();
      }

      //describing
      System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get()
           .forEach((topic, desc) -> {
               System.out.println("Topic: " + topic);
               System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                       desc.partitions()
                           .stream()
                           .map(p -> Integer.toString(p.partition()))
                           .collect(Collectors.joining(",")));
           });

      admin.close();
  }
}
creating topic: example-topic-2020-6-24
-- describing topic --
Topic: example-topic-2020-6-24
Partitions: 3, partition ids: 0,1,2

Using ConsumerRebalanceListener

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class ConsumerRebalanceListenerExample {

  public static void main(String[] args) throws InterruptedException {
      ExecutorService executorService = Executors.newFixedThreadPool(3);
      for (int i = 0; i < 3; i++) {
          int finalI = i;
          executorService.execute(() -> startConsumer("consumer-" + finalI));
          Thread.sleep(3000);
      }
      executorService.shutdown();
      executorService.awaitTermination(3, TimeUnit.MINUTES);
  }

  private static KafkaConsumer<String, String> startConsumer(String name) {
      Properties consumerProps = ExampleConfig.getConsumerProps();
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      consumer.subscribe(Collections.singleton("example-topic-2020-6-24"),
              new ConsumerRebalanceListener() {
                  @Override
                  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                      System.out.printf("onPartitionsRevoked - consumerName: %s, partitions: %s%n", name,
                              formatPartitions(partitions));
                  }

                  @Override
                  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                      System.out.printf("onPartitionsAssigned - consumerName: %s, partitions: %s%n", name,
                              formatPartitions(partitions));
                  }
              });
      System.out.printf("starting consumerName: %s%n", name);
      consumer.poll(Duration.ofSeconds(10));
      System.out.printf("closing consumerName: %s%n", name);
      consumer.close();
      return consumer;
  }

  private static List<String> formatPartitions(Collection<TopicPartition> partitions) {
      return partitions.stream().map(topicPartition ->
              String.format("topic: %s, partition: %s", topicPartition.topic(), topicPartition.partition()))
                       .collect(Collectors.toList());
  }
}
starting consumerName: consumer-0
onPartitionsAssigned - consumerName: consumer-0, partitions: [topic: example-topic-2020-6-24, partition: 0, topic: example-topic-2020-6-24, partition: 2, topic: example-topic-2020-6-24, partition: 1]
starting consumerName: consumer-1
onPartitionsRevoked - consumerName: consumer-0, partitions: [topic: example-topic-2020-6-24, partition: 0, topic: example-topic-2020-6-24, partition: 2, topic: example-topic-2020-6-24, partition: 1]
onPartitionsAssigned - consumerName: consumer-1, partitions: [topic: example-topic-2020-6-24, partition: 2]
onPartitionsAssigned - consumerName: consumer-0, partitions: [topic: example-topic-2020-6-24, partition: 0, topic: example-topic-2020-6-24, partition: 1]
starting consumerName: consumer-2
onPartitionsRevoked - consumerName: consumer-1, partitions: [topic: example-topic-2020-6-24, partition: 2]
onPartitionsRevoked - consumerName: consumer-0, partitions: [topic: example-topic-2020-6-24, partition: 0, topic: example-topic-2020-6-24, partition: 1]
onPartitionsAssigned - consumerName: consumer-2, partitions: [topic: example-topic-2020-6-24, partition: 2]
onPartitionsAssigned - consumerName: consumer-0, partitions: [topic: example-topic-2020-6-24, partition: 0]
onPartitionsAssigned - consumerName: consumer-1, partitions: [topic: example-topic-2020-6-24, partition: 1]
closing consumerName: consumer-0
onPartitionsRevoked - consumerName: consumer-0, partitions: [topic: example-topic-2020-6-24, partition: 0]
onPartitionsRevoked - consumerName: consumer-1, partitions: [topic: example-topic-2020-6-24, partition: 1]
onPartitionsRevoked - consumerName: consumer-2, partitions: [topic: example-topic-2020-6-24, partition: 2]
onPartitionsAssigned - consumerName: consumer-2, partitions: [topic: example-topic-2020-6-24, partition: 2]
onPartitionsAssigned - consumerName: consumer-1, partitions: [topic: example-topic-2020-6-24, partition: 0, topic: example-topic-2020-6-24, partition: 1]
closing consumerName: consumer-1
onPartitionsRevoked - consumerName: consumer-1, partitions: [topic: example-topic-2020-6-24, partition: 0, topic: example-topic-2020-6-24, partition: 1]
onPartitionsRevoked - consumerName: consumer-2, partitions: [topic: example-topic-2020-6-24, partition: 2]
onPartitionsAssigned - consumerName: consumer-2, partitions: [topic: example-topic-2020-6-24, partition: 0, topic: example-topic-2020-6-24, partition: 2, topic: example-topic-2020-6-24, partition: 1]
closing consumerName: consumer-2
onPartitionsRevoked - consumerName: consumer-2, partitions: [topic: example-topic-2020-6-24, partition: 0, topic: example-topic-2020-6-24, partition: 2, topic: example-topic-2020-6-24, partition: 1]

As seen above, at first all three partitions assigned to the one consumer-0.

As consumer-1 joins, partition rebalance is triggered. The partitions assigned to consumer-0 are revoked and two partitions are assigned to it. One remaining partition is assigned to consumer-1.

As consumer-2 joins, all partition memberships are revoked and each consumer is assigned one partition.

When consumer-0 closes, all partition memberships are revoked and consumer-1 is assigned two partitions and consumer-2 is assigned one partition.

When consumer-1 closes, all partition memberships are revoked again and consumer-2 is assigned all partitions.

When consumer-2 also closes, all partition memberships are revoked and there are not more consumer alive for partitions assignments.

Example Project

Dependencies and Technologies Used:

  • kafka_2.13 2.5.0 Apache Kafka
  • JDK 8
  • Maven 3.5.4

ConsumerRebalanceListener Example Select All Download
  • kafka-consumer-rebalance-listener-example
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • ConsumerRebalanceListenerExample.java

    See Also