zl程序教程

您现在的位置是:首页 >  后端

当前栏目

PYTHON 连接钉钉传输工作数据监控

Python监控连接数据 工作 传输
2023-06-13 09:14:31 时间

“我报名参加金石计划1期挑战——瓜分10万奖池,这是我的第1篇文章,点击查看活动详情

先导

工作中运维工作经常会遇到一些数据汇报,数据监控, 作为一个新人真心感觉这些数据没有什么意思(当然也许是我菜),有句话怎么说,懒惰是人类进步的阶梯.这里使用 python 连接数据把数据 传到钉钉, 这样可以进行数据监控 ,看看是哪家小可爱又 搞事情了 ╭(╯^╰)╮

钉钉接口

钉钉提供了群机器人接口等很多很多接口,网上 demo 一大堆.这里只制作简单的表述

钉钉官网:

[https://developers.dingtalk.com/document/app/overview-of-group-robots]

可以给群里增加一个机器人,通过 @固定人 ,或者所有人,广播等方式发送信息

创建钉钉机器人

首先你得建立个群

增加群机器人

完成必要的安全设置,勾选我已阅读并同意《自定义机器人服务及免责条款》,然后单击完成

目前有 3 种安全设置方式,请根据需要选择一种:

  • 自定义关键词:最多可以设置 10 个关键词,消息中至少包含其中 1 个关键词才可以发送成功。 例如添加了一个自定义关键词:监控报警,则这个机器人所发送的消息,必须包含监控报警这个词,才能发送成功。
  • 加签: 把timestamp+"\n"+密钥当做签名字符串,使用 HmacSHA256 算法计算签名,然后进行 Base64 encode,最后再把签名参数再进行 urlEncode,得到最终的签名(需要使用 UTF-8 字符集)。
https://oapi.dingtalk.com/robot/send?access_token=XXXXXX&timestamp=XXX&sign=XXX

测试机器人

python 代码版本

这里直接使用 加签版本的,因为这种时间判定的才是最常用的

import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse

timestamp = str(round(time.time() * 1000))   #毫秒级时间戳
secret = '你的secret'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
#往post接口里面仍数据

def dingmessage():

# 请求的URL,WebHook地址

    webhook = "https://oapi.dingtalk.com/robot/send?access_token=你的token&timestamp="+str(timestamp)+"&sign="+str(sign)
    print(webhook)
#构建请求头部
    header = {
        "Content-Type": "application/json",
        "Charset": "UTF-8"
}
#构建请求数据
    tex = "能力越大,责任越大。"
    message ={

        "msgtype": "text",
        "text": {
            "content": tex
        },
        "at": {

            "isAtAll": False
        }

    }
#对请求的数据进行json封装
    message_json = json.dumps(message)
#发送请求
    info = requests.post(url=webhook,data=message_json,headers=header)
#打印返回的结果
    print(info.text)

if __name__=="__main__":
    dingmessage()

打印结果

如果显示 ok 那么就是说数据传输成功了

这时候接入钉钉即可 查看数据

好了现在 py 已经能够给钉钉发送信息了,那么怎么做监控呢

创建时间监控 实时发送信息

我这里以 apscheduler 框架 进行定时巡回

一、安装 APScheduler

pip install apscheduler

复制代码

二、基本概念

APScheduler 有四大组件:

1、触发器 triggers :触发器包含调度逻辑。每个作业都有自己的触发器,用于确定下一个任务何时运行。除了初始配置之外,触发器是完全无状态的。有三种内建的 trigger:

(1)date: 特定的时间点触发

(2)interval: 固定时间间隔触发

(3)cron: 在特定时间周期性地触发

2、任务储存器 job stores:用于存放任务,把任务存放在内存(为默认 MemoryJobStore)或数据库中。3、执行器 executors: 执行器是将任务提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。

4、调度器 schedulers: 把上方三个组件作为参数,通过创建调度器实例来运行根据开发需求选择相应的组件,下面是不同的调度器组件:BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。AsyncIOScheduler AsyncIO 调度器,适用于应用使用 AsnycIO 的情况。GeventScheduler Gevent 调度器,适用于应用通过 Gevent 的情况。TornadoScheduler Tornado 调度器,适用于构建 Tornado 应用。TwistedScheduler Twisted 调度器,适用于构建 Twisted 应用。QtScheduler Qt 调度器,适用于构建 Qt 应用。

三、使用步骤

1、新建一个调度器 schedulers

2、添加调度任务

3、运行调度任务

比如写 3 就是说 冯 3 触发

*/3 就是说 每隔 3 单位触发

假设触发条件

我们这里判定如果当前时间 4 个数据加起来大于 12 那么 就发送消息

每分钟发送一次

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
# 输出时间
import pymssql
import copy
import time
import requests
import json
import hmac
import hashlib
import base64
import urllib.parse

timestamp = str(round(time.time() * 1000))
secret = '你的secret'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))

# now = time.time() #返回float数据
#
#
# #毫秒级时间戳
# print(int(round(now * 1000)))
# now1=int(round(now * 1000))




def dingmessage(num):

# 请求的URL,WebHook地址

    webhook = "https://oapi.dingtalk.com/robot/send?access_token=你的token&timestamp="+str(timestamp)+"&sign="+str(sign)
    print(webhook)
#构建请求头部
    header = {
        "Content-Type": "application/json",
        "Charset": "UTF-8"
}
#构建请求数据
    tex = "能力越大,责任越大。"+str(num)
    message ={

        "msgtype": "text",
        "text": {
            "content": tex
        },
        "at": {

            "isAtAll": False
        }

    }
#对请求的数据进行json封装
    message_json = json.dumps(message)
#发送请求
    info = requests.post(url=webhook,data=message_json,headers=header)
#打印返回的结果
    print(info.text)


def job():
    hour = datetime.datetime.now().hour
    min = datetime.datetime.now().minute
    f1 = str(hour)[0]
    f2 = str(hour)[1]
    f3 = str(min)[0]
    f4 = str(min)[1]
    print(f1)
    print(f2)
    print(f3)
    print(f4)
    print(int(f1) + int(f2) + int(f3) + int(f4))
    if int(f1) + int(f2) + int(f3) + int(f4) > 12:
        dingmessage(int(f1) + int(f2) + int(f3) + int(f4) )


# BlockingScheduler
scheduler = BlockingScheduler()
#1-23小时内,每一分钟触发一次, 后面那个意思是说如果3600 秒触发 那么讲重新执行该任务
scheduler.add_job(job, 'cron', hour='1-23', minute='*/1',misfire_grace_time=3600)#1
scheduler.start()

结果

运行结果

钉钉结果

如图 23.18 分 2+3+1+8=14 >12 触发条件 就给钉钉 发送信息了

致此一个见得 发送文字功能实现,有问题即可发送钉钉

还有一点就是钉钉一定要调成前台允许显示, 然后在录音个对应的提示. 懒人必备,