Close

Kafka - Getting Started

[Last Updated: Mar 16, 2020]

What is Kafka?

Apache Kafka is a distributed streaming platform.
The streams of records can be published and subscribed to registered topics.

Installing and Running Kafka

Follow the instructions here to install and run Kafka server. By default Kafka server runs at localhost:9092

Example

pom.xml

<project .....>
<modelVersion>4.0.0</modelVersion>

<groupId>com.logicbig.example</groupId>
<artifactId>kafka-getting-started</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>8</source>
<target>8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

The Producer

package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class ExampleProducer {

  private final KafkaProducer<String, String> producer;

  ExampleProducer() {
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      producer = new KafkaProducer<>(props);
  }

  public void sendMessage(String topic, String key, String value) {
      System.out.printf("Sending message topic: %s, key: %s, value: %s%n", topic, key, value);
      producer.send(new ProducerRecord<>(topic, key, value));
  }
}

The Consumer

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.function.BiFunction;

public class ExampleConsumer {
  public ExampleConsumer(String topic, BiFunction<String, String, Boolean> biFunction) {
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", "localhost:9092");
      props.setProperty("group.id", "test");
      props.setProperty("enable.auto.commit", "true");
      props.setProperty("auto.commit.interval.ms", "1000");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList(topic));
      out: while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30));
          for (ConsumerRecord<String, String> record : records){
            if(biFunction.apply(record.key(), record.value())){
                break out;
            }
          }
      }
  }
}

Main class

package com.logicbig.example;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class KafkaExampleMain {

  public static void main(String[] args) throws InterruptedException {
      final String topic = "my-test-topic";
      int count = 5;
      AtomicInteger atomicInteger = new AtomicInteger(0);

      ExecutorService executorService = Executors.newFixedThreadPool(2);

      //start consumer
      executorService.execute(() -> {
          new ExampleConsumer(topic, (key, value) -> {
              System.out.printf("consumed: key = %s, value = %s%n", key, value);
              return atomicInteger.incrementAndGet() == count;
          });
      });

      //start producer
      executorService.execute(() -> {
          ExampleProducer exampleProducer = new ExampleProducer();
          for (int i = 0; i < count; i++) {
              exampleProducer.sendMessage(topic, Integer.toString(i), "message - " + i);
          }
      });

      executorService.shutdown();
      executorService.awaitTermination(3, TimeUnit.MINUTES);
  }
}
Sending message topic: my-test-topic, key: 0, value: message - 0
Sending message topic: my-test-topic, key: 1, value: message - 1
Sending message topic: my-test-topic, key: 2, value: message - 2
Sending message topic: my-test-topic, key: 3, value: message - 3
Sending message topic: my-test-topic, key: 4, value: message - 4
consumed: key = 0, value = message - 0
consumed: key = 1, value = message - 1
consumed: key = 2, value = message - 2
consumed: key = 3, value = message - 3
consumed: key = 4, value = message - 4

Example Project

Dependencies and Technologies Used:

  • kafka-clients 2.4.1 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Kafka - Quick Example Select All Download
  • kafka-getting-started
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • KafkaExampleMain.java

    See Also