Celery入门demo

Celery 中文是”芹菜”, 同时也是一个分布式任务队列.

什么是分布式任务队列呢?

任务队列是一种在线程或机器间分发任务的机制。

消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。

Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。

Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。

以上引自官方网站译文。

如果不理解,没关系,今天我们全部跳过理论基础,直接采用案例去理解Celery如何工作。

1、准备工作:安装Celery (pip install Celery)  和 RabbitMQ(yum install rabbitmq-server)

2、安装完毕后先启动RabbitMQ:

rabbitmq-server start

3、新建一个普通用户,否则Celery无法启动,su 到这个用户下,新建文件tasks.py,内容如下:

import  requests

from celery import Celery

app = Celery(‘tasks’,  broker=’amqp://guest@localhost//’)

app.conf.CELERY_RESULT_BACKEND = ‘db+sqlite:///results.sqlite’

@app.task

def fetch_url(url):

r = requests.get(url)

return r.status_code

新建client.py文件,编辑内容如下:

from tasks import fetch_url

tasks = []

f = open(‘topsites.txt’)

for url in f:

tasks.append(fetch_url.delay(‘http://’ + url.strip()))

f.close()

topsites.txt文件内容为:

www.mindg.cn

www.qq.com

www.163.com

 

以上就完成了所有的程序文件的设置,往下我们要启动Celery和看看如何运行。

在普通用户下,在当前目录下运行:

celery worker -A tasks -l INFO

正常情况下会有如下提示,我去掉了不必要的行,截取了最后几行内容:

[tasks]

. tasks.fetch_url

[2016-02-22 23:37:57,272: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

[2016-02-22 23:37:57,296: INFO/MainProcess] mingle: searching for neighbors

[2016-02-22 23:37:58,307: INFO/MainProcess] mingle: sync with 1 nodes

[2016-02-22 23:37:58,308: INFO/MainProcess] mingle: sync complete

[2016-02-22 23:37:58,431: WARNING/MainProcess] celery@test ready.

新建一个终端,在普通用户下运行,python client.py

然后回到刚才显示日志的终端,将会看到运行的结果,另外会在当前目录下生成一个results.sqlite文件,这个是sqlite 库,可以用sqlite results.sqlite命令行进行打开,库里有2个表,如下:

sqlite> .table

celery_taskmeta     celery_tasksetmeta

程序运行的状态存储在celery_taskmeta表中了。

demo到这就结束了,想深入了解的兄弟可以去官网,本篇还是实际例子操作为主,如果您觉得还不错,请分享给您身边的朋友,在此谢过。