Django配置websocket
2023-09-14 08:57:24 时间
# setting.py 配置
INSTALLED_APPS = [
'···',
'channels',
'···',
]
ASGI_APPLICATION = 'face_safe.asgi.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [(os.getenv('REDIS_SERVER_HOST', '192.168.2.175'), int(os.getenv('REDIS_SERVER_PORT', '6379')))],
},
},
}
# setting.py 同级新增asgi.py
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from apps.telecom.routing import websocket_urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'face_safe.settings')
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': AuthMiddlewareStack(
URLRouter(
websocket_urlpatterns
)
)
})
# routing.py
# -*- coding: utf-8 -*-
"""
@author: Mr_zhang
@software: PyCharm
@file: routing.py
@time: 2021/11/23 下午2:06
"""
from django.urls import path
from apps.telecom import consumer
websocket_urlpatterns = [
path('v1/ws/telecom/roll_call/', consumer.RollCallConsumer.as_asgi()),
]
# consumer.py
# -*- coding: utf-8 -*-
"""
@author: Mr_zhang
@software: PyCharm
@file: consumer.py
@time: 2021/11/23 下午2:06
"""
import json
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
class RollCallConsumer(AsyncJsonWebsocketConsumer):
group_name = "face_safe"
async def connect(self):
await self.channel_layer.group_add(
self.group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)
async def receive(self, text_data=None, bytes_data=None, **kwargs):
await self.send(text_data)
async def send(self, text_data=None, bytes_data=None, close=False):
await super(RollCallConsumer, self).send(text_data, bytes_data, close)
async def push_messages(self, message):
message_type = message['message_type']
message = message['message']
data = {
'message_type': message_type,
'message': message
}
await self.send(json.dumps(data, ensure_ascii=False))
def send_message(group_name="face_safe", message=None, message_type=None):
"""
WS广播: 外部函数调用
:param message_type: 消息类型
:param group_name: 组名称
:param message: 消息内容
:return:
"""
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'push.messages', # push_messages函数
'message': message,
'message_type': message_type
}
)
# 函数调用广播推送
consumer.send_message(
message_type='record',
message=monitor_serializers.MonitorDiscoverSerializers(instance).data
)
可通过在线websocket测试: http://www.jsons.cn/websocket/
相关文章
- Django之setting文件
- Django-配置celery
- Django-media配置
- 第四百零二节,Django+Xadmin打造上线标准的在线教育平台—生产环境部署,uwsgi安装和启动,nginx的安装与启动,uwsgi与nginx的配置文件+虚拟主机配置
- 第三百八十一节,Django+Xadmin打造上线标准的在线教育平台—xadmin全局配置
- 第三百七十八节,Django+Xadmin打造上线标准的在线教育平台—django自带的admin后台管理介绍
- Mac下django简单安装配置步骤
- Django 的 CSRF 保护机制(转)
- django框架进阶-分页-长期维护
- Django管理工具django-admin.py创建项目
- 转一篇:文档笔记之Django QuerySet
- python-django项目基础-haystack&whoosh&jieba_20191124
- python-django-linux上mysql的安装和配置_20191124
- python的前后端分离(一):django+原生js实现get请求
- django基础-02:虚拟环境
- 基于django的自定义简单session功能
- django中配置使用celery
- Django 3.2.5博客开发教程:基础配置
- Python Django 基于通用视图实现图片的显示功能代码示例
- Python Django 配置URL的方式(url传参方式)
- Python:Django开发环境与生产环境的配置
- Django学习14 -- 文件(文本/图像)上传
- Django===django工作流
- Python-Django使用MemcachedCache缓存
- Django 发送email配置详解及各种错误类型
- django 多数据库配置
- Django Nginx+uwsgi 安装配置
- Django(1)快速体验
- Django(18)-ORM常用的查询函数详解及实例演示
- Django实现导航栏二级目录
- 17 - vulhub - Django GIS SQL注入漏洞(CVE-2020-9402)
- Django报错:No ‘Access-Control-Allow-Origin‘ header is present on the requested resource.