zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Python分布式任务队列Celery,Django中如何实现异步任务和定时任务

2023-02-18 16:23:11 时间

问题引入

个人的某Django项目需要实现在后台发送邮件,发送邮件时间比较长,需要在后台做大量的数据运算,包括去做深度学习生成报告,以及做大量数据的处理。由于Python中GIL全局锁的限制,单是使用多线程threading,无法充分利用CPU,这里需要一个工具实现异步方式来进行分配管理任务。

Celery简介

celery是一个分布式的任务队列,把大量任务分布到不同的机器上去,通过集群来运行大量的任务。celery由Python编写,可通过暴露HTTP方式进行任务交互以及与其他语言集成开发。

安装celery

$ pip install -U Celery

安装需要的依赖包

$ pip install "celery[librabbitmq]"
$ pip install "celery[librabbitmq,redis,auth,msgpack]"
  • celery[librabbitmq]:使用librabbitmq库
  • celery[redis]:使用Redis进行消息传输或后端结果存储
  • celery[auth]:使用auth进行鉴权
  • celery[msgpack]:使用msgpack序列化

更多依赖参考官网https://docs.celeryproject.org/en/latest/getting-started/introduction.html

到此,celery的环境已搭建起来,下一步需要选择一个消息队列或者消息代理,项目中使用的是redis。

启动redis

$ docker run -d -p 6379:6379 redis

创建celery任务

在项目根目录下创建celery目录,新建tasks.py脚本。

from celery import Celery
 
# 第一个参数是当前脚本的名称,第二个参数是broker的服务地址
app = Celery('tasks', backend='redis://127.0.0.1', broker='redis://127.0.0.1')
 
 
@app.task
def add(x, y):
    return x + y

celery的实例名称叫做tasks,broker传递了两个参数:backend存储,把每一个异步任务运行的结果存储在什么地方,可以使用redis、数据库,也可以使用RPC的消息队列去传到外部消息队列中存储;broker为存储任务系统的代理,也是个消息队列。这里我都选用了redis。

运行Celery Worker服务器

$ celery -A tasks worker --loglevel=INFO

win10下需要在后面加上-P eventlet。

创建测试脚本run_tasks.py

from tasks import add
 
# 任务提交后变为ready状态
result = add.delay(4, 4)
print(f'Is task ready: {result.ready()}')
 
# 等待一秒钟,一秒内运行完就会取到结果,超过则返回超时状态
run_result = result.get(timeout=1)
print(f'task result: {run_result}')

运行脚本:

监控和管理

  • 使用命令行监控:

status:列出此集群中的活动节点

$ celery -A proj status

inspect:查看任务执行情况

如:列出活动任务

$ celery -A proj inspect active
  • 使用Flower web监控工具

可以方便看到任务的执行进展、执行历史和执行结果,还可以远程控制。

pip安装:(安装时先断开celery服务)

$ pip install flower

启动celery的flower,默认5555端口:

celery -A tasks flower --broker=redis://localhost:6379/0

访问网址可以看到worker节点的任务运行的详细信息。

Django与Celery集成:异步任务

Celery4之后的版本已支持Django,不需要安装额外的package。

整体架构如图,首先从用户侧发起请求到Django,Django产生任务并将任务发给Celery,Celery中有对应的消息队列和代理Broker去接受这个任务并将它存起来。Celery中的Worker会去检索队列中的任务,将任务一个个执行,执行完后存下来,这时我们也能在系统中拿到结果,包括在Flower中能够监控到任务的状态。

在项目的主应用下创建创建一个celery.py,由于将开发和生产配置做了分离,environ.setdefault使用settings.base,app为应用名称。

import os
 
from celery import Celery
 
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.base')
 
app = Celery('recruitment')
 
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
 
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
 
 
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

在主应用的__init__.py下配置初始化。

from __future__ import absolute_import, unicode_literals
# 防止导入的包有命名冲突
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
 
__all__ = ('celery_app',)

再到项目的settings的本地配置下加上celery配置。

CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = "Asia/Shanghai"
CELERYD_MAX_TASKS_PRE_CHILD = 10
CELERYD_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_work.log")
CELERYBEAT_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_beat.log")

新建发送消息tasks.py,这里以使用钉钉发送群消息为例:

from __future__ import absolute_import, unicode_literals
 
from celery import shared_task
from .dingtalk import send
 
@shared_task
def send_dingtalk_message(message):
    send(message)

在后台管理员admin.py中修改send方法为异步任务,即改为send_dingtalk_message.delay()。

启动Celery、Flower和redis来管理异步任务。

Django与Celery集成:定时任务

在Celery中用一个叫做Beat的调度进程来管理和调度定时任务。调用关系为:Beat首先调用Scheduler去找到任务,然后检测任务的执行状态,如果这个任务到了它的执行时间就会去执行,执行完会将任务的状态存储下来。存储方式有两种:一种是直接把任务执行状态存储到文件中,这个是默认的Default PersistentStorage(Scheduler);另一种方式是将执行的状态和任务信息存在数据库里。建议使用数据库,便于维护。

这里将上述服务的启动编写为shell脚本形式:

启动Django项目.bat:

# django-admin compilemessages
## if local config file does not exist, clond one:
test -f settings/local.py || echo "=== warning: local.py does not exist, will initialize the file, please update the configs ==="
test -f settings/local.py || cp settings/production.py settings/local.py
test -f settings/local.py && sed -i '' 's/DEBUG = False/DEBUG = True/g' settings/local.py 2> /dev/null
 
# synchronous web server for development:
# --settings=settings.local
python3 manage.py runserver 0.0.0.0:8000 $server_params
 
# for async web server:
# export DJANGO_SETTINGS_MODULE=settings.local
# uvicorn recruitment.asgi:application --workers 3

启动celery.sh:

# 启动 recruitment 这个 package 的时候,会运行 __init__.py
# __init__.py 里面初始化了 django 的配置
DJANGO_SETTINGS_MODULE=settings.production celery -A recruitment worker -l INFO

启动flower.sh:

DJANGO_SETTINGS_MODULE=settings.production celery -A recruitment flower
  • 安装beat:
pip install django-celery-beat
  • 注册django_celery_beat到settings下的APP中
  • 数据库迁移
python manage.py makemigrations
python manage.py migrate
  • 使用DatabaseScheduler启动beat或者在配置中设置beat_scheduler,脚本代码如下
# DJANGO_SETTINGS_MODULE=settings.local celery -A recruitment beat
DJANGO_SETTINGS_MODULE=settings.local celery -A recruitment beat --scheduler django_celery_beat.schedulers:DatabaseScheduler

管理定时任务的几种方法:

  • 在Admin后台添加管理定时任务Periodic Tasks
  • 系统启动时自动注册定时任务

在主应用下新建celery.py

from celery.schedules import crontab
from recruitment.tasks import add
 
 
# 使用Django信号
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='hello every 10')
 
    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)
 
    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )
 
 
@app.task
def test(arg):
    print(arg)
 
 
app.conf.timezone = "Asia/Shanghai"
  • 直接设置应用的beat_schedule
from celery.schedules import crontab
from recruitment.tasks import add
 
 
app.conf.beat_schedule = {
    'add-every-10-seconds': {
        'task': 'recruitment.tasks.add',
        'schedule': 10.0,
        'args': (16, 4,)
    },
}
  • 运行时添加定时任务
# 运行时动态添加定时任务
import json
from django_celery_beat.models import PeriodicTask, IntervalSchedule
 
# 先创建定时策略,每10秒钟运行一次
schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS, )
 
# 再创建任务
task = PeriodicTask.objects.create(interval=schedule, name='say welcome 2021', task='recruitment.celery.test',
                                   args=json.dumps(['welcome']), )