Java Spring Boot REST Server with Kafka Consumer

🕓 40 minutes

What you’ll learn

How to setup your application for :

  • connecting to Kafka consuming messages from its’ topic,
  • providing data to REST API.

In this tutorial, we will create a simple java component with the java spring-boot scaffolder. We want to expose a single REST endpoint for getting client application logs. Logs are published in Kafka topic, so we need a Kafka client configuration as well. Inside ClientAplicationLogs component is simple logic, which stores last 10 messages from Kafka and is available to return them via REST API.

components

Project source

This example project can be cloned from: http://gitlab.factory.codenow-control.codenow.com/public-docs/client-authorization-demo/client-application-logs.git

Prerequisites

  • Prepare your local development environment for CodeNOW with Java Spring Boot.
  • Run Apache Kafka locally.
  • Create a new component
    • For details see the section Prerequisites of the Java Spring Boot Local Development tutorial.

Steps

Open your IDE, import the created component and start coding:

  • Define the message payload. Here is an example of the Log, which is a simple POJO with basic client data:

    • generate getters and setters with your IDE

      01 package io.codenow.client.app.logs.service.model;
      02
      03 import java.util.Date;
      04
      05 public class Log {
      06
      07 private Date timestamp;
      08 private String key;
      09 private String payload;
      10
      11 public Log(Date timestamp, String key, String payload) {
      12 super();
      13 this.timestamp = timestamp;
      14 this.key = key;
      15 this.payload = payload;
      16 }
  • Next prepare the configuration for the Kafka logging client:

    • Go to the Kafka administration console (http://localhost:9000 if using Kafdrop from our Java Spring Boot Local Development manual) and create a new topic client-logging

    • Add maven dependency to your pom.xml

      01 <dependency>
      02 <groupId>org.springframework.kafka</groupId>
      03 <artifactId>spring-kafka</artifactId>
      04 </dependency>
    • For more details about spring-kafka, see: https://spring.io/projects/spring-kafka

    • Now let’s create simple cache queue storing only last couple of messages. Field ‘limit’ is configured with spring property log.cache.size (see below):

      01 package io.codenow.client.app.logs.service.kafka;
      02
      03 import java.util.concurrent.ArrayBlockingQueue;
      04
      05 import org.springframework.beans.factory.annotation.Value;
      06 import org.springframework.stereotype.Component;
      07
      08 import io.codenow.client.app.logs.service.model.Log;
      09
      10 @Component
      11 public class FixedSizeLogCache extends ArrayBlockingQueue<Log>{
      12
      13 private static final long serialVersionUID = 3640159618214478205L;
      14 private int limit;
      15
      16 public FixedSizeLogCache(@Value("${log.cache.size}") int limit) {
      17 super(limit);
      18 this.limit = limit;
      19 }
      20
      21 @Override
      22 public boolean add(Log item) {
      23 ensureLimit();
      24 return super.add(item);
      25 }
      26
      27 @Override
      28 public boolean offer(Log e) {
      29 ensureLimit();
      30 return super.offer(e);
      31 }
      32
      33 private void ensureLimit() {
      34 if(super.size() >= limit) {
      35 super.remove();
      36 }
      37 }
      38 }
  • Next step is using spring-boot kafka autoconfiguration and create new kafka listener for logs

    • ‘spring.kafka.topic.name’ is configured via spring properties

      01 package io.codenow.client.app.logs.service.kafka;
      02
      03 import java.util.Date;
      04
      05 import org.springframework.kafka.annotation.KafkaListener;
      06 import org.springframework.kafka.support.KafkaHeaders;
      07 import org.springframework.messaging.handler.annotation.Header;
      08 import org.springframework.messaging.handler.annotation.Payload;
      09 import org.springframework.stereotype.Component;
      10
      11 import io.codenow.client.app.logs.service.model.Log;
      12
      13 @Component
      14 public class KafkaLogsListener {
      15
      16 private FixedSizeLogCache cache;
      17
      18 public KafkaLogsListener(FixedSizeLogCache cache) {
      19 this.cache = cache;
      20 }
      21
      22 @KafkaListener(topics = "${spring.kafka.topic.name}")
      23 public void logsListener(@Payload String payload, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Date timestamp) {
      24 cache.add(new Log(timestamp, messageKey, payload));
      25 }
      26 }
  • Next, create a new controller and put all the parts together

    • For more detail about the spring REST controller, see: https://spring.io/guides/gs/rest-service/

      01 package io.codenow.client.app.logs.service.controller;
      02
      03 import org.springframework.web.bind.annotation.GetMapping;
      04 import org.springframework.web.bind.annotation.RequestMapping;
      05 import org.springframework.web.bind.annotation.RestController;
      06
      07 import io.codenow.client.app.logs.service.kafka.FixedSizeLogCache;
      08 import io.codenow.client.app.logs.service.model.Log;
      09 import reactor.core.publisher.Flux;
      10
      11 @RestController
      12 @RequestMapping("/logs")
      13 public class ClientApplicationLogsController {
      14
      15 private FixedSizeLogCache cache;
      16
      17 public ClientApplicationLogsController(FixedSizeLogCache cache) {
      18 this.cache = cache;
      19 }
      20
      21 @GetMapping
      22 private Flux<Log> getLogs() {
      23 return Flux.fromIterable(cache);
      24 }
      25 }
  • Last but not least, replace configuration in config/application.yaml with following code:

    • Note that this configuration depends on your local development setup for Kafka and can differ case-by-case. For your local configuration is probably required to uncomment ‘bootstrap-servers: localhost:29092’ and comment the line above.

    • Make sure you follow yaml syntax (especially whitespaces)

      01 server:
      02 port: 8080
      03 spring:
      04 main:
      05 banner-mode: off
      06 zipkin:
      07 enabled: false
      08 kafka:
      09 bootstrap-servers: client-logging-kafka-kafka-brokers.managed-components:9092
      10 # bootstrap-servers: localhost:29092
      11 consumer:
      12 group-id: log-group
      13 topic:
      14 name: client-logging
      15 management:
      16 endpoints:
      17 web:
      18 exposure:
      19 include: health, prometheus
      20 log:
      21 cache:
      22 size: 10
  • Try to build and run the application in your IDE. After startup, you should be able to access your new controller’s swagger: http://localhost:8080/swagger/index.html

apiDocumentation

What’s next?

If your code works in the local development, you are ready to push your changes to GIT and try to build and deploy your new component version to the CodeNOW environment. For more information see Application Deployment and Monitoring, just make sure to change the application.yaml properties from the local to the production setup.