Skip to main content

Java Spring Boot Kafka Producer and Consumer

🕓 45 minutes

What you’ll learn

  • Connect to kafka and create a new topic
  • Create a new kafka producer and produce messages to kafka topic
  • Create a new kafka consumer and consume messages from kafka topic
  • Create multiple kafka consumers with different group-ids

In this tutorial, we will create a simple java component using a Java Spring Boot scaffolder. We need to expose a single REST endpoint for producing a new event (in this case creating a new order). As a new order is received, other services should be alerted about this event. For distributing the event among other services, kafka will be used.

We send the message to a specific kafka topic (create-order). The message is then received by consumers listening to this topic. Every consumer group processes the message only once, so we put NotificationService in one group and all instances of the Service in another one. When the service receives the event (consumes the message from kafka topic), it logs the information.

Check this to learn more about consumer groups:


Services would normally be divided into three separate components (three separate git repositories), but for our purposes, we put all services into one base component and beans are differentiated by name. Instance1 and Instance2 represent two scaled pods of the same service.

Project source and deployed application

If you want to see the application work in action for yourself, it is deployed in CodeNOW. You just need to click on this link: and sign in with the following credentials:

  • username: public-user
  • password: Public-user123.

This example project can be cloned from:


Run kafka

  • with docker or locally

For more information about how to run kafka, check in the example project


Open your IDE, import the created component and start coding:

  • Define the message payload. Here is an example of the Order, which is a simple POJO with basic order data:
    • generate getters and setters with your IDE or use lombok annotations

Order attributes are not important. We just want to demonstrate how to produce and consume messages with objects attached to them.

  package coding.example;

import lombok.Getter;
import lombok.Setter;

public class Order {

private String orderID;

private Date dateOfCreation;

private String content;



Maven dependency

  • Add maven dependency to your pom.xml

  • For more details about spring-kafka, see:

Application properties

  • Add these properties to your /codenow/config/application.yaml and set up this file as a run configuration for your project

    bootstrap-servers: ${KAFKA_RESERVATION_BOOTSTRAP_SERVERS:localhost:9092}
    create-order: create-order
    notification: notification
    service: service
  • Values of these properties will be accessed from the code using the @Value annotation


  • Create a configuration for the kafka template

    package coding.example.orderService.kafka;

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;

    import java.util.HashMap;
    import java.util.Map;

    public class CreateOrderProducerConfig {

    private String bootstrapServers;

    public <K, V> ProducerFactory<K, V> createOrderProducerFactory(){
    Map<String,Object> config = new HashMap<>();
    config.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    config.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory(config);

    public <K, V> KafkaTemplate<K, V> createOrderKafkaTemplate(){
    return new KafkaTemplate<>(createOrderProducerFactory());
  • And now create CreateOrderProducer service

    • Spring Boot will automatically inject @Bean createOrderKafkaTemplate and @Value createOrderTopic when it's needed

To learn more about inversion of control and dependency injection, check out this link:

package coding.example.orderService.kafka;

import coding.example.Order;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.ExecutionException;

public class CreateOrderProducer {

private static final Logger log = LoggerFactory.getLogger(CreateOrderProducer.class);

private final KafkaTemplate<String, Order> createOrderKafkaTemplate;

private final String createOrderTopic;

public CreateOrderProducer(KafkaTemplate<String, Order> createOrderKafkaTemplate,
@Value("${spring.kafka.order.topic.create-order}") String createOrderTopic) {
this.createOrderKafkaTemplate = createOrderKafkaTemplate;
this.createOrderTopic = createOrderTopic;

public boolean sendCreateOrderEvent(Order order) throws ExecutionException, InterruptedException {
SendResult<String, Order> sendResult = createOrderKafkaTemplate.send(createOrderTopic, order).get();"Create order {} event sent via Kafka", order);;
return true;


This step needs to be followed for every service that wants to consume messages from kafka topic

  • Create consumer configuration

Don't forget to specify the right value of groupId for the specific service

package coding.example.notificationService;

import coding.example.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class CreateOrderConsumerConfig {

private String bootstrapServers;

private String groupId;

public ConsumerFactory<String, Order> createOrderConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

return new DefaultKafkaConsumerFactory<>(props,new StringDeserializer(),
new JsonDeserializer<>(Order.class));

public ConcurrentKafkaListenerContainerFactory<String, Order> createOrderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
return factory;
  • And now create CreateOrderConsumer service with createOrderListener kafka listener

    • Kafka listener will be created from container factory @Bean named NotificationContainerFactory
    • Kafka listener will be listening to the topic specified under ${spring.kafka.order.topic.create-order} spring configuration value
      package coding.example.notificationService;

    import coding.example.Order;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Service;

    public class CreateOrderConsumer {

    private static final Logger log = LoggerFactory.getLogger(CreateOrderConsumer.class);

    @KafkaListener(topics = "${spring.kafka.order.topic.create-order}", containerFactory="NotificationContainerFactory")
    public void createOrderListener(@Payload Order order, Acknowledgment ack) {"Notification service received order {} ", order);

REST Controller

Finally, we create a REST Controller, so we can generate events by sending requests to it

  • For more detail about the spring REST controller, see:

    package coding.example.orderService.controller;

    import coding.example.orderService.kafka.CreateOrderProducer;
    import coding.example.Order;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;

    import java.util.concurrent.ExecutionException;

    public class OrderController {

    private final CreateOrderProducer createOrderProducer;

    public OrderController(CreateOrderProducer createOrderProducer) {
    this.createOrderProducer = createOrderProducer;

    public ResponseEntity<?> createOrder(@RequestBody Order order) throws ExecutionException, InterruptedException {
    return new ResponseEntity<>(HttpStatus.OK);


Build and Run

Try to build and run the application in your IDE. After a startup, you should be able to access your new controller’s swagger: http://localhost:8080/swagger-ui/index.html