海康视频取流自动化巡检

如上环境配置

针对该脚本机型优化,加快数据处理速度和模块化拆解

import requests
import urllib3
import json
import cx_Oracle
import datetime
import os
from openpyxl import Workbook

# 强制取消证书验证的级别出发不同的警告
urllib3.disable_warnings()

# 创建日志文件
if os.path.exists("d:/hik.log"):
    f = open("d:/hik.log", "a+")
    f.write("\n" + "--" * 30)
else:
    f = open("d:/hik.log", "w+")

try:

    """
    调取信息存储在172.30.64.45数据库的技术数据信息
    """
    # 连接数据库,需要提前安装Oracle Instant Client
    connection = cx_Oracle.connect(user="hik", password="Jiaen#2020", dsn="172.30.64.45/ORCLPDB1")
    f.write(
        "\n" + str(datetime.datetime.now()) + '  ' + str(connection))
    # 创建游标
    cursor = connection.cursor()
    f.write(
        "\n" + str(datetime.datetime.now()) + '  ' + str(cursor))
    """
    获取海康ISC AK/SK信息
    """

    SQL = "select appkey,appsecret,url from hik_akas where id ='1'"
    # 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
    for hik_isc_values in cursor.execute(SQL):
        appKey = hik_isc_values[0]
        appSecret = hik_isc_values[1]
        URL = hik_isc_values[2]
    # 执行完成语句,定义变量接收给到的是元组数据,此时需要转成字符串再转成LIST再给值
    # cursor.execute(SQL)
    # hik_isc_values = cursor.fetchall()
    # hik_isc_values = json.loads(json.dumps(hik_isc_values[0]))
    # print(hik_isc_values[1])

    """
    获取ISC根路径API和签名信息
    """
    SQL = "select HIK_API,REGIONS_SSL from hik_api_ssl where hik_api ='/api/resource/v1/regions/root'"
    # 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
    for hik_ssl_values in cursor.execute(SQL):
        URL_regions_root = URL + hik_ssl_values[0]
        regions_root = hik_ssl_values[1]

    """
    获取ISC根路径根据区域编号获取下一级区域列表API和签名信息
    """
    SQL = "select HIK_API,REGIONS_SSL from hik_api_ssl where hik_api ='/api/resource/v2/regions/subRegions'"
    # 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
    for hik_ssl_values in cursor.execute(SQL):
        URL_resource_subResources = URL + hik_ssl_values[0]
        regions_subRegions = hik_ssl_values[1]

    """
    获取ISC区域编号获取当前层级的资源信息列表API和签名信息
    """
    SQL = "select HIK_API,REGIONS_SSL from hik_api_ssl where hik_api ='/api/irds/v2/resource/subResources'"
    # 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
    for hik_ssl_values in cursor.execute(SQL):
        URL_regions_subRegions = URL + hik_ssl_values[0]
        resource_subResources = hik_ssl_values[1]

    """
    获取ISC监控点编号列表API和签名信息
    """
    SQL = "select HIK_API,REGIONS_SSL from hik_api_ssl where hik_api ='/api/nms/v1/record/list'"
    # 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
    for hik_ssl_values in cursor.execute(SQL):
        URL_nms_record_list = URL + hik_ssl_values[0]
        resource_nms_record_list = hik_ssl_values[1]

    """
    获取钉钉授信息
    """
    SQL = "select appkey,appsecret,agentid,robotcode,coolappcode from DING_AKAS"
    # 第一种取值方法 直接带查询执行语句,返回LIST值,直接调用
    for ding_values in cursor.execute(SQL):
        AppKey = ding_values[0]
        AppSecret = ding_values[1]
        AgentId = ding_values[2]
        robotCode = ding_values[3]
        coolAppCode = ding_values[4]

    """
    钉钉消息推送群
    """
    SQL = "select openconversationid from DING_GROUPS_PUSH"
    ding_push_groups = []
    i = 0
    for col_value in cursor.execute(SQL):
        ding_push_groups.append(col_value)

    # for ix in range(len(ding_push_groups)):
    #     print(json.loads(json.dumps(ding_push_groups))[ix][0])


except cx_Oracle.DatabaseError as e:
    error_obj, = e.args
    f.write(
        "\n" + str(datetime.datetime.now()) + '  ' + str(error_obj))
    print(error_obj)


def http_request_mode(url_path, request_json, regions_ssl):
    try:
        # 请求头需按此顺序编写,X-Ca-Signature根据请求URL后缀地址通过isc openAPI签名工具生成
        headers = {'Content-Type': 'application/json', 'X-Ca-Key': appKey, 'X-Ca-Signature-Headers': 'X-Ca-Key',
                   'X-Ca-Signature': regions_ssl}
        # 请求数据
        resources = requests.post(url=url_path, json=request_json, headers=headers, verify=False)
        # print(resources.status_code)
        return resources
    except TimeoutError:
        print(TimeoutError.args)
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + str(TimeoutError.args))
    except urllib3.exceptions.NewConnectionError:
        print(urllib3.exceptions.NewConnectionError.args)
    except requests.exceptions.ConnectTimeout as ConnectTimeout:
        print(ConnectTimeout.args)
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + str(ConnectTimeout.args))
    except requests.exceptions.ReadTimeout as ReadTimeout:
        print(ReadTimeout.args)
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + str(ReadTimeout.args))
    except requests.exceptions.BaseHTTPError as BaseHTTPError:
        print(BaseHTTPError.args)
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + str(BaseHTTPError.args))
    except requests.exceptions.HTTPError as HTTPError:
        print(HTTPError.args)
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + str(HTTPError.args))
    except requests.exceptions.ConnectionError as ConError:
        print(ConError.args)
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + str(ConError.args))
    except requests.exceptions.RequestsWarning as RequestsWarning:
        print(RequestsWarning.args)
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + str(RequestsWarning.args))


"""
海康根信息值请求
"""


def regions_root_mode():
    # 请求body
    json_regions_root = {}

    # 请求获取根节点信息
    resources = http_request_mode(URL_regions_root, json_regions_root, regions_root)

    try:
        # 执行查询
        ROOT_SQL = "select count(indexCode) as num from regions_root where indexCode = :indexCode"
        cursor.execute(ROOT_SQL, [resources.json()['data']['indexCode']])

        # 判断数据唯一性,并更新数据
        row = cursor.fetchone()
        # 元组转字符串
        result = ', '.join(str(x) for x in row)
        # print(result)
        # 0为新数据直接写入
        if int(result) == 0:
            sql = "INSERT INTO regions_root (indexCode,name) VALUES (:indexCode,:name)"
            cursor.execute(sql, [resources.json()['data']['indexCode'], resources.json()['data']['name']])
            connection.commit()
            f.write(
                "\n" + str(datetime.datetime.now()) + '  ' + "根节点数据写入regions_root表成功")
            print("数据写入成功:", resources.json()['data']['indexCode'], resources.json()['data']['name'])
        # 1为已存数据,更新历史数据
        elif int(result) == 1:
            sql = "update regions_root set name = :name where indexCode = :indexCode"
            cursor.execute(sql, [resources.json()['data']['name'], resources.json()['data']['indexCode']])
            connection.commit()
            f.write(
                "\n" + str(datetime.datetime.now()) + '  ' + "根节点数据在regions_root表更新成功")
            print("更新数据成功:", resources.json()['data']['indexCode'], resources.json()['data']['name'])

        # 提前更新资源区域NEWVALUE,用于判断有没有新增的层级标识
        update_regions_subregions = "update regions_subRegions set newValue = 0"
        cursor.execute(update_regions_subregions)
        connection.commit()

    except cx_Oracle.DatabaseError as root_e:
        ROOT_ERR, = root_e.args
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + str(ROOT_ERR))
        print(ROOT_ERR)

    return resources.json()['data']['indexCode']


root_result_value = regions_root_mode()

"""
海康请求根区域下一级区域列表
"""


def regions_subregions_mode(root_result):
    # 根据区域编号获取下一级区域列表v2
    json_regions_subRegions = {
        "parentIndexCode": root_result,
        "resourceType": "camera",
        "pageNo": 1,
        "pageSize": 1000,
        "cascadeFlag": 0
    }

    resources = http_request_mode(URL_resource_subResources, json_regions_subRegions, regions_subRegions)
    f.write(
        "\n" + str(datetime.datetime.now()) + '  层级列表数据获取' + resources.text)

    if resources.json()["msg"] == "success":

        for key in resources.json()["data"]["list"]:
            # print(key['indexCode'], key['name'])

            # 查询表中数据量
            resource_subResources_SQL = "select count(indexCode) as num from regions_subRegions where indexCode = :indexCode"
            # 执行查询
            cursor.execute(resource_subResources_SQL, [key['indexCode']])

            # 判断数据唯一性,并更新数据
            # 将元组转换成字符串
            row = cursor.fetchone()
            result = ', '.join(str(x) for x in row)
            # print(result)
            # 0为新数据直接写入
            try:
                if result == '0':
                    sql = "INSERT INTO regions_subRegions (indexCode,name,parentIndexCode,SEE,newValue) VALUES \
                    (:indexCode,:name,:parentIndexCode,:SEE,:newValue)"
                    cursor.execute(sql, [key['indexCode'], key['name'], key['parentIndexCode'], '1', '1'])
                    connection.commit()
                    print("数据写入成功:", key['indexCode'], key['name'])
                    # f.write(
                    #     "\n" + str(datetime.datetime.now()) + '  ' + "区域列表数据写入成功:" + key['indexCode'] + ' ' +
                    #     key['name'])
                # # 1为已存数据,更新历史数据
                elif result == '1':
                    sql = "update regions_subRegions set name = :name,parentIndexCode = :parentIndexCode, \
                    newValue = :newValue where indexCode = :indexCode"
                    cursor.execute(sql, [key['name'], key['parentIndexCode'], '1', key['indexCode']])
                    connection.commit()
                    print("更新数据成功:", key['indexCode'], key['name'])
                    # f.write(
                    #     "\n" + str(datetime.datetime.now()) + '  ' + "区域列表更新数据成功:" + key['indexCode'] + ' ' +
                    #     key['name'])
            except cx_Oracle.DatabaseError as regions_subRegions_e:
                regions_subRegions_ERR, = regions_subRegions_e.args
                f.write(
                    "\n" + str(datetime.datetime.now()) + '  ' + str(regions_subRegions_ERR))
                print(regions_subRegions_ERR)

            # 遍历所有值
            regions_subregions_mode(key['indexCode'])


regions_subregions_mode(root_result_value)


"""
海康请求根区域下区的资源信息
"""


def nms_record_mode():

    # 清理已经失效层级
    delete_regions_subregions = "delete from regions_subRegions where newValue = 0"
    cursor.execute(delete_regions_subregions)
    connection.commit()

    # 查询表中数据量
    regions_subRegions_SQL = "select indexCode as num from regions_subRegions where SEE = '1'"
    cursor.execute(regions_subRegions_SQL)

    # 文件名称命名
    now = datetime.datetime.today()
    formatted_date = now.strftime('%Y%m%d%H%M%S')
    path = f'd:/{formatted_date}.xlsx'

    # 创建日志文件
    wb = Workbook()
    ws = wb.active
    data_list = ["状态码", "巡检状态", "摄像头名称", "巡检区域", "巡检时间"]
    ws.append(data_list)

    # 记录数值
    ok = 0
    ng = 0

    for row in cursor.fetchall():
        # 将元组转换成字符串
        result = ', '.join(str(x) for x in row)
        # print("元组转换:" + result)

        # 根据区域编号获取当前层级的资源信息,包括门禁,监控,但不包含子层集
        json_resource_subResources = {
            "regionIndexCode": result,
            "pageNo": 1,
            "authCodes": [
                "view"
            ],
            "pageSize": 1000,
            "resourceType": "camera"
        }

        resources = http_request_mode(URL_regions_subRegions, json_resource_subResources, resource_subResources)

        # print(resources.json())
        # 确认层级下的监控摄像头,并调取录像状态
        if resources.json()["msg"] == "success":
            # print(resources.json()["data"]["list"])
            for key in resources.json()["data"]["list"]:
                # 打印区域下的监控清单
                # print(key['indexCode'], key['name'], key['regionPathName'])

                # 根据监控点编号、开始时间、结束时间,分页获取录像录像完整性记录
                today = datetime.datetime.today()
                today = datetime.datetime.date(today)
                today = f'{today}T00:00:00.000+08:00'

                yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
                yesterday = yesterday.strftime('%Y-%m-%d')
                yesterday = f'{yesterday}T00:00:00.000+08:00'

                json_nms_record_list = {
                    "pageNo": 1,
                    "pageSize": 1000,
                    "beginTime": yesterday,
                    "endTime": today,
                    "indexCodes": [
                        key['indexCode']
                    ]
                }

                record_result = http_request_mode(URL_nms_record_list, json_nms_record_list, resource_nms_record_list)

                if record_result.json()["msg"] == "success" and record_result.json()["data"]["total"] != 0:
                    for indexCode_result in record_result.json()["data"]["list"]:
                        # print(record_result.json())
                        # print(indexCode_result)

                        # 巡检结果(31-正常;32-异常;33-巡检失败;34-未配置)
                        xjTime = str(datetime.datetime.now())

                        # 巡检判断状态
                        status = ""
                        if indexCode_result['result'] == 31:
                            status = "正常"
                            ok = ok + 1
                        elif indexCode_result['result'] == 32:
                            status = "异常"
                            ng = ng + 1

                            # 发送钉钉文件
                            data_list = [str(indexCode_result['result']), status, str(key['name']),
                                         str(key['regionPathName']), xjTime]
                            ws.append(data_list)
                        elif indexCode_result['result'] == 33:
                            status = "巡检失败"
                            ng = ng + 1

                            # 发送钉钉文件
                            data_list = [str(indexCode_result['result']), status, str(key['name']),
                                         str(key['regionPathName']), xjTime]
                            ws.append(data_list)
                        elif indexCode_result['result'] == 34:
                            status = "未配置"
                            ng = ng + 1

                            # 发送钉钉文件
                            data_list = [str(indexCode_result['result']), status, str(key['name']),
                                         str(key['regionPathName']), xjTime]
                            ws.append(data_list)

                        # 巡检录像状态打印
                        print(indexCode_result['result'], status, key['name'], key['regionPathName'], xjTime,
                              key['indexCode'])
                        # 写入巡检数据
                        sql = "INSERT INTO nms_record_list (regionPathName,name,xjtime,xjresult,xjresultn) VALUES " \
                              "(:regionPathName,:name,:xjtime,:xjresult,:xjresultn)"
                        cursor.execute(sql, [key['regionPathName'], key['name'], datetime.datetime.now(), \
                                             indexCode_result['result'], status])
                        connection.commit()
                else:
                    xjTime = str(datetime.datetime.now())
                    print("未配置录像", "null", key['name'], key['regionPathName'], xjTime, key['indexCode'])
                    ng = ng + 1

                    # 发送钉钉文件
                    data_list = [0, "未配置录像", str(key['name']), str(key['regionPathName']), xjTime]
                    ws.append(data_list)

                    # 写入巡检数据
                    sql = "INSERT INTO nms_record_list (regionPathName,name,xjtime,xjresult,xjresultn) VALUES " \
                          "(:regionPathName,:name,:xjtime,:xjresult,:xjresultn)"
                    cursor.execute(sql, [key['regionPathName'], key['name'], datetime.datetime.now(), '0',
                                         '未配置录像'])
                    connection.commit()

    # 关闭文件
    wb.save(path)

    # 巡检总数
    all_num = ok + ng
    camera = round((all_num - ng) / all_num * 100, 2)

    return path, ok, ng, all_num, camera


# 接收返回值
receive = nms_record_mode()

"""
钉钉文件日志信息推送
"""


def push_ding_mode():
    # 计算周
    now = datetime.datetime.today()
    year = int(now.strftime('%Y'))
    month = int(now.strftime('%m'))
    day = int(now.strftime('%d'))

    week = datetime.date(year, month, day).isocalendar().week

    # 请求地址
    ding_token_URL = "https://oapi.dingtalk.com/gettoken?appkey=" + AppKey + "&appsecret=" + AppSecret

    # 发送请求获取access_token
    access_token = requests.get(ding_token_URL)
    print(access_token.json()["access_token"])
    f.write(
        "\n" + str(datetime.datetime.now()) + '  ' + "钉钉access_token:" + access_token.text)

    # 上传文件到钉钉云盘
    upload_url = "https://oapi.dingtalk.com/media/upload?access_token=" + access_token.json()["access_token"]

    # 指定上传文件路径
    # print(receive[0])
    f.write(
        "\n" + str(datetime.datetime.now()) + '  ' + "钉钉上传源文件:" + str(receive[0]))

    files = {'media': open(f'{receive[0]}', 'rb')}
    data = {'type': 'file'}

    # 上传文件
    response = requests.post(upload_url, files=files, data=data)
    f.write(
        "\n" + str(datetime.datetime.now()) + '  ' + "钉钉上传文件:" + response.text)

    # 空文件不许云上传钉钉云盘
    if response.json()["errcode"] != 34010:
        # print(response.text)
        for ding_send_file in range(len(ding_push_groups)):
            # print(json.loads(json.dumps(ding_push_groups))[ding_send_log][0])

            file_media = response.json()["media_id"]

            # 发送文件
            now = datetime.datetime.today()
            formatted_date = now.strftime('%Y%m%d%H%M%S')

            headers = {'Content-Type': 'application/json',
                       'x-acs-dingtalk-access-token': str(access_token.json()["access_token"])}
            ding_upload_URL = 'https://api.dingtalk.com/v1.0/robot/groupMessages/send'
            ding_upload_json = {
                "msgParam": "{"
                            f"\"mediaId\":\"{file_media}\","
                            f"\"fileName\":\"W{week}周监控巡检日志.xlsx\","
                            "\"fileType\":\"xlsx\","
                            "}",
                "msgKey": "sampleFile",
                "openConversationId": json.loads(json.dumps(ding_push_groups))[ding_send_file][0],
                "robotCode": robotCode,
                "coolAppCode": coolAppCode
            }

            ding_upload_msg = requests.post(url=ding_upload_URL, json=ding_upload_json, headers=headers)
            f.write(
                "\n" + str(datetime.datetime.now()) + '  ' + "钉钉发送文件:" + ding_upload_msg.text)

            # 查询表中数据量
            ding_upload_SQL = "insert into ding_upload (createtime,media_id,file_type) \
            values (:createtime,:media_id,:file_type)"

            cursor.execute(ding_upload_SQL, [now, file_media, str(data['type'])])
            connection.commit()

    # 发送钉钉巡检报告日志,支持多群推送
    for ding_send_log in range(len(ding_push_groups)):
        # print(json.loads(json.dumps(ding_push_groups))[ding_send_log][0])
        # 发送文字版本巡检报告
        headers = {'Content-Type': 'application/json',
                   'x-acs-dingtalk-access-token': str(access_token.json()["access_token"])}
        send_log_URL = 'https://api.dingtalk.com/v1.0/robot/groupMessages/send'

        text = "{"f'"title": "W{week}周监控自动化巡检报告","text": \
        "![](https://www.jasolar.com/statics/gaiban/images/er_03_banner04.jpg) \
        \n##### W{week}周监控自动化巡检报告\n- 摄像头总数:{receive[3]}支\
        \n- 录像完整:{receive[1]}支\n- 录像异常:{receive[2]}支\n- 录像完整率:{receive[4]}%"'"}"

        send_log_json = {
            "msgParam": text,
            "msgKey": "sampleMarkdown",
            "openConversationId": json.loads(json.dumps(ding_push_groups))[ding_send_log][0],
            "robotCode": robotCode,
            "coolAppCode": coolAppCode
        }

        ding_send_msg = requests.post(url=send_log_URL, json=send_log_json, headers=headers)
        f.write(
            "\n" + str(datetime.datetime.now()) + '  ' + "钉钉发送巡检报告:" + ding_send_msg.text)

    #  关闭游标和连接
    cursor.close()
    connection.close()


push_ding_mode()

# 关闭日志文件
f.close()