As seen in previous examples, when we send messages (ProducerRecord ) we can specify key and value.
While sending messages, if partition is not explicitly specified, then keys can be used to decide to which partition message will go.
All messages with the same key will go to the same partition.
Example
Run Kafka server as described here.
Creating topic
Using Kafka Admin API to create the example topic with 4 partitions.
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;
public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic("example-topic-2020-4-21", 4);
}
private static void createTopic(String topicName, int numPartitions) throws Exception {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleHelper.BROKERS);
AdminClient admin = AdminClient.create(config);
//checking if topic already exists
boolean alreadyExists = admin.listTopics().names().get().stream()
.anyMatch(existingTopicName -> existingTopicName.equals(topicName));
if (alreadyExists) {
System.out.printf("topic already exits: %s%n", topicName);
} else {
//creating new topic
System.out.printf("creating topic: %s%n", topicName);
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
//describing
System.out.println("-- describing topic --");
admin.describeTopics(Collections.singleton(topicName)).all().get()
.forEach((topic, desc) -> {
System.out.println("Topic: " + topic);
System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
desc.partitions()
.stream()
.map(p -> Integer.toString(p.partition()))
.collect(Collectors.joining(",")));
});
admin.close();
}
}
creating topic: my-topic-2020-4-21
-- describing topic --
Topic: my-topic-2020-4-21
Partitions: 4, partition ids: 0,1,2,3
A helper class
package com.logicbig.example;
import java.util.Properties;
public class ExampleHelper {
public static final String BROKERS = "localhost:9092";
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKERS);
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static Properties getConsumerProps(String topicName) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", BROKERS);
props.setProperty("group.id", "testGroup");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
Using keys for partition assignments
package com.logicbig.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PartitionAssignmentExample {
private static int PARTITION_COUNT = 4;
private static String TOPIC_NAME = "example-topic-2020-4-21";
private static int MSG_COUNT = 4;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(PartitionAssignmentExample::startConsumer);
executorService.execute(PartitionAssignmentExample::sendMessages);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
private static void startConsumer() {
Properties consumerProps = ExampleHelper.getConsumerProps(TOPIC_NAME);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singleton(TOPIC_NAME));
int numMsgReceived = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
numMsgReceived++;
System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
consumer.commitSync();
if (numMsgReceived == MSG_COUNT * PARTITION_COUNT) {
break;
}
}
}
private static void sendMessages() {
Properties producerProps = ExampleHelper.getProducerProps();
KafkaProducer producer = new KafkaProducer<>(producerProps);
;
for (int i = 0; i < MSG_COUNT; i++) {
for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
String value = "message-" + i;
String key = "key-" + i;
System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
TOPIC_NAME, key, value, "not-specified");
producer.send(new ProducerRecord<>(TOPIC_NAME, key, value));
}
}
}
}
Sending message topic: example-topic-2020-4-21, key: key-0, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-0, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-0, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-0, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-1, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-1, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-1, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-1, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-2, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-2, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-2, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-2, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-3, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-3, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-3, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-21, key: key-3, value: message-3, partition id: not-specified
consumed: key = key-2, value = message-2, partition id= 2, offset = 4
consumed: key = key-2, value = message-2, partition id= 2, offset = 5
consumed: key = key-2, value = message-2, partition id= 2, offset = 6
consumed: key = key-2, value = message-2, partition id= 2, offset = 7
consumed: key = key-3, value = message-3, partition id= 3, offset = 0
consumed: key = key-3, value = message-3, partition id= 3, offset = 1
consumed: key = key-3, value = message-3, partition id= 3, offset = 2
consumed: key = key-3, value = message-3, partition id= 3, offset = 3
consumed: key = key-1, value = message-1, partition id= 0, offset = 4
consumed: key = key-1, value = message-1, partition id= 0, offset = 5
consumed: key = key-1, value = message-1, partition id= 0, offset = 6
consumed: key = key-1, value = message-1, partition id= 0, offset = 7
consumed: key = key-0, value = message-0, partition id= 1, offset = 4
consumed: key = key-0, value = message-0, partition id= 1, offset = 5
consumed: key = key-0, value = message-0, partition id= 1, offset = 6
consumed: key = key-0, value = message-0, partition id= 1, offset = 7
As seen above key-0 is always assigned partition 1, key-1 is always assigned partition 0, key-2 is always assigned partition 2 and key-3 is always assigned partition 3.
When the key is null and the default partitioner is used, the record will be sent to one of the available partitions of the topic at random. A round-robin algorithm will be used to balance the messages among the partitions.
Example ProjectDependencies and Technologies Used: - kafka-clients 2.4.1 Apache Kafka
- JDK 8
- Maven 3.5.4
|
|