Close

Kafka - Understanding Consumer Group with examples

[Last Updated: Aug 11, 2020]

In Kafka, each consumer group is composed of many consumer instances for scalability and fault tolerance.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different partitions.

Examples

In this example we will understand how topic partitions are assigned across multiple consumers in a consumer group.

If we have three partitions for a topic and we start three consumers for the same topic then each consumer is assigned one partition automatically.

If we have three partitions for a topic and we start two consumers for the same topic then one consumer is assigned one partition and other is assigned two partitions.

If we have three partitions for a topic and we start one consumer for the same topic then one consumer is assigned all three partitions.

If we have three partitions for a topic and we start four consumers for the same topic then three of four consumers are assigned one partition each, and one consumer will not receive any messages.

Let's start Kafka server as described here.

Creating a topic with 3 partitions

Let's create a topic with three partitions using Kafka Admin API.

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
  public static void main(String[] args) throws Exception {
      createTopic("example-topic", 3);
  }

  private static void createTopic(String topicName, int numPartitions) throws Exception {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
      AdminClient admin = AdminClient.create(config);

      //checking if topic already exists
      boolean alreadyExists = admin.listTopics().names().get().stream()
                                   .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
      if (alreadyExists) {
          System.out.printf("topic already exits: %s%n", topicName);
      } else {
          //creating new topic
          System.out.printf("creating topic: %s%n", topicName);
          NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();
      }

      //describing
      System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get()
           .forEach((topic, desc) -> {
               System.out.println("Topic: " + topic);
               System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                       desc.partitions()
                           .stream()
                           .map(p -> Integer.toString(p.partition()))
                           .collect(Collectors.joining(",")));
           });

      admin.close();
  }
}
creating topic: example-topic
-- describing topic --
Topic: example-topic
Partitions: 3, partition ids: 0,1,2

Publishing and Consuming Messages

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ConsumerGroupExample{
  private final static int PARTITION_COUNT = 3;
  private final static String TOPIC_NAME = "example-topic";
  private final static int MSG_COUNT = 4;
  private static int totalMsgToSend;
  private static AtomicInteger msg_received_counter = new AtomicInteger(0);

  public static void run(int consumerCount, String[] consumerGroups) throws Exception {
      int distinctGroups = new TreeSet<>(Arrays.asList(consumerGroups)).size();
      totalMsgToSend = MSG_COUNT * PARTITION_COUNT * distinctGroups;
      ExecutorService executorService = Executors.newFixedThreadPool(consumerCount + 1);
      for (int i = 0; i < consumerCount; i++) {
          String consumerId = Integer.toString(i + 1);
          int finalI = i;
          executorService.execute(() -> startConsumer(consumerId, consumerGroups[finalI]));
      }
      executorService.execute(ConsumerGroupExample::sendMessages);
      executorService.shutdown();
      executorService.awaitTermination(10, TimeUnit.MINUTES);
  }

  private static void startConsumer(String consumerId, String consumerGroup) {
      System.out.printf("starting consumer: %s, group: %s%n", consumerId, consumerGroup);
      Properties consumerProps = ExampleConfig.getConsumerProps(consumerGroup);
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      consumer.subscribe(Collections.singleton(TOPIC_NAME));
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
          for (ConsumerRecord<String, String> record : records) {
              msg_received_counter.incrementAndGet();
              System.out.printf("consumer id:%s, partition id= %s, key = %s, value = %s"
                              + ", offset = %s%n",
                      consumerId, record.partition(),record.key(), record.value(), record.offset());
          }

          consumer.commitSync();
          if(msg_received_counter.get()== totalMsgToSend){
              break;
          }
      }
  }

  private static void sendMessages() {
      Properties producerProps = ExampleConfig.getProducerProps();
      KafkaProducer producer = new KafkaProducer<>(producerProps);
      int key = 0;
      for (int i = 0; i < MSG_COUNT; i++) {
          for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
              String value = "message-" + i;
              key++;
              System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
                      TOPIC_NAME, key, value, partitionId);
              producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId,
                      Integer.toString(key), value));
          }
      }
  }
}

Running 3 Consumers

public class Running3Consumers {
  public static void main(String[] args) throws Exception {
      String[] consumerGroups = new String[3];
      Arrays.fill(consumerGroups, "test-consumer-group");
      ConsumerGroupExample.run(3, consumerGroups);
  }
}
starting consumer: 1, group: test-consumer-group
starting consumer: 2, group: test-consumer-group
starting consumer: 3, group: test-consumer-group
Sending message topic: example-topic, key: 1, value: message-0, partition id: 0
Sending message topic: example-topic, key: 2, value: message-0, partition id: 1
Sending message topic: example-topic, key: 3, value: message-0, partition id: 2
Sending message topic: example-topic, key: 4, value: message-1, partition id: 0
Sending message topic: example-topic, key: 5, value: message-1, partition id: 1
Sending message topic: example-topic, key: 6, value: message-1, partition id: 2
Sending message topic: example-topic, key: 7, value: message-2, partition id: 0
Sending message topic: example-topic, key: 8, value: message-2, partition id: 1
Sending message topic: example-topic, key: 9, value: message-2, partition id: 2
Sending message topic: example-topic, key: 10, value: message-3, partition id: 0
Sending message topic: example-topic, key: 11, value: message-3, partition id: 1
Sending message topic: example-topic, key: 12, value: message-3, partition id: 2
consumer id:2, partition id= 2, key = 3, value = message-0, offset = 4
consumer id:1, partition id= 1, key = 2, value = message-0, offset = 4
consumer id:3, partition id= 0, key = 1, value = message-0, offset = 4
consumer id:1, partition id= 1, key = 5, value = message-1, offset = 5
consumer id:2, partition id= 2, key = 6, value = message-1, offset = 5
consumer id:1, partition id= 1, key = 8, value = message-2, offset = 6
consumer id:3, partition id= 0, key = 4, value = message-1, offset = 5
consumer id:1, partition id= 1, key = 11, value = message-3, offset = 7
consumer id:2, partition id= 2, key = 9, value = message-2, offset = 6
consumer id:2, partition id= 2, key = 12, value = message-3, offset = 7
consumer id:3, partition id= 0, key = 7, value = message-2, offset = 6
consumer id:3, partition id= 0, key = 10, value = message-3, offset = 7

As seen above all three partitions are individually assigned to each consumer i.e. consumer 1 is assigned partition 1, consumer 2 is assigned partition 2 and consumer 3 is assigned partition 0.

Running 2 Consumers

public class Running2Consumers {
  public static void main(String[] args) throws Exception {
      String[] consumerGroups = new String[2];
      Arrays.fill(consumerGroups, "test-consumer-group");
      ConsumerGroupExample.run(2, consumerGroups);
  }
}
starting consumer: 1, group: test-consumer-group
starting consumer: 2, group: test-consumer-group
Sending message topic: example-topic, key: 1, value: message-0, partition id: 0
Sending message topic: example-topic, key: 2, value: message-0, partition id: 1
Sending message topic: example-topic, key: 3, value: message-0, partition id: 2
Sending message topic: example-topic, key: 4, value: message-1, partition id: 0
Sending message topic: example-topic, key: 5, value: message-1, partition id: 1
Sending message topic: example-topic, key: 6, value: message-1, partition id: 2
Sending message topic: example-topic, key: 7, value: message-2, partition id: 0
Sending message topic: example-topic, key: 8, value: message-2, partition id: 1
Sending message topic: example-topic, key: 9, value: message-2, partition id: 2
Sending message topic: example-topic, key: 10, value: message-3, partition id: 0
Sending message topic: example-topic, key: 11, value: message-3, partition id: 1
Sending message topic: example-topic, key: 12, value: message-3, partition id: 2
consumer id:2, partition id= 2, key = 3, value = message-0, offset = 8
consumer id:2, partition id= 2, key = 6, value = message-1, offset = 9
consumer id:2, partition id= 2, key = 9, value = message-2, offset = 10
consumer id:2, partition id= 2, key = 12, value = message-3, offset = 11
consumer id:1, partition id= 0, key = 1, value = message-0, offset = 8
consumer id:1, partition id= 0, key = 4, value = message-1, offset = 9
consumer id:1, partition id= 0, key = 7, value = message-2, offset = 10
consumer id:1, partition id= 0, key = 10, value = message-3, offset = 11
consumer id:1, partition id= 1, key = 2, value = message-0, offset = 8
consumer id:1, partition id= 1, key = 5, value = message-1, offset = 9
consumer id:1, partition id= 1, key = 8, value = message-2, offset = 10
consumer id:1, partition id= 1, key = 11, value = message-3, offset = 11

As seen above consumer 1 is assigned two partitions i.e. 0 and 1, whereas, consumer 2 is assigned one partition i.e. 2.

Running 1 Consumer

public class Running1Consumers {
  public static void main(String[] args) throws Exception {
      ConsumerGroupExample.run(1, new String[]{"test-consumer-group"});
  }
}
starting consumer: 1, group: test-consumer-group
Sending message topic: example-topic, key: 1, value: message-0, partition id: 0
Sending message topic: example-topic, key: 2, value: message-0, partition id: 1
Sending message topic: example-topic, key: 3, value: message-0, partition id: 2
Sending message topic: example-topic, key: 4, value: message-1, partition id: 0
Sending message topic: example-topic, key: 5, value: message-1, partition id: 1
Sending message topic: example-topic, key: 6, value: message-1, partition id: 2
Sending message topic: example-topic, key: 7, value: message-2, partition id: 0
Sending message topic: example-topic, key: 8, value: message-2, partition id: 1
Sending message topic: example-topic, key: 9, value: message-2, partition id: 2
Sending message topic: example-topic, key: 10, value: message-3, partition id: 0
Sending message topic: example-topic, key: 11, value: message-3, partition id: 1
Sending message topic: example-topic, key: 12, value: message-3, partition id: 2
consumer id:1, partition id= 2, key = 3, value = message-0, offset = 12
consumer id:1, partition id= 2, key = 6, value = message-1, offset = 13
consumer id:1, partition id= 2, key = 9, value = message-2, offset = 14
consumer id:1, partition id= 2, key = 12, value = message-3, offset = 15
consumer id:1, partition id= 0, key = 1, value = message-0, offset = 12
consumer id:1, partition id= 0, key = 4, value = message-1, offset = 13
consumer id:1, partition id= 0, key = 7, value = message-2, offset = 14
consumer id:1, partition id= 0, key = 10, value = message-3, offset = 15
consumer id:1, partition id= 1, key = 2, value = message-0, offset = 12
consumer id:1, partition id= 1, key = 5, value = message-1, offset = 13
consumer id:1, partition id= 1, key = 8, value = message-2, offset = 14
consumer id:1, partition id= 1, key = 11, value = message-3, offset = 15

As seen above, consumer 1 is assigned all three partitions.

Running 4 Consumers

public class Running4Consumers {
  public static void main(String[] args) throws Exception {
      String[] consumerGroups = new String[4];
      Arrays.fill(consumerGroups, "test-consumer-group");
      ConsumerGroupExample.run(4, consumerGroups);
  }
}
starting consumer: 1, group: test-consumer-group
starting consumer: 2, group: test-consumer-group
starting consumer: 3, group: test-consumer-group
starting consumer: 4, group: test-consumer-group
Sending message topic: example-topic, key: 1, value: message-0, partition id: 0
Sending message topic: example-topic, key: 2, value: message-0, partition id: 1
Sending message topic: example-topic, key: 3, value: message-0, partition id: 2
Sending message topic: example-topic, key: 4, value: message-1, partition id: 0
Sending message topic: example-topic, key: 5, value: message-1, partition id: 1
Sending message topic: example-topic, key: 6, value: message-1, partition id: 2
Sending message topic: example-topic, key: 7, value: message-2, partition id: 0
Sending message topic: example-topic, key: 8, value: message-2, partition id: 1
Sending message topic: example-topic, key: 9, value: message-2, partition id: 2
Sending message topic: example-topic, key: 10, value: message-3, partition id: 0
Sending message topic: example-topic, key: 11, value: message-3, partition id: 1
Sending message topic: example-topic, key: 12, value: message-3, partition id: 2
consumer id:4, partition id= 1, key = 2, value = message-0, offset = 16
consumer id:4, partition id= 1, key = 5, value = message-1, offset = 17
consumer id:4, partition id= 1, key = 8, value = message-2, offset = 18
consumer id:4, partition id= 1, key = 11, value = message-3, offset = 19
consumer id:1, partition id= 0, key = 1, value = message-0, offset = 16
consumer id:1, partition id= 0, key = 4, value = message-1, offset = 17
consumer id:1, partition id= 0, key = 7, value = message-2, offset = 18
consumer id:1, partition id= 0, key = 10, value = message-3, offset = 19
consumer id:2, partition id= 2, key = 3, value = message-0, offset = 16
consumer id:2, partition id= 2, key = 6, value = message-1, offset = 17
consumer id:2, partition id= 2, key = 9, value = message-2, offset = 18
consumer id:2, partition id= 2, key = 12, value = message-3, offset = 19

As seen above, consumer 3 didn't receive any message, whereas, all three remaining consumers are individually assigned each partition.

Consumers with different consumer groups

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumers.

public class RunningConsumersWithDifferentGroups {
  public static void main(String[] args) throws Exception {
      String[] consumerGroups = new String[3];
      for (int i = 0; i < consumerGroups.length; i++) {
          consumerGroups[i] ="test-consumer-group-"+i;
      }
      ConsumerGroupExample.run(3, consumerGroups);
  }
}
starting consumer: 1, group: test-consumer-group-0
starting consumer: 3, group: test-consumer-group-2
starting consumer: 2, group: test-consumer-group-1
Sending message topic: example-topic, key: 1, value: message-0, partition id: 0
Sending message topic: example-topic, key: 2, value: message-0, partition id: 1
Sending message topic: example-topic, key: 3, value: message-0, partition id: 2
Sending message topic: example-topic, key: 4, value: message-1, partition id: 0
Sending message topic: example-topic, key: 5, value: message-1, partition id: 1
Sending message topic: example-topic, key: 6, value: message-1, partition id: 2
Sending message topic: example-topic, key: 7, value: message-2, partition id: 0
Sending message topic: example-topic, key: 8, value: message-2, partition id: 1
Sending message topic: example-topic, key: 9, value: message-2, partition id: 2
Sending message topic: example-topic, key: 10, value: message-3, partition id: 0
Sending message topic: example-topic, key: 11, value: message-3, partition id: 1
Sending message topic: example-topic, key: 12, value: message-3, partition id: 2
consumer id:3, partition id= 2, key = 3, value = message-0, offset = 24
consumer id:3, partition id= 2, key = 6, value = message-1, offset = 25
consumer id:3, partition id= 2, key = 9, value = message-2, offset = 26
consumer id:3, partition id= 2, key = 12, value = message-3, offset = 27
consumer id:3, partition id= 0, key = 1, value = message-0, offset = 24
consumer id:3, partition id= 0, key = 4, value = message-1, offset = 25
consumer id:3, partition id= 0, key = 7, value = message-2, offset = 26
consumer id:3, partition id= 0, key = 10, value = message-3, offset = 27
consumer id:3, partition id= 1, key = 2, value = message-0, offset = 24
consumer id:3, partition id= 1, key = 5, value = message-1, offset = 25
consumer id:3, partition id= 1, key = 8, value = message-2, offset = 26
consumer id:3, partition id= 1, key = 11, value = message-3, offset = 27
consumer id:2, partition id= 2, key = 3, value = message-0, offset = 24
consumer id:2, partition id= 2, key = 6, value = message-1, offset = 25
consumer id:2, partition id= 2, key = 9, value = message-2, offset = 26
consumer id:2, partition id= 2, key = 12, value = message-3, offset = 27
consumer id:2, partition id= 0, key = 1, value = message-0, offset = 24
consumer id:2, partition id= 0, key = 4, value = message-1, offset = 25
consumer id:2, partition id= 0, key = 7, value = message-2, offset = 26
consumer id:2, partition id= 0, key = 10, value = message-3, offset = 27
consumer id:2, partition id= 1, key = 2, value = message-0, offset = 24
consumer id:2, partition id= 1, key = 5, value = message-1, offset = 25
consumer id:2, partition id= 1, key = 8, value = message-2, offset = 26
consumer id:2, partition id= 1, key = 11, value = message-3, offset = 27
consumer id:1, partition id= 2, key = 3, value = message-0, offset = 24
consumer id:1, partition id= 2, key = 6, value = message-1, offset = 25
consumer id:1, partition id= 2, key = 9, value = message-2, offset = 26
consumer id:1, partition id= 2, key = 12, value = message-3, offset = 27
consumer id:1, partition id= 0, key = 1, value = message-0, offset = 24
consumer id:1, partition id= 0, key = 4, value = message-1, offset = 25
consumer id:1, partition id= 0, key = 7, value = message-2, offset = 26
consumer id:1, partition id= 0, key = 10, value = message-3, offset = 27
consumer id:1, partition id= 1, key = 2, value = message-0, offset = 24
consumer id:1, partition id= 1, key = 5, value = message-1, offset = 25
consumer id:1, partition id= 1, key = 8, value = message-2, offset = 26
consumer id:1, partition id= 1, key = 11, value = message-3, offset = 27

Example Project

Dependencies and Technologies Used:

  • kafka-clients 2.4.1 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Consumer Group Example Select All Download
  • Kafka-consumer-group-example
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • ConsumerGroupExample.java

    See Also