zl程序教程

您现在的位置是:首页 >  其他

当前栏目

在项目中使用Celery

项目 Celery 使用
2023-09-11 14:19:18 时间

项目结构

项目的结构如下在这里插入图片描述

celery.py

from celery import Celery

# 创建celery实例
app = Celery('demo')
app.config_from_object('project.celeryconfig')

# 自动搜索任务
app.autodiscover_tasks(['project'])

celeryconfig.py

broker_url = 'redis://127.0.0.1:6379/5'
result_backend = 'redis://127.0.0.1:6379/6'

tasks.py

from project.celery import app as celery_app


# 创建任务函数
@celery_app.task
def my_task1():
    print("任务函数(my_task1)正在执行....")


@celery_app.task
def my_task2():
    print("任务函数(my_task2)正在执行....")


@celery_app.task
def my_task3():
    print("任务函数(my_task3)正在执行....")

启动项目

点击
在这里插入图片描述

说明

  1. 搜寻到的任务 在这里插入图片描述
  2. 配置信息在这里插入图片描述

路由

官网链接

  1. 当任务较多的时候我们可以指定某个队列专门处理特定的事情
  2. 比如一个队列处理发送邮件,一个队列处理上传文件

启动队列

我们启动两个队列Queue1Queue2,进入Linux系统,注意我们要启动两个终端,分别输入以下命令

celery -A project worker --loglevel=info -Q queue1
celery -A project worker --loglevel=info -Q queue2

在这里插入图片描述

  • 这样我们就启动了两个队列来干活,这样相当于启动了两个worker服务器,一个worker服务器负责一个队列(本次演示使用方式)
  • 我们还可以用一个worker服务器处理两个队列的事,输入以下命令
celery -A project worker --loglevel=info -Q queue1,queue2

路由配置

所谓路由配置,也就是配置哪个任务到哪个队列(什么队列干什么活)

  1. 打开celeryconfig.py文件,进行如下配置
task_routes=({
    'project.tasks.my_task1': {'queue': 'queue1'},
    'project.tasks.my_task2': {'queue': 'queue1'}
    })
  1. 在项目下新建test.py写上如下代码

调度任务

delay

  1. 我们另起一个终端,可以使用delay来进行任务的调度
  2. 输入以下语句
from project.tasks import my_task1
my_task1.delay()

我们就会发现任务执行了在这里插入图片描述

apply_async

使用此方法进行任务调度,可以指定队列和任务执行的延迟时间
在这里插入图片描述

my_task2.apply_async((),queue="queue1",countdown=10)
  • 当前函数指的就是将任务发送到queue1队列,10s后执行

周期任务

这里会使用到celery beat调度器,他可以使任务周期执行,官网的相关说明,点击

普通周期任务

假设每5S执行一次任务3,那么我们可以再celeryconfig.py文件下增加如下配置

beat_schedule = {
    "every-5-seconds": {
        "task": "project.tasks.my_task3",
        "schedule": 5.0
    }
}

启动Linux系统,输入命令

celery -A project worker --loglevel=info --beat

当我们启动之后,就会发现任务开始每隔5s自动执行了在这里插入图片描述

定时周期任务

如果我们想每天或者每周的几点执行一次任务,那么这个时候就用到了cron任务,可以在celeryconfig.py中增加如下配置信息

from celery.schedules import crontab
beat_schedule = {
    "add-every-monday-morning": {
        "task": "project.tasks.my_task2",
        "schedule": crontab(hour=7, minute=30, day_of_week=1)
    }
}

那么这里就表示的是每周一早晨7:30执行任务3

任务时间

celerybeat-schedule任务执行时间的结果保存在celerybeat-schedule系列文件中
在这里插入图片描述