超实用 Demo:使用 FastAPI、Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流
2023-06-13 09:13:59 时间
异步任务,是 Web 开发中经常遇到的问题,比如说用户提交了一个请求,虽然这个请求对应的任务非常耗时,但是不能让用户等在这里,通常需要立即返回结果,告诉用户任务已提交。任务可以在后续慢慢完成,完成后再给用户发一个完成的通知。
今天分享一份代码,使用 Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流,你可以修改 task.py 来实现你自己的异步任务。
架构图如下:
其中 Celery 来执行异步任务,RabbitMQ 作为消息队列,MongoDB 存储任务执行结果,FastAPI 提供 Web 接口。
以上所有模块均可使用 Docker 一键部署。
下面为 Demo 使用方法:
1、确保本机已安装 Docker、Git
2、下载源代码:
git clone https://github.com/aarunjith/async-demo.git
3、部署并启动:
cd async-demo
docker compose up --build
4、启动一个异步任务:
$ curl -X POST http://localhost:8080/process
任务会发送到消息队列,同时会立即返回一个任务 id:
❯ curl -X POST http://localhost:8080/process
{"status":"PENDING","id":"a129c666-7b5b-45f7-ba54-9d7b96a1fe58","error":""}%
5、查询任务状态:
curl -X POST http://localhost:8080/check_progress/<task_id>
任务完成后的返回结果如下:
❯ curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58
{"status":"SUCEESS","data":"\"hello\""}%
代码目录结构如下:
其中 app.py 如下:
from fastapi import FastAPI
from celery.result import AsyncResult
from tasks import start_processing
from loguru import logger
from pymongo import MongoClient
import uvicorn
# Lets create a connection to our backend where celery stores the results
client = MongoClient("mongodb://mongodb:27017")
# Default database and collection names that Celery create
db = client['task_results']
coll = db["celery_taskmeta"]
app = FastAPI()
@app.post('/process')
async def process_text_file():
'''
Process endpoint to trigger the start of a process
'''
try:
result = start_processing.delay()
logger.info(f'Started processing the task with id {result.id}')
return {
"status": result.state,
'id': result.id,
'error': ''
}
except Exception as e:
logger.info(f'Task Execution failed: {e}')
return {
"status": "FAILURE",
'id': None,
'error': e
}
@app.post('/check_progress/{task_id}')
async def check_async_progress(task_id: str):
'''
Endpoint to check the task progress and fetch the results if the task is
complete.
'''
try:
result = AsyncResult(task_id)
if result.ready():
data = coll.find({'_id': task_id})[0]
return {'status': 'SUCEESS', 'data': data['result']}
else:
return {"status": result.state, "error": ''}
except Exception as e:
data = coll.find({'_id': task_id})[0]
if data:
return {'status': 'SUCEESS', 'data': data['result']}
return {'status': 'Task ID invalid', 'error': e}
if __name__ == "__main__":
uvicorn.run("app:app", host='0.0.0.0', port='8080')
如果要实现自己的任务队列,就修改 task.py 来添加自己的异步任务,可以整合到自己的项目中。
最后的话
Celery 是异步任务非常好用的工具,推荐阅读分布式异步任务队列神器之-Celery,一文搞定 celery 任务远程调用。RabbitMQ 消息队列可以确保服务重新启动时数据也不丢失,因此这个 Demo 有很强的实用价值,如果觉得有帮助,可以转发、关注、讨论。
相关文章
- mongodb 基本概念
- MongoDB下根据数组大小进行查询的方法
- MongoDB安装及其配置必备实操指南(mongodb安装和配置)
- MongoDB:计算时间差的利器(mongodb时间差)
- MongoDB查询超快:极速体验(mongodb查询速度)
- MongoDB副本集部署解决方案(mongodb副本集部署)
- MongoDB驱动:快速下载体验(mongodb驱动下载)
- 历史悠久的MongoDB:存储着未知历史记载(历史数据mongodb)
- 探索MongoDB:技术原理及应用(mongodb技术原理)
- 深入探究:MongoDB的特点与用途简介(mongodb是什么数据库)
- MongoDB的端口配置详解(mongodb端口配置)
- 编译MongoDB源代码编译:简单快速的实现方式(mongodb源代码)
- 深入浅出:MongoDB删除表数据实战(mongodb删除表数据)
- 云上搭建MongoDB平台,实现数据云存储(云平台mongodb)
- MongoDB与关系型数据库的比较分析(mongodb对比)
- MongoDB实现文件存储功能:高效、快速的文件存储方式(mongodb存储文件)
- Exploring the Architecture of MongoDB: A Visual Guide(mongodb架构图)
- 提升Mongodb集群性能,大幅度提升数据处理效率(mongodb集群性能)
- MongoDB数据安全:保护隐私加密数据(mongodb数据加密)
- 无法完成要求,mongodb 8 是什么意思需要进一步的信息才能给出标题。(mongodb8)
- 使用MongoDB实现高效集群管理(mongodb 集群管理)
- 【深入探索MongoDB的知识】(深入理解mongodb)