海康视频取流自动化巡检
如上环境配置
针对该脚本机型优化,加快数据处理速度和模块化拆解
import requests
import urllib3
import json
import cx_Oracle
import datetime
import os
from openpyxl import Workbook
# 强制取消证书验证的级别出发不同的警告
urllib3.disable_warnings()
# 创建日志文件
if os.path.exists("d:/hik.log"):
f = open("d:/hik.log", "a+")
f.write("\n" + "--" * 30)
else:
f = open("d:/hik.log", "w+")
try:
"""
调取信息存储在172.30.64.45数据库的技术数据信息
"""
# 连接数据库,需要提前安装Oracle Instant Client
connection = cx_Oracle.connect(user="hik", password="Jiaen#2020", dsn="172.30.64.45/ORCLPDB1")
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(connection))
# 创建游标
cursor = connection.cursor()
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(cursor))
"""
获取海康ISC AK/SK信息
"""
SQL = "select appkey,appsecret,url from hik_akas where id ='1'"
# 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
for hik_isc_values in cursor.execute(SQL):
appKey = hik_isc_values[0]
appSecret = hik_isc_values[1]
URL = hik_isc_values[2]
# 执行完成语句,定义变量接收给到的是元组数据,此时需要转成字符串再转成LIST再给值
# cursor.execute(SQL)
# hik_isc_values = cursor.fetchall()
# hik_isc_values = json.loads(json.dumps(hik_isc_values[0]))
# print(hik_isc_values[1])
"""
获取ISC根路径API和签名信息
"""
SQL = "select HIK_API,REGIONS_SSL from hik_api_ssl where hik_api ='/api/resource/v1/regions/root'"
# 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
for hik_ssl_values in cursor.execute(SQL):
URL_regions_root = URL + hik_ssl_values[0]
regions_root = hik_ssl_values[1]
"""
获取ISC根路径根据区域编号获取下一级区域列表API和签名信息
"""
SQL = "select HIK_API,REGIONS_SSL from hik_api_ssl where hik_api ='/api/resource/v2/regions/subRegions'"
# 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
for hik_ssl_values in cursor.execute(SQL):
URL_resource_subResources = URL + hik_ssl_values[0]
regions_subRegions = hik_ssl_values[1]
"""
获取ISC区域编号获取当前层级的资源信息列表API和签名信息
"""
SQL = "select HIK_API,REGIONS_SSL from hik_api_ssl where hik_api ='/api/irds/v2/resource/subResources'"
# 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
for hik_ssl_values in cursor.execute(SQL):
URL_regions_subRegions = URL + hik_ssl_values[0]
resource_subResources = hik_ssl_values[1]
"""
获取ISC监控点编号列表API和签名信息
"""
SQL = "select HIK_API,REGIONS_SSL from hik_api_ssl where hik_api ='/api/nms/v1/record/list'"
# 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
for hik_ssl_values in cursor.execute(SQL):
URL_nms_record_list = URL + hik_ssl_values[0]
resource_nms_record_list = hik_ssl_values[1]
"""
获取钉钉授信息
"""
SQL = "select appkey,appsecret,agentid,robotcode,coolappcode from DING_AKAS"
# 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
for ding_values in cursor.execute(SQL):
AppKey = ding_values[0]
AppSecret = ding_values[1]
AgentId = ding_values[2]
robotCode = ding_values[3]
coolAppCode = ding_values[4]
"""
钉钉消息推送群
"""
SQL = "select openconversationid from DING_GROUPS_PUSH"
ding_push_groups = []
i = 0
for col_value in cursor.execute(SQL):
ding_push_groups.append(col_value)
# for ix in range(len(ding_push_groups)):
# print(json.loads(json.dumps(ding_push_groups))[ix][0])
except cx_Oracle.DatabaseError as e:
error_obj, = e.args
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(error_obj))
print(error_obj)
def http_request_mode(url_path, request_json, regions_ssl):
try:
# 请求头需按此顺序编写,X-Ca-Signature根据请求URL后缀地址通过isc openAPI签名工具生成
headers = {'Content-Type': 'application/json', 'X-Ca-Key': appKey, 'X-Ca-Signature-Headers': 'X-Ca-Key',
'X-Ca-Signature': regions_ssl}
# 请求数据
resources = requests.post(url=url_path, json=request_json, headers=headers, verify=False)
# print(resources.status_code)
return resources
except TimeoutError:
print(TimeoutError.args)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(TimeoutError.args))
except urllib3.exceptions.NewConnectionError:
print(urllib3.exceptions.NewConnectionError.args)
except requests.exceptions.ConnectTimeout as ConnectTimeout:
print(ConnectTimeout.args)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(ConnectTimeout.args))
except requests.exceptions.ReadTimeout as ReadTimeout:
print(ReadTimeout.args)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(ReadTimeout.args))
except requests.exceptions.BaseHTTPError as BaseHTTPError:
print(BaseHTTPError.args)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(BaseHTTPError.args))
except requests.exceptions.HTTPError as HTTPError:
print(HTTPError.args)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(HTTPError.args))
except requests.exceptions.ConnectionError as ConError:
print(ConError.args)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(ConError.args))
except requests.exceptions.RequestsWarning as RequestsWarning:
print(RequestsWarning.args)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(RequestsWarning.args))
"""
海康根信息值请求
"""
def regions_root_mode():
# 请求body
json_regions_root = {}
# 请求获取根节点信息
resources = http_request_mode(URL_regions_root, json_regions_root, regions_root)
try:
# 执行查询
ROOT_SQL = "select count(indexCode) as num from regions_root where indexCode = :indexCode"
cursor.execute(ROOT_SQL, [resources.json()['data']['indexCode']])
# 判断数据唯一性,并更新数据
row = cursor.fetchone()
# 元组转字符串
result = ', '.join(str(x) for x in row)
# print(result)
# 0为新数据直接写入
if int(result) == 0:
sql = "INSERT INTO regions_root (indexCode,name) VALUES (:indexCode,:name)"
cursor.execute(sql, [resources.json()['data']['indexCode'], resources.json()['data']['name']])
connection.commit()
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + "根节点数据写入regions_root表成功")
print("数据写入成功:", resources.json()['data']['indexCode'], resources.json()['data']['name'])
# 1为已存数据,更新历史数据
elif int(result) == 1:
sql = "update regions_root set name = :name where indexCode = :indexCode"
cursor.execute(sql, [resources.json()['data']['name'], resources.json()['data']['indexCode']])
connection.commit()
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + "根节点数据在regions_root表更新成功")
print("更新数据成功:", resources.json()['data']['indexCode'], resources.json()['data']['name'])
# 提前更新资源区域NEWVALUE,用于判断有没有新增的层级标识
update_regions_subregions = "update regions_subRegions set newValue = 0"
cursor.execute(update_regions_subregions)
connection.commit()
except cx_Oracle.DatabaseError as root_e:
ROOT_ERR, = root_e.args
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(ROOT_ERR))
print(ROOT_ERR)
return resources.json()['data']['indexCode']
root_result_value = regions_root_mode()
"""
海康请求根区域下一级区域列表
"""
def regions_subregions_mode(root_result):
# 根据区域编号获取下一级区域列表v2
json_regions_subRegions = {
"parentIndexCode": root_result,
"resourceType": "camera",
"pageNo": 1,
"pageSize": 1000,
"cascadeFlag": 0
}
resources = http_request_mode(URL_resource_subResources, json_regions_subRegions, regions_subRegions)
f.write(
"\n" + str(datetime.datetime.now()) + ' 层级列表数据获取' + resources.text)
if resources.json()["msg"] == "success":
for key in resources.json()["data"]["list"]:
# print(key['indexCode'], key['name'])
# 查询表中数据量
resource_subResources_SQL = "select count(indexCode) as num from regions_subRegions where indexCode = :indexCode"
# 执行查询
cursor.execute(resource_subResources_SQL, [key['indexCode']])
# 判断数据唯一性,并更新数据
# 将元组转换成字符串
row = cursor.fetchone()
result = ', '.join(str(x) for x in row)
# print(result)
# 0为新数据直接写入
try:
if result == '0':
sql = "INSERT INTO regions_subRegions (indexCode,name,parentIndexCode,SEE,newValue) VALUES \
(:indexCode,:name,:parentIndexCode,:SEE,:newValue)"
cursor.execute(sql, [key['indexCode'], key['name'], key['parentIndexCode'], '1', '1'])
connection.commit()
print("数据写入成功:", key['indexCode'], key['name'])
# f.write(
# "\n" + str(datetime.datetime.now()) + ' ' + "区域列表数据写入成功:" + key['indexCode'] + ' ' +
# key['name'])
# # 1为已存数据,更新历史数据
elif result == '1':
sql = "update regions_subRegions set name = :name,parentIndexCode = :parentIndexCode, \
newValue = :newValue where indexCode = :indexCode"
cursor.execute(sql, [key['name'], key['parentIndexCode'], '1', key['indexCode']])
connection.commit()
print("更新数据成功:", key['indexCode'], key['name'])
# f.write(
# "\n" + str(datetime.datetime.now()) + ' ' + "区域列表更新数据成功:" + key['indexCode'] + ' ' +
# key['name'])
except cx_Oracle.DatabaseError as regions_subRegions_e:
regions_subRegions_ERR, = regions_subRegions_e.args
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + str(regions_subRegions_ERR))
print(regions_subRegions_ERR)
# 遍历所有值
regions_subregions_mode(key['indexCode'])
regions_subregions_mode(root_result_value)
"""
海康请求根区域下区的资源信息
"""
def nms_record_mode():
# 清理已经失效层级
delete_regions_subregions = "delete from regions_subRegions where newValue = 0"
cursor.execute(delete_regions_subregions)
connection.commit()
# 查询表中数据量
regions_subRegions_SQL = "select indexCode as num from regions_subRegions where SEE = '1'"
cursor.execute(regions_subRegions_SQL)
# 文件名称命名
now = datetime.datetime.today()
formatted_date = now.strftime('%Y%m%d%H%M%S')
path = f'd:/{formatted_date}.xlsx'
# 创建日志文件
wb = Workbook()
ws = wb.active
data_list = ["状态码", "巡检状态", "摄像头名称", "巡检区域", "巡检时间"]
ws.append(data_list)
# 记录数值
ok = 0
ng = 0
for row in cursor.fetchall():
# 将元组转换成字符串
result = ', '.join(str(x) for x in row)
# print("元组转换:" + result)
# 根据区域编号获取当前层级的资源信息,包括门禁,监控,但不包含子层集
json_resource_subResources = {
"regionIndexCode": result,
"pageNo": 1,
"authCodes": [
"view"
],
"pageSize": 1000,
"resourceType": "camera"
}
resources = http_request_mode(URL_regions_subRegions, json_resource_subResources, resource_subResources)
# print(resources.json())
# 确认层级下的监控摄像头,并调取录像状态
if resources.json()["msg"] == "success":
# print(resources.json()["data"]["list"])
for key in resources.json()["data"]["list"]:
# 打印区域下的监控清单
# print(key['indexCode'], key['name'], key['regionPathName'])
# 根据监控点编号、开始时间、结束时间,分页获取录像录像完整性记录
today = datetime.datetime.today()
today = datetime.datetime.date(today)
today = f'{today}T00:00:00.000+08:00'
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
yesterday = yesterday.strftime('%Y-%m-%d')
yesterday = f'{yesterday}T00:00:00.000+08:00'
json_nms_record_list = {
"pageNo": 1,
"pageSize": 1000,
"beginTime": yesterday,
"endTime": today,
"indexCodes": [
key['indexCode']
]
}
record_result = http_request_mode(URL_nms_record_list, json_nms_record_list, resource_nms_record_list)
if record_result.json()["msg"] == "success" and record_result.json()["data"]["total"] != 0:
for indexCode_result in record_result.json()["data"]["list"]:
# print(record_result.json())
# print(indexCode_result)
# 巡检结果(31-正常;32-异常;33-巡检失败;34-未配置)
xjTime = str(datetime.datetime.now())
# 巡检判断状态
status = ""
if indexCode_result['result'] == 31:
status = "正常"
ok = ok + 1
elif indexCode_result['result'] == 32:
status = "异常"
ng = ng + 1
# 发送钉钉文件
data_list = [str(indexCode_result['result']), status, str(key['name']),
str(key['regionPathName']), xjTime]
ws.append(data_list)
elif indexCode_result['result'] == 33:
status = "巡检失败"
ng = ng + 1
# 发送钉钉文件
data_list = [str(indexCode_result['result']), status, str(key['name']),
str(key['regionPathName']), xjTime]
ws.append(data_list)
elif indexCode_result['result'] == 34:
status = "未配置"
ng = ng + 1
# 发送钉钉文件
data_list = [str(indexCode_result['result']), status, str(key['name']),
str(key['regionPathName']), xjTime]
ws.append(data_list)
# 巡检录像状态打印
print(indexCode_result['result'], status, key['name'], key['regionPathName'], xjTime,
key['indexCode'])
# 写入巡检数据
sql = "INSERT INTO nms_record_list (regionPathName,name,xjtime,xjresult,xjresultn) VALUES " \
"(:regionPathName,:name,:xjtime,:xjresult,:xjresultn)"
cursor.execute(sql, [key['regionPathName'], key['name'], datetime.datetime.now(), \
indexCode_result['result'], status])
connection.commit()
else:
xjTime = str(datetime.datetime.now())
print("未配置录像", "null", key['name'], key['regionPathName'], xjTime, key['indexCode'])
ng = ng + 1
# 发送钉钉文件
data_list = [0, "未配置录像", str(key['name']), str(key['regionPathName']), xjTime]
ws.append(data_list)
# 写入巡检数据
sql = "INSERT INTO nms_record_list (regionPathName,name,xjtime,xjresult,xjresultn) VALUES " \
"(:regionPathName,:name,:xjtime,:xjresult,:xjresultn)"
cursor.execute(sql, [key['regionPathName'], key['name'], datetime.datetime.now(), '0',
'未配置录像'])
connection.commit()
# 关闭文件
wb.save(path)
# 巡检总数
all_num = ok + ng
camera = round((all_num - ng) / all_num * 100, 2)
return path, ok, ng, all_num, camera
# 接收返回值
receive = nms_record_mode()
"""
钉钉文件日志信息推送
"""
def push_ding_mode():
# 计算周
now = datetime.datetime.today()
year = int(now.strftime('%Y'))
month = int(now.strftime('%m'))
day = int(now.strftime('%d'))
week = datetime.date(year, month, day).isocalendar().week
# 请求地址
ding_token_URL = "https://oapi.dingtalk.com/gettoken?appkey=" + AppKey + "&appsecret=" + AppSecret
# 发送请求获取access_token
access_token = requests.get(ding_token_URL)
print(access_token.json()["access_token"])
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + "钉钉access_token:" + access_token.text)
# 上传文件到钉钉云盘
upload_url = "https://oapi.dingtalk.com/media/upload?access_token=" + access_token.json()["access_token"]
# 指定上传文件路径
# print(receive[0])
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + "钉钉上传源文件:" + str(receive[0]))
files = {'media': open(f'{receive[0]}', 'rb')}
data = {'type': 'file'}
# 上传文件
response = requests.post(upload_url, files=files, data=data)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + "钉钉上传文件:" + response.text)
# 空文件不许云上传钉钉云盘
if response.json()["errcode"] != 34010:
# print(response.text)
for ding_send_file in range(len(ding_push_groups)):
# print(json.loads(json.dumps(ding_push_groups))[ding_send_log][0])
file_media = response.json()["media_id"]
# 发送文件
now = datetime.datetime.today()
formatted_date = now.strftime('%Y%m%d%H%M%S')
headers = {'Content-Type': 'application/json',
'x-acs-dingtalk-access-token': str(access_token.json()["access_token"])}
ding_upload_URL = 'https://api.dingtalk.com/v1.0/robot/groupMessages/send'
ding_upload_json = {
"msgParam": "{"
f"\"mediaId\":\"{file_media}\","
f"\"fileName\":\"W{week}周监控巡检日志.xlsx\","
"\"fileType\":\"xlsx\","
"}",
"msgKey": "sampleFile",
"openConversationId": json.loads(json.dumps(ding_push_groups))[ding_send_file][0],
"robotCode": robotCode,
"coolAppCode": coolAppCode
}
ding_upload_msg = requests.post(url=ding_upload_URL, json=ding_upload_json, headers=headers)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + "钉钉发送文件:" + ding_upload_msg.text)
# 查询表中数据量
ding_upload_SQL = "insert into ding_upload (createtime,media_id,file_type) \
values (:createtime,:media_id,:file_type)"
cursor.execute(ding_upload_SQL, [now, file_media, str(data['type'])])
connection.commit()
# 发送钉钉巡检报告日志,支持多群推送
for ding_send_log in range(len(ding_push_groups)):
# print(json.loads(json.dumps(ding_push_groups))[ding_send_log][0])
# 发送文字版本巡检报告
headers = {'Content-Type': 'application/json',
'x-acs-dingtalk-access-token': str(access_token.json()["access_token"])}
send_log_URL = 'https://api.dingtalk.com/v1.0/robot/groupMessages/send'
text = "{"f'"title": "W{week}周监控自动化巡检报告","text": \
" \
\n##### W{week}周监控自动化巡检报告\n- 摄像头总数:{receive[3]}支\
\n- 录像完整:{receive[1]}支\n- 录像异常:{receive[2]}支\n- 录像完整率:{receive[4]}%"'"}"
send_log_json = {
"msgParam": text,
"msgKey": "sampleMarkdown",
"openConversationId": json.loads(json.dumps(ding_push_groups))[ding_send_log][0],
"robotCode": robotCode,
"coolAppCode": coolAppCode
}
ding_send_msg = requests.post(url=send_log_URL, json=send_log_json, headers=headers)
f.write(
"\n" + str(datetime.datetime.now()) + ' ' + "钉钉发送巡检报告:" + ding_send_msg.text)
# 关闭游标和连接
cursor.close()
connection.close()
push_ding_mode()
# 关闭日志文件
f.close()