Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。 Celery 有广泛、多样的用户与贡献者社区,可以通过IRC 或是 邮件列表 加入到 Celery 的开发和维护中。 Celery 是开源的,使用 BSD 许可证 授权。 下面就以两个 Celery 非常常用的例子:异步任务执行定时任务 为例,给大家介绍一下如何使用 Celery。

安装 Celery

安装 Celery 非常简单,因为我都是通过 pip 来管理 python 的依赖的,所以直接就使用 pip 进行安装:

  1. $ pip install Celery

预备环境

Celery 原理简介

为了说明为什么需要预备环境,这里需要对 Celery 的原理进行一个简单的介绍,这里介绍的原理就是 Celery 是如何实现异步任务的。 Celery 的异步实现原理是将 任务执行单元任务派发单元 分开,从而达到异步的效果;

任务派发单元 将需要执行的任务丢到一个 消息队列 中,然后由 任务执行单元 根据自身的情况从消息队列中获取任务执行,这样就实现了异步的效果。 所以,这里我们可以发现 3 个关键的单位,分别是 任务执行单元(worker/consumer)、任务派发单元(producer)和 消息队列(queue)。 consumerproducer 我们稍后再说,因为都是在 Celery 里面的,但是我们的 Celery 并不带 Queue,所以需要我们自己准备 Queue,Celery 官方推荐的 Queue 有 RabbitMQ、Redis,当然还有很多组件可以选择,但是 Celery 表示并不保证完美支持。这里就以 RabbitMQ 为例进行演示

RabbitMQ 安装

因为我都是用 Docker 来安装各种组件的,所以我这里也是直接用 Docker 来安装 RabbitMQ:

  1. $ docker pull rabbitmq
  2. $ docker run -d --name rabbit -p 15671:5671 -p 15672:5672 rabbitmq

这样,我就可以访问 docker 的 15672 端口来使用 RabbitMQ 了。

异步队列 Demo

下面就演示一个很简单的异步队列,将以下代码保存为 tasks.py

  1. from celery import Celery
  2. app = Celery('tasks', broker='amqp://[email protected]:15672//')
  3. @app.task
  4. def add(x, y):
  5. result = x + y
  6. return result

然后使用下面这个命令启动 worker:

  1. $ celery -A tasks worker -l info

你应该看到类似的提示:

图 1:运行 Worker

这个时候,worker 已经启动了, queue 也已经存在了,就差 producer 了,我们这里决定使用 python shell 来作为 producer。 打开另外一个 shell 终端,切换到 tasks.py 所在的目录,然后运行以下命令:

  1. $ python
  2. > from tasks import add
  3. > add.delay(1, 2)

我们可以在启动 worker 的终端上看到类似的输出:

  1. 2016-02-11 15:29:26,174: INFO/MainProcess] Received task: tasks.add[c95addcf-2ea3-4e58-bfb9-fa6d3a774419]
  2. [2016-02-11 15:29:26,176: INFO/MainProcess] Task tasks.add[c95addcf-2ea3-4e58-bfb9-fa6d3a774419] succeeded in 0.000506637006765s: 3

这就说明我们的异步任务执行成功了。 这里有几点需要解释一下的: tasks.py 中 初始化了一个 Celery 对象,里面有一个 broker 的参数,这个就是用来指定 queue 的,我在这里使用的就是 docker 中的 RabbitMQ 的 URL。

add 函数的定义有 @app.task 装饰器,这个装饰器表示这个函数是 worker 的执行函数,异步函数都需要使用这个装饰器。

在 python shell 中,调用 add 方法的方式是: add.delay(1, 2) 和调用普通方法 add(1, 2) 不一样之处在于加了一层 delay 封装,表示我们要异步执行这个函数。

定时任务

其实,当我们知道异步任务之后,定时任务也就简单了,定时任务的工作方法就是定时执行一个函数,再套入上面的异步任务执行流程中,我们应该发现其实我们现在就只缺少一个定时器了。

在 Celery 中,提供了一个定时器叫做 Crontab,他提供强大的定时配置能力,让我们可以随心所欲的设置定时时间。

下面编写一个简单的例子来演示定时任务,保持上面的 worker 不要关闭,另开一个 shell 终端,将以下代码保存为 schedule.py:

  1. from celery.schedules import crontab
  2. from celery import Celery
  3. app = Celery('tasks', broker='amqp://[email protected]:15672//')
  4. app.conf.update(
  5. # 定时配置要放在这里,f**k...
  6. CELERYBEAT_SCHEDULE={
  7. 'perminute': {
  8. 'task': 'tasks.add',
  9. 'schedule': crontab(minute='*/1'),
  10. 'args': (1, 2)
  11. }
  12. }
  13. )

然后执行以下命令:

  1. $ celery beat -A schedule -l info

然后你会发现 worker 的 shell 中每隔一分钟就会打印两句日志,这表示我们的定时任务运行正常了。

参考资料