Skip to main content

Java Spring Boot REST Server with Kafka Consumer

đź•“ 40 minutes

What you’ll learn#

How to set up your application for :

  • connecting to Kafka consuming messages from its’ topic,
  • providing data to REST API.

In this tutorial, we will create a simple java component with the java spring-boot scaffolder. We want to expose a single REST endpoint for getting client application logs. Logs are published in Kafka topic, so we need a Kafka client configuration as well. Inside ClientAplicationLogs component is simple logic, which stores last 10 messages from Kafka and is available to return them via REST API.


Project source#

This example project can be cloned from:


  • Prepare your local development environment for CodeNOW with Java Spring Boot.
  • Run Apache Kafka locally.
  • Create a new component
    • For details see the section Prerequisites of the Java Spring Boot Local Development tutorial.


Open your IDE, import the created component and start coding:

  • Define the message payload. Here is an example of the Log, which is a simple POJO with basic client data:

    • generate getters and setters with your IDE

      import java.util.Date;
      public class Log {
       private Date timestamp; private String key; private String payload;
       public Log(Date timestamp, String key, String payload) {  super();  this.timestamp = timestamp;  this.key = key;  this.payload = payload;}
  • Next prepare the configuration for the Kafka logging client:

    • Go to the Kafka administration console (http://localhost:9000 if using Kafdrop from our Java Spring Boot Local Development manual) and create a new topic client-logging

    • Add maven dependency to your pom.xml

      <dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka</artifactId></dependency>
    • For more details about spring-kafka, see:

    • Now let’s create simple cache queue storing only last couple of messages. Field limit is configured with spring property log.cache.size (see below):

      import java.util.concurrent.ArrayBlockingQueue;
      import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;
      @Componentpublic class FixedSizeLogCache extends ArrayBlockingQueue<Log>{
       private static final long serialVersionUID = 3640159618214478205L; private int limit;
       public FixedSizeLogCache(@Value("${log.cache.size}") int limit) {     super(limit);     this.limit = limit; }
       @Override public boolean add(Log item) {     ensureLimit();     return super.add(item); }
       @Override public boolean offer(Log e) {     ensureLimit();     return super.offer(e); }
       private void ensureLimit() {     if(super.size() >= limit) {         super.remove();     } }}
  • Next step is using spring-boot kafka autoconfiguration and create new kafka listener for logs

    • is configured via spring properties

      import java.util.Date;
      import org.springframework.kafka.annotation.KafkaListener;import;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;
      @Componentpublic class KafkaLogsListener {
       private FixedSizeLogCache cache;
       public KafkaLogsListener(FixedSizeLogCache cache) {     this.cache = cache; }
       @KafkaListener(topics = "${}") public void logsListener(@Payload String payload, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Date timestamp) {     cache.add(new Log(timestamp, messageKey, payload)); }}
  • Next, create a new controller and put all the parts together

    • For more detail about the spring REST controller, see:

      import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;
      import;import;import reactor.core.publisher.Flux;
      @RestController@RequestMapping("/logs")public class ClientApplicationLogsController {
       private FixedSizeLogCache cache;
       public ClientApplicationLogsController(FixedSizeLogCache cache) {     this.cache = cache; }
         @GetMapping   private Flux<Log> getLogs() {       return Flux.fromIterable(cache);   }}
  • Last but not least, replace configuration in codenow/config/application.yaml with following code:

    • Note that this configuration depends on your local development setup for Kafka and can differ case-by-case. For your local configuration is probably required to uncomment bootstrap-servers: localhost:29092 and comment the line above.

    • Make sure you follow yaml syntax (especially whitespaces)

      server:  port: 8080spring: main:   banner-mode: off zipkin:   enabled: false kafka:   bootstrap-servers: client-logging-kafka-kafka-brokers.managed-components:9092#    bootstrap-servers: localhost:29092   consumer:     group-id: log-group   topic:     name: client-loggingmanagement:   endpoints:    web:      exposure:        include: health, prometheuslog: cache:   size: 10
  • Try to build and run the application in your IDE. After startup, you should be able to access your new controller’s swagger: http://localhost:8080/swagger/index.html


What’s next?#

If your code works in the local development, you are ready to push your changes to GIT and try to build and deploy your new component version to the CodeNOW environment. For more information see Application Deployment and Monitoring, just make sure to change the application.yaml properties from the local to the production setup.