In the last post, we talked from a Consumer Sample, looked at some of Kombu’s internal implementations, and learned a bit about how messages are taken back. Now let’s move on to look at how the message was sent, and again, let’s start with the Sample we talked about in the first article and see what the real story is.
The first step is to create a Connection, no doubt about it, because there must be a connection to send the message. We’ve already seen this code in the last article, so let’s leave it at this stage and move on to the next one.
Line 29 We’re explicitly declaring a Producer, which is different from what we saw in the second article, where we simply constructed a SimpleQueue that creates a Consumer and a Producer, but here we have explicitly declares a Producer, so we can explore what’s going on with the code located at kombu/messageing.py Line 61:
You can see a lot of familiar terms here, channel/exchange and so on, but we see that we’re not passing any of these parameters, so by default it’s None, only channel we’re passing connection, ok, so with that in mind, let’s go ahead and see what happens when we call
The publish comment is very long, so I ignored it, and look at the code we call, we also pass a lot of arguments, let’s see what our arguments do.
The only useful things are in Line 138-140, which compresses the data, and Line 148, which is also useful for setting retry policy. Finally, I’ll just call the internal
_publish method on Line 149.
_publish method is called directly on Line 149.
Here’s where it gets interesting, where Line 188 encapsulates the message as Message, and then Line 194 declares the queue for mq, and then finally sends the message through Channel.
By the way, at this point, we still haven’t seen how Exchange and Routing Key work, so we’re hoping that
basic_publish will give us some insight, and following the base class of Channel, we see
virtual/base.py Line 600.
As you can see, the messaging here was originally left to Exchange, so it’s time to take a look at the Exchange implementation. The code jumps to
komu/transport/virtual/exchange.py and we find a lot of interesting stuff, such as in Line 155.
We’ll see that Kombu supports 3 different Exchanges by default, and defines its Exchange implementation, so let’s take a look at DirectExchange’s specific
So the key is how
_lookup is implemented, in fact,
_lookup we do not need to care about the specific implementation, let’s look up, look at
lookup in your mind will have some understanding.
The problem here is what the table is. It’s much clearer if we find out how to implement the table, but if we trace the code, you’ll find that the table is more complicated to get, so here are some typical pieces of code to document where the table comes from. virtual/base.py**.
From these code snippets, we have a general idea of how Exchange routing works, the key point should be
prepare_bind for each Exchange, and then let’s see how the
put operation works for Direct Exchange. kombu/transport/redis.py` Line 762 This location.
No highlights, since we all already know which queue is which, so just put it in there keep an eye out, there’s also a
q_for_pri operation here, which we may see later.
OK, that’s a simple process for producing a message that we simply had to walk through and learned quite a bit more about.
- Producer contains a lot of things, including Exchange, routing_key, channel, and more!
- Message is just an encapsulation of some elements, not doing anything.
- Exchange just converts the sent
routing_keyto the name of
- The actual sending has to be done by the channels, and each different Transport has its own corresponding channel.
- [Kombu Code] (https://github.com/celery/kombu)