Close

Kafka - Introduction to Kafka Admin API

[Last Updated: Aug 11, 2020]

The Admin API supports managing and inspecting topics, brokers, acls, and other Kafka objects.

In this tutorial we will see getting started examples of how to use Kafka Admin API.

Example

Start Kafka server as describe here.

How to list Kafka configuration?

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ListingConfigs {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(config);
      for (Node node : admin.describeCluster().nodes().get()) {
          System.out.println("-- node: " + node.id() + " --");
          ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, "0");
          DescribeConfigsResult dcr = admin.describeConfigs(Collections.singleton(cr));
          dcr.all().get().forEach((k, c) -> {
              c.entries()
               .forEach(configEntry -> {System.out.println(configEntry.name() + "= " + configEntry.value());});
          });
      }
  }
}
-- node: 0 --
advertised.host.name= null
log.cleaner.min.compaction.lag.ms= 0
metric.reporters=
quota.producer.default= 9223372036854775807
offsets.topic.num.partitions= 50
log.flush.interval.messages= 9223372036854775807
controller.socket.timeout.ms= 30000
auto.create.topics.enable= true
log.flush.interval.ms= null
principal.builder.class= null
replica.socket.receive.buffer.bytes= 65536
min.insync.replicas= 1
replica.fetch.wait.max.ms= 500
num.recovery.threads.per.data.dir= 1
ssl.keystore.type= JKS
password.encoder.iterations= 4096
sasl.mechanism.inter.broker.protocol= GSSAPI
default.replication.factor= 1
ssl.truststore.password= null
log.preallocate= false
sasl.kerberos.principal.to.local.rules= DEFAULT
fetch.purgatory.purge.interval.requests= 1000
ssl.endpoint.identification.algorithm= https
replica.socket.timeout.ms= 30000
message.max.bytes= 1000012
transactional.id.expiration.ms= 604800000
transaction.state.log.replication.factor= 1
control.plane.listener.name= null
num.io.threads= 8
sasl.login.refresh.buffer.seconds= 300
connections.max.reauth.ms= 0
connection.failed.authentication.delay.ms= 100
offsets.commit.required.acks= -1
log.flush.offset.checkpoint.interval.ms= 60000
delete.topic.enable= true
quota.window.size.seconds= 1
ssl.truststore.type= JKS
offsets.commit.timeout.ms= 5000
quota.window.num= 11
zookeeper.connect= localhost:2181
authorizer.class.name=
password.encoder.secret= null
log.cleaner.max.compaction.lag.ms= 9223372036854775807
num.replica.fetchers= 1
alter.log.dirs.replication.quota.window.size.seconds= 1
log.retention.ms= null
alter.log.dirs.replication.quota.window.num= 11
log.roll.jitter.hours= 0
password.encoder.old.secret= null
log.cleaner.enable= true
offsets.load.buffer.size= 5242880
log.cleaner.delete.retention.ms= 86400000
ssl.client.auth= none
controlled.shutdown.max.retries= 3
offsets.topic.replication.factor= 1
queued.max.requests= 500
transaction.state.log.min.isr= 1
log.cleaner.threads= 1
ssl.secure.random.implementation= null
sasl.kerberos.service.name= null
sasl.kerberos.ticket.renew.jitter= 0.05
socket.request.max.bytes= 104857600
ssl.trustmanager.algorithm= PKIX
zookeeper.session.timeout.ms= 6000
log.retention.bytes= -1
sasl.jaas.config= null
log.message.timestamp.type= CreateTime
sasl.kerberos.min.time.before.relogin= 60000
zookeeper.set.acl= false
connections.max.idle.ms= 600000
offsets.retention.minutes= 10080
max.connections= 2147483647
delegation.token.expiry.time.ms= 86400000
transaction.state.log.num.partitions= 50
replica.fetch.backoff.ms= 1000
inter.broker.protocol.version= 2.4-IV1
kafka.metrics.reporters=
listener.security.protocol.map= PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.retention.hours= 168
num.partitions= 1
client.quota.callback.class= null
broker.id.generation.enable= true
listeners= null
ssl.provider= null
ssl.enabled.protocols= TLSv1.2,TLSv1.1,TLSv1
inter.broker.listener.name= null
delete.records.purgatory.purge.interval.requests= 1
log.roll.ms= null
alter.config.policy.class.name= null
delegation.token.expiry.check.interval.ms= 3600000
ssl.cipher.suites=
zookeeper.max.in.flight.requests= 10
log.flush.scheduler.interval.ms= 9223372036854775807
log.index.size.max.bytes= 10485760
ssl.keymanager.algorithm= SunX509
sasl.login.callback.handler.class= null
security.inter.broker.protocol= PLAINTEXT
replica.fetch.max.bytes= 1048576
sasl.server.callback.handler.class= null
advertised.port= null
log.cleaner.dedupe.buffer.size= 134217728
replica.high.watermark.checkpoint.interval.ms= 5000
replication.quota.window.size.seconds= 1
log.cleaner.io.buffer.size= 524288
sasl.kerberos.ticket.renew.window.factor= 0.8
create.topic.policy.class.name= null
zookeeper.connection.timeout.ms= 6000
metrics.recording.level= INFO
password.encoder.cipher.algorithm= AES/CBC/PKCS5Padding
controlled.shutdown.retry.backoff.ms= 5000
security.providers= null
log.roll.hours= 168
log.cleanup.policy= delete
log.flush.start.offset.checkpoint.interval.ms= 60000
ssl.principal.mapping.rules= DEFAULT
host.name=
replica.selector.class= null
log.roll.jitter.ms= null
transaction.state.log.segment.bytes= 104857600
max.connections.per.ip= 2147483647
offsets.topic.segment.bytes= 104857600
background.threads= 10
quota.consumer.default= 9223372036854775807
request.timeout.ms= 30000
group.initial.rebalance.delay.ms= 0
log.message.format.version= 2.4-IV1
sasl.login.class= null
log.index.interval.bytes= 4096
log.dir= /tmp/kafka-logs
log.segment.bytes= 1073741824
log.cleaner.backoff.ms= 15000
offset.metadata.max.bytes= 4096
ssl.truststore.location= null
replica.fetch.response.max.bytes= 10485760
group.max.session.timeout.ms= 1800000
ssl.keystore.password= null
port= 9092
zookeeper.sync.time.ms= 2000
log.retention.minutes= null
log.segment.delete.delay.ms= 60000
log.dirs= /tmp/kafka-logs
controlled.shutdown.enable= true
compression.type= producer
max.connections.per.ip.overrides=
log.message.timestamp.difference.max.ms= 9223372036854775807
sasl.login.refresh.min.period.seconds= 60
password.encoder.key.length= 128
sasl.login.refresh.window.factor= 0.8
kafka.metrics.polling.interval.secs= 10
transaction.abort.timed.out.transaction.cleanup.interval.ms= 60000
sasl.kerberos.kinit.cmd= /usr/bin/kinit
log.cleaner.io.max.bytes.per.second= 1.7976931348623157E308
auto.leader.rebalance.enable= true
leader.imbalance.check.interval.seconds= 300
log.cleaner.min.cleanable.ratio= 0.5
replica.lag.time.max.ms= 10000
max.incremental.fetch.session.cache.slots= 1000
delegation.token.master.key= null
num.network.threads= 3
ssl.key.password= null
reserved.broker.max.id= 1000
sasl.client.callback.handler.class= null
metrics.num.samples= 2
transaction.remove.expired.transaction.cleanup.interval.ms= 3600000
socket.send.buffer.bytes= 102400
log.message.downconversion.enable= true
ssl.protocol= TLS
password.encoder.keyfactory.algorithm= null
transaction.state.log.load.buffer.size= 5242880
socket.receive.buffer.bytes= 102400
ssl.keystore.location= null
replica.fetch.min.bytes= 1
broker.rack= null
unclean.leader.election.enable= false
num.replica.alter.log.dirs.threads= null
sasl.enabled.mechanisms= GSSAPI
group.min.session.timeout.ms= 6000
offsets.retention.check.interval.ms= 600000
log.cleaner.io.buffer.load.factor= 0.9
transaction.max.timeout.ms= 900000
producer.purgatory.purge.interval.requests= 1000
metrics.sample.window.ms= 30000
group.max.size= 2147483647
broker.id= 0
offsets.topic.compression.codec= 0
delegation.token.max.lifetime.ms= 604800000
replication.quota.window.num= 11
log.retention.check.interval.ms= 300000
advertised.listeners= null
leader.imbalance.per.broker.percentage= 10
sasl.login.refresh.window.jitter= 0.05
queued.max.request.bytes= -1

How to list all topics?

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.TopicListing;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ListingTopics {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(config);
      for (TopicListing topicListing : admin.listTopics().listings().get()) {
          System.out.println(topicListing);
      }
  }
}
(name=test-topic-1, internal=false)
(name=test-topic-2, internal=false)

How to create topics?

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CreateTopic {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(config);
      //creating new topic
      System.out.println("-- creating --");
      NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
      admin.createTopics(Collections.singleton(newTopic));

      //listing
      System.out.println("-- listing --");
      admin.listTopics().names().get().forEach(System.out::println);
  }
}
-- creating --
-- listing --
my-new-topic
test-topic-1
test-topic-2

Example Project

Dependencies and Technologies Used:

  • kafka-clients 2.4.1 Apache Kafka
  • JDK 8
  • Maven 3.5.4

Apache Kafka Admin API Examples Select All Download
  • kafka-admin-getting-started
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • ListingConfigs.java

    See Also