Spring Boot Kafka Producer and Consumer Example – Step By Step Guide

Spring Boot Kafka Producer and Consumer Example from scratch

In this post we will see Spring Boot Kafka Producer and Consumer Example from scratch. We are going to cover below points.

How to install Apache Kafka.
How to start zookeeper/kafka and create a topic.
How to configure spring and apache Kafka.
How to create Kafka producer and consumer to send/receive string messages – Hello word example.
How to create Kafka producer and consumer to send/receive JSON messages.
How to create a Kafka Consumer Rest controller/end-point.
Testing using postman.

Installing Apche kafka and Creating Topic.

In this section, we will see how to install kafka(run zookeeper and kafka server) and create topic.

Insatlling Apache Kafka in windows.

Download kafka using below link.

https://kafka.apache.org/downloads

How to download and install apache kafka

Download from below link.

How to download and install apache kafka

Note(for beginners) – Create directory C:\data\kafka and keep all folder inside the Kafka folder from the download directory, to avoid error while running Kafka related command(for example you may get input command is a too long error).

For example i have downloaded and extracted apache kafka as below.

Copied all content and paste to C:\data\kafka


Running zookeeper and kafka.

Use below command to run zookeeper.

C:\data\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Spring Boot Kafka Producer Example

Use below command to run kafka.

C:\data\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

Creating topic.

C:\data\kafka>.\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic netsurfingzone-topic-1

Spring Boot Kafka Producer Example

We have created topic netsurfingzone-topic-1 that we are going to use later in this example.

Spring Boot Apache Kafka example – Producing and consuming string type message

Open eclipse and create a maven project, Don’t forget to check to ‘create a simple project (skip)’ click on next. Fill all details(GroupId – spring-boot-kafka-hello-world-example , ArtifactId – spring-boot-kafka-hello-world-example , and name – spring-boot-kafka-hello-world-example) and click on finish. Keep packaging as the jar.

maven dependency

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>spring-boot-kafka-hello-world-example</groupId>
	<artifactId>spring-boot-kafka-hello-world-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-boot-kafka-hello-world-example</name>
	<description>spring-boot-kafka-hello-world-example</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.2.RELEASE</version>
	</parent>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

	</dependencies>
</project>

Directory structure of spring-boot-kafka-hello-world-example.

Spring Boot Kafka Producer Example

Create ApplicationConstant.java class.


package com.netsurfingzone.constant;

public class ApplicationConstant {
	public static final String KAFKA_LOCAL_SERVER_CONFIG = "localhost:9092";
	public static final String GROUP_ID_STRING = "group-id-string-1";
	public static final String TOPIC_NAME = "netsurfingzone-topic-1";
	public static final String KAFKA_LISTENER_CONTAINER_FACTORY = "kafkaListenerContainerFactory";
}

Create configuration class SpringKafkaConfig.java.

package com.netsurfingzone.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import com.netsurfingzone.constant.ApplicationConstant;

@Configuration
@EnableKafka
public class SpringKafkaConfig {
	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);//line-23
		configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//line-24
		configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//line-25
		configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_STRING);//line-26
		return new DefaultKafkaConsumerFactory<>(configMap);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}
}

At line 23 we are providing apache kafka local server details. The line number 24 and 25, we are configuring what type of data(key and value) we are are going to deserialize.

Create Message Producer class i.e KafkaProducer – This class will used to send the message.

package com.netsurfingzone.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.netsurfingzone.constant.ApplicationConstant;

@RestController
@RequestMapping("/produce")
public class KafkaProducer {

	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;

	@GetMapping("/{message}")
	public String sendMessage(@PathVariable String message) {

		try {
			kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return "Message sent succuessfully";
	}

}

The KafkaTemplate class is available in org.springframework.kafka.core package and implements the KafkaOperations interface. The KafkaOperations interface contains the different overloaded versions of the send() method. See docs for more details.

Create message consumer listener class i.e KafkaConsumer.java – This class is used to receive the message from topic.

package com.netsurfingzone.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.netsurfingzone.constant.ApplicationConstant;

@Component
public class KafkaConsumer {

	private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

	@KafkaListener(groupId = ApplicationConstant.GROUP_ID_STRING, topics = ApplicationConstant.TOPIC_NAME, 
			containerFactory = ApplicationConstant.KAFKA_LISTENER_CONTAINER_FACTORY)
	public void receivedMessage(String message) {
		logger.info("Message Received using Kafka listener " + message);
	}
}

Define SpringMain.java class

package com.netsurfingzone.main;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = "com.netsurfingzone.*")
public class SpringMain {
	public static void main(String[] args) {
		SpringApplication.run(SpringMain.class, args);
	}

}

application.yml

server:
   port: 9091

Run the SpringMain class and deploy the application.

Open postman or any browser and send any message in url.

observe the console, our consumer should able to receive message.


Note – We can see message that we send using postman using cmd.

Open cmd, go to till below directory and run below command.

C:\data\kafka>.\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic netsurfingzone-topic-1

Send message from postman. We should able to see

Now we will see how to produce and consume json type message using apache kafka and Spring Boot.

Spring Boot Apache Kafka example – Producing and consuming JSON type message

Open eclipse and create a maven project, Don’t forget to check to ‘create a simple project (skip)’ click on next. Fill all details(GroupId – spring-boot-kafka-json-message-example , ArtifactId – spring-boot-kafka-json-message-example , and name – spring-boot-kafka-json-message-example) and click on finish. Keep packaging as the jar.

maven dependency would be same as previous example.

Directory structure of spring-boot-kafka-json-message-example.

Spring Boot Kafka Consumer Example

In ApplicationConstant.java class define new constant.

public static final String GROUP_ID_JSON = “group-id-json-1”;

Create new class Student.java.

package com.netsurfingzone.dto;

public class Student {
	private Long id;
	private String name;
	private String rollNumber;

        //getter & setter
}

We need to modify SpringKafkaConfig.java as below.

package com.netsurfingzone.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import com.netsurfingzone.constant.ApplicationConstant;
import com.netsurfingzone.dto.Student;

@Configuration
@EnableKafka
public class SpringKafkaConfig {

	@Bean
	public ProducerFactory<String, Object> producerFactory() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);
		configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
		return new DefaultKafkaProducerFactory<String, Object>(configMap);
	}

	@Bean
	public KafkaTemplate<String, Object> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}

	@Bean
	public ConsumerFactory<String, Student> consumerFactory() {
		Map<String, Object> configMap = new HashMap<>();
		configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG);
		configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_JSON);
		configMap.put(JsonDeserializer.TRUSTED_PACKAGES, "com.netsurfingzone.dto");
//line-50
		return new DefaultKafkaConsumerFactory<>(configMap);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, Student> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<String, Student>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}
}

We are doing configMap.put(JsonDeserializer.TRUSTED_PACKAGES, "com.netsurfingzone.dto"); at line number 50 to avoid below error.

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition netsurfingzone-topic-1-0 at offset 26. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class ‘com.netsurfingzone.dto.Student’ is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

We can define these configuration in application.properties files too. See more details about how to configure kafka trusted package here(docs).

Modify KafkaProducer.java – This class will used to send the json message.

package com.netsurfingzone.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.netsurfingzone.constant.ApplicationConstant;
import com.netsurfingzone.dto.Student;

@RestController
@RequestMapping("/produce")
public class KafkaProducer {

	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;

	@PostMapping("/message")
	public String sendMessage(@RequestBody Student message) {

		try {
			kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return "json message sent succuessfully";
	}

}

Modify KafkaConsumer.java – This class is used to receive the json message from topic.

package com.netsurfingzone.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netsurfingzone.constant.ApplicationConstant;
import com.netsurfingzone.dto.Student;

@Component
public class KafkaConsumer {

	private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

	@KafkaListener(groupId = ApplicationConstant.GROUP_ID_JSON, topics = ApplicationConstant.TOPIC_NAME, containerFactory = ApplicationConstant.KAFKA_LISTENER_CONTAINER_FACTORY)
	public void receivedMessage(Student message) throws JsonProcessingException {
		ObjectMapper mapper = new ObjectMapper();
		String jsonString = mapper.writeValueAsString(message);
		logger.info("Json message received using Kafka listener " + jsonString);
	}
}

SpringMain.java would remain same.

Spring Kafka Producer and Consumer Example

Observe the console.

Yes! We are able to read json message using Apache kafka listener.

Creating Kafka consumer REST API – Define Rest end-point to consume message from Kafka topic.

We covered how to define Kafka consumers using @KafkaListener annotation to read messages from the Kafka topic. In this section how to create a Rest end-point to consume messages. We are not going to use Kafka listeners, instead of that, we will create the rest end-point.

Modify the KafkaConsumer.java class as below.

package com.netsurfingzone.consumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.netsurfingzone.constant.ApplicationConstant;
import com.netsurfingzone.dto.Student;

@RestController
@RequestMapping("/consume")
public class KafkaConsumer {

	@Autowired
	private ConcurrentKafkaListenerContainerFactory<String, Student> factory;

	@GetMapping("/message")
	public List<Student> receiveMessage() {
		List<Student> students = new ArrayList<>();
		ConsumerFactory<String, Student> consumerFactory = factory.getConsumerFactory();
		Consumer<String, Student> consumer = consumerFactory.createConsumer();
		try {
			consumer.subscribe(Arrays.asList(ApplicationConstant.TOPIC_NAME));
			ConsumerRecords<String, Student> consumerRecords = consumer.poll(10000);
			Iterable<ConsumerRecord<String, Student>> records = consumerRecords.records(ApplicationConstant.TOPIC_NAME);
			Iterator<ConsumerRecord<String, Student>> iterator = records.iterator();

			while (iterator.hasNext()) {
				students.add(iterator.next().value());
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
		return students;
	}
}

Rest of code would be same(configuration and other class).

Let’s try to consume messages from kafka topic.

http://localhost:9091/consume/message

We should able to see message in response.

[
    {
        "id": 10,
        "name": "john",
        "rollNumber": "0126CS01"
    }
]

That’s all about Spring Boot Kafka Producer and Consumer Example.

Downlod complete example from github.

Other messaging tutorial from scratch.

Spring Data JPA Examples.

Spring kafka docs.

Summary – We have seen Spring Boot Kafka Producer and Consumer Example from scratch. we need to run both zookeeper and kafka in order to send message using kafka.