Overview

We’ve covered all of Kombu’s incoming and outgoing messages in the previous articles, and it seems like there’s not much left to cover. But, in fact, in our quick walk-through of the code, we missed some important things, one of which is that when fetching a message, we said that Consumer’s consume is non-blocking, and then proceeded to a loop where we waited for a buffer to be filled and then returned that buffer, but. We don’t know how this works, so let’s take a look at the rest of the implementation.

Consumer

The first thing we need to do is to look at the non-blocking implementation of consume, after a few code walks we should be able to easily find the Consumer code: kombu/messaging.py, and then look at the consume operation: kombu/messaging.py.

As you can see, there doesn’t seem to be anything particularly bright, and if there is, it should be in two places.

Queue that cannot be unlisted unless cancel_by_queue is called.

  1. no_wait=True for all queues, leaving only one set to no_wait=False, what’s the point? Keep looking.

There is an additional concept of tag here, as we can see by looking at Kombu’s documentation.

The consumer tag is local to a connection, so two clients can use the same If this field is empty the server will generate a unique tag.

Okay, that’s how it works. Then look at the actual consume code, where you pass a couple of arguments.

Let’s not look at the specific callback is what, continue to look deeper into the consume, where the design to the queue, this structure we have not seen before, here is a look in passing: kombu/entity.py

The code is simple, it goes directly to the channel, which is already an old friend, so follow it directly.

A very simple paragraph, just set the various values, and then the key is the last sentence: _reset_cycle, what’s inside, look at it and you’ll see.

Then it seems to be over, call is over, nothing happens, go home.

Message Callback

Wait, wait, wait, there seems to be more to it than that… Here we’re just declaring the mission consumers out, but we don’t know where to put the message after we’ve received it or how to put it in. So go ahead and look at the code behind our sample.

  1. self.channel.connection.client.drain_events(timeout=remaining)

That’s the important one, so let’s find out what the code is here in kombu/connection.py.

And then into Transport, that can not be helped, can not be avoided.

The key to this implementation is the get in Line 961, which corresponds to kombu/utils/scheduling.py*.

The actual value should be kombu/transport/virtual/base.py: kombu/transport/virtual/base.py.

And then it goes to kombu/transport/redis.py

That’s really the end of it, and at this point we can realize that the so-called asynchrony is actually a training rotation, where Kombu queries all the queues that need to be listened to again, until the query is complete or a queue is encountered that can be used, and then it gets the content and calls back the corresponding callback!

Recall

In this tracking process, we can see that the circles go around, but all that has gone through are these documents.

Reference

  1. [Kombu Code] (https://github.com/celery/kombu)