Skip to main content

Event-Driven Architectures with Kafka and Python

·5 mins

Everything You Need to Get Started #

Welcome to this first revision of my article “Event-Driven Architectures with Kafka and Python”.

In the original article the event that was sent out was a String. While this is sufficient for many use cases, most of my use cases require to send out an object of a custom type.

Therefore I have changed the code and the tutorial to use object serialisation and deserialisation with JSON.

Of course, there are other methods to serialise and deserialise data. A very popular one for example is Avro.

While many ways lead to Rome, let’s start with one and continue with the tutorial.

Introduction #

Event-driven architectures have become the thing over the last years with Kafka being the de-facto standard when it comes to tooling.

This post provides a complete example for an event-driven architecture, implemented with two services written in Python that communicate via Kafka.

The main goal for this tutorial has been to provide a working example without getting too much into any details, which, in my opinion, unnecessarily distract from the main task of getting “something” up and running as quick as possible.

We have a couple of building blocks, mainly

  • Infrastructure (Kafka, Zookeeper)
  • Producer (Python Service)
  • Consumer (Python Service)

The producer has the only task of periodically sending out an event to Kafka. This event just carries a timestamp. The consumers job is to listen for this event and print the timestamp.

Kafka

The whole implementation resulted in the following project structure.

Project Structure

The complete code can be downloaded from here.

Infrastructure #

Only two components, despite the services, are needed to get an event-based architecture up-and-running: Kafka and Zookeeper.

Check the resources-section at the end of the tutorial for links to both.

Whereas Kafka is the “main”-part of exchanging the events, Zookeeper is needed for several reasons. From the Zookeeper website:

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

Below is the docker-compose.yml to get both up-and-running:

version: '3'

services:

    kafka:
      image: wurstmeister/kafka
      container_name: kafka
      ports:
        - "9092:9092"
      environment:
        - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
        - KAFKA_ADVERTISED_PORT=9092
        - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      depends_on:
        - zookeeper

    zookeeper:
      image: wurstmeister/zookeeper
      ports:
        - "2181:2181"
      environment:
        - KAFKA_ADVERTISED_HOST_NAME=zookeeper

When this is in place, only the two services are needed that implement the “business domain”. Well, a very simple one: Sending and receiving a timestamp.

Code Setup #

There are two Python projects and all the code is in each of their main.py.

They only have one dependency defined in their requirements.txt:

kafka-python==2.0.2

This dependency needs to be installed for each project with the following command:

python3 -m pip install -r requirements.txt

Producer #

As mentioned above, the producer is “producing” timestamps.

class TimestampEvent:
    def __init__(self, timestamp):
        self.timestamp = timestamp

These timestamps are being sent out via Kafka to everyone who is interested in receiving them.

import json
from kafka import KafkaProducer
from datetime import datetime
from time import sleep

from TimestampEvent import TimestampEvent

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda x: json.dumps(x.__dict__).encode('utf-8'))

while True:
    timestampEvent = TimestampEvent(datetime.now().strftime("%H:%M:%S"))
    print("Sending: " + timestampEvent.timestamp)
    producer.send('timestamp', timestampEvent)
    sleep(5)

When creating the producer we provide two pieces of information:

  • bootstrap_servers: Where to find Kafka. This could have been left out, because localhost:9092 is the default value.
  • value_serializer: How messages will be encoded.

Messages will be sent to the timestamp topic. This is relevant for the consumer to be able to only listen to messages from this topic.

That is all for the producer. On to the consumer …

Consumer #

The data structure for timestamps is identical to that of the producer.

class TimestampEvent:
    def __init__(self, timestamp):
        self.timestamp = timestamp

It is time to receive the timestamps that have been sent out by the producer.

from kafka import KafkaConsumer
import json

from TimestampEvent import TimestampEvent

consumer = KafkaConsumer('timestamp',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
    timestampEvent = TimestampEvent(**(message.value))
    print("Received: " + timestampEvent.timestamp)

This time we have not provided bootstrap_servers as we did with the consumer. It will default to localhost:9092.

The necessary parameters being provided are:

  • The topic the consumer will listen to: timestamp
  • value_deserializer: How messages will be decoded after they have been received.

Everything is in its place by now. Ready for some action.

Run Example Code #

It is time to run everything. Remember the following project structure:

event-driven-architectures
- docker-compose.yml
- python-tutorial
-- producer
--- main.py
-- consumer
--- main.py

In directory event-driven-architectures, Kafka and Zookeeper is being started via docker-compose:

docker-compose up -d

Changing into the producer-directory, the service is being started with:

$ source venv/bin/activate
(venv) $ python3 main.py

Finally, in a new terminal window, change into the consumer directory to start the service the same way:

$ source venv/bin/activate
(venv) $ python3 main.py

Now, you should be able to see something similar like this. On the left is the log output of the producer and on the right is the log output of the consumer.

Terminal Output

Congratulations if you came that far. Actually, it should have been quite easy to follow through. If not, let me know how to improve this tutorial.

Cleanup #

When done, you can leave the Python-environment by just typing

(venv) $ deactivate

The Docker-services are also still running. Those need to be stopped and cleaned up as well.

The command below does the following

  • Stop all running Docker containers
  • Delete stopped Docker containers
  • Remove all volumes
$ docker-compose stop && docker-compose rm -f && docker volume prune -f
Stopping kafka                             ... done
Stopping kafka-python-tutorial_zookeeper_1 ... done
Going to remove kafka, kafka-python-tutorial_zookeeper_1
Removing kafka                             ... done
Removing kafka-python-tutorial_zookeeper_1 ... done
Deleted Volumes:
e4380413983bb36f914621dac4019565cd9ed130c04c5336c898874b648c2c92
120ab4ab7e227bdc5ee155d1cc61f29b1b0f8d7ed2fa9ee29deb05c90e33b8fe
0636bf46ec05cdda15deec280cdef672c68366a7d8d57ff424938069498e4063

Total reclaimed space: 67.13MB

Conclusion #

This concludes this tutorial on how to create an event-driven architecture by using Kafka and Python.

The complete project code can be downloaded from here.

Resources #