For a consumer, we can enable/disable auto commit by setting enable.auto.commit = true/false
When set to true consumer's offset will be periodically committed in the background.
If this property is set to false then no offsets are committed
The property auto.commit.interval.ms specifies the frequency in milliseconds that the consumer offsets are auto-committed to Kafka. This only applies if enable.auto.commit is set to true.
Kafka consumer will auto commit the offset of the last message received in response to its poll() call.
KafkaConsumer#position() method
public long position(TopicPartition partition)
Gets the offset of the next record that will be fetched (if a record with that offset exists). This method simply returns the current-offset.
KafkaConsumer#committed() method
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions)
Gets the last committed offsets for the given partitions (whether the commit happened by this process or another).
Examples
Run Kafka server as described here
Creating Topics
Let's create two topics, each with 1 partition.
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;
public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic("example-topic-2020-5-7a", 1);
createTopic("example-topic-2020-5-7b", 1);
}
private static void createTopic(String topicName, int numPartitions) throws Exception {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
AdminClient admin = AdminClient.create(config);
//checking if topic already exists
boolean alreadyExists = admin.listTopics().names().get().stream()
.anyMatch(existingTopicName -> existingTopicName.equals(topicName));
if (alreadyExists) {
System.out.printf("topic already exits: %s%n", topicName);
} else {
//creating new topic
System.out.printf("creating topic: %s%n", topicName);
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
//describing
System.out.println("-- describing topic --");
admin.describeTopics(Collections.singleton(topicName)).all().get()
.forEach((topic, desc) -> {
System.out.println("Topic: " + topic);
System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
desc.partitions()
.stream()
.map(p -> Integer.toString(p.partition()))
.collect(Collectors.joining(",")));
});
admin.close();
}
}
creating topic: example-topic-2020-5-7a
-- describing topic --
Topic: example-topic-2020-5-7a
Partitions: 1, partition ids: 0
creating topic: example-topic-2020-5-7b
-- describing topic --
Topic: example-topic-2020-5-7b
Partitions: 1, partition ids: 0
Example config class
package com.logicbig.example;
import java.util.Properties;
public class ExampleConfig {
public static final String BROKERS = "localhost:9092";
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKERS);
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static Properties getConsumerProps(boolean autoCommit, Long autoCommitMillisInterval) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", BROKERS);
props.setProperty("group.id", "testGroup");
props.setProperty("enable.auto.commit", Boolean.toString(autoCommit));
if (autoCommit) {
props.setProperty("auto.commit.interval.ms", Long.toString(autoCommitMillisInterval));
}
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
Example for enable.auto.commit=true
package com.logicbig.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AutoCommitExample {
private static String TOPIC_NAME = "example-topic-2020-5-7a";
private static int MSG_COUNT = 4;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(AutoCommitExample::startConsumer);
executorService.execute(AutoCommitExample::sendMessages);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
private static void startConsumer() {
Properties consumerProps = ExampleConfig.getConsumerProps(true, 1000L);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
consumer.assign(Collections.singleton(topicPartition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
if (records.isEmpty()) {
System.out.println("-- terminating consumer --");
break;
}
printOffsets(consumer, topicPartition);
}
printOffsets(consumer, topicPartition);
}
private static void printOffsets(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
Map<TopicPartition, OffsetAndMetadata> committed = consumer
.committed(new HashSet<>(Arrays.asList(topicPartition)));
OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
long position = consumer.position(topicPartition);
System.out.printf("Committed: %s, current position %s%n", offsetAndMetadata == null ? null : offsetAndMetadata
.offset(), position);
}
private static void sendMessages() {
Properties producerProps = ExampleConfig.getProducerProps();
KafkaProducer producer = new KafkaProducer<>(producerProps);
for (int i = 0; i < MSG_COUNT; i++) {
String value = "message-" + i;
System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
producer.send(new ProducerRecord<>(TOPIC_NAME, value));
}
producer.flush();
producer.close();
}
}
Sending message topic: example-topic-2020-5-7a, value: message-0
Sending message topic: example-topic-2020-5-7a, value: message-1
Sending message topic: example-topic-2020-5-7a, value: message-2
Sending message topic: example-topic-2020-5-7a, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Committed: null, current position 4
-- terminating consumer --
Committed: 4, current position 4
Running one more time
Sending message topic: example-topic-2020-5-7a, value: message-0
Sending message topic: example-topic-2020-5-7a, value: message-1
Sending message topic: example-topic-2020-5-7a, value: message-2
Sending message topic: example-topic-2020-5-7a, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 4
consumed: key = null, value = message-1, partition id= 0, offset = 5
consumed: key = null, value = message-2, partition id= 0, offset = 6
consumed: key = null, value = message-3, partition id= 0, offset = 7
Committed: 4, current position 8
-- terminating consumer --
Committed: 8, current position 8
Example for enable.auto.commit=false
Let's use the same topic example-topic-2020-5-7a first:
package com.logicbig.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AutoCommitFalseExample {
private static String TOPIC_NAME = "example-topic-2020-5-7a";
private static int MSG_COUNT = 4;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(AutoCommitFalseExample::startConsumer);
executorService.execute(AutoCommitFalseExample::sendMessages);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
private static void startConsumer() {
Properties consumerProps = ExampleConfig.getConsumerProps(false, null);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
consumer.assign(Collections.singleton(topicPartition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
if(records.isEmpty()){
System.out.println("-- terminating consumer --");
break;
}
printOffsets(consumer, topicPartition);
}
printOffsets(consumer, topicPartition);
}
private static void printOffsets(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
Map<TopicPartition, OffsetAndMetadata> committed = consumer
.committed(new HashSet<>(Arrays.asList(topicPartition)));
OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
long position = consumer.position(topicPartition);
System.out.printf("Committed: %s, current position %s%n", offsetAndMetadata == null ? null : offsetAndMetadata
.offset(), position);
}
private static void sendMessages() {
Properties producerProps = ExampleConfig.getProducerProps();
KafkaProducer producer = new KafkaProducer<>(producerProps);
for (int i = 0; i < MSG_COUNT; i++) {
String value = "message-" + i;
System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
producer.send(new ProducerRecord<>(TOPIC_NAME, value));
}
producer.flush();
producer.close();
}
}
Sending message topic: example-topic-2020-5-7a, value: message-0
Sending message topic: example-topic-2020-5-7a, value: message-1
Sending message topic: example-topic-2020-5-7a, value: message-2
Sending message topic: example-topic-2020-5-7a, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 8
consumed: key = null, value = message-1, partition id= 0, offset = 9
consumed: key = null, value = message-2, partition id= 0, offset = 10
consumed: key = null, value = message-3, partition id= 0, offset = 11
Committed: 8, current position 12
-- terminating consumer --
Committed: 8, current position 12
As seen in output the consumer started consuming from the last committed offsets (committed in the last example). Also the committed offset is not updated from 8 to 12 as there's no auto commit going on in this case.
Running again:
Sending message topic: example-topic-2020-5-7a, value: message-0
Sending message topic: example-topic-2020-5-7a, value: message-1
Sending message topic: example-topic-2020-5-7a, value: message-2
Sending message topic: example-topic-2020-5-7a, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 8
consumed: key = null, value = message-1, partition id= 0, offset = 9
consumed: key = null, value = message-2, partition id= 0, offset = 10
consumed: key = null, value = message-3, partition id= 0, offset = 11
consumed: key = null, value = message-0, partition id= 0, offset = 12
consumed: key = null, value = message-1, partition id= 0, offset = 13
consumed: key = null, value = message-2, partition id= 0, offset = 14
consumed: key = null, value = message-3, partition id= 0, offset = 15
Committed: 8, current position 16
-- terminating consumer --
Committed: 8, current position 16
As seen above, consumer is not starting from offset 12 but again 8. That's because the offsets are not auto-committed.
Let's use the new topic example-topic-2020-5-7b
public class AutoCommitFalseExample2 {
private static String TOPIC_NAME = "example-topic-2020-5-7b";
private static int MSG_COUNT = 4;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(AutoCommitFalseExample2::startConsumer);
executorService.execute(AutoCommitFalseExample2::sendMessages);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
private static void startConsumer() {
Properties consumerProps = ExampleConfig.getConsumerProps(false, null);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
consumer.assign(Collections.singleton(topicPartition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
if(records.isEmpty()){
System.out.println("-- terminating consumer --");
break;
}
printOffsets(consumer, topicPartition);
}
printOffsets(consumer, topicPartition);
}
private static void printOffsets(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
Map<TopicPartition, OffsetAndMetadata> committed = consumer
.committed(new HashSet<>(Arrays.asList(topicPartition)));
OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
long position = consumer.position(topicPartition);
System.out.printf("Committed: %s, current position %s%n", offsetAndMetadata == null ? null : offsetAndMetadata
.offset(), position);
}
private static void sendMessages() {
Properties producerProps = ExampleConfig.getProducerProps();
KafkaProducer producer = new KafkaProducer<>(producerProps);
for (int i = 0; i < MSG_COUNT; i++) {
String value = "message-" + i;
System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
producer.send(new ProducerRecord<>(TOPIC_NAME, value));
}
producer.flush();
producer.close();
}
}
Sending message topic: example-topic-2020-5-7b, value: message-0
Sending message topic: example-topic-2020-5-7b, value: message-1
Sending message topic: example-topic-2020-5-7b, value: message-2
Sending message topic: example-topic-2020-5-7b, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Committed: null, current position 4
-- terminating consumer --
Committed: null, current position 4
Running again
Sending message topic: example-topic-2020-5-7b, value: message-0
Sending message topic: example-topic-2020-5-7b, value: message-1
Sending message topic: example-topic-2020-5-7b, value: message-2
Sending message topic: example-topic-2020-5-7b, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 4
consumed: key = null, value = message-1, partition id= 0, offset = 5
consumed: key = null, value = message-2, partition id= 0, offset = 6
Committed: null, current position 7
consumed: key = null, value = message-3, partition id= 0, offset = 7
Committed: null, current position 8
-- terminating consumer --
Committed: null, current position 8
In this case consumer always starts with current offset, that's because there no last committed offset (it's null) to start with.
Example ProjectDependencies and Technologies Used: - kafka_2.12 2.5.0 Apache Kafka
- JDK 8
- Maven 3.5.4
|
|