Popular Posts

Spring boot Kafka Integration



Kafka is an open source streaming platform which scales very well in a horizontal way without compromising speed and efficiency.

The Kafka core is written in Scala, and Kafka Streams and KSQL are written in Java.


Kafka is a service bus

To connect heterogeneous applications, we need to implement a message publication mechanism to send and receive messages among them. A message router is known as message broker. Kafka is a message broker, a solution to deal with routing messages among clients in a quick way.

Kafka architecture has two directives

  1. The first is to not block the producers(to deal with back pressure).

  2. The second is to isolate producers and consumers. The producers should not know who their consumers are, hence the kafka follows the dumb broker and smart clients model.

Back pressure is to make applications robust against data surges. when producer generates data higher rate than consumer consumes

Kafka is a real-time messaging system

Kafka is a software solution with a publish-subscribe model with following features

  1. open source

  2. distributed

  3. partitioned

  4. replicated

  5. commit-log based

Kafka Terminology

Important Kafka Terminology
  • Broker: Kafka server, also the Kafka server process itself.

  • Cluster: Set of Kafka Brokers.

  • Zookeeper: Cluster coordinator.

  • Topic: This is a queue. a broker can run several topics.

  • Offset: This is an identifier for each message.

  • Partition: Immutable and ordered sequence of records continually appended to a structured commit log.

  • Producer: Program that publishes data to topics.

  • Consumer: Program that processes data from the topics.

  • Retention Period: Time to keep messages available for consumption

Kafka Architecture

Role of Zookeeper

Most of the contemporary distributed orchestration systems such as Kubernetes and Swarm rely on a distributed key/value pair for maintaining the global state of the cluster.

Consul, etcd, and even Redis are used for service discovery and cluster state management.

Apache Kafka was designed much before these lightweight services are built.


Kafka uses Apache ZooKeeper as the distributed configuration store.

It forms the backbone of Kafka cluster that continuously monitors the health of the brokers.

When new brokers get added to the cluster, ZooKeeper will start utilizing it by creating topics and partitions on it.

Apart from cluster management, initial versions of Kafka used ZooKeeper for storing the partition and offset information for each consumer.

Starting from 0.10, that information has moved to an internal Kafka topic

Cluster Types

Kafka supports three types of clusters

  1. Single node - single broker

  2. Single node - multiple broker

  3. Multiple node - multiple broker

Message Delivery Modes

Kafka, there are three ways to deliver messages

  1. Never redelivered: The messages may be lost because, once delivered, they are not sent again

  2. May be redelivered: The messages are never lost, if it is not received, the message can be sent again

  3. Delivered once: The message is delivered exactly once. (most difficult form of delivery)



Message log can be compacted in two ways

  • Coarse-grained: Log compacted by time

  • Fine-grained: Log compacted by message

Kafka Installation

There are three ways to install a Kafka environment

  1. Download executable files

  2. Homebrew or yum package managers

  3. Installing Confluent Platform

JDK8 and Scala are pre-requisite software to install kafka
Ensure you have at least 4GB of RAM, and installation directory will be /usr/local/kafka/ for MacOS and /opt/kafka/ for Linux users.

Installing on macOS

  • install sbt(Scala build tool) with brew, execute the following

$brew install sbt # (1)

$brew upgrade sbt # (2)
  • Install Scala with brew

$brew install scala #(1)

$brew upgrade scala #(2)

Installing on Windows

This will explain installation of kafka on windows machine

Installing on Linux


Running Kafka

There are two ways to run Kafka, depending on whether we install it directly or through Confluent Platform

If you installed kafka with brew the location would be /usr/local/etc/kafka

There are two types of configurations possible with Kafka

  • Standalone

  • Cluster

The real power of Kafka is unlocked when running with replication in cluster mode and all topics are correctly partitioned.

Cluster mode has two main advantages

  • Parallelism: it is the capacity to run tasks simultaneously among the cluster members.

  • Redundancy: warrants that, when a Kafka node goes dow, the cluster is safe and accessible from the other running nodes.

Running Zookeeper

$zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

Running Kafka Server (Standalone Mode)

$kafka-server-start /usr/local/etc/kafka/server.properties &
Append & at the end to run as separate deamon

Refer below link for detailed description of supported server properties

Zookeeper must be running on the machine before starting Kafka.

Running Kafka Server (Cluster Mode)

Refer server.properties file and create two broker configurations.

# The id of the broker. This must be set to a unique integer for each broker.

#port on which kafka server(broker) will run

# A comma separated list of directories under which to store log files

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. ",,".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

Create two broker configurations as mentioned below

  • mark-1.properties

  • mark-2.properties

  • Run first broker

kafka-server-start mark-1.properties &
  • Run second broker

kafka-server-start mark-2.properties &
you can check status of kafka broker with below command
tvajjala$lsof -i:9093
java    43439 tvajjala  106u  IPv6 0x672b0fb3db715d83      0t0  TCP *:9093 (LISTEN)
java    43439 tvajjala  122u  IPv6 0x672b0fb3db717483      0t0  TCP tvajjala.home:57770->tvajjala.home:9093 (ESTABLISHED)
java    43439 tvajjala  123u  IPv6 0x672b0fb3db714683      0t0  TCP tvajjala.home:9093->tvajjala.home:57770 (ESTABLISHED)
tvajjala$lsof -i:9094
java    43439 tvajjala  127u  IPv6 0x672b0fb3e101e203      0t0  TCP tvajjala.home:57774->tvajjala.home:9094 (ESTABLISHED)
java    43708 tvajjala  106u  IPv6 0x672b0fb3db716903      0t0  TCP *:9094 (LISTEN)
java    43708 tvajjala  116u  IPv6 0x672b0fb3e101d683      0t0  TCP tvajjala.home:9094->tvajjala.home:57774 (ESTABLISHED)

Dealing With Topics(CLI)

Kafka, like almost all modern infrastructure projects, has three ways of building things:

  • Through the command line,

  • Through programming,

  • Through a web console (in this case the Confluent Control Center).

Kafka has pre-built utilities to manage brokers as we already saw and to manage topics

Create Topic

$kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BarCap

parameter controls the parallelism


parameter controls the redundancy


name of topic to be created


zookeeper cluster host

Input Parameters
  • --replication-factor parameter is fundamental as it specifies in how many servers of the cluster the topic is going to replicate. On the other hand, one broker can run just one replica.

org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1

  • --partitions parameter, as its name implies, how many partitions the topic will have. The number determines the parallelism that can be achieved on the consumer side.

  • --zookeeper parameter indicates where the Zookeeper cluster is running.

List Topics

$kafka-topics --list --zookeeper localhost:2181

Create Producer

$kafka-console-producer --broker-list localhost:9092 --topic BarCap
>This is first message to Barclay
>Second message to FraudTC

This specifies the Zookeeper servers specified as a comma- separated list in the form, hostname:port.


This parameter is followed by the name of the target topic.


This specifies whether the messages should be sent synchronously.


This specifies the compression codec used to produce the messages. The possible options are: none, gzip, snappy, or lz4. If not specified, the default is gzip.


If the messages are not sent synchronously, but the message size is sent in a single batch, this value is specified in bytes.


As the brokers can fail receiving messages, this parameter specifies the number of retries before a producer gives up and drops the message. This number must be a positive integer.


In case of failure, the node leader election might take some time. This parameter is the time to wait before producer retries after this election. The number is the time in milliseconds.


If the producer is running in asynchronous mode and this parameter is set, it indicates the maximum amount of time a message will queue awaiting for the sufficient batch size. This value is expressed in milliseconds.


If the producer is running in asynchronous mode and this parameter is set, it gives the maximum amount of messages will queue awaiting the sufficient batch size.

Create Consumer

$kafka-console-consumer --bootstrap-server localhost:9092 --topic BarCap --from-beginning
 — from-beginning

parameter indicates that messages should be consumed from the beginning instead of the last messages in the log


This is the amount of data to be fetched in a single request. The size in bytes follows as argument. The default value is 1,024 x 1,024. --socket-buffer-size: This is the size of the TCP RECV. The size in bytes follows this parameter. The default value is 2 x 1024 x 1024.


This is the name of the class to use for formatting messages for display. The default value is NewlineMessageFormatter.


This is the time interval at which to save the current offset in milliseconds. The time in milliseconds follows as argument. The default value is 10,000.


This is the maximum number of messages to consume before exiting. If not set, the consumption is continuous. The number of messages follows as the argument.


If there is an error while processing a message, the system should skip it instead of halting.

Describe Topic

$kafka-topics --describe --zookeeper localhost:2181 --topic BarCapTC
Topic:BarCapTC PartitionCount:1 ReplicationFactor:2 Configs:
 Topic: BarCapTC Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1

Number of partitions on the topic (parallelism)


Number of replicas on the topic (redundancy)


Node responsible for reading and writing operations of a given partition


List of brokers replicating this topic data; some of these might even be dead


List of nodes that are currently in-sync replicas

Delete Topic

$kafka-topics --delete  --zookeeper localhost:2181 --topic=BarCap


Kafkacat is a generic command-line non-JVM utility used to test and debug apache Kafka deployments.

Kafkacat can be used to produce, consume, and list topic and partition information for Kafka.

Kafkacat is netcat for Kafka, and it is a tool for inspecting and creating data in Kafka.

kafkacat is similar to kafka-console-producer and kafka-console-consumer but more powerful

Kafkacat is an open source utility and it is available at https://github.com/edenhill/kafkacat

brew install kafkacat <macOS>
apt-get install kafkacat <Linux>

Subscribe to topic BarCap and BarCapTC

kafkacat -b localhost:9093 -t BarCap BarCapTC

Message Processing (Java API)

Message Processing involves following:

  • Message structure validation against a message schema

  • Given an event stream, filtering the messages from the stream

  • Message enrichment with additional data

  • Message aggregation(composition) form two or more message to produce a new message

Message Formats

Message can be represented in several formats

  • JSON Notation

  • Apache Avro

  • Apache Thrift

  • Protocol Buffers

JSON is easily read and written by both humans and machines, as counterweight, binary representation is very fast and lightweight in processing.

Aapache Avro

Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.

Read more about Avro spec at https://avro.apache.org/docs/1.8.2/spec.html

Message Processing with Spring Boot

This Spring boot application will produces and consumes payment details as JSON message

Update configuration under resources/application.yml

    address: localhost:9092 #(1)

    payments: payments  #(2)
  1. Kafka Server(Broker) address

  2. Topic name where we send messages

Create KafkaTopicConfiguration to create Topic in the Kafka broker

 * This configuration lets you to create Topic in the specified broker
 * @author ThirupathiReddy Vajjala
public class KafkaTopicConfig {

    private String kafkaBrokerAddress; #(1)

     * An admin that delegates to an {@link org.apache.kafka.clients.admin.AdminClient} to create topics defined
     * in the application context.
     * @return KafkaAdmin {@link KafkaAdmin}
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(singletonMap(BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress)); #(2)

     * Create Topic on the above specified broker instance
     * @param topicName topicName from configuration
     * @return topicInstance
    public NewTopic paymentDetailsTopic(@Value("${message.topic.payments}") String topicName) {

        return new NewTopic(topicName, 1, (short) 1); #(3)

  1. Kafka Broker address which is referred from application.yml

  2. KafkaAdmin is an admin that delegates to an AdminClient to create topics defined in the application context.

  3. Creates Topic in the Kafka Broker with given name while bootstrapping the application

Create KafkaTemplate to send(produce) messages into Topic

 * This class lets you allow to create {@link KafkaTemplate} instance,
 * which is used to send messages to Topic.
 * <p>
 * {@link KafkaTemplate} requires {@link ProducerFactory} instance
 * <p>
 * {@link org.apache.kafka.clients.producer.Producer} instance and {@link KafkaTemplate} are thread-safe.
 * @author ThirupathiReddy Vajjala
public class KafkaTemplateConfig {

    private String kafkaBrokerAddress;

    public ProducerFactory<String, PaymentDetails> producerFactory() {
        Map<String, Object> configProperties = new HashMap<>();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); #(1)
        configProperties.put(ProducerConfig.ACKS_CONFIG, "all");
        configProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
        return new DefaultKafkaProducerFactory<>(configProperties); #(2)

     * KafkaTemplate used to produce messages into topics
     * @param producerFactory producerFactory instance
     * @return kafkaTemplate {@link KafkaTemplate}
    public KafkaTemplate<String, PaymentDetails> kafkaTemplate(ProducerFactory producerFactory) {

        return new KafkaTemplate<>(producerFactory);  #(3)

  1. Custom Message Serializer

  2. ProducerFactory is used to specify producer configuration and type of messages we are sending

  3. KafkaTemplate is wrapper to ProducerFactory which will ease sending messages programmatically

Create Consumer Configuration to receive messages

 * For consuming messages, we need to configure {@link ConsumerFactory}
 * <p>
 * and {@link org.springframework.kafka.config.KafkaListenerContainerFactory}. Once these beans are available POJO based consumers
 * can be configured using @{@link org.springframework.kafka.annotation.KafkaListener} annotation
public class KafkaConsumerConfig {

    private String kafkaBrokerAddress;

    private static final String GROUP_ID = "FRAUD";

    public ConsumerFactory<String, PaymentDetails> consumerFactory() {
        Map<String, Object> configProperties = new HashMap<>();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);

        return new DefaultKafkaConsumerFactory<>(configProperties,
                new StringDeserializer(),
                new JsonDeserializer<>(PaymentDetails.class)); #(1)

    public ConcurrentKafkaListenerContainerFactory<String, PaymentDetails> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, PaymentDetails> factory
                = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory()); #(2)
        return factory;
  1. Custom Message Deserializer

  2. KafkaListener to read messages

We are using PaymentDetails object as Kafka message.

Create Producer and send message to the Topic

 * This spring component sends messages to the given Topic for every 5 seconds.
 * The response is Blocking object on which we added callback function to read the status of our message
public class MessageProducer implements CommandLineRunner {

    KafkaTemplate kafkaTemplate;

    String paymentMessageTopic;

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageProducer.class);

    public void run(String... args) throws InterruptedException {

        do {
            sendMessage(new PaymentDetails(currentTimeMillis(), "12345", "Thiru", 1234.0));
        } while (true);


    public void sendMessage(PaymentDetails message) {

        ListenableFuture<SendResult<String, PaymentDetails>> future = kafkaTemplate.send(paymentMessageTopic, message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, PaymentDetails>>() {

            public void onFailure(Throwable ex) {
                LOGGER.warn("Message Sending Failed {} ", ex);

            public void onSuccess(SendResult<String, PaymentDetails> result) {

                LOGGER.info("Message [{}] sent to topic [{}]", result.getProducerRecord().value(), result.getRecordMetadata().topic());

From the terminal use below command to lister to that Topic

$kafka-console-consumer --bootstrap-server localhost:9092 --topic payments --from-beginning

Alternatively, we can create Java Client as shown below read messages

public class MessageConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumer.class);

    @KafkaListener(topics = "payments", groupId = "FRAUD")   #(1)
    public void receivePaymentDetails(PaymentDetails message) {

        LOGGER.info("Received Message {} ", message);


  1. KafkaListener annotation used in any spring bean message to listen to Topic

No comments:

Post a Comment