zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

celery 异步队列

2023-09-27 14:25:19 时间

实现方法一

django项目应用celery,主要有两种任务方式,一是异步任务(发布者任务),一般是web请求,二是定时任务。

celery组成请看celery介绍_宠乖仪的博客-CSDN博客

Celery最好与经常被称为消息代理的存储解决方案一起结合使用。和Celery经常一起使用的消息代理是Redis,它是一个基于内存的持久化的键-值数据存储系统

Redis还用作Celery队列返回的结果的存储,以便Celery队列的消费者稍后进行检索,Redis用于存储应用程序代码生成的消息,这些消息描述了Celery任务队列中要完成的工作。

 

1、安装依赖库

pip3 install Celery

pip3 install redis

pip3 install eventlet

 

2、主项目下的添加:

celery.py

import os
import django
from celery import Celery
from django.conf import settings

# 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错
# django_rest 是当前项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_rest.settings')
django.setup()

celery_app = Celery('django_rest')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

 

在主项目的init.py中,添加如下代码:

# celery异步任务
from .celery import celery_app

__all__ = ['celery_app']

settings.py

在配置文件中配置对应的redis配置:

# 异步调用redis配置
# Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/0'
# BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER='json'
CELERY_ACEPT_CONTENT=['applocation/json']

 

3、应用下的添加

tasks.py

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from time import sleep
from django_rest.celery import celery_app

from celery.result import AsyncResult


@celery_app.task
def add(x,y):
    sleep(5)
    print("1111111111111111111111111111")
    return x;

@shared_task      ----说是用这个其他应用也可以调用??? 
def add3(x,y):
    sleep(5)
    print("1111111111111111111111111111")


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

 

调用任务和查询任务状态

views.py

from django.http import HttpResponse, JsonResponse
import json
from .tasks import *

//调用异步任务
def sendCelery(request):
    res=add3.delay("凯宾斯基",1)
    # print(res.info())
    return JsonResponse({'status':'successful','task_id':res.task_id})
 
from celery import result

//查询异步任务状态
def get_result_by_taskid(request):
    task_id = request.GET.get('task_id')
    ar = result.AsyncResult(task_id)
    if ar.ready():
        return JsonResponse({'status': ar.state, 'result': ar.get()})
    else:
        return JsonResponse({'status': ar.state, 'result': ''})

 

启动celery

//django_rest项目名--要cd要项目主目录下执行
celery  -A django_rest worker  -l debug -P eventlet

 

参考: https://blog.csdn.net/qq_53582111/article/details/120207740v  (里面还有教定时任务,用到再看)

 

 

再次测试验证

 

celery 中文手册 https://www.celerycn.io/

参考: https://www.bilibili.com/video/BV1Di4y1d7AD?spm_id_from=333.337.search-card.all.click&vd_source=caabcbd2a759a67e2a3de8acbaaf08ea  --做笔记

 

tasks.py

from celery_tasks.main import app
from time import sleep



@app.task
def send(a,b):
    print("调用开始")
    # sleep(1)
    print("---调用结束---")
    return "12312"

config.py

# 代理人 这里用redis
broker_url="redis://127.0.0.1:6379/9"

main.py

from celery import Celery
import celery_demo
import os


# 读取Django的配置
os.environ["DJANGO_SETTINGS_MODULE"]="celery_demo.settings"

# 创建celery对象,并指定配置
app=Celery('django_rest',backend="redis://127.0.0.1:6379/9")

# celery项目配置: 指定任务存储到哪里去
app.config_from_object('celery_tasks.config')

# 加载可用的任务-要告诉django,tsak的任务目录
app.autodiscover_tasks([
    "celery_tasks.sms"
])

 

views.py

from django.shortcuts import render
from django.views import View
from celery_tasks.sms.tasks import send
from django import http

# Create your views here.

class SendSMSView():
    def get(request):
        # # 加上delay才会异步调用方法
        # # 正常调用为send("张三",1212)
        result=send.delay("张三","1212")
        # print(result)
        return http.HttpResponse("任务id:%s"% result)

 

 

 

 

 

 

 

 

 

 

 

启动命令

celery  -A celery_tasks.main  worker  -l info

or
celery  -A celery_tasks.main   worker  -l debug -P eventlet

 

异步走的print信息, 只能在 celery 这个命令终端里面看,return的可以在redis里面看

 

https://www.cnblogs.com/zivli/p/11517797.html(还有监控)