Produce & Consume String Messages using Apache Kafka and Java


In this post, we will create a Kafka producer which will produce string messages as Rest call and a consumer which will consume the produced messages.

Install/Setup Kafka

To download Kafka refer to the Apache Kafka Download website.
Pick any of the desired package. For this post we choose 2.8.1

NOTE: The version 2.13.3 is not working and will throw Connection Refused error.

Steps to Setup:

  1. Extract the tgz file to some directory.
  2. Open zookeeper.properties file under config directory and provide the dataDir value. It is the data directory used by Kafka to store the data.
  3. Open server.properties file under config directory and provide the logs-dir kafka logs directory.
  4. Start the zookeeper by running following command:
    • if Windows:
      .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
    • if Unix:
      .\bin\zookeeper-server-start.sh .\config\zookeeper.properties

      NOTE:

      Problem: 
            If you are getting following error:
                input line is too long kafka
                The syntax is not correct
      Reason: 
            This is happening because the allowed characters in windows for classpath is only 8191                        characters. But when the above command is executed, it sets each jar file found under the lib                directory to the classpath variable. This make the statement increase the characters length                        threshold.
     
    Solution: 
            > Open the kafka-run-class.bat file under bin/windows directory.
            > Search for "rem Classpath addition for release
            Replace these line
                rem Classpath addition for release
                for %%i in ("%BASE_DIR%\libs\*") do (
                  call :concat "%%i"
                )

            with these
                rem Classpath addition for release
                call :concat "%BASE_DIR%\libs\*;"

Once the zookeeper is started successfully without any error, then start kafka server using below command:
For Windows:
        .\bin\windows\kafka-server-start.bat .\config\server.properties

For Unix:
        .\bin\kafka-server-start.sh .\config\server.properties

If you want to stop, then run the corresponding stop scripts for zookeeper and kafka server.

Create Kafka Topic

When a message is sent to Kafka broker, it will send the message to a specific topic, which can be thought of as address where the post can be delivered. This way consumer can differentiate between the messages which are relevant to get consumed than all other messages.

We can create a kafka topic like this:
For Unix:   
        ./kafka-topics.sh --create --topic mt -zookeeper localhost:2181 --replication-factor 1 --partitions 1 

For Windows
       ./windows/kafka-topics.bat --create --topic mt -zookeeper localhost:2181 --replication-factor 1 --partitions 1 

Create Spring Project

From the Spring Initializer, either from IDE or the website, create a spring maven project with the following dependency:

Creating Producer Based on Configurations

The project will have a producer which will produce the string messages.
@Configuration
public class ProducerConfigs {

private static final String KAFKA_BROKER = "localhost:9092";

@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfigurations());
}

@Bean
public Map<String, Object> producerConfigurations(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return config;
}
In the above code base, we are defining the producer configuration which states that the producer will be producing the messages to the KAFKA_BROKER url defined, and the Key and Value of the message will be of type String.

Here the Kafka template is used to produce messages from producer created from the producer factory . The producer are created based on configurations given.

Creating Consumer Based on Configurations

The project will have a consumer which will consumer the string messages.

private static final String KAFKA_BROKER = "localhost:9092";
private static final String GROUP_ID = "kafka-sandbox";

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigurations());
}

@Bean
public Map<String, Object> consumerConfigurations(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
config.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return config;
}

In the above code base, with the KAFKA_BROKER it will also require the GROUP_ID which is mandatory by Kafka for parallel data consumptions. Similarly the ConcurrentKafkaListenerContainerFactory is used to consume multiple messages by different threads.

Here the consumer configurations is given to connect to the KAFKA_BROKER with the GROUP_ID and the Key and Value deserializer class (here it is String).

Create Topic-wise Consumer for specific action

This will be topic specific consumer where you can perform specific actions. Here we will listen to the kafka for any new messages produced on the kafka topic we created and will store it to a list. We will also create a getMessages() method to get the list of messages.

private final List<String> messages = new ArrayList<>();

@KafkaListener(topics = "mt", groupId = "kafka-sandbox")
public void listen(String message){
synchronized (message){
messages.add(message);
}
}

public List<String> getMessages(){
return messages;
}

It listens on the topics mentioned and the group-id given.

Create Rest Controller to produce and consume messages

In the Rest controller, we will create a produce method which will send the data via kafka template on the given topic. 

    @GetMapping("/kafka/produce")
    public void produce(@RequestParam String message){
    kafkaTemplate.send("mt", message);
    }

And will create a another method which will display all the messages produced by the producer.

    @GetMapping("/kafka/messages")
    public List<String> getMessages(){
    return mtConsumer.getMessages();
    }


Start the spring application and run the following get REST call to send any specific messages like this where we are sending 4 messages to kafka topic mt:

http://localhost:8080/kafka/produce?message="Kafka with Java on DynamicallyBluntTech1"
http://localhost:8080/kafka/produce?message="Kafka with Java on DynamicallyBluntTech2"
http://localhost:8080/kafka/produce?message="Kafka with Java on DynamicallyBluntTech3"
http://localhost:8080/kafka/produce?message="Kafka with Java on DynamicallyBluntTech4"

Similarly, we can run the REST call for getMessages() to display all the messages which the consumer has consumed, like below:

http://localhost:8080/kafka/messages

which will display following 4 messages which was produced:
["\"Kafka with Java on DynamicallyBluntTech1\"", "\"Kafka with Java on DynamicallyBluntTech2\"", "\"Kafka with Java on DynamicallyBluntTech3\"", "\"Kafka with Java on DynamicallyBluntTech4\""]

In this way we can produce and consumer string messages. We can also produce and consume the JSON object as well, only change would be to use Jsonserializer and JsonDeserializer class as part of the configuration given to the producer and consumer. 
To get the entire code base of the above example, refer to this Github link here.

References

https://developer.okta.com/blog/2019/11/19/java-kafka
https://www.geeksforgeeks.org/spring-boot-how-to-publish-string-messages-on-apache-kafka/

Comments