OK, now that we’re really getting into the source code analysis of Kombu, let’s start with the example from Kombu Source Code Analysis I and look at a simple use of the How Kombu is implemented internally starts with this very first piece of code.
Let’s focus on what’s happening with
conn.SimpleQueue, then look at what the queue is, and finally look at the content of the message. conn should be of type
Connection, and we’re using Redis, so it should be . Redis‘s
Connection is correct, and we’ll follow up with the code to see how (kombu/connection.py line 49).
Tracing the constructor of
Connection we can see that in both Line 174 and Line 187 we only remember the type of
transport and then initialize it with a parameter in Line 189, and if we look at the code we can see that in Line 246 we assign it directly to
transport_cls, which will be of great use later on.
OK, a little bit of an interlude, coming back to
SimpleQueue, at Line 712 we can see a very simple item.
We can see that the code is as simple as that, look at the comment above and note the meaning of the parameters inside the comment, you can see that several default parameters are provided here.
- channel: This is the connection.
- queue: the parameter we pass.
- no_ack: None
- queue_opts: None
- exchange_opts: None
Okay, let’s follow along and see what this
SimpleQueue is, which should be located at kombu/simple.py, and we can see the code defined at Line 118, and to be honest, it’s quite nice to see this, because it feels like we’re seeing the head: kombu/simple.py.
You can see that the Consumer and Producer have already been created in this SimpleQueue, so let’s not focus on their code for now, but rather on what I described in my last post. We can focus on the parameters of the Consumer and Producer, which are set for us by default, and then we’ll be done with them.
Next, it’s time to look at the implementation of getting the message, again in this file, but we can see that the code of SimpleQueue is really simple, it doesn’t override the
get method itself, so we can find it in its parent class
SimpleBase, which should be in Line 35.
At Line 36 thankfully, it’s blocked, so we don’t need to look at more code, and then here’s a very desirable
_consume, go in and look at.
It’s very simple, it’s actually a call to the Consumer consume and then return, notice that this is non-blocking, so how do we get the message, keep reading, you can see that there’s a loop below, and then focus on the comment, if a message comes in then it’s put into
self.buffer Inside, no message here will block, at the same time, if the blocking exceeds the timeout time we set, then it will run an exception.
Then we go according to the normal logic, in Line 45 is to get the data normally, and then go back, we look at what we got, back to the original code, in fact, that is, we write our own Sample in.
What is returned is a Message, but we didn’t see what type this message is in the code we traced earlier, so what type is it? Actually, you’ve searched the Kombu code and found that there’s only one Message type in kombu/message.py, which, as mentioned before, is the basic unit of production/consumption in Kombu, so let’s take a quick look at the code.
In fact, it is nothing more than the encapsulation of the message content, however, the content is still relatively rich, because there is no need to talk about it, so, let’s not talk about the content, let’s go back to the latter sentence of the processing of the message:
message.ack and see what happened here:
Actually you’ll find that it doesn’t do anything, it just determines if you need to confirm or not, if you’ve confirmed or not, and then this is the important part:
self.channel.basic_ack where the way to confirm is given to the channel (Connection) to execute, and then that’s it.
This is one way of tracing a simple implementation. The whole thing is fairly straightforward, but we already know a few things.
- Connection contains Transport, and it’s an abstraction, so depending on our parameters we can determine what Transport is.
- SimpleQueue contains not only queue, but also connection, consumer and producer.
- Consumer’s consume is non-blocking and is really fetched in
drain_eventsand inserted into the member variable, but we haven’t seen how it’s inserted yet!
- The message is confirmed by a connection, but we don’t see any persistence or anything like that.
- [Kombu Code] (https://github.com/celery/kombu)
Translated with www.DeepL.com/Translator (free version)