博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Celery-分布式任务队列
阅读量:6137 次
发布时间:2019-06-21

本文共 7189 字,大约阅读时间需要 23 分钟。

一、介绍

官方文档:

pip3 install celery

Celery是一个专注于实时处理和任务调度的分布式任务队列,通过它可以轻松的实现任务的异步处理。

使用Celery的常见场景:

  • Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
  • 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
  • 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。

Celery包含如下组件:

  • Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  • Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
  • Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
  • Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
  • Result Backend:任务处理完后保存状态信息和结果,以供查询。

二、简单示例

创建一个tasks.py:

from celery import Celeryapp = Celery(    "tasks",     broker="amqp://pd:123456@localhost:5672//",    backend="redis://:123456@localhost:6379/0")@app.taskdef add(x, y):    return x+y

启动Celery Worker来开始监听并执行任务:

celery -A tasks worker -l info

更多有关命令:

celery worker --help

再打开一个终端, 进行命令行模式,调用任务:

>>> from tasks import add>>> relt = add.delay(10, 10)>>> relt.ready()  # 检查任务是否已经完成True>>> relt.get()    # 获取任务结果,可设置timeout超时20>>> relt
>>> relt.id'470d5f45-42eb-4b0c-bd38-06b85fa5599b'>>> relt.result20>>> relt.status'SUCCESS'
from celery import Celeryfrom celery.result import AsyncResultapp = Celery(    "tasks",     broker="amqp://pd:123456@localhost:5672/pdvhost",    backend="redis://:123456@localhost:6379/0")result = AsyncResult(id="470d5f45-42eb-4b0c-bd38-06b85fa5599b", app=app)print(result.get())  # 20
View Code

三、配置

官方文档,配置:

像上面简单示例中,要想添加配置,则可以直接在应用程序设置配置:

app.conf.task_serializer = "json"

如果您一次配置多个设置,则:

app.conf.update(    task_serializer="json",    accept_content=["json"],    result_serializer="json",    timezone="Europe/Oslo",    enable_utc=True,)

对于大型项目,建议使用专用配置模块。因为项目复杂,最好做到程序的解耦,所以将配置保存在集中位置是一个非常好的选择,一般默认 celeryconfig.py 模块是用来保存配置的,你也可以使用自己定义的名字,然后通过调用 app.config_from_object() 方法告诉 Celery 实例使用配置模块:

app.config_from_object("celeryconfig")# 或者from . import celeryconfigapp.config_from_object(celeryconfig)

四、在项目中使用Celery

项目布局:

方案选择:

  1. RabbitMQ作为消息代理。不选Redis是因为如果Redis发生意外,会造成数据丢失等后果。
  2. Msgpack做序列化。Msgpack是一个二进制的类json的序列化方案,它比json的数据结构更小,传输更快。
  3. Redis做结果存储。
pip3 install msgpack
########## celeryapp.py ##########from celery import Celeryfrom . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"])app.config_from_object(celeryconfig)if __name__ == "__main__":    app.start()########## tasks.py ##########from .celeryapp import app@app.taskdef add(x, y):    return x+y@app.taskdef mul(x, y):    return x*y########## celeryconfig.py ########### 使用RabbitMQ作为消息代理broker_url = "amqp://pd:123456@114.116.50.214:5672//"# # 把任务结果存在了Redisresult_backend = "redis://:123456@114.116.50.214:6379/0"# 任务序列化和反序列化使用msgpack方案task_serializer = "msgpack"# 读取任务结果一般性能要求不高,所以使用了可读性更好的jsonresult_serializer = "json"# 任务过期时间result_expires = 60*60*24# 指定接受的内容类型accept_content = ["json", "msgpack"]
代码示例

五、在后台运行worker

在生产中,我们需要在后台运行worker,官方文档中有详细描述。

守护程序脚本使用celery multi命令在后台启动一个或多个worker:

# 启动worker后台运行celery multi start w1 -A proj.celeryapp -l infocelery multi start w2 -A proj.celeryapp -l infoPS:如果使用的是默认的celery.py,那么直接proj即可# 重启celery multi restart w1 -A proj -l info# 停止celery multi stop w1 -A proj -l info# 确保退出之前完成所有当前正在执行的任务celery multi stopwait w1 -A proj -l info

默认情况下,它会在当前目录下创建的pid和日志文件,为了防止多个worker在彼此之上启动,最好将这些文件放在专用目录中:

mkdir /var/run/celerymkdir /var/log/celerycelery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log

六、指定队列传送任务

官方文档:

在 celeryconfig.py 中加入以下配置:

# 路由键以 task. 开头的消息都进default队列# 路由键以 web. 开头的消息都进web_tasks队列task_queues = (    Queue("default", routing_key="task.#"),    Queue("web_tasks", routing_key="web.#"),)# 默认的交换机名字为taskstask_default_exchange = "tasks"# 设置默认交换类型为topictask_default_exchange_type = "topic"# 默认的路由键是 task.defaulttask_default_routing_key = "task.default"# 要将任务路由到web_tasks队列,可以在task_routes设置中添加条目task_routes = {    # tasks.add的消息会进入web_tasks队列    "proj.tasks.add": {        "queue": "web_tasks",        "routing_key": "web.add",    },}

其他代码与上面 四 中的相同。

启动worker,指定该worker工作于哪个队列:

# 该worker只会执行web_tasks队列中的任务celery -A proj.celeryapp worker -Q web_tasks -l info

七、定时任务

官方文档:

Celery支持定时任务,设定好任务的执行时间,Celery就会定时自动帮你执行, 这个定时任务模块叫 celery beat。

函数版tasks.py:

from celery import Celeryfrom celery.schedules import crontabapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//", backend="redis://:123456@localhost:6379/0")app.conf.timezone = "Asia/Shanghai"@app.on_after_configure.connectdef setup_periodic_tasks(sender, **kwargs):    # 每5秒执行一次 test("Hello")    sender.add_periodic_task(5.0, test.s("Hello"), name="every-5s")    # 每10秒执行一次 test("World")    sender.add_periodic_task(10.0, test.s("World"), name="every-10s", expires=5)    # 每周一早上 7:30 执行一次 test("Happy Mondays!")    sender.add_periodic_task(        crontab(hour=7, minute=30, day_of_week=1),        test.s("Happy Mondays!"),    )@app.taskdef test(arg):    print(arg)
View Code
celery -A tasks worker -l infocelery -A tasks beat -l info

配置版:

########## celeryapp.py ##########from celery import Celeryfrom . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"])app.config_from_object(celeryconfig)if __name__ == "__main__":    app.start()########## celeryconfig.py ##########broker_url = "amqp://pd:123456@114.116.50.214:5672//"result_backend = "redis://:123456@114.116.50.214:6379/0"task_serializer = "msgpack"result_serializer = "json"result_expires = 60*60*24accept_content = ["json", "msgpack"]timezone = "Asia/Shanghai"from celery.schedules import crontabbeat_schedule = {    "every-10s": {        "task": "proj.tasks.add",        "schedule": 10.0,        "args": (10, 10)    },    "every-monday-morning-7:30": {        "task": "proj.tasks.mul",        "schedule": crontab(hour=7, minute=30, day_of_week=1),        "args": (10, 10)    }}########## tasks.py ##########from .celeryapp import app@app.taskdef add(x, y):    return x+y@app.taskdef mul(x, y):    return x*y
View Code
celery -A proj.celeryapp worker -l infocelery -A proj.celeryapp beat -l info 

八、在Django中使用celery

发布任务

项目布局:

import osfrom celery import Celeryos.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")app = Celery("mysite")app.config_from_object("django.conf:settings", namespace="CELERY")app.autodiscover_tasks()@app.task(bind=True)def debug_task(self):    print("Request: {0!r}".format(self.request))
celeryapp.py
from .celeryapp import app as celery_app__all__ = ["celery_app"]
__init__.py

settings.py,更多设置参考:

#for celeryCELERY_BROKER_URL = "amqp://pd:123456@114.116.50.214:5672//"CELERY_RESULT_BACKEND = "redis://:123456@114.116.50.214:6379/0"

在app里的tasks.py里编写任务:

from celery import shared_task@shared_taskdef add(x, y):    return x+y@shared_taskdef mul(x, y):    return x*y

在views里调用celery task:

from django.shortcuts import HttpResponsefrom app01 import tasksdef test(request):    result = tasks.add.delay(100, 100)    return HttpResponse(result.get())

定时任务

1、安装 django-celery-beat

pip3 install django-celery-beat

2、在settings.py中设置

INSTALLED_APPS = [    ...,    'django_celery_beat',]

3、进行数据库迁移,以便创建定时任务所需的表

python3 manage.py migrate

4、开始监测定时任务

celery -A mysite.celeryapp beat -l info -S django

5、在django-admin界面设置定时任务

 

转载于:https://www.cnblogs.com/believepd/p/10643392.html

你可能感兴趣的文章
《众妙之门——网页设计专业之道》——2.4 总结
查看>>
MySQL sql_mode 说明(及处理一起 sql_mode 引发的问题)
查看>>
Java 注解详解 (annotation)
查看>>
鹰眼跟踪、限流降级,EDAS的微服务解决之道
查看>>
秘籍:程序猿该如何实力撩妹
查看>>
网络编程socket基本API详解
查看>>
API接口设计 OAuth2.0认证
查看>>
Mysql5.6的1755错误解决办法
查看>>
在命令行中运行“mvn compile”因为中文报错
查看>>
Docker的技术不再局限于测试和开发
查看>>
技术干货:工欲善其事,必先利其器 阿里云数据库系列谈之一
查看>>
禁用ViewState
查看>>
深入理解Java HashMap实现原理
查看>>
阿里云备案获取服务号
查看>>
深入理解Python中的__builtin__和__builtins__
查看>>
YII AJAX registerScript
查看>>
ARC forbids explicit message send of 'retainCount'
查看>>
redis单机安装
查看>>
golang内存分配
查看>>
手把手教你----使用Nuget管理自己的项目库
查看>>