Have you ever built an application that started simple but slowly became a tangled mess? You add one feature, like processing an uploaded video. Then another, like sending a notification. Before you know it, a single user request is triggering a dozen different things, and your whole app grinds to a halt waiting for everything to finish.
It’s a classic growing pain. We try to make our code do too much all at once, synchronously. But what if we could just… not?
What if, instead of handling everything right now, we could just fire off a message—like a digital sticky note—and trust that some other part of our system would pick it up and handle it when it's ready? That’s the magic of asynchronous messaging, and it's the secret sauce behind almost every large-scale tech product you use. Today, we're going to pull back the curtain and build a mini version of one of these systems ourselves using a fantastic Python library called Kombu.
Don't worry, this isn't some dry, academic exercise. We’re going to build a real, working system right here. You’ll see messages fly from one place to another, get routed to the right workers, and processed in the background. By the end, you'll get that "aha!" moment and see how this pattern can make your own applications way more powerful and resilient.
Ready? Let's get started.
First Things First: Setting Up Our Workshop
Before we can build our messaging system, we need to lay the groundwork. Think of this as clearing the workbench and grabbing our tools. We only need a few things to get going.
First, we need to install Kombu. It’s the star of our show.
pip install kombu
Next, we’ll write our initial Python script. We'll import all the necessary pieces from Kombu and set up some basic logging. Logging is super important here—it's how we'll actually see our messages moving through the system. Without it, we'd be flying blind.
We’re also going to define a BROKER_URL. Normally, this would point to a real message broker like RabbitMQ or Redis. But for this little experiment, Kombu has a neat trick up its sleeve: an in-memory broker. This lets us run everything locally without any extra setup. It's perfect for learning and testing.
Here’s what that initial setup looks like:
import threading
import time
import logging
import uuid
import sys
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[logging.StreamHandler(sys.stdout)],
force=True
)
logger = logging.getLogger(__name__)
BROKER_URL = "memory://localhost/"
Simple enough, right? We've got our tools, we've turned on the lights (logging), and we've cleared a space to work (our in-memory broker). Now for the fun part.
The Grand Central Station: Designing Our Exchange and Queues
At the heart of any good messaging system is a traffic controller. In our world, this is called an exchange. An exchange is like a central post office. It receives all incoming mail (messages) and, based on some rules, decides which mailboxes (queues) to put them in.
We're going to use a special kind of exchange called a topic exchange. I love topic exchanges because they’re incredibly flexible. They let us route messages based on patterns, a bit like using wildcards in a file search.
Let’s define our exchange and a couple of queues:
media_exchange = Exchange('media_exchange', type='topic', durable=True)
task_queues = [
Queue('video_queue', media_exchange, routing_key='video.#'),
Queue('audit_queue', media_exchange, routing_key='#'),
]
So, what did we just do?
media_exchange: We created our central post office and named itmedia_exchange. We set its type totopic.video_queue: This is our first mailbox. We told it to listen to themedia_exchangefor any message with a "routing key" that starts withvideo.. The.#is a wildcard that means "and anything that follows." So, a message with the keyvideo.uploadorvideo.processing.failedwould land here.audit_queue: This is our "catch-all" mailbox. The routing key#is a special wildcard that matches everything. This queue will get a copy of every single message that passes through our exchange. It’s perfect for logging, auditing, or debugging.
See how powerful that is? We’ve already designed a system where video-related tasks go to a dedicated place, while a separate audit trail logs everything without getting in the way.
Meet the Worker: The One Who Does the Actual Work
Okay, we have mailboxes, but who’s going to check the mail? We need a worker (or in Kombu terms, a consumer). This is a background process that just sits there, listening to a queue, and whenever a new message arrives, it grabs it and does something with it.
We'll build our own Worker class. It looks a little intimidating at first, but it's pretty straightforward once you break it down.
class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
self.should_stop = False
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=self.queues,
callbacks=[self.on_message],
accept=['json'],
prefetch_count=1)
]
def on_message(self, body, message):
routing_key = message.delivery_info['routing_key']
payload_id = body.get('id', 'unknown')
logger.info(f"\n RECEIVED MSG via key: [{routing_key}]")
logger.info(f" Payload ID: {payload_id}")
try:
if 'video' in routing_key:
self.process_video(body)
# We can check for 'audit' here, but our logger already shows it's received.
# For this example, we'll just log it.
elif routing_key: # Catches any message, including audit
logger.info(" [Audit] Logging event...")
message.ack()
logger.info(f" ACKNOWLEDGED")
except Exception as e:
logger.error(f" ERROR: {e}")
# In a real app, you might want to reject or requeue the message
# message.requeue()
def process_video(self, body):
logger.info(" [Processor] Transcoding video (Simulating work...)")
time.sleep(0.5)
Let's walk through this. Our Worker listens to the queues we give it.
The most important part is the on_message method. This is the function that gets called every single time a message arrives. Inside, we:
- Inspect the message: We grab the
routing_keyto see what kind of message it is. - Route internally: We use a simple
ifstatement. If the key has "video" in it, we call ourprocess_videofunction. Otherwise, we just log it as an audit event. - Do the work: The
process_videofunction simulates doing some heavy lifting, like transcoding a video file. We just make itsleepfor half a second. - Acknowledge it: This is critical.
message.ack()is us telling the broker, "Okay, I've successfully handled this message. You can safely delete it now." If our worker crashed before this line, the message would be re-delivered to another worker to ensure the job gets done. It's a built-in safety net.
We now have a smart, resilient worker ready to process tasks.
Sending the Mail: Our Message Producer
Our system is all set up to receive and process messages, but someone needs to send them in the first place! That's the job of the producer.
The producer is any part of your application that needs to kick off a background task. In our example, we'll just write a simple function that sends a couple of different messages.
def publish_messages(connection):
producer = Producer(connection)
tasks = [
('video.upload', {'file': 'movie.mp4'}),
('user.login', {'user': 'admin'}),
]
logger.info("\n PRODUCER: Starting to publish messages...")
for r_key, data in tasks:
data['id'] = str(uuid.uuid4())[:8]
logger.info(f" SENDING: {r_key} -> {data}")
producer.publish(
data,
exchange=media_exchange,
routing_key=r_key,
serializer='json'
)
time.sleep(1.5)
logger.info(" PRODUCER: Done.")
Notice how clean this is? The producer doesn't know or care who is listening. It just creates a message (a Python dictionary in this case), gives it a routing key (video.upload or user.login), and publishes it to our media_exchange.
The exchange handles the rest. It’s a beautiful example of decoupling—the part of your code that sends the work is completely separate from the part that does the work.
Showtime! Let's Run the Whole Thing
We've built all the pieces: the exchange, the queues, the worker, and the producer. Now let's connect them all and watch the show.
We'll start our worker in a background thread so it can listen for messages continuously. Then, in the main thread, we'll call our publish_messages function.
def run_example():
with Connection(BROKER_URL) as conn:
worker = Worker(conn, task_queues)
worker_thread = threading.Thread(target=worker.run)
worker_thread.daemon = True
worker_thread.start()
logger.info(" SYSTEM: Worker thread started.")
time.sleep(1)
try:
publish_messages(conn)
time.sleep(2) # Give worker time to process last message
except KeyboardInterrupt:
pass
finally:
worker.should_stop = True
logger.info("\n SYSTEM: Execution complete.")
if __name__ == "__main__":
run_example()
When you run this script, you'll see a flurry of activity in your console. You’ll see the producer sending messages, and almost instantly, you'll see the worker picking them up. You'll see the video.upload message get routed to both the video queue and the audit queue. And you'll see the user.login message go only to the audit queue, just as we designed.
It’s a complete, end-to-end flow of a distributed system, running right on your machine. Pretty cool, huh?
What we've built here is more than just a toy example. It's the blueprint for building incredibly scalable and robust applications. Imagine hundreds of workers processing thousands of video files, or millions of notifications being sent, all orchestrated by this simple, elegant pattern. You’ve just taken your first step into a much larger, and much more powerful, world of system design.




