Event-Driven Architectures with Kafka and Java Spring-Boot

Everything You Need To Get Started #
Event-driven architectures have become the thing over the last years with Kafka being the de-facto standard when it comes to tooling.
This post provides a complete example for an event-driven architecture, implemented with two Java Spring-Boot services that communicate via Kafka.
The main goal for this tutorial has been to provide a working example without getting too much into any details, which, in my opinion, unnecessarily distract from the main task of getting “something” up and running as quick as possible.
We have a couple of building blocks, mainly
- Infrastructure (Kafka, Zookeeper)
- Producer (Java Spring-Boot Service)
- Consumer (Java Spring-Boot Service)
The producer has the only task of periodically sending out an event to Kafka. This event just carries a timestamp. The consumers job is to listen for this event and print the timestamp.
The whole implementation resulted in the following project structure.
The complete project code can be downloaded from here.
This can be build on command line as explained below, or imported into an IDE like IntelliJ for example.
Infrastructure #
Only two components, despite the services, are needed to get an event-based architecture up-and-running: Kafka and Zookeeper.
Check the resources-section at the end of the tutorial for links to both.
Whereas Kafka is the “main”-part of handling the events, Zookeeper is needed for several reasons. From the Zookeeper website:
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
Below is the docker-compose.yml
to get both up-and-running:
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeper
When this is in place, only the two Java-services are needed that implement the “business domain”. Well, a very simple one: Sending and receiving a timestamp.
Code Setup #
There is a very practical website where one can create and initialise a Spring-project with all the required dependencies: Spring Initializr.
First, I have created the producer application:
Next, was the consumer application:
Note, that I have already added the dependency for “Spring for Apache Kafka”.
After downloading and unpacking the project files it was time to start implementing.
For the producer and the consumer 4 files are needed for each:
- The Application
- The Configuration
- The Producer, respectively the Consumer
- The Properties-File
What goes into those files is explained in the next two chapters. I am not going into the details here, because this tutorial is not meant to be an in-depth Kafka tutorial.
Producer #
As mentioned above, the producer is “producing” timestamps and sending them out via Kafka to everyone who is interested in receiving them.
Everything starts with the application class in ProducerApplication.java
, which has been left more or less untouched. Only the @EnableScheduling
-annotation has been added which is needed in the producer itself.
package net.wissmueller.kafkatutorial.producer;
// imports ...
@SpringBootApplication
@EnableScheduling
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
Some configuration is needed which I have put into ProducerConfiguration.java
.
We need to tell the producer where to find Kafka and what serialisers for the events to use. This is done in producerConfigs()
.
An event has a key and a value. For both we are using the String
-class. This is specified in kafkaTemplate()
.
We also need the topic on which to send the events. Therefore we have timestampTopic()
which returns NewTopic
.
package net.wissmueller.kafkatutorial.producer;
// imports ...
public class ProducerConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public NewTopic timestampTopic() {
return TopicBuilder.name("timestamp")
.build();
}
}
This class needs some properties which are in the usual place: application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=tutorialGroup
The producer itself is in KafkaProducer.java
:
package net.wissmueller.kafkatutorial.producer;
// imports ...
@Component
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
private KafkaTemplate<String, String> kafkaTemplate;
KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
void sendMessage(String message, String topicName) {
kafkaTemplate.send(topicName, message);
}
@Scheduled(fixedRate = 5000)
public void reportCurrentTime() {
String timestamp = dateFormat.format(new Date());
sendMessage(timestamp, "timestamp");
log.info("Sent: {}", timestamp);
}
}
This class gets initialised with the KafkaTemplate
.
In reportCurrentTime()
the timestamp is being sent to Kafka every 5 seconds, which is implemented via the @Scheduled
-annotation. This only works when the @EnableScheduling
-annotation has been set in the application class.
That is all for the producer. On to the consumer …
Consumer #
It is time to receive the timestamps that have been sent out by the producer.
As with the producer, the entry-point is the application class in ConsumerApplication.java
. This time completely unchanged, just as it has been generated by the Spring Initializr.
package net.wissmueller.kafkatutorial.consumer;
// imports ...
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
The configuration is in ConsumerConfiguration.java
where we, analog to the producer,
- specify how to reach Kafka
- which serialisers to use
- the format of the Kafka events
package net.wissmueller.kafkatutorial.consumer;
// imports ...
public class ConsumerConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public NewTopic timestampTopic() {
return TopicBuilder.name("timestamp")
.build();
}
}
Also here, we need to set some properties in application.properties
:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=tutorialGroup
Last but not least, we have the consumer in KafkaConsumer.java
. We only have to specify a listener on a topic by using the @KafkaListener
-topic and the action. In this case, the timestamp just gets logged.
package net.wissmueller.kafkatutorial.consumer;
// imports ...
@Component
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "timestamp")
void listener(String timestamp) {
log.info("Received: {}", timestamp);
}
}
Run Example Code #
It is time to run everything. Remember the following project structure:
code/
- docker-compose.yml
- producer/
-- pom.xml
-- ...
- consumer/
-- pom.xml
-- ...
In directory code
, Kafka and Zookeeper is being started via docker-compose
:
docker-compose up -d
Changing into the producer
-directory, the service is being started with:
mvn spring-boot:run
Finally, in a new terminal window, change into the consumer
directory to start the service the same way:
mvn spring-boot:run
Now, you should be able to see something similar like this. On the left is the log output of the producer and on the right is the log output of the consumer.
This concludes this introductory-tutorial on how to create an event-driven architecture by using Kafka and Java Spring-Boot.
The complete project code can be downloaded from here.