物联网视频监控服务(二)-硬件编码篇
2023-04-18 14:23:30 时间
概述
此篇文章主要描述 ESP32-CAM硬件上的软件开发部分;
功能点
- 采集视频数据,并通过UDP协议上传到服务器功能;
- 接收emqx的topic消息,动态调整 视频帧 获取及发送频率;
- WIFI连接成功 通过闪光灯提示;
- 通过watch dog 实现当程序宕机或假死自动重启;
使用技术
- 语言:python
- 软件或框架:micropython+umqtt(MQTT协议的实现)
- IDE(代码编译器): Thonny+Pycharm
- 应用层协议:MQTT
- 传输层协议:TCP,UDP
- 硬件:ESP32-CAM开发套件
准备工作
Thonny相关
概念
- 固件: 指写入到硬件中的程序,写入后就被固定,无法更改,除非重新写入新的固件; 固件被存储在硬件的内存中,包括后续的python脚本,注意文件大小不能超过内存限制;
- 固件和软件区别之一:软件是运行在操作系统之上,固件运行在硬件之上,如BIOS系统;
硬件开发流程
- 先为 硬件准备好运行环境,这里选择micropython语言作为运行环境,则先通过thonny安装对应固件即可; 固件地址请 点击链接 进行下载;
- 然后 开始写python脚本,通过 thonny将脚本在硬件上执行即可;
- 如上两步请 点击链接 按照教程即可完成;
MicroPython知识点
- 在固件安装到硬件后,通过thonny能看到 硬件上有个boot.py文件,此文件是 硬件通电后自动执行,也就是 程序的入口文件,其他的python脚本,只要在此文件中加入import语句 即可 通电自动执行; 注意在脚本功能完善后,再加入到此文件,否则要是脚本执行异常,可能导致连接thonny可能失败,需要 重新刷固件,较麻烦
- WDT: 俗称 看门狗,要在规定时间内喂狗,否则系统会自动重启;作用是 防止程序运行崩溃后,无法自启;
- umqtt: 是mqtt协议在MicroPython上的实现库; 这里主要用来连接到实现mqtt协议的emqx软件服务,然后尝试接收消息,再执行回调函数;
- camera:此库是摄像头操作的库,暂未找到相关文档链接;
- MicroPython基础知识中文版,基本语法和python3相同,只是标准库有所不同,如 json库,这里是ujson;
- MicroPython对应的ESP32开发板的相关知识
- micropython在开发时需要的第三方库(就是python包或脚本) 可以网上寻找,然后上传到硬件中即可使用,如 umqtt;
系统设计
概念
- 视频流传输:视频是由一帧一帧的图片组成,所以 这里的视频流传输其实就是 摄像头捕捉图片然后传输图片;到 服务端后再处理转为视频进行磁盘存储;
硬件的数据流处理
主要代码部分
cam_send.py
import socket
import time
import camera
import esp32
import network
from machine import WDT
from flash_light import control_flash
from log_action import LogWriter
from umqtt.simple import MQTTClientRobust
log = LogWriter()
class CamSend:
def __init__(self, wifi_name, wifi_password, udp_server_ip, udp_server_port, mqtt_server, mqtt_port, mqtt_user,
mqtt_password):
"""
:param wifi_name:
:param wifi_password:
:param udp_server_ip:
:param udp_server_port:
:param mqtt_server:
:param mqtt_port:
:param mqtt_user:
:param mqtt_password:
"""
# wifi配置
self.wifi_name = wifi_name
self.wifi_password = wifi_password
# udp传输图片配置
self.udp_server_ip = udp_server_ip
self.udp_server_port = udp_server_port
# mqtt配置
self.mqtt_server = mqtt_server
self.mqtt_port = mqtt_port
self.mqtt_user = mqtt_user
self.mqtt_password = mqtt_password
# 发送图片间隔时间
self.send_img_sleep_time = 0.1
# mqtt订阅的topic
self.topic = b"camera_frq"
def mqtt_client_init(self):
"""
mqtt客户端初始化,连接到mqtt服务器
:return:
"""
log.info("尝试连接mqtt")
self.mqtt_client = MQTTClientRobust("camera_client", self.mqtt_server, self.mqtt_port, self.mqtt_user,
self.mqtt_password)
self.mqtt_client.set_callback(self.topic_subscribe)
self.mqtt_client.connect(False)
log.info("连接成功")
self.mqtt_client.subscribe(self.topic)
def connect_wifi(self):
"""
连接wifi,注意只能连接2.4G HZ的wifi,否则会卡住,且需要拔插操作才能重新操作
:return:
"""
log.info('连接wifi')
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
is_connected = wlan.isconnected()
if not is_connected:
log.info('尚未连接wifi,现在尝试连接...')
# connect函数是异步的,所以要手动的等待一段时间
wlan.connect(self.wifi_name, self.wifi_password)
time.sleep(5)
# 循环3次,失败后不再连接,并报错
count = 0
while not wlan.isconnected():
control_flash(0.5)
count += 1
log.info(f"尝试连接第{count}次")
time.sleep(1)
if count > 1:
control_flash(0.1)
raise Exception("wifi 连接失败,请检测修改")
control_flash(1)
log.info('连接wifi成功!')
def topic_subscribe(self, topic, msg):
"""
消息订阅处理函数
:param topic:
:param msg:
:return:
"""
if topic != self.topic:
return
# 摄像头发送图片频率 0:高 1:低
if int(msg) == 1:
log.info("图片发送频率降低")
self.send_img_sleep_time = 0.5
else:
log.info("图片发送频率提高")
self.send_img_sleep_time = 0.1
def camera_init(self):
"""
摄像头初始化
:return:
"""
log.info("摄像头初始化...")
try:
camera.init(0, format=camera.JPEG)
except Exception:
camera.deinit()
camera.init(0, format=camera.JPEG)
def camera_config(self):
"""
摄像头参数配置
:return:
"""
log.info("摄像头参数配置")
# 上翻下翻
camera.flip(0)
# 左/右
camera.mirror(1)
# 分辨率
camera.framesize(camera.FRAME_HVGA) # 像素水平:480*320
# camera.framesize(camera.FRAME_VGA) # 像素水平:640*480
# camera.framesize(camera.FRAME_SVGA) # 像素水平:800*600
# 选项如下:
# FRAME_96X96 FRAME_QQVGA FRAME_QCIF FRAME_HQVGA FRAME_240X240
# FRAME_QVGA FRAME_CIF FRAME_HVGA FRAME_VGA FRAME_SVGA
# FRAME_XGA FRAME_HD FRAME_SXGA FRAME_UXGA FRAME_FHD
# FRAME_P_HD FRAME_P_3MP FRAME_QXGA FRAME_QHD FRAME_WQXGA
# FRAME_P_FHD FRAME_QSXGA
# 有关详细信息,请查看此链接:https://bit.ly/2YOzizz
# 特效
camera.speffect(camera.EFFECT_NONE)
# 选项如下:
# 效果无(默认)效果负效果 BW效果红色效果绿色效果蓝色效果复古效果
# EFFECT_NONE (default) EFFECT_NEG EFFECT_BW EFFECT_RED EFFECT_GREEN EFFECT_BLUE EFFECT_RETRO
# 白平衡
camera.whitebalance(camera.WB_HOME)
# 选项如下:
# WB_NONE (default) WB_SUNNY WB_CLOUDY WB_OFFICE WB_HOME
# 饱和
camera.saturation(0)
# -2,2(默认为0). -2灰度
# -2,2 (default 0). -2 grayscale
# 亮度
camera.brightness(0)
# -2,2(默认为0). 2亮度
# -2,2 (default 0). 2 brightness
# 对比度
camera.contrast(0)
# -2,2(默认为0).2高对比度
# -2,2 (default 0). 2 highcontrast
# 质量
camera.quality(2)
# 10-63数字越小质量越高
def udp_init(self):
"""
UDP连接创建对应socket
:return:
"""
log.info("UDP连接创建对应socket")
self.udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
def send_data(self):
"""
图片数据通过udp协议发送给服务器
:return:
"""
log.info("开始发送数据")
try:
count = 0
while True:
self.mqtt_client.check_msg()
# Then need to sleep
count += 1
# 每隔100s查看一下温度
if count == 100:
# 读取 MCU(微控制器)温度
temperature = esp32.raw_temperature()
# 华氏温度 对 摄氏度换算公式: (华氏温度-32)/1.8
log.info(f"MCU温度:{round((int(temperature) - 32) / 1.8, 2)}°C")
count = 0
# 获取图像数据
buf = camera.capture()
# 向服务器发送图像数据
self.udp_client.sendto(b'cameraSend' + buf, (self.udp_server_ip, self.udp_server_port))
# 根据实际情况调节摄像头发送图片频率,节约资源
time.sleep(self.send_img_sleep_time)
# 给看门狗喂食
self.wdt.feed()
except:
pass
finally:
camera.deinit()
def run(self):
"""
程序运行
:return:
"""
# 开门狗,设置10s不喂食,就停止服务,防止服务假死等情况
self.wdt = WDT(timeout=10000)
self.connect_wifi()
self.mqtt_client_init()
self.camera_init()
self.camera_config()
self.udp_init()
self.send_data()
flash_light.py
# -*- coding: utf-8 -*-
# """
#
# All rights reserved
# create time '2022/11/24 11:31'
#
# """
import time
from machine import Pin
def control_flash(times=1):
"""
闪光灯
:param times:闪烁时长(s)
:return:
"""
# 控制闪光灯
pin33 = Pin(4, Pin.OUT)
time.sleep(times)
print("设置值为1")
pin33.value(1)
time.sleep(times)
print("设置值为0")
pin33.value(0)
time.sleep(times)
项目代码地址
硬件通电即用方式
- 使用Thonny连接硬件后,硬件端的micropython会默认给出一个boot.py文件,将需要执行的入口脚本导入到此文件即可;
相关链接:
相关文章
- 谈谈网站的面包屑
- Linux为指定ip开放/关闭端口
- 强化学习如何做数据分析?新加坡国立等TKDE 2022综述论文
- 解决一个C#中定时任务被阻塞问题
- 斯坦福学生攻破约会软件!用GAN模型女扮男装骗过人脸识别系统
- Rockley Photonics的血糖监测硅光芯片(续)
- 教你查看 jupyter notebook 每个单元格运行时间
- 牛津大学最新调研:AI面临基准危机,NLP集中“攻关”推理测试
- 相关性矩阵图绘制方法大汇总!!
- 一文带你了解机器人是如何通过视觉实现目标跟踪的
- JVM-VisualVM:多合-故障处理工具
- quartus II编译报错:Error: Current license file does not support the XXX device 环境win10,Quartus2出现破解问题
- JVM-JConsole:Java监视与管理控制台(windows)
- 腾讯云服务器快速安装Odoo
- 我,爆肝17天用600行代码拍到400公里之外的国际空间站
- VHDL串口通信 在FPGA开发板上测试 并解决没有识别到下载接口USB_Blaster(No Hardware问题)
- 一文搞懂 | 内核的启动
- gitbook init出现TypeError: cb.apply is not a function解决办法
- 复旦博士写了130行代码,两分钟解决繁琐核酸报告核查
- 如何实时监测进程调度累计的runtime