Let's Build a Smart Task Router in Python with Kombu

Akram Chauhan
Akram Chauhan
8 min read201 views
Let's Build a Smart Task Router in Python with Kombu

Have you ever built an application that started simple but slowly became a tangled mess? You add one feature, like processing an uploaded video. Then another, like sending a notification. Before you know it, a single user request is triggering a dozen different things, and your whole app grinds to a halt waiting for everything to finish.

It’s a classic growing pain. We try to make our code do too much all at once, synchronously. But what if we could just… not?

What if, instead of handling everything right now, we could just fire off a message—like a digital sticky note—and trust that some other part of our system would pick it up and handle it when it's ready? That’s the magic of asynchronous messaging, and it's the secret sauce behind almost every large-scale tech product you use. Today, we're going to pull back the curtain and build a mini version of one of these systems ourselves using a fantastic Python library called Kombu.

Don't worry, this isn't some dry, academic exercise. We’re going to build a real, working system right here. You’ll see messages fly from one place to another, get routed to the right workers, and processed in the background. By the end, you'll get that "aha!" moment and see how this pattern can make your own applications way more powerful and resilient.

Ready? Let's get started.

First Things First: Setting Up Our Workshop

Before we can build our messaging system, we need to lay the groundwork. Think of this as clearing the workbench and grabbing our tools. We only need a few things to get going.

First, we need to install Kombu. It’s the star of our show.

pip install kombu

Next, we’ll write our initial Python script. We'll import all the necessary pieces from Kombu and set up some basic logging. Logging is super important here—it's how we'll actually see our messages moving through the system. Without it, we'd be flying blind.

We’re also going to define a BROKER_URL. Normally, this would point to a real message broker like RabbitMQ or Redis. But for this little experiment, Kombu has a neat trick up its sleeve: an in-memory broker. This lets us run everything locally without any extra setup. It's perfect for learning and testing.

Here’s what that initial setup looks like:

import threading
import time
import logging
import uuid
import sys
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin

logging.basicConfig(
    level=logging.INFO,
    format='%(message)s',
    handlers=[logging.StreamHandler(sys.stdout)],
    force=True
)
logger = logging.getLogger(__name__)

BROKER_URL = "memory://localhost/"

Simple enough, right? We've got our tools, we've turned on the lights (logging), and we've cleared a space to work (our in-memory broker). Now for the fun part.

The Grand Central Station: Designing Our Exchange and Queues

At the heart of any good messaging system is a traffic controller. In our world, this is called an exchange. An exchange is like a central post office. It receives all incoming mail (messages) and, based on some rules, decides which mailboxes (queues) to put them in.

We're going to use a special kind of exchange called a topic exchange. I love topic exchanges because they’re incredibly flexible. They let us route messages based on patterns, a bit like using wildcards in a file search.

Let’s define our exchange and a couple of queues:

media_exchange = Exchange('media_exchange', type='topic', durable=True)

task_queues = [
    Queue('video_queue', media_exchange, routing_key='video.#'),
    Queue('audit_queue', media_exchange, routing_key='#'),
]

So, what did we just do?

  1. media_exchange: We created our central post office and named it media_exchange. We set its type to topic.
  2. video_queue: This is our first mailbox. We told it to listen to the media_exchange for any message with a "routing key" that starts with video.. The .# is a wildcard that means "and anything that follows." So, a message with the key video.upload or video.processing.failed would land here.
  3. audit_queue: This is our "catch-all" mailbox. The routing key # is a special wildcard that matches everything. This queue will get a copy of every single message that passes through our exchange. It’s perfect for logging, auditing, or debugging.

See how powerful that is? We’ve already designed a system where video-related tasks go to a dedicated place, while a separate audit trail logs everything without getting in the way.

Meet the Worker: The One Who Does the Actual Work

Okay, we have mailboxes, but who’s going to check the mail? We need a worker (or in Kombu terms, a consumer). This is a background process that just sits there, listening to a queue, and whenever a new message arrives, it grabs it and does something with it.

We'll build our own Worker class. It looks a little intimidating at first, but it's pretty straightforward once you break it down.

class Worker(ConsumerMixin):
    def __init__(self, connection, queues):
        self.connection = connection
        self.queues = queues
        self.should_stop = False

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues=self.queues,
                     callbacks=[self.on_message],
                     accept=['json'],
                     prefetch_count=1)
        ]

    def on_message(self, body, message):
        routing_key = message.delivery_info['routing_key']
        payload_id = body.get('id', 'unknown')
        logger.info(f"\n RECEIVED MSG via key: [{routing_key}]")
        logger.info(f" Payload ID: {payload_id}")

        try:
            if 'video' in routing_key:
                self.process_video(body)
            # We can check for 'audit' here, but our logger already shows it's received.
            # For this example, we'll just log it.
            elif routing_key: # Catches any message, including audit
                logger.info(" [Audit] Logging event...")

            message.ack()
            logger.info(f" ACKNOWLEDGED")

        except Exception as e:
            logger.error(f" ERROR: {e}")
            # In a real app, you might want to reject or requeue the message
            # message.requeue()

    def process_video(self, body):
        logger.info(" [Processor] Transcoding video (Simulating work...)")
        time.sleep(0.5)

Let's walk through this. Our Worker listens to the queues we give it.

The most important part is the on_message method. This is the function that gets called every single time a message arrives. Inside, we:

  • Inspect the message: We grab the routing_key to see what kind of message it is.
  • Route internally: We use a simple if statement. If the key has "video" in it, we call our process_video function. Otherwise, we just log it as an audit event.
  • Do the work: The process_video function simulates doing some heavy lifting, like transcoding a video file. We just make it sleep for half a second.
  • Acknowledge it: This is critical. message.ack() is us telling the broker, "Okay, I've successfully handled this message. You can safely delete it now." If our worker crashed before this line, the message would be re-delivered to another worker to ensure the job gets done. It's a built-in safety net.

We now have a smart, resilient worker ready to process tasks.

Sending the Mail: Our Message Producer

Our system is all set up to receive and process messages, but someone needs to send them in the first place! That's the job of the producer.

The producer is any part of your application that needs to kick off a background task. In our example, we'll just write a simple function that sends a couple of different messages.

def publish_messages(connection):
    producer = Producer(connection)
    tasks = [
        ('video.upload', {'file': 'movie.mp4'}),
        ('user.login', {'user': 'admin'}),
    ]

    logger.info("\n PRODUCER: Starting to publish messages...")
    for r_key, data in tasks:
        data['id'] = str(uuid.uuid4())[:8]
        logger.info(f" SENDING: {r_key} -> {data}")
        producer.publish(
            data,
            exchange=media_exchange,
            routing_key=r_key,
            serializer='json'
        )
        time.sleep(1.5)
    logger.info(" PRODUCER: Done.")

Notice how clean this is? The producer doesn't know or care who is listening. It just creates a message (a Python dictionary in this case), gives it a routing key (video.upload or user.login), and publishes it to our media_exchange.

The exchange handles the rest. It’s a beautiful example of decoupling—the part of your code that sends the work is completely separate from the part that does the work.

Showtime! Let's Run the Whole Thing

We've built all the pieces: the exchange, the queues, the worker, and the producer. Now let's connect them all and watch the show.

We'll start our worker in a background thread so it can listen for messages continuously. Then, in the main thread, we'll call our publish_messages function.

def run_example():
    with Connection(BROKER_URL) as conn:
        worker = Worker(conn, task_queues)
        worker_thread = threading.Thread(target=worker.run)
        worker_thread.daemon = True
        worker_thread.start()
        logger.info(" SYSTEM: Worker thread started.")
        time.sleep(1)

        try:
            publish_messages(conn)
            time.sleep(2)  # Give worker time to process last message
        except KeyboardInterrupt:
            pass
        finally:
            worker.should_stop = True
            logger.info("\n SYSTEM: Execution complete.")

if __name__ == "__main__":
    run_example()

When you run this script, you'll see a flurry of activity in your console. You’ll see the producer sending messages, and almost instantly, you'll see the worker picking them up. You'll see the video.upload message get routed to both the video queue and the audit queue. And you'll see the user.login message go only to the audit queue, just as we designed.

It’s a complete, end-to-end flow of a distributed system, running right on your machine. Pretty cool, huh?

What we've built here is more than just a toy example. It's the blueprint for building incredibly scalable and robust applications. Imagine hundreds of workers processing thousands of video files, or millions of notifications being sent, all orchestrated by this simple, elegant pattern. You’ve just taken your first step into a much larger, and much more powerful, world of system design.

Tags

AI Engineering Performance Optimization Python Software Development Distributed Systems Task Queue Message Broker Asynchronous Messaging Kombu Python Kombu Scalable Architecture Microservices Architecture Backend Development System Design Concurrent Programming Application Scaling Queueing Systems

Stay Updated

Get the latest articles and insights delivered straight to your inbox.

We respect your privacy. Unsubscribe at any time.

Aicosoft

AI & Technology News, Insights & Innovation

AICOSOFT delivers cutting-edge AI news, technology breakthroughs, and innovation insights. Stay informed about artificial intelligence, machine learning, robotics, and the latest tech trends shaping tomorrow.

Connect With Us

© 2026 Aicosoft. All rights reserved.