Close

Kafka - Using Keys For Partition Assignment

[Updated: Apr 28, 2020, Created: Apr 21, 2020]

As seen in previous examples, when we send messages (ProducerRecord) we can specify key and value.

While sending messages, if partition is not explicitly specified, then keys can be used to decide to which partition message will go.

All messages with the same key will go to the same partition.

Example

Run Kafka server as described here.

Creating topic

Using Kafka Admin API to create the example topic with 4 partitions.

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
  public static void main(String[] args) throws Exception {
      createTopic("example-topic-2020-4-21", 4);
  }

  private static void createTopic(String topicName, int numPartitions) throws Exception {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleHelper.BROKERS);
      AdminClient admin = AdminClient.create(config);

      //checking if topic already exists
      boolean alreadyExists = admin.listTopics().names().get().stream()
                                   .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
      if (alreadyExists) {
          System.out.printf("topic already exits: %s%n", topicName);
      } else {
          //creating new topic
          System.out.printf("creating topic: %s%n", topicName);
          NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();
      }

      //describing
      System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get()
           .forEach((topic, desc) -> {
               System.out.println("Topic: " + topic);
               System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                       desc.partitions()
                           .stream()
                           .map(p -> Integer.toString(p.partition()))
                           .collect(Collectors.joining(",")));
           });

      admin.close();
  }
}
creating topic: my-topic-2020-4-21
-- describing topic --
Topic: my-topic-2020-4-21
Partitions: 4, partition ids: 0,1,2,3

A helper class

package com.logicbig.example;

import java.util.Properties;

public class ExampleHelper {
  public static final String BROKERS = "localhost:9092";

  public static Properties getProducerProps() {
      Properties props = new Properties();
      props.put("bootstrap.servers", BROKERS);
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      return props;
  }

  public static Properties getConsumerProps(String topicName) {
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", BROKERS);
      props.setProperty("group.id", "testGroup");
      props.setProperty("enable.auto.commit", "false");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      return props;
  }
}

Using keys for partition assignments

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PartitionAssignmentExample {
  private static int PARTITION_COUNT = 4;
  private static String TOPIC_NAME = "example-topic-2020-4-21";
  private static int MSG_COUNT = 4;

  public static void main(String[] args) throws Exception {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      executorService.execute(PartitionAssignmentExample::startConsumer);
      executorService.execute(PartitionAssignmentExample::sendMessages);
      executorService.shutdown();
      executorService.awaitTermination(10, TimeUnit.MINUTES);
  }

  private static void startConsumer() {
      Properties consumerProps = ExampleHelper.getConsumerProps(TOPIC_NAME);
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

      consumer.subscribe(Collections.singleton(TOPIC_NAME));
      int numMsgReceived = 0;
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
          for (ConsumerRecord<String, String> record : records) {
              numMsgReceived++;
              System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                      record.key(), record.value(), record.partition(), record.offset());
          }
          consumer.commitSync();
          if (numMsgReceived == MSG_COUNT * PARTITION_COUNT) {
              break;
          }
      }
  }

  private static void sendMessages() {
      Properties producerProps = ExampleHelper.getProducerProps();
      KafkaProducer producer = new KafkaProducer<>(producerProps);
      ;
      for (int i = 0; i < MSG_COUNT; i++) {
          for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
              String value = "message-" + i;
              String key = "key-" + i;
              System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
                      TOPIC_NAME, key, value, "not-specified");
              producer.send(new ProducerRecord<>(TOPIC_NAME, key, value));
          }
      }
  }
}
Sending message topic: example-topic-2020-4-21, key: key-0, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-0, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-0, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-0, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-1, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-1, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-1, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-1, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-2, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-2, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-2, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-2, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-3, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-3, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-3, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-3, value: message-3, partition id: not-specified
consumed: key = key-2, value = message-2, partition id= 2, offset = 4
consumed: key = key-2, value = message-2, partition id= 2, offset = 5
consumed: key = key-2, value = message-2, partition id= 2, offset = 6
consumed: key = key-2, value = message-2, partition id= 2, offset = 7
consumed: key = key-3, value = message-3, partition id= 3, offset = 0
consumed: key = key-3, value = message-3, partition id= 3, offset = 1
consumed: key = key-3, value = message-3, partition id= 3, offset = 2
consumed: key = key-3, value = message-3, partition id= 3, offset = 3
consumed: key = key-1, value = message-1, partition id= 0, offset = 4
consumed: key = key-1, value = message-1, partition id= 0, offset = 5
consumed: key = key-1, value = message-1, partition id= 0, offset = 6
consumed: key = key-1, value = message-1, partition id= 0, offset = 7
consumed: key = key-0, value = message-0, partition id= 1, offset = 4
consumed: key = key-0, value = message-0, partition id= 1, offset = 5
consumed: key = key-0, value = message-0, partition id= 1, offset = 6
consumed: key = key-0, value = message-0, partition id= 1, offset = 7

As seen above key-0 is always assigned partition 1, key-1 is always assigned partition 0, key-2 is always assigned partition 2 and key-3 is always assigned partition 3.

When the key is null and the default partitioner is used, the record will be sent to one of the available partitions of the topic at random. A round-robin algorithm will be used to balance the messages among the partitions.

Example Project

Dependencies and Technologies Used:

  • kafka-clients 2.4.1 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Kafka - Using Keys For Partition Assignment Select All Download
  • kafka-using-keys-for-partition-assignment
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • PartitionAssignmentExample.java

    See Also