Close

Kafka - Auto Committing Offsets

[Last Updated: Jun 21, 2020]

For a consumer, we can enable/disable auto commit by setting enable.auto.commit = true/false

When set to true consumer's offset will be periodically committed in the background.

If this property is set to false then no offsets are committed

The property auto.commit.interval.ms specifies the frequency in milliseconds that the consumer offsets are auto-committed to Kafka. This only applies if enable.auto.commit is set to true.

Kafka consumer will auto commit the offset of the last message received in response to its poll() call.

KafkaConsumer#position() method

public long position(TopicPartition partition)

Gets the offset of the next record that will be fetched (if a record with that offset exists). This method simply returns the current-offset.

KafkaConsumer#committed() method

public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) 

Gets the last committed offsets for the given partitions (whether the commit happened by this process or another).

Examples

Run Kafka server as described here

Creating Topics

Let's create two topics, each with 1 partition.

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
  public static void main(String[] args) throws Exception {
      createTopic("example-topic-2020-5-7a", 1);
      createTopic("example-topic-2020-5-7b", 1);
  }

  private static void createTopic(String topicName, int numPartitions) throws Exception {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
      AdminClient admin = AdminClient.create(config);

      //checking if topic already exists
      boolean alreadyExists = admin.listTopics().names().get().stream()
                                   .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
      if (alreadyExists) {
          System.out.printf("topic already exits: %s%n", topicName);
      } else {
          //creating new topic
          System.out.printf("creating topic: %s%n", topicName);
          NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();
      }

      //describing
      System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get()
           .forEach((topic, desc) -> {
               System.out.println("Topic: " + topic);
               System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                       desc.partitions()
                           .stream()
                           .map(p -> Integer.toString(p.partition()))
                           .collect(Collectors.joining(",")));
           });

      admin.close();
  }
}
creating topic: example-topic-2020-5-7a
-- describing topic --
Topic: example-topic-2020-5-7a
Partitions: 1, partition ids: 0
creating topic: example-topic-2020-5-7b
-- describing topic --
Topic: example-topic-2020-5-7b
Partitions: 1, partition ids: 0

Example config class

package com.logicbig.example;

import java.util.Properties;

public class ExampleConfig {
  public static final String BROKERS = "localhost:9092";

  public static Properties getProducerProps() {
      Properties props = new Properties();
      props.put("bootstrap.servers", BROKERS);
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      return props;
  }

  public static Properties getConsumerProps(boolean autoCommit, Long autoCommitMillisInterval) {
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", BROKERS);
      props.setProperty("group.id", "testGroup");
      props.setProperty("enable.auto.commit", Boolean.toString(autoCommit));
      if (autoCommit) {
          props.setProperty("auto.commit.interval.ms", Long.toString(autoCommitMillisInterval));
      }
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      return props;
  }
}

Example for enable.auto.commit=true

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AutoCommitExample {
  private static String TOPIC_NAME = "example-topic-2020-5-7a";
  private static int MSG_COUNT = 4;

  public static void main(String[] args) throws Exception {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      executorService.execute(AutoCommitExample::startConsumer);
      executorService.execute(AutoCommitExample::sendMessages);
      executorService.shutdown();
      executorService.awaitTermination(10, TimeUnit.MINUTES);
  }

  private static void startConsumer() {
      Properties consumerProps = ExampleConfig.getConsumerProps(true, 1000L);
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
      consumer.assign(Collections.singleton(topicPartition));
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
          for (ConsumerRecord<String, String> record : records) {
              System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                      record.key(), record.value(), record.partition(), record.offset());
          }
          if (records.isEmpty()) {
              System.out.println("-- terminating consumer --");
              break;
          }

          printOffsets(consumer, topicPartition);
      }
      printOffsets(consumer, topicPartition);
  }

  private static void printOffsets(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
      Map<TopicPartition, OffsetAndMetadata> committed = consumer
              .committed(new HashSet<>(Arrays.asList(topicPartition)));
      OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
      long position = consumer.position(topicPartition);
      System.out.printf("Committed: %s, current position %s%n", offsetAndMetadata == null ? null : offsetAndMetadata
              .offset(), position);
  }

  private static void sendMessages() {
      Properties producerProps = ExampleConfig.getProducerProps();
      KafkaProducer producer = new KafkaProducer<>(producerProps);
      for (int i = 0; i < MSG_COUNT; i++) {
          String value = "message-" + i;
          System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
          producer.send(new ProducerRecord<>(TOPIC_NAME, value));
      }
      producer.flush();
      producer.close();
  }
}
Sending message topic: example-topic-2020-5-7a, value: message-0
Sending message topic: example-topic-2020-5-7a, value: message-1
Sending message topic: example-topic-2020-5-7a, value: message-2
Sending message topic: example-topic-2020-5-7a, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Committed: null, current position 4
-- terminating consumer --
Committed: 4, current position 4

Running one more time

Sending message topic: example-topic-2020-5-7a, value: message-0
Sending message topic: example-topic-2020-5-7a, value: message-1
Sending message topic: example-topic-2020-5-7a, value: message-2
Sending message topic: example-topic-2020-5-7a, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 4
consumed: key = null, value = message-1, partition id= 0, offset = 5
consumed: key = null, value = message-2, partition id= 0, offset = 6
consumed: key = null, value = message-3, partition id= 0, offset = 7
Committed: 4, current position 8
-- terminating consumer --
Committed: 8, current position 8

Example for enable.auto.commit=false

Let's use the same topic example-topic-2020-5-7a first:

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AutoCommitFalseExample {
  private static String TOPIC_NAME = "example-topic-2020-5-7a";
  private static int MSG_COUNT = 4;

  public static void main(String[] args) throws Exception {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      executorService.execute(AutoCommitFalseExample::startConsumer);
      executorService.execute(AutoCommitFalseExample::sendMessages);
      executorService.shutdown();
      executorService.awaitTermination(10, TimeUnit.MINUTES);
  }

  private static void startConsumer() {
      Properties consumerProps = ExampleConfig.getConsumerProps(false, null);
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
      consumer.assign(Collections.singleton(topicPartition));
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
          for (ConsumerRecord<String, String> record : records) {
              System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                      record.key(), record.value(), record.partition(), record.offset());
          }
          if(records.isEmpty()){
              System.out.println("-- terminating consumer --");
              break;
          }

          printOffsets(consumer, topicPartition);
      }
      printOffsets(consumer, topicPartition);
  }

  private static void printOffsets(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
      Map<TopicPartition, OffsetAndMetadata> committed = consumer
              .committed(new HashSet<>(Arrays.asList(topicPartition)));
      OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
      long position = consumer.position(topicPartition);
      System.out.printf("Committed: %s, current position %s%n", offsetAndMetadata == null ? null : offsetAndMetadata
              .offset(), position);
  }

  private static void sendMessages() {
      Properties producerProps = ExampleConfig.getProducerProps();
      KafkaProducer producer = new KafkaProducer<>(producerProps);
      for (int i = 0; i < MSG_COUNT; i++) {
          String value = "message-" + i;
          System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
          producer.send(new ProducerRecord<>(TOPIC_NAME, value));
      }
      producer.flush();
      producer.close();
  }
}
Sending message topic: example-topic-2020-5-7a, value: message-0
Sending message topic: example-topic-2020-5-7a, value: message-1
Sending message topic: example-topic-2020-5-7a, value: message-2
Sending message topic: example-topic-2020-5-7a, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 8
consumed: key = null, value = message-1, partition id= 0, offset = 9
consumed: key = null, value = message-2, partition id= 0, offset = 10
consumed: key = null, value = message-3, partition id= 0, offset = 11
Committed: 8, current position 12
-- terminating consumer --
Committed: 8, current position 12

As seen in output the consumer started consuming from the last committed offsets (committed in the last example). Also the committed offset is not updated from 8 to 12 as there's no auto commit going on in this case.

Running again:

Sending message topic: example-topic-2020-5-7a, value: message-0
Sending message topic: example-topic-2020-5-7a, value: message-1
Sending message topic: example-topic-2020-5-7a, value: message-2
Sending message topic: example-topic-2020-5-7a, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 8
consumed: key = null, value = message-1, partition id= 0, offset = 9
consumed: key = null, value = message-2, partition id= 0, offset = 10
consumed: key = null, value = message-3, partition id= 0, offset = 11
consumed: key = null, value = message-0, partition id= 0, offset = 12
consumed: key = null, value = message-1, partition id= 0, offset = 13
consumed: key = null, value = message-2, partition id= 0, offset = 14
consumed: key = null, value = message-3, partition id= 0, offset = 15
Committed: 8, current position 16
-- terminating consumer --
Committed: 8, current position 16

As seen above, consumer is not starting from offset 12 but again 8. That's because the offsets are not auto-committed.

Let's use the new topic example-topic-2020-5-7b

public class AutoCommitFalseExample2 {
  private static String TOPIC_NAME = "example-topic-2020-5-7b";
  private static int MSG_COUNT = 4;

  public static void main(String[] args) throws Exception {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      executorService.execute(AutoCommitFalseExample2::startConsumer);
      executorService.execute(AutoCommitFalseExample2::sendMessages);
      executorService.shutdown();
      executorService.awaitTermination(10, TimeUnit.MINUTES);
  }

  private static void startConsumer() {
      Properties consumerProps = ExampleConfig.getConsumerProps(false, null);
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
      consumer.assign(Collections.singleton(topicPartition));
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
          for (ConsumerRecord<String, String> record : records) {
              System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                      record.key(), record.value(), record.partition(), record.offset());
          }
          if(records.isEmpty()){
              System.out.println("-- terminating consumer --");
              break;
          }

          printOffsets(consumer, topicPartition);
      }
      printOffsets(consumer, topicPartition);
  }

  private static void printOffsets(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
      Map<TopicPartition, OffsetAndMetadata> committed = consumer
              .committed(new HashSet<>(Arrays.asList(topicPartition)));
      OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
      long position = consumer.position(topicPartition);
      System.out.printf("Committed: %s, current position %s%n", offsetAndMetadata == null ? null : offsetAndMetadata
              .offset(), position);
  }

  private static void sendMessages() {
      Properties producerProps = ExampleConfig.getProducerProps();
      KafkaProducer producer = new KafkaProducer<>(producerProps);
      for (int i = 0; i < MSG_COUNT; i++) {
          String value = "message-" + i;
          System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
          producer.send(new ProducerRecord<>(TOPIC_NAME, value));
      }
      producer.flush();
      producer.close();
  }
}
Sending message topic: example-topic-2020-5-7b, value: message-0
Sending message topic: example-topic-2020-5-7b, value: message-1
Sending message topic: example-topic-2020-5-7b, value: message-2
Sending message topic: example-topic-2020-5-7b, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Committed: null, current position 4
-- terminating consumer --
Committed: null, current position 4

Running again

Sending message topic: example-topic-2020-5-7b, value: message-0
Sending message topic: example-topic-2020-5-7b, value: message-1
Sending message topic: example-topic-2020-5-7b, value: message-2
Sending message topic: example-topic-2020-5-7b, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 4
consumed: key = null, value = message-1, partition id= 0, offset = 5
consumed: key = null, value = message-2, partition id= 0, offset = 6
Committed: null, current position 7
consumed: key = null, value = message-3, partition id= 0, offset = 7
Committed: null, current position 8
-- terminating consumer --
Committed: null, current position 8

In this case consumer always starts with current offset, that's because there no last committed offset (it's null) to start with.

Example Project

Dependencies and Technologies Used:

  • kafka_2.12 2.5.0 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Kafka - Auto Committing Offsets Select All Download
  • kafka-auto-commit-example
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • AutoCommitExample.java

    See Also