Overview

After reading so many articles, the last one finally saw some interesting things, but apparently this does not meet our expectations for Kombu, because the last article said that the message in the queues to get the queues by rotation each queue to achieve, how frustrating, is kombu so technically incompetent? This may actually be the case upfront, but as it evolved, kombu wasn’t satisfied with that and some more advanced looking techniques were introduced, so let’s take a look at some additional implementations today.

eventloop

This is an example of a consumer implementation, because from our previous analysis, we can see that the producer is actually very simple, without much technical content, but the consumer is different, it has the full technical difficulty, for example, we have previously slotted the rotation training traversal is from this, today to a different Consumer.

The first one is basically the same, still creating Connection and Consumer, but the Consumer is created slightly differently than the one we created before, which was created by SimpleQueue. does something like consumer.get(), but instead writes a sentence like.

  1. for _ in eventloop(connection)

It’s amazing what happens here, let’s go to eventloop and look at the file at kombu/common.py Line 177.

At first glance it looks like another loop, but if you take a closer look, it’s a little different. Here is a generator, and every time it returns conn.drain_events, it’s an old acquaintance. queue.get() `, but now just put it into the eventloop.

Hub

The implementation of the eventloop looks nothing surprising, so let’s look at another interesting implementation, first of all, let’s start with a Sample.

The difference between this and the previous eventloop is that Line 17, where the eventloop took the Connection object directly as an argument, does take hub as an argument, so let’s see what’s going on here.

Okay, we’ve already specified Connection to Transport here, so let’s go directly to RedisTransport and see what’s going on.

The first one is to determine if the current channel is in the epoll model, if not, then put it in; also, if the channel was not in the epoll before, then it is in the epoll this time, but the connection has no effect on the epoll yet, so send a _brpop_start(Line 301).

As you can see, there is a listen request on all queues, which means that any queue that receives a message will be responded to. Also have to go back to kombu/transport/redis.py Line 1034 and see what the add_reader function does.


It actually jumps back to where we were before, where we need to focus on what the on_readable function does.

I’ve heard that Redis is ready, so I came to get the result, and then parsed it, and when it succeeded, I had to process the message, so I came back to redis.py: redis.py.

Recall

Looking back, while the previous meal was an exciting search for roots, we didn’t stop to admire the view from the road, so here’s a slow look at some things to remember.

Hub What is it?

We’ve come full circle, however, without clearing up the concept of what Hub is. From the Hub comment, we know that it is responsible for Event Loop, in that case, I can guess that there must be an EPoll model inside it, so I can get a rough idea of what it is.

Seeing the _create_poller and _close_poller in the code confirms my guess, but it’s a bit complicated here in that it’s compatible with several models: Gevent, kqueue, epoll and select, but I don’t care, I like epoll so I’ll take it as epoll, but it seems that more people like gevent.

Since it’s an Event Loop, we have to focus on its FD, so the add function helps me a lot with my walkthrough.

But seeing this implementation of run_forever makes me a bit uncomfortable, because it’s a bit different from the Epoll model I’m familiar with:.

Then I had to pay attention to what the loop was and how it was implemented, and I saw the following epoll in eventio.py.

Then you compare it to create_loop, and it all becomes clear.

It felt like some questions were answered, and hub is an event loop!

Connection Register to Hub

Since Hub is an event loop, it makes sense to register the connnection, but how does it affect the Consumer? A quick walk through the Redis Connection code shows that register_with_event_loop doesn’t do much registration, but it’s clear: A Connection corresponds to a Hub, and the hub between them is MultiChannelPoller, which is responsible for finding out which channels are available, but they all come from the same Connection.

How Consumer and Connection are connected

In fact, we can guess that the Consumer is actually bound to the message processing function, we only need to correspond the corresponding Channel and Consumer callback should be able to.

In fact, we can see from the beginning of the Sample, the hub of which is Queue!

Consumer consumption messages are consumed via Queue, which in turn passes them on to Channel

There’s no doubt that Channel will blame connection again, and then EPoll will have to play now that the connection is up.

What is Hub run_forever doing?

Since it’s run_forever, and it’s blocked, we can assume it’s a poll inside EPoll.

We’ve seen this loop before, in hub it’s create_loop(), but it’s a generator that doesn’t return anything after calling next, and we can see on Line 187 that it doesn’t take the return value at all, it just makes genrator iterate it once.

In fact, we rotate connection internally, and then match up available connections with callbacks, and call callback when a message arrives, so everything seems to be connected, but the relationship between connection and callback seems to be unclear.

callback in Connection

As you can see, callbacks were originally passed in the Consumer by specifying on_message, but how did they end up in the Connection? Check out.

In fact, the callback in the Hub only goes to fd, he doesn’t know which connection and channel this fd corresponds to, and this job is managed by MultiChannelPoller, which corresponds to both fd and channel.

This job is managed by MultiChannelPoller, which corresponds to fd and channel.

What is the relationship between Channel and Connection?