• A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn’t even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues.

Routing in RabbitMQ

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

Bindings can take an extra routing_key parameter.

Direct exchange

Exchange using fanout configuration broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on an attribute. For example we may want the script which is writing log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.

We were using a fanout exchange, which doesn’t give us too much flexibility — it’s only capable of mindless broadcasting.

We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple — a message goes to the queues whose binding key exactly matches the routing key of the message.

Example: Emitting logs

We’ll use this model for our logging system. Instead of fanout we’ll send messages to a direct exchange. We will supply the log severity as a routing key. That way the receiving script will be able to select the severity it wants to receive. Let’s focus on emitting logs first.

Like always we need to create an exchange first:

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

And we’re ready to send a message:

channel.basic_publish(exchange='direct_logs',routing_key=severity,
body=message)

Subscribing to the above exchange

We’re going to create a new binding for each severity we’re interested in.

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)

Suffering from Knowledge Quest