Celery Analysis Series
Many of you who play Python have probably heard of or even played with Celery, which is arguably the most popular asynchronous message queue in Python. However, students who use it a lot also have a feeling that from time to time they will encounter inexplicable potholes that can only be solved by rebooting, because it is not easy to find the cause.
I, as one of the injured, wanted to understand the inner principle of this pit, so I had raked its internal implementation, but, because Celery is a bit large, and I think the code implementation is not much, so I only learned a part of the knowledge, but, but it is very rewarding. So, due to various circumstances, I’m going to dig deeper this time, so there will be a series of articles on the Celery implementation.
Kombu is an AMQP-compatible message queue abstraction, but that’s just the author’s side of the story, as you can see from the code. So, I decided to start with Kombu’s code, and this article is the first one!
I’m going to talk about Kombu 4.1.0 and Celery 4.1.0 in this series, but I’m going to talk about it in advance in case the next students don’t get it right.
Download Kombu code
Kombu’s code is easy to find because it’s strongly coupled to Celery, so it’s in Celery’s code set and can be found easily on Github, so it’s easy to download:.
git clone https://github.com/celery/kombu.git
git checkout v4.1.0
That way, you’ve got the source code for Kombu inside my series of articles, and all the sources and lines of code I’ll follow correspond to this version of the code!
Once I have the code, I’m going to discard all the non-code parts and look at the kombu directory as a whole.
As you can see, inside the directory kombu are mostly files, and only three folders, which are.
- async: functions and classes of asynchronous operations.
- transport: classes compatible with various MQs
- utils: some auxiliary functions and classes.
We found some problems with this directory.
Simple add/get message
OK, after the overview let’s look at some practical stuff, let’s try writing a production/consumption code in Kombu, this DEMO is divided into two parts
- simple_receive.py: a simple consumer
- simple_send.py: a simple producer
Here’s a look at how both of these codes are written.
Again, as usual, you can find both pieces of code in my Github. Looking at the consumer first, the consumer’s code is relatively simple, we first establish a connection to the MQ, and then we take the message out of the MQ’s designated queue and process it.
The producer, on the other hand, has to establish a connection to the queue, but it’s different in that it creates an exchange and sends a message to the queue, all the while taking a number of parameters.
This is one of the interesting things about Kombu, it’s intended to abstract all MQs and then expose a consistent API through an interface, so we don’t have to worry about the underlying MQ, we can just switch between Redis/RabbitMQ/MongoDB or whatever.
Kombu’s MQ Model
Since Kombu is an abstraction of AMQP, it must have an abstract model, in fact, it is roughly the same as RabbitMQ, but, not exactly the same, there are some differences, and the following is an introduction to Konbu’s abstract model.
In Kombu, there are multiple concepts, some of which we’ve actually seen in the simple production/consumer sample example above, and they are.
- Message: the basic unit of production and consumption, in fact it is what we call a message.
- Connection: The abstraction of MQ connection, a Connection corresponds to the connection of an MQ.
- Transport: a real MQ connection and an instance of a real connection to MQ (redis/rabbitmq)
- Producers: An abstract class for sending messages
- Consumers: an abstract class that receives messages
- Exchange: MQ routing, this is similar to RabbitMQ, with 5 types supported.
- Queue: the corresponding queue abstract, which is actually a string wrapper
messages are sent to that Queue’s
Suppose we want to send a message to a queue in Redis called ‘test’, how does Kombu do that, which is designed into the concept of Exchange. Currently, Kombu’s support for different MQs looks like this.
Assuming we are using Direct, then our Producer just needs to specify Queue=test in production, which will be sent in the test queue. For more on Exchange, see: AMQP 0-9-1 Model Explained
That’s it for this article, let’s get down to the code level and see how the various entities in the model are implemented.
- [AMQP 0-9-1 Model Explained] (https://www.rabbitmq.com/tutorials/amqp-concepts.html)
- [Kombu Docs] (http://docs.celeryproject.org/projects/kombu/en/latest/reference/kombu.html)
Translated with www.DeepL.com/Translator (free version)