Close

Kafka Manual Commit - CommitAsync() Example

[Last Updated: Jun 21, 2020]

By setting auto.commit.offset=false (tutorial), offsets will only be committed when the application explicitly chooses to do so.

To commit offsets asynchronously we can use following method of KafkaConsumer:

public void commitAsync()

This method commits offsets returned on the last poll(Duration) for all the subscribed list of topics and partition.

Example

Run Kafka server as described here

Example config

package com.logicbig.example;

import java.util.Properties;

public class ExampleConfig {
  public static final String BROKERS = "localhost:9092";

  public static Properties getProducerProps() {
      Properties props = new Properties();
      props.put("bootstrap.servers", BROKERS);
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      return props;
  }

  public static Properties getConsumerProps() {
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", BROKERS);
      props.setProperty("group.id", "testGroup");
      props.setProperty("enable.auto.commit", "false");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      return props;
  }
}

Creating example topic

Let's create a topic with one partition:

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
  public static void main(String[] args) throws Exception {
      createTopic("example-topic-2020-5-28", 1);
  }

  private static void createTopic(String topicName, int numPartitions) throws Exception {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
      AdminClient admin = AdminClient.create(config);

      //checking if topic already exists
      boolean alreadyExists = admin.listTopics().names().get().stream()
                                   .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
      if (alreadyExists) {
          System.out.printf("topic already exits: %s%n", topicName);
      } else {
          //creating new topic
          System.out.printf("creating topic: %s%n", topicName);
          NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();
      }

      //describing
      System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get()
           .forEach((topic, desc) -> {
               System.out.println("Topic: " + topic);
               System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                       desc.partitions()
                           .stream()
                           .map(p -> Integer.toString(p.partition()))
                           .collect(Collectors.joining(",")));
           });

      admin.close();
  }
}
creating topic: example-topic-2020-5-28
-- describing topic --
Topic: example-topic-2020-5-28
Partitions: 1, partition ids: 0

Using CommitAsync()

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

public class CommitAsyncExample {
  private static String TOPIC_NAME = "example-topic-2020-5-28";
  private static KafkaConsumer<String, String> consumer;
  private static TopicPartition topicPartition;

  public static void main(String[] args) throws Exception {
      Properties consumerProps = ExampleConfig.getConsumerProps();
      consumer = new KafkaConsumer<>(consumerProps);
      topicPartition = new TopicPartition(TOPIC_NAME, 0);
      consumer.assign(Collections.singleton(topicPartition));
      printOffsets("before consumer loop", consumer, topicPartition);
      sendMessages();
      startConsumer();
  }

  private static void startConsumer() {

      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
          for (ConsumerRecord<String, String> record : records) {
              System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                      record.key(), record.value(), record.partition(), record.offset());
          }
          if (records.isEmpty()) {
              System.out.println("-- terminating consumer --");
              break;
          }
          printOffsets("before commitAsync() call", consumer, topicPartition);
          consumer.commitAsync();
          printOffsets("after commitAsync() call", consumer, topicPartition);
      }
      printOffsets("after consumer loop", consumer, topicPartition);
  }

  private static void printOffsets(String message, KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
      Map<TopicPartition, OffsetAndMetadata> committed = consumer
              .committed(new HashSet<>(Arrays.asList(topicPartition)));
      OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
      long position = consumer.position(topicPartition);
      System.out
              .printf("Offset info %s, Committed: %s, current position %s%n", message,
                      offsetAndMetadata == null ? null : offsetAndMetadata
                      .offset(), position);
  }

  private static void sendMessages() {
      Properties producerProps = ExampleConfig.getProducerProps();
      KafkaProducer producer = new KafkaProducer<>(producerProps);
      for (int i = 0; i < 4; i++) {
          String value = "message-" + i;
          System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
          producer.send(new ProducerRecord<>(TOPIC_NAME, value));
      }
      producer.flush();
      producer.close();
  }
}
Offset info before consumer loop, Committed: null, current position 0
Sending message topic: example-topic-2020-5-28, value: message-0
Sending message topic: example-topic-2020-5-28, value: message-1
Sending message topic: example-topic-2020-5-28, value: message-2
Sending message topic: example-topic-2020-5-28, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Offset info before commitAsync() call, Committed: null, current position 4
Offset info after commitAsync() call, Committed: 4, current position 4
-- terminating consumer --
Offset info after consumer loop, Committed: 4, current position 4

Running one more time:

Offset info before consumer loop, Committed: 4, current position 4
Sending message topic: example-topic-2020-5-28, value: message-0
Sending message topic: example-topic-2020-5-28, value: message-1
Sending message topic: example-topic-2020-5-28, value: message-2
Sending message topic: example-topic-2020-5-28, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 4
consumed: key = null, value = message-1, partition id= 0, offset = 5
consumed: key = null, value = message-2, partition id= 0, offset = 6
consumed: key = null, value = message-3, partition id= 0, offset = 7
Offset info before commitAsync() call, Committed: 4, current position 8
Offset info after commitAsync() call, Committed: 8, current position 8
-- terminating consumer --
Offset info after consumer loop, Committed: 8, current position 8

Example Project

Dependencies and Technologies Used:

  • kafka_2.12 2.5.0 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Kafka - CommitAsync() Example Select All Download
  • kafka-manual-commit-async-example
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • CommitAsyncExample.java

    See Also