Kafka consumer as a Celery task

Today, I’ll be talking about an interesting combination. Running Kafka consumer as a scheduled Celery task. Go on reading, if you find this interesting 😉

First of all, there are multiple ways to get a Kafka consumer running. Some of them are:

  1. Having a separate microservice – This is a good approach as it allows independent scaling of the consumers. However there is an overhead of managing, monitoring this microservice, ensuring connectivity is fine etc. Still ideal for large, complex setup where we want to have dedicated consumers capable of scaling in/out on demand.
  2. Running as a daemon task/background task – The consumer can as well run in background or as a daemon process. This will be a forever running process as consumers are meant to be. However troubleshooting/monitoring daemon processes can often be cumbersome.

We did not go with the above approaches, as ours is a light weight application and we did not want a forever running process running along with our application for the consumer. Comes in Celery 🙂

Before I dive into the implementation specific details, below is the tech stack we use:

  1. Django (django-2.1.7)
  2. Celery (celery-4.3.0)
  3. Kafka (kafka-python-2.0.2)
  4. Python (python-3.2)

Celery is a distributed task scheduling module which can be easily integrated with applications (in our case, python celery) and provide task scheduling options in a simple yet reliable manner. We decided to use Celery to schedule consumer polls at regular intervals instead of running it forever, however things like graceful message consumption, offset management had to be tuned to make this work. Let’s discuss how we went about this.

We use kafka-python module in our Django app to implement the consumer. Although kafka-python does not support transactional producer/consumer at the moment, it does not cause any issues per se. Some caveats being, there might be a lag in message consumption as the control batch messages (commit/abort markers) will likely not get consumed.

Below are the steps we followed:

  1. Define a Celery task to poll the consumer every 5 mins.
  2. Consumer has a poll timeout of 3 mins (meaning it will stop polling and end task if no messages are available for consumption within this duration).
  3. The Celery task is force killed once the consumer times out, to prevent the task from running infinitely.
  4. A new consumer will run every 5 minutes (in a new Celery task) and even if an old one is running, it will not impact as the consumers will be part of same consumer group.
  5. The consumer has auto commit enabled which will commit the offset for consumed messages every second.
  6. In-case the consumer goes down in between/before/after processing a message, and commit is not done, it will be re-tried once a new consumer is up. Duplicate messages are handled with some explicit checks as well (based on our use-case)
  7. In-case the consumer goes down in between/before/after processing a message and commit is done, it will not be re-tried.
  8. The auto_offset_reset flag is set to latest, meaning in-case the offset is corrupted (due to Kafka crash etc), messages will not be re consumed from beginning, preventing duplicate message consumption.

Let’s have a look at the consumer implementation. I’ll not go over the Celery task setup part as that is readily available in the Celery getting-started guide.

import os, signal, time
from kafka import KafkaConsumer

def kafka_consumer():
        try:
            # Signal handler to kill Celery task
            def handler(signum, frame):
                print("Hard exiting celery task as consumer poller has timed out...")
                sys.exit(0)
            
            # Initialize Kafka consumer
            consumer = KafkaConsumer('topic_name',bootstrap_servers=['kafka_broker:port'], 
                                        api_version=(0, 10),

                                        # SSL settings required incase your broker has mTLS configured, else skip
                                        security_protocol='SSL',
                                        ssl_check_hostname=True,
                                        ssl_cafile='ca.pem',
                                        ssl_certfile='cert.pem',
                                        ssl_keyfile='key.pem',

                                        consumer_timeout_ms=180000, 
                                        group_id='consumer_group_name', 
                                        auto_offset_reset='latest', # Guarantees no re-consumption of messages in-case of Kafka crash
                                        enable_auto_commit=True,
                                        auto_commit_interval_ms=1000)
          
            print("Polling for topic: {topic}".format(topic='topic_name'))
            
            for msg in consumer:
                print("Message Key is {k} and Value is {v}".format(k=msg.key,v=msg.value))
  
                # TODO: Process Message
                # TODO: Handle duplicate consumption explicitly if required

                # Allow time for auto-commit before closing consumer thread
                time.sleep(2)  
        
        except Exception as e:
            print('Exception {err} while consuming message(s) from Kafka: '.format(err=str(e)))
        
        finally:
            consumer.close
            # Kill celery task
            signal.signal(signal.SIGALRM, handler)
            signal.alarm(1)

Some points to note in the implementation above:

  1. The Celery task is only killed once the consumer times out and has no new messages to consume. This ensures consumer is not killed in between while consuming/processing messages.
  2. A delay of 2 secs is introduced with auto-commit duration set to 1 sec. This ensures processed message is always committed.
  3. Note the explicit handling of duplicate message processing is important as in-case of consumer/Kafka crash, messages may be re-consumed, in-case the commit has not been done.

The above setup is live for our application, deployed in K8s and works exactly the way we want!

Design a site like this with WordPress.com
Get started