The producer is responsible for choosing the topic partition for the record.
This can be done by explicitly assigning the partition id (tutorial) or by assigning key (tutorial).
If producer doesn't assign partition or key then a round-robin approach is used to balance the messages among the partitions.
Example
Start Kafka server as described here.
Creating a topic
Let's create a topic with 4 partition:
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;
public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic("example-topic-2020-4-29", 4);
}
private static void createTopic(String topicName, int numPartitions) throws Exception {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleHelper.BROKERS);
AdminClient admin = AdminClient.create(config);
//checking if topic already exists
boolean alreadyExists = admin.listTopics().names().get().stream()
.anyMatch(existingTopicName -> existingTopicName.equals(topicName));
if (alreadyExists) {
System.out.printf("topic already exits: %s%n", topicName);
} else {
//creating new topic
System.out.printf("creating topic: %s%n", topicName);
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
//describing
System.out.println("-- describing topic --");
admin.describeTopics(Collections.singleton(topicName)).all().get()
.forEach((topic, desc) -> {
System.out.println("Topic: " + topic);
System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
desc.partitions()
.stream()
.map(p -> Integer.toString(p.partition()))
.collect(Collectors.joining(",")));
});
admin.close();
}
}
creating topic: example-topic-2020-4-29
-- describing topic --
Topic: example-topic-2020-4-29
Partitions: 4, partition ids: 0,1,2,3
A helper class
package com.logicbig.example;
import java.util.Properties;
public class ExampleHelper {
public static final String BROKERS = "localhost:9092";
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKERS);
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static Properties getConsumerProps(String topicName) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", BROKERS);
props.setProperty("group.id", "testGroup");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
Producing records with no key or partition assigned
package com.logicbig.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class NullKeyPartitionExample {
private static String TOPIC_NAME = "example-topic-2020-4-29";
private static int MSG_COUNT = 12;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(NullKeyPartitionExample::startConsumer);
executorService.execute(NullKeyPartitionExample::sendMessages);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
private static void startConsumer() {
Properties consumerProps = ExampleHelper.getConsumerProps(TOPIC_NAME);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singleton(TOPIC_NAME));
int numMsgReceived = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
numMsgReceived++;
System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
consumer.commitSync();
if (numMsgReceived == MSG_COUNT) {
break;
}
}
}
private static void sendMessages() {
Properties producerProps = ExampleHelper.getProducerProps();
KafkaProducer producer = new KafkaProducer<>(producerProps);
;
for (int i = 0; i < MSG_COUNT; i++) {
String value = "message-" + i;
System.out.printf("Sending message topic: %s, key: %s, value: %s, partition id: %s%n",
TOPIC_NAME, "not-specified", value, "not-specified");
producer.send(new ProducerRecord<>(TOPIC_NAME, value));
}
}
}
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-4, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-5, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-6, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-7, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-8, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-9, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-10, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-11, partition id: not-specified
consumed: key = null, value = message-4, partition id= 0, offset = 2
consumed: key = null, value = message-5, partition id= 0, offset = 3
consumed: key = null, value = message-6, partition id= 0, offset = 4
consumed: key = null, value = message-7, partition id= 0, offset = 5
consumed: key = null, value = message-8, partition id= 0, offset = 6
consumed: key = null, value = message-9, partition id= 0, offset = 7
consumed: key = null, value = message-10, partition id= 0, offset = 8
consumed: key = null, value = message-11, partition id= 0, offset = 9
consumed: key = null, value = message-0, partition id= 2, offset = 1
consumed: key = null, value = message-1, partition id= 2, offset = 2
consumed: key = null, value = message-2, partition id= 2, offset = 3
consumed: key = null, value = message-3, partition id= 2, offset = 4
Running more times:
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-0, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-1, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-2, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-3, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-4, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-5, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-6, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-7, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-8, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-9, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-10, partition id: not-specified
Sending message topic: example-topic-2020-4-29, key: not-specified, value: message-11, partition id: not-specified
consumed: key = null, value = message-6, partition id= 0, offset = 16
consumed: key = null, value = message-7, partition id= 0, offset = 17
consumed: key = null, value = message-8, partition id= 0, offset = 18
consumed: key = null, value = message-9, partition id= 0, offset = 19
consumed: key = null, value = message-10, partition id= 0, offset = 20
consumed: key = null, value = message-0, partition id= 1, offset = 22
consumed: key = null, value = message-1, partition id= 1, offset = 23
consumed: key = null, value = message-2, partition id= 1, offset = 24
consumed: key = null, value = message-3, partition id= 1, offset = 25
consumed: key = null, value = message-4, partition id= 1, offset = 26
consumed: key = null, value = message-5, partition id= 1, offset = 27
consumed: key = null, value = message-11, partition id= 2, offset = 20
Example ProjectDependencies and Technologies Used: - kafka-clients 2.4.1 Apache Kafka
- JDK 8
- Maven 3.5.4
|
|