zl程序教程

您现在的位置是:首页 >  其他

当前栏目

物联网视频监控服务(二)-硬件编码篇

2023-04-18 14:23:30 时间

概述

此篇文章主要描述 ESP32-CAM硬件上的软件开发部分;

功能点

  • 采集视频数据,并通过UDP协议上传到服务器功能;
  • 接收emqx的topic消息,动态调整 视频帧 获取及发送频率;
  • WIFI连接成功 通过闪光灯提示;
  • 通过watch dog 实现当程序宕机或假死自动重启;

使用技术

  • 语言:python
  • 软件或框架:micropython+umqtt(MQTT协议的实现)
  • IDE(代码编译器): Thonny+Pycharm
  • 应用层协议:MQTT
  • 传输层协议:TCP,UDP
  • 硬件:ESP32-CAM开发套件

准备工作

Thonny相关

概念

  • 固件: 指写入到硬件中的程序,写入后就被固定,无法更改,除非重新写入新的固件; 固件被存储在硬件的内存中,包括后续的python脚本,注意文件大小不能超过内存限制;
  • 固件和软件区别之一:软件是运行在操作系统之上,固件运行在硬件之上,如BIOS系统;

硬件开发流程

  • 先为 硬件准备好运行环境,这里选择micropython语言作为运行环境,则先通过thonny安装对应固件即可; 固件地址请 点击链接 进行下载;
  • 然后 开始写python脚本,通过 thonny将脚本在硬件上执行即可;
  • 如上两步请 点击链接 按照教程即可完成;

MicroPython知识点

  • 在固件安装到硬件后,通过thonny能看到 硬件上有个boot.py文件,此文件是 硬件通电后自动执行,也就是 程序的入口文件,其他的python脚本,只要在此文件中加入import语句 即可 通电自动执行; 注意在脚本功能完善后,再加入到此文件,否则要是脚本执行异常,可能导致连接thonny可能失败,需要 重新刷固件,较麻烦
  • WDT: 俗称 看门狗,要在规定时间内喂狗,否则系统会自动重启;作用是 防止程序运行崩溃后,无法自启;
  • umqtt: 是mqtt协议在MicroPython上的实现库; 这里主要用来连接到实现mqtt协议的emqx软件服务,然后尝试接收消息,再执行回调函数;
  • camera:此库是摄像头操作的库,暂未找到相关文档链接;
  • MicroPython基础知识中文版,基本语法和python3相同,只是标准库有所不同,如 json库,这里是ujson;
  • MicroPython对应的ESP32开发板的相关知识
  • micropython在开发时需要的第三方库(就是python包或脚本) 可以网上寻找,然后上传到硬件中即可使用,如 umqtt;

系统设计

概念

  • 视频流传输:视频是由一帧一帧的图片组成,所以 这里的视频流传输其实就是 摄像头捕捉图片然后传输图片;到 服务端后再处理转为视频进行磁盘存储;

硬件的数据流处理

在这里插入图片描述

主要代码部分

cam_send.py

import socket
import time

import camera
import esp32
import network
from machine import WDT

from flash_light import control_flash
from log_action import LogWriter
from umqtt.simple import MQTTClientRobust

log = LogWriter()


class CamSend:
    def __init__(self, wifi_name, wifi_password, udp_server_ip, udp_server_port, mqtt_server, mqtt_port, mqtt_user,
                 mqtt_password):
        """

        :param wifi_name:
        :param wifi_password:
        :param udp_server_ip:
        :param udp_server_port:
        :param mqtt_server:
        :param mqtt_port:
        :param mqtt_user:
        :param mqtt_password:
        """
        # wifi配置
        self.wifi_name = wifi_name
        self.wifi_password = wifi_password
        # udp传输图片配置
        self.udp_server_ip = udp_server_ip
        self.udp_server_port = udp_server_port
        # mqtt配置
        self.mqtt_server = mqtt_server
        self.mqtt_port = mqtt_port
        self.mqtt_user = mqtt_user
        self.mqtt_password = mqtt_password
        # 发送图片间隔时间
        self.send_img_sleep_time = 0.1
        # mqtt订阅的topic
        self.topic = b"camera_frq"

    def mqtt_client_init(self):
        """
        mqtt客户端初始化,连接到mqtt服务器
        :return:
        """
        log.info("尝试连接mqtt")
        self.mqtt_client = MQTTClientRobust("camera_client", self.mqtt_server, self.mqtt_port, self.mqtt_user,
                                            self.mqtt_password)
        self.mqtt_client.set_callback(self.topic_subscribe)
        self.mqtt_client.connect(False)
        log.info("连接成功")
        self.mqtt_client.subscribe(self.topic)

    def connect_wifi(self):
        """
        连接wifi,注意只能连接2.4G HZ的wifi,否则会卡住,且需要拔插操作才能重新操作
        :return:
        """
        log.info('连接wifi')
        wlan = network.WLAN(network.STA_IF)
        wlan.active(True)
        is_connected = wlan.isconnected()
        if not is_connected:
            log.info('尚未连接wifi,现在尝试连接...')
            # connect函数是异步的,所以要手动的等待一段时间
            wlan.connect(self.wifi_name, self.wifi_password)
            time.sleep(5)
            # 循环3次,失败后不再连接,并报错
            count = 0
            while not wlan.isconnected():
                control_flash(0.5)
                count += 1
                log.info(f"尝试连接第{count}次")
                time.sleep(1)
                if count > 1:
                    control_flash(0.1)
                    raise Exception("wifi 连接失败,请检测修改")
        control_flash(1)
        log.info('连接wifi成功!')

    def topic_subscribe(self, topic, msg):
        """
        消息订阅处理函数
        :param topic:
        :param msg:
        :return:
        """
        if topic != self.topic:
            return
        # 摄像头发送图片频率 0:高 1:低
        if int(msg) == 1:
            log.info("图片发送频率降低")
            self.send_img_sleep_time = 0.5
        else:
            log.info("图片发送频率提高")
            self.send_img_sleep_time = 0.1

    def camera_init(self):
        """
        摄像头初始化
        :return:
        """
        log.info("摄像头初始化...")
        try:
            camera.init(0, format=camera.JPEG)
        except Exception:
            camera.deinit()
            camera.init(0, format=camera.JPEG)

    def camera_config(self):
        """
        摄像头参数配置
        :return:
        """
        log.info("摄像头参数配置")

        # 上翻下翻
        camera.flip(0)
        # 左/右
        camera.mirror(1)

        # 分辨率
        camera.framesize(camera.FRAME_HVGA)  # 像素水平:480*320
        # camera.framesize(camera.FRAME_VGA) # 像素水平:640*480
        # camera.framesize(camera.FRAME_SVGA)  # 像素水平:800*600
        # 选项如下:
        # FRAME_96X96 FRAME_QQVGA FRAME_QCIF FRAME_HQVGA FRAME_240X240
        # FRAME_QVGA FRAME_CIF FRAME_HVGA FRAME_VGA FRAME_SVGA
        # FRAME_XGA FRAME_HD FRAME_SXGA FRAME_UXGA FRAME_FHD
        # FRAME_P_HD FRAME_P_3MP FRAME_QXGA FRAME_QHD FRAME_WQXGA
        # FRAME_P_FHD FRAME_QSXGA
        # 有关详细信息,请查看此链接:https://bit.ly/2YOzizz

        # 特效
        camera.speffect(camera.EFFECT_NONE)
        # 选项如下:
        # 效果无(默认)效果负效果 BW效果红色效果绿色效果蓝色效果复古效果
        # EFFECT_NONE (default) EFFECT_NEG EFFECT_BW EFFECT_RED EFFECT_GREEN EFFECT_BLUE EFFECT_RETRO

        # 白平衡
        camera.whitebalance(camera.WB_HOME)
        # 选项如下:
        # WB_NONE (default) WB_SUNNY WB_CLOUDY WB_OFFICE WB_HOME

        # 饱和
        camera.saturation(0)
        # -2,2(默认为0). -2灰度
        # -2,2 (default 0). -2 grayscale

        # 亮度
        camera.brightness(0)
        # -2,2(默认为0). 2亮度
        # -2,2 (default 0). 2 brightness

        # 对比度
        camera.contrast(0)
        # -2,2(默认为0).2高对比度
        # -2,2 (default 0). 2 highcontrast

        # 质量
        camera.quality(2)
        # 10-63数字越小质量越高

    def udp_init(self):
        """
        UDP连接创建对应socket
        :return:
        """
        log.info("UDP连接创建对应socket")
        self.udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)

    def send_data(self):
        """
        图片数据通过udp协议发送给服务器
        :return:
        """
        log.info("开始发送数据")
        try:
            count = 0
            while True:
                self.mqtt_client.check_msg()
                # Then need to sleep
                count += 1
                # 每隔100s查看一下温度
                if count == 100:
                    # 读取 MCU(微控制器)温度
                    temperature = esp32.raw_temperature()
                    # 华氏温度 对 摄氏度换算公式: (华氏温度-32)/1.8
                    log.info(f"MCU温度:{round((int(temperature) - 32) / 1.8, 2)}°C")
                    count = 0
                # 获取图像数据
                buf = camera.capture()
                # 向服务器发送图像数据
                self.udp_client.sendto(b'cameraSend' + buf, (self.udp_server_ip, self.udp_server_port))
                # 根据实际情况调节摄像头发送图片频率,节约资源
                time.sleep(self.send_img_sleep_time)
                # 给看门狗喂食
                self.wdt.feed()
        except:
            pass
        finally:
            camera.deinit()

    def run(self):
        """
        程序运行
        :return:
        """
        # 开门狗,设置10s不喂食,就停止服务,防止服务假死等情况
        self.wdt = WDT(timeout=10000)
        self.connect_wifi()
        self.mqtt_client_init()
        self.camera_init()
        self.camera_config()
        self.udp_init()
        self.send_data()

flash_light.py

# -*- coding: utf-8 -*-
# """
#
# All rights reserved
# create time '2022/11/24 11:31'
#

# """
import time

from machine import Pin


def control_flash(times=1):
    """
    闪光灯
    :param times:闪烁时长(s)
    :return:
    """
    # 控制闪光灯
    pin33 = Pin(4, Pin.OUT)
    time.sleep(times)
    print("设置值为1")
    pin33.value(1)
    time.sleep(times)
    print("设置值为0")
    pin33.value(0)
    time.sleep(times)

项目代码地址

video_hardware_script 点击链接

硬件通电即用方式

  • 使用Thonny连接硬件后,硬件端的micropython会默认给出一个boot.py文件,将需要执行的入口脚本导入到此文件即可;

相关链接: