Celery 中文是”芹菜”, 同时也是一个分布式任务队列.
什么是分布式任务队列呢?
任务队列是一种在线程或机器间分发任务的机制。
消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。
Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。
以上引自官方网站译文。
如果不理解,没关系,今天我们全部跳过理论基础,直接采用案例去理解Celery如何工作。
1、准备工作:安装Celery (pip install Celery) 和 RabbitMQ(yum install rabbitmq-server)
2、安装完毕后先启动RabbitMQ:
rabbitmq-server start
3、新建一个普通用户,否则Celery无法启动,su 到这个用户下,新建文件tasks.py,内容如下:
import requests
from celery import Celery
app = Celery(‘tasks’, broker=’amqp://guest@localhost//’)
app.conf.CELERY_RESULT_BACKEND = ‘db+sqlite:///results.sqlite’
@app.task
def fetch_url(url):
r = requests.get(url)
return r.status_code
新建client.py文件,编辑内容如下:
from tasks import fetch_url
tasks = []
f = open(‘topsites.txt’)
for url in f:
tasks.append(fetch_url.delay(‘http://’ + url.strip()))
f.close()
topsites.txt文件内容为:
www.mindg.cn
www.qq.com
www.163.com
以上就完成了所有的程序文件的设置,往下我们要启动Celery和看看如何运行。
在普通用户下,在当前目录下运行:
celery worker -A tasks -l INFO
正常情况下会有如下提示,我去掉了不必要的行,截取了最后几行内容:
[tasks]
. tasks.fetch_url
[2016-02-22 23:37:57,272: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2016-02-22 23:37:57,296: INFO/MainProcess] mingle: searching for neighbors
[2016-02-22 23:37:58,307: INFO/MainProcess] mingle: sync with 1 nodes
[2016-02-22 23:37:58,308: INFO/MainProcess] mingle: sync complete
[2016-02-22 23:37:58,431: WARNING/MainProcess] celery@test ready.
新建一个终端,在普通用户下运行,python client.py
然后回到刚才显示日志的终端,将会看到运行的结果,另外会在当前目录下生成一个results.sqlite文件,这个是sqlite 库,可以用sqlite results.sqlite命令行进行打开,库里有2个表,如下:
sqlite> .table
celery_taskmeta celery_tasksetmeta
程序运行的状态存储在celery_taskmeta表中了。
demo到这就结束了,想深入了解的兄弟可以去官网,本篇还是实际例子操作为主,如果您觉得还不错,请分享给您身边的朋友,在此谢过。