Close

Kafka - Publishing records With null keys and no assigned partitions

[Last Updated: Aug 11, 2020]

The producer is responsible for choosing the topic partition for the record.

This can be done by explicitly assigning the partition id (tutorial) or by assigning key (tutorial).

If producer doesn't assign partition or key then a round-robin approach is used to balance the messages among the partitions.

Example

Start Kafka server as described here.

Creating a topic

Let's create a topic with 4 partition:

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
  public static void main(String[] args) throws Exception {
      createTopic("example-topic-2020-4-29", 4);
  }

  private static void createTopic(String topicName, int numPartitions) throws Exception {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleHelper.BROKERS);
      AdminClient admin = AdminClient.create(config);

      //checking if topic already exists
      boolean alreadyExists = admin.listTopics().names().get().stream()
                                   .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
      if (alreadyExists) {
          System.out.printf("topic already exits: %s%n", topicName);
      } else {
          //creating new topic
          System.out.printf("creating topic: %s%n", topicName);
          NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();
      }

      //describing
      System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get()
           .forEach((topic, desc) -> {
               System.out.println("Topic: " + topic);
               System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                       desc.partitions()
                           .stream()
                           .map(p -> Integer.toString(p.partition()))
                           .collect(Collectors.joining(",")));
           });

      admin.close();
  }
}
creating topic: example-topic-2020-4-29
-- describing topic --
Topic: example-topic-2020-4-29
Partitions: 4, partition ids: 0,1,2,3

A helper class

package com.logicbig.example;

import java.util.Properties;

public class ExampleHelper {
  public static final String BROKERS = "localhost:9092";

  public static Properties getProducerProps() {
      Properties props = new Properties();
      props.put("bootstrap.servers", BROKERS);
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      return props;
  }

  public static Properties getConsumerProps(String topicName) {
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", BROKERS);
      props.setProperty("group.id", "testGroup");
      props.setProperty("enable.auto.commit", "false");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      return props;
  }
}

Producing records with no key or partition assigned

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class NullKeyPartitionExample {
  private static String TOPIC_NAME = "example-topic-2020-4-29";
  private static int MSG_COUNT = 12;

  public static void main(String[] args) throws Exception {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      executorService.execute(NullKeyPartitionExample::startConsumer);
      executorService.execute(NullKeyPartitionExample::sendMessages);
      executorService.shutdown();
      executorService.awaitTermination(10, TimeUnit.MINUTES);
  }

  private static void startConsumer() {
      Properties consumerProps = ExampleHelper.getConsumerProps(TOPIC_NAME);
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

      consumer.subscribe(Collections.singleton(TOPIC_NAME));
      int numMsgReceived = 0;
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
          for (ConsumerRecord<String, String> record : records) {
              numMsgReceived++;
              System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                      record.key(), record.value(), record.partition(), record.offset());
          }
          consumer.commitSync();
          if (numMsgReceived == MSG_COUNT) {
              break;
          }
      }
  }

  private static void sendMessages() {
      Properties producerProps = ExampleHelper.getProducerProps();
      KafkaProducer producer = new KafkaProducer<>(producerProps);
      ;
      for (int i = 0; i < MSG_COUNT; i++) {
          String value = "message-" + i;
          System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
                  TOPIC_NAME, "not-specified", value, "not-specified");
          producer.send(new ProducerRecord<>(TOPIC_NAME, value));
      }
  }
}
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-4, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-5, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-6, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-7, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-8, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-9, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-10, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-11, partition id: not-specified
consumed: key = null, value = message-4, partition id= 0, offset = 2
consumed: key = null, value = message-5, partition id= 0, offset = 3
consumed: key = null, value = message-6, partition id= 0, offset = 4
consumed: key = null, value = message-7, partition id= 0, offset = 5
consumed: key = null, value = message-8, partition id= 0, offset = 6
consumed: key = null, value = message-9, partition id= 0, offset = 7
consumed: key = null, value = message-10, partition id= 0, offset = 8
consumed: key = null, value = message-11, partition id= 0, offset = 9
consumed: key = null, value = message-0, partition id= 2, offset = 1
consumed: key = null, value = message-1, partition id= 2, offset = 2
consumed: key = null, value = message-2, partition id= 2, offset = 3
consumed: key = null, value = message-3, partition id= 2, offset = 4

Running more times:

Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-4, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-5, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-6, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-7, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-8, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-9, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-10, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-11, partition id: not-specified
consumed: key = null, value = message-6, partition id= 0, offset = 16
consumed: key = null, value = message-7, partition id= 0, offset = 17
consumed: key = null, value = message-8, partition id= 0, offset = 18
consumed: key = null, value = message-9, partition id= 0, offset = 19
consumed: key = null, value = message-10, partition id= 0, offset = 20
consumed: key = null, value = message-0, partition id= 1, offset = 22
consumed: key = null, value = message-1, partition id= 1, offset = 23
consumed: key = null, value = message-2, partition id= 1, offset = 24
consumed: key = null, value = message-3, partition id= 1, offset = 25
consumed: key = null, value = message-4, partition id= 1, offset = 26
consumed: key = null, value = message-5, partition id= 1, offset = 27
consumed: key = null, value = message-11, partition id= 2, offset = 20

Example Project

Dependencies and Technologies Used:

  • kafka-clients 2.4.1 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Kafka - Records With null key And no assigned partitions Select All Download
  • kafka-producing-records-with-null-partition-and-key
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • NullKeyPartitionExample.java

    See Also