对接海康ISC和钉钉机器人发送巡检通知
1、对接海康平台获取资源信息
2、对接钉钉企业自建应用机器人上传巡检文件并发送群通知
3、对接企业内部APEX应用上传巡检记录
import requests
import urllib3
import json
import cx_Oracle
import datetime
import os
from openpyxl import Workbook
# 强制取消证书验证的级别出发不同的警告
urllib3.disable_warnings()
# 海康ISC AK/SK
appKey = "26799904"
appSecret = "6bfUp43a2wY4gumN0hbo"
URL = "https://172.30.64.51:1443/artemis"
# 获取根区域信息
def regions_root_mode():
# API请求证书信息
regions_root = "GH3hedWffIf7IlpfHhFyAhoFZoOTBSWwupTy/D0pj9c="
# 根区域API请求地址
URL_regions_root = URL + "/api/resource/v1/regions/root"
# 请求body
json_regions_root = {}
# 请求头需按此顺序编写,X-Ca-Signature根据请求URL后缀地址通过isc openAPI签名工具生成
headers = {'Content-Type': 'application/json', 'X-Ca-Key': appKey, 'X-Ca-Signature-Headers': 'X-Ca-Key',
'X-Ca-Signature': regions_root}
# 请求数据
resources = requests.post(url=URL_regions_root, json=json_regions_root, headers=headers, verify=False)
# 连接数据库,需要提前安装Oracle Instant Client
connection = cx_Oracle.connect(user="hik", password="Jiaen#2020", dsn="172.30.64.45/ORCLPDB1")
# 创建游标
cursor = connection.cursor()
# 查询表中数据量
SQL = "select count(indexCode) as num from regions_root where indexCode = :indexCode"
# 执行查询
cursor.execute(SQL, [resources.json()['data']['indexCode']])
# 判断数据唯一性,并更新数据
row = cursor.fetchone()
# 元组转字符串
result = ', '.join(str(x) for x in row)
# print(result)
# 0为新数据直接写入
if int(result) == 0:
sql = "INSERT INTO regions_root (indexCode,name) VALUES (:indexCode,:name)"
cursor.execute(sql, [resources.json()['data']['indexCode'], resources.json()['data']['name']])
connection.commit()
# print("数据写入成功:", resources.json()['data']['indexCode'], resources.json()['data']['name'])
# 1为已存数据,更新历史数据
elif int(result) == 1:
sql = "update regions_root set name = :name where indexCode = :indexCode"
cursor.execute(sql, [resources.json()['data']['name'], resources.json()['data']['indexCode']])
connection.commit()
# print("更新数据成功:", resources.json()['data']['indexCode'], resources.json()['data']['name'])
# 关闭游标和连接
cursor.close()
connection.close()
return resources.json()['data']['indexCode']
root_result_value = regions_root_mode()
def regions_subRegions_mode(root_result):
# 根据区域编号获取下一级区域列表v2
URL_regions_subRegions = URL + "/api/resource/v2/regions/subRegions"
# 根据区域编号获取下一级区域列表v2
regions_subRegions = "TZAY1W91PHvcz7LVGawEOd1z1f8P5FdKp/hX15z6jZY="
# 根据区域编号获取下一级区域列表v2
json_regions_subRegions = {
"parentIndexCode": root_result,
"resourceType": "camera",
"pageNo": 1,
"pageSize": 1000,
"cascadeFlag": 0
}
# 请求头需按此顺序编写,X-Ca-Signature根据请求URL后缀地址通过isc openAPI签名工具生成
headers = {'Content-Type': 'application/json', 'X-Ca-Key': appKey, 'X-Ca-Signature-Headers': 'X-Ca-Key',
'X-Ca-Signature': regions_subRegions}
# 请求数据
resources = requests.post(url=URL_regions_subRegions, json=json_regions_subRegions, headers=headers, verify=False)
# print(resources.json()["msg"])
# 输出所有的区域信息
# 连接数据库,需要提前安装Oracle Instant Client
connection = cx_Oracle.connect(user="hik", password="Jiaen#2020", dsn="172.30.64.45/ORCLPDB1")
# 创建游标
cursor = connection.cursor()
if resources.json()["msg"] == "success":
for key in resources.json()["data"]["list"]:
# print(key['indexCode'], key['name'])
# 查询表中数据量
SQL = "select count(indexCode) as num from regions_subRegions where indexCode = :indexCode"
# 执行查询
cursor.execute(SQL, [key['indexCode']])
# 判断数据唯一性,并更新数据
# 将元组转换成字符串
row = cursor.fetchone()
result = ', '.join(str(x) for x in row)
# print(result)
# 0为新数据直接写入
if result == '0':
sql = "INSERT INTO regions_subRegions (indexCode,name,parentIndexCode,SEE) VALUES \
(:indexCode,:name,:parentIndexCode,:SEE)"
cursor.execute(sql, [key['indexCode'], key['name'], key['parentIndexCode'], '1'])
connection.commit()
# print("数据写入成功:", key['indexCode'], key['name'])
# # 1为已存数据,更新历史数据
elif result == '1':
sql = "update regions_subRegions set name = :name,parentIndexCode = :parentIndexCode,SEE = :SEE \
where indexCode = :indexCode"
cursor.execute(sql, [key['indexCode'], key['name'], key['parentIndexCode'], '1'])
connection.commit()
# print("更新数据成功:", key['indexCode'], key['name'])
# 遍历所有值
regions_subRegions_mode(key['indexCode'])
# 关闭游标和连接
cursor.close()
connection.close()
regions_subRegions_mode(root_result_value)
def nms_record_mode():
# 根据区域编号获取当前层级的资源信息,包括门禁,监控,但不包含子层集
URL_resource_subResources = URL + "/api/irds/v2/resource/subResources"
# 根据区域编号获取当前层级的资源信息,包括门禁,监控,但不包含子层集
resource_subResources = "6mXT/YXF7gcLFwsQ0aoY2M-ZfgDp54KR2uUk93WlFLM="
# 连接数据库,需要提前安装Oracle Instant Client
connection = cx_Oracle.connect(user="hik", password="Jiaen#2020", dsn="172.30.64.45/ORCLPDB1")
# 创建游标
cursor = connection.cursor()
# 查询表中数据量
SQL = "select indexCode as num from regions_subRegions where SEE = '1'"
# 执行查询
cursor.execute(SQL)
# 文件名称命名
now = datetime.datetime.today()
formatted_date = now.strftime('%Y%m%d%H%M%S')
path = f'd:/{formatted_date}.xlsx'
# 创建日志文件
wb = Workbook()
ws = wb.active
data_list = ["状态码", "巡检状态", "摄像头名称", "巡检区域", "巡检时间"]
ws.append(data_list)
# 记录数值
ok = 0
ng = 0
for row in cursor.fetchall():
# 将元组转换成字符串
result = ', '.join(str(x) for x in row)
# print("元组转换:" + result)
# 根据区域编号获取当前层级的资源信息,包括门禁,监控,但不包含子层集
json_resource_subResources = {
"regionIndexCode": result,
"pageNo": 1,
"authCodes": [
"view"
],
"pageSize": 1000,
"resourceType": "camera"
}
# print('result')
# 请求头需按此顺序编写,X-Ca-Signature根据请求URL后缀地址通过isc openAPI签名工具生成
headers = {'Content-Type': 'application/json', 'X-Ca-Key': appKey, 'X-Ca-Signature-Headers': 'X-Ca-Key',
'X-Ca-Signature': resource_subResources}
# 请求数据
resources = requests.post(url=URL_resource_subResources, json=json_resource_subResources, headers=headers,
verify=False)
# print(resources.json())
# 确认层级下的监控摄像头,并调取录像状态
if resources.json()["msg"] == "success":
# print(resources.json()["data"]["list"])
for key in resources.json()["data"]["list"]:
# 打印区域下的监控清单
# print(key['indexCode'], key['name'], key['regionPathName'])
# 判断是否有录像
# 根据监控点编号、开始时间、结束时间,分页获取录像录像完整性记录
resource_nms_record_list = "4rcC1XxoK1AB3dOvd9CyTLSRz+3SRqjkdlnlCn58kiU="
# 根据监控点编号、开始时间、结束时间,分页获取录像录像完整性记录
URL_nms_record_list = URL + "/api/nms/v1/record/list"
# 根据监控点编号、开始时间、结束时间,分页获取录像录像完整性记录
today = datetime.datetime.today()
today = datetime.datetime.date(today)
today = f'{today}T00:00:00.000+08:00'
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
yesterday = yesterday.strftime('%Y-%m-%d')
yesterday = f'{yesterday}T00:00:00.000+08:00'
json_nms_record_list = {
"pageNo": 1,
"pageSize": 1000,
"beginTime": yesterday,
"endTime": today,
"indexCodes": [
key['indexCode']
]
}
# print(key['indexCode'])
headers = {'Content-Type': 'application/json', 'X-Ca-Key': appKey, 'X-Ca-Signature-Headers': 'X-Ca-Key',
'X-Ca-Signature': resource_nms_record_list}
# 请求数据
record_result = requests.post(url=URL_nms_record_list, json=json_nms_record_list,
headers=headers,
verify=False)
# print(record_result.json())
# print(record_result.json()["data"]["total"])
if record_result.json()["msg"] == "success" and record_result.json()["data"]["total"] != 0:
for indexCode_result in record_result.json()["data"]["list"]:
# print(record_result.json())
# print(indexCode_result)
# 巡检结果(31-正常;32-异常;33-巡检失败;34-未配置)
xjTime = str(datetime.datetime.now())
# 巡检判断状态
status = ""
if indexCode_result['result'] == 31:
status = "正常"
ok = ok + 1
elif indexCode_result['result'] == 32:
status = "异常"
ng = ng + 1
# 发送钉钉文件
data_list = [str(indexCode_result['result']), status, str(key['name']),
str(key['regionPathName']), xjTime]
ws.append(data_list)
elif indexCode_result['result'] == 33:
status = "巡检失败"
ng = ng + 1
# 发送钉钉文件
data_list = [str(indexCode_result['result']), status, str(key['name']),
str(key['regionPathName']), xjTime]
ws.append(data_list)
elif indexCode_result['result'] == 34:
status = "未配置"
ng = ng + 1
# 发送钉钉文件
data_list = [str(indexCode_result['result']), status, str(key['name']),
str(key['regionPathName']), xjTime]
ws.append(data_list)
# 巡检录像状态打印
print(indexCode_result['result'], status, key['name'], key['regionPathName'], xjTime,
key['indexCode'])
# 写入巡检数据
sql = "INSERT INTO nms_record_list (regionPathName,name,xjtime,xjresult,xjresultn) VALUES " \
"(:regionPathName,:name,:xjtime,:xjresult,:xjresultn)"
cursor.execute(sql, [key['regionPathName'], key['name'], datetime.datetime.now(), \
indexCode_result['result'], status])
connection.commit()
else:
xjTime = str(datetime.datetime.now())
print("未配置录像", "null", key['name'], key['regionPathName'], xjTime, key['indexCode'])
ng = ng + 1
# 发送钉钉文件
data_list = [0, "未配置录像", str(key['name']), str(key['regionPathName']), xjTime]
ws.append(data_list)
# 写入巡检数据
sql = "INSERT INTO nms_record_list (regionPathName,name,xjtime,xjresult,xjresultn) VALUES " \
"(:regionPathName,:name,:xjtime,:xjresult,:xjresultn)"
cursor.execute(sql, [key['regionPathName'], key['name'], datetime.datetime.now(), '0',
'未配置录像'])
connection.commit()
# 关闭文件
wb.save(path)
# 关闭游标和连接
cursor.close()
connection.close()
# 巡检总数
all_num = ok + ng
return path, ok, ng, all_num
# 接收返回值
receive = nms_record_mode()
def push_ding_mode():
# 计算周
now = datetime.datetime.today()
year = int(now.strftime('%Y'))
month = int(now.strftime('%m'))
day = int(now.strftime('%d'))
week = datetime.date(year, month, day).isocalendar().week
# 调用机器人前置工作
AgentId = "1596248912"
AppKey = "dinglqfibvlfa3yidm7z"
AppSecret = "VwxBxWxJwhT8BafWqY5xsdfAO6rTv1BKXkJYUCn8mvl3a6UysfhtcMFUX0255SK5"
# 请求地址
URL = "https://oapi.dingtalk.com/gettoken?appkey=" + AppKey + "&appsecret=" + AppSecret
# 发送请求获取access_token
access_token = requests.get(URL)
print(access_token.json()["access_token"])
# 上传文件到钉钉云盘
upload_url = "https://oapi.dingtalk.com/media/upload?access_token=" + access_token.json()["access_token"]
# 指定上传文件路径
print(receive[0])
files = {'media': open(f'{receive[0]}', 'rb')}
data = {'type': 'file'}
# 上传文件
response = requests.post(upload_url, files=files, data=data)
# 空文件不许云上传钉钉云盘
if response.json()["errcode"] != 34010:
print(response.text)
file_media = response.json()["media_id"]
# 发送文件
now = datetime.datetime.today()
formatted_date = now.strftime('%Y%m%d%H%M%S')
headers = {'Content-Type': 'application/json',
'x-acs-dingtalk-access-token': str(access_token.json()["access_token"])}
URL = 'https://api.dingtalk.com/v1.0/robot/groupMessages/send'
json = {
"msgParam": "{"
f"\"mediaId\":\"{file_media}\","
f"\"fileName\":\"W{week}周监控巡检日志.xlsx\","
"\"fileType\":\"xlsx\","
"}",
"msgKey": "sampleFile",
"openConversationId": "cidUgclNaLEUqRlOss+KoI+Ig==",
"robotCode": "dinglqfitvlfa3tidm6z",
"coolAppCode": "COOLAPP-1-102494B95F9F210538590085"
}
sucmsg = requests.post(url=URL, json=json, headers=headers)
print(sucmsg.text)
print(sucmsg.status_code)
# 连接数据库,需要提前安装Oracle Instant Client
connection = cx_Oracle.connect(user="hik", password="Jiaen#2020", dsn="172.30.64.45/ORCLPDB1")
# 创建游标
cursor = connection.cursor()
# 查询表中数据量
SQL = "insert into ding_upload (createtime,media_id,file_type) values (:createtime,:media_id,:file_type)"
# 执行查询
# print(now)
# print(file_media)
# print(data['type'])
cursor.execute(SQL, [now, file_media, str(data['type'])])
connection.commit()
# 发送文字版本巡检报告
headers = {'Content-Type': 'application/json',
'x-acs-dingtalk-access-token': str(access_token.json()["access_token"])}
URL = 'https://api.dingtalk.com/v1.0/robot/groupMessages/send'
text = "{"f'"title": "W{week}周监控自动化巡检报告","text": \
" \
\n##### W{week}周监控自动化巡检报告\n- 摄像头总数:{receive[3]}支\n- 视频流取值正常:{receive[1]}支\n- 视频流取值异常:{receive[2]}支"'"}"
json = {
"msgParam": text,
"msgKey": "sampleMarkdown",
"openConversationId": "cidUgclNaLEyqRbOss+KiI+Ig==",
"robotCode": "dinglqfbbvlfa4tidm6z",
"coolAppCode": "COOLAPP-1-102494B85F9F210537590005"
}
requests.post(url=URL, json=json, headers=headers)
push_ding_mode()