Skip to main content

Event-Driven Architectures with Kafka and Java Spring-Boot

·6 mins

Everything You Need To Get Started #

Event-driven architectures have become the thing over the last years with Kafka being the de-facto standard when it comes to tooling.

This post provides a complete example for an event-driven architecture, implemented with two Java Spring-Boot services that communicate via Kafka.

The main goal for this tutorial has been to provide a working example without getting too much into any details, which, in my opinion, unnecessarily distract from the main task of getting “something” up and running as quick as possible.

We have a couple of building blocks, mainly

  • Infrastructure (Kafka, Zookeeper)
  • Producer (Java Spring-Boot Service)
  • Consumer (Java Spring-Boot Service)

The producer has the only task of periodically sending out an event to Kafka. This event just carries a timestamp. The consumers job is to listen for this event and print the timestamp.

Kafka

The whole implementation resulted in the following project structure.

Project Structure

The complete project code can be downloaded from here.

This can be build on command line as explained below, or imported into an IDE like IntelliJ for example.

IntelliJ-Project

Infrastructure #

Only two components, despite the services, are needed to get an event-based architecture up-and-running: Kafka and Zookeeper.

Check the resources-section at the end of the tutorial for links to both.

Whereas Kafka is the “main”-part of handling the events, Zookeeper is needed for several reasons. From the Zookeeper website:

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

Below is the docker-compose.yml to get both up-and-running:

version: '3'

services:

    kafka:
      image: wurstmeister/kafka
      container_name: kafka
      ports:
        - "9092:9092"
      environment:
        - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
        - KAFKA_ADVERTISED_PORT=9092
        - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      depends_on:
        - zookeeper

    zookeeper:
      image: wurstmeister/zookeeper
      ports:
        - "2181:2181"
      environment:
        - KAFKA_ADVERTISED_HOST_NAME=zookeeper

When this is in place, only the two Java-services are needed that implement the “business domain”. Well, a very simple one: Sending and receiving a timestamp.

Code Setup #

There is a very practical website where one can create and initialise a Spring-project with all the required dependencies: Spring Initializr.

First, I have created the producer application:

Producer Initialisation

Next, was the consumer application:

Consumer Initialisation

Note, that I have already added the dependency for “Spring for Apache Kafka”.

After downloading and unpacking the project files it was time to start implementing.

For the producer and the consumer 4 files are needed for each:

  • The Application
  • The Configuration
  • The Producer, respectively the Consumer
  • The Properties-File

What goes into those files is explained in the next two chapters. I am not going into the details here, because this tutorial is not meant to be an in-depth Kafka tutorial.

Producer #

As mentioned above, the producer is “producing” timestamps and sending them out via Kafka to everyone who is interested in receiving them.

Everything starts with the application class in ProducerApplication.java, which has been left more or less untouched. Only the @EnableScheduling-annotation has been added which is needed in the producer itself.

package net.wissmueller.kafkatutorial.producer;

// imports ...

@SpringBootApplication
@EnableScheduling
public class ProducerApplication {

  public static void main(String[] args) {
    SpringApplication.run(ProducerApplication.class, args);
  }

}

Some configuration is needed which I have put into ProducerConfiguration.java.

We need to tell the producer where to find Kafka and what serialisers for the events to use. This is done in producerConfigs().

An event has a key and a value. For both we are using the String-class. This is specified in kafkaTemplate().

We also need the topic on which to send the events. Therefore we have timestampTopic() which returns NewTopic.

package net.wissmueller.kafkatutorial.producer;

// imports ...

public class ProducerConfiguration {

  @Value("${kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
  }

  @Bean
  public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

  @Bean
  public NewTopic timestampTopic() {
    return TopicBuilder.name("timestamp")
                       .build();
  }

}

This class needs some properties which are in the usual place: application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=tutorialGroup

The producer itself is in KafkaProducer.java:

package net.wissmueller.kafkatutorial.producer;

// imports ...

@Component
public class KafkaProducer {
  private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);

  private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

  private KafkaTemplate<String, String> kafkaTemplate;

  KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  void sendMessage(String message, String topicName) {
    kafkaTemplate.send(topicName, message);
  }

  @Scheduled(fixedRate = 5000)
  public void reportCurrentTime() {
    String timestamp = dateFormat.format(new Date());
    sendMessage(timestamp, "timestamp");
    log.info("Sent: {}", timestamp);
  }
}

This class gets initialised with the KafkaTemplate .

In reportCurrentTime() the timestamp is being sent to Kafka every 5 seconds, which is implemented via the @Scheduled-annotation. This only works when the @EnableScheduling-annotation has been set in the application class.

That is all for the producer. On to the consumer …

Consumer #

It is time to receive the timestamps that have been sent out by the producer.

As with the producer, the entry-point is the application class in ConsumerApplication.java. This time completely unchanged, just as it has been generated by the Spring Initializr.

package net.wissmueller.kafkatutorial.consumer;

// imports ...

@SpringBootApplication
public class ConsumerApplication {

  public static void main(String[] args) {
    SpringApplication.run(ConsumerApplication.class, args);
  }

}

The configuration is in ConsumerConfiguration.java where we, analog to the producer,

  • specify how to reach Kafka
  • which serialisers to use
  • the format of the Kafka events
package net.wissmueller.kafkatutorial.consumer;

// imports ...

public class ConsumerConfiguration {

  @Value("${kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }

  @Bean
  public NewTopic timestampTopic() {
    return TopicBuilder.name("timestamp")
                       .build();
  }

}

Also here, we need to set some properties in application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=tutorialGroup

Last but not least, we have the consumer in KafkaConsumer.java. We only have to specify a listener on a topic by using the @KafkaListener-topic and the action. In this case, the timestamp just gets logged.

package net.wissmueller.kafkatutorial.consumer;

// imports ...

@Component
public class KafkaConsumer {
  private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

  @KafkaListener(topics = "timestamp")
  void listener(String timestamp) {
    log.info("Received: {}", timestamp);
  }
}

Run Example Code #

It is time to run everything. Remember the following project structure:

code/
- docker-compose.yml
- producer/
-- pom.xml
-- ...
- consumer/
-- pom.xml
-- ...

In directory code, Kafka and Zookeeper is being started via docker-compose:

docker-compose up -d

Changing into the producer-directory, the service is being started with:

mvn spring-boot:run

Finally, in a new terminal window, change into the consumer directory to start the service the same way:

mvn spring-boot:run

Now, you should be able to see something similar like this. On the left is the log output of the producer and on the right is the log output of the consumer.

Terminal Output

This concludes this introductory-tutorial on how to create an event-driven architecture by using Kafka and Java Spring-Boot.

The complete project code can be downloaded from here.

Resources #