通过钉钉发送工作通知

钉钉接口:https://open.dingtalk.com/document/orgapp/asynchronous-sending-of-enterprise-session-messages

需要在企业内部应用创建钉钉应用,并且赋予通讯录的权限

注意工作通知同一条消息一天只会出现一次

1、测试发送文本

import requests,os,datetime
import json

#钉钉开放平台获取企业内部应用的AgentId、AppKey和AppSecret
AgentId = "1596240811"
AppKey = "dinglqfibvlfa3tidm6a"
AppSecret = "VwxBxWvJwhT8BafWKY5xsdfAO6rTv1BKXkJYUCn8mvl3a6UysfhtcMFUX0255SKl"
mobile = "15661111158"
#请求地址
URL = "https://oapi.dingtalk.com/gettoken?appkey="+AppKey+"&appsecret="+AppSecret

#发送请求获取access_token
access_token =  requests.get(URL)

#创建日志文件
if os.path.exists("d:/dingtalk.log"):
    f=open("d:/dingtalk.log","a+")
    f.write("\n" + "--" * 30)
else:
    f=open("d:/dingtalk.log","w+")

#获取钉钉access_token判断,并根据事件创建日志
if access_token.json()["errcode"] == 0:
    # print(access_token.json()["access_token"])
    f.write("\n" + str(datetime.datetime.now()) + "    " + "sucmsg" + "    " + str(access_token.json()["access_token"]))
    f.close()

    # 根据手机号码获取钉钉userid
    headers = {'Content-Type': 'application/json'}
    URL = "https://oapi.dingtalk.com/topapi/v2/user/getbymobile?access_token=" + str(access_token.json()["access_token"])
    userid = requests.post(url=URL, json={'mobile': mobile}, headers=headers)
    # print(userid.text)
    # print(userid.json()["result"]["userid"])

    #发送工作通知
    if userid.json()["errcode"] == 0:
        URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token=" + str(access_token.json()["access_token"])
        json = {
            "agent_id":AgentId,
            "userid_list":str(userid.json()["result"]["userid"]),
            "msg":{
                "msgtype": "text",
                "text": {
                    "content": "测试钉钉工作通知!"
                }
            }
        }
        sucmsg = requests.post(url=URL,json=json,headers=headers)
        print(sucmsg.text)

    else:
        f.write("\n" + str(datetime.datetime.now()) + "    " + URL + "    " + "errmsg" + "    " + str(userid.json()["errmsg"]))
        f.close()

else:
    f.write("\n" + str(datetime.datetime.now()) + "    " + URL + "    " + "errmsg" + "    " + str(access_token.json()["errmsg"]))
    f.close()

2、卡片式通知

import requests,os,datetime
import json

#钉钉开放平台获取企业内部应用的AgentId、AppKey和AppSecret
AgentId = "1596241812"
AppKey = "dinglqfibvlfa4tidm6z"
AppSecret = "VwxBxWvJwhT8BafWKY6xsdfAO6rTv1BKXkJYUCn8mvl3a6UysfhtcMFUX0255SK5"
mobile = "16631922012"
#请求地址
URL = "https://oapi.dingtalk.com/gettoken?appkey="+AppKey+"&appsecret="+AppSecret

#发送请求获取access_token
access_token =  requests.get(URL)
# print(access_token.text)
#创建日志文件
if os.path.exists("d:/dingtalk.log"):
    f=open("d:/dingtalk.log","a+")
    f.write("\n" + "--" * 30)
else:
    f=open("d:/dingtalk.log","w+")

#获取钉钉access_token判断,并根据事件创建日志
if access_token.json()["errcode"] == 0:
    # print(access_token.json()["access_token"])
    f.write("\n" + str(datetime.datetime.now()) + "    " + "sucmsg" + "    " + str(access_token.json()["access_token"]))
    # f.close()

    # 根据手机号码获取钉钉userid
    # headers = {'Content-Type': 'multipart/form-data'}
    headers = {'Content-Type': 'application/json'}
    URL = "https://oapi.dingtalk.com/topapi/v2/user/getbymobile?access_token=" + str(access_token.json()["access_token"])
    userid = requests.post(url=URL, json={'mobile': mobile}, headers=headers)
    # print(userid.text)
    # print(userid.json()["result"]["userid"])

    #发送工作通知
    if userid.json()["errcode"] == 0:
        # headers = {'Content-Type': 'multipart/form-data'}
        URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token=" + str(access_token.json()["access_token"])
        json = {
            "agent_id":AgentId,
            "userid_list":str(userid.json()["result"]["userid"]),
            "msg":{
                "msgtype": "action_card",
                "action_card": {
                    "title": "本周运维巡检提醒!!!",
                    "markdown": "# 本周重点巡检项:   \n   ## 1、MES和EAP的日志文件大小   \n   * 检查命令du -h /var/log/messages   \n   ## 2、网络通讯设备主干链路   \n   ## 3、阿里云安全补丁修复情况   \n   ![例图1](https://img0.baidu.com/it/u=3012065904,4186157241&fm=253&fmt=auto&app=138&f=JPG?w=891&h=500)",
                    "single_title":"巡检链接",
                    "single_url": "http://172.30.9.95"
                }
            }
        }

        sucmsg = requests.post(url=URL,json=json,headers=headers)
        # print(sucmsg.text)
        # print(sucmsg.status_code)


    else:
        f.write("\n" + str(datetime.datetime.now()) + "    " + URL + "    " + "errmsg" + "    " + str(userid.json()["errmsg"]))
        f.close()

else:
    f.write("\n" + str(datetime.datetime.now()) + "    " + URL + "    " + "errmsg" + "    " + str(access_token.json()["errmsg"]))
    f.close()

模式single_url是侧边栏打开,如果想更为PC浏览器打开,修改如下pc_slide=true代表侧边栏,这里改false即可

"single_url": "dingtalk://dingtalkclient/page/link?url=http://172.30.9.95&pc_slide=false"

如上也可优化,读取指定的markdown文件作为消息的主体通知内容

    #发送工作通知
    if userid.json()["errcode"] == 0:
        # headers = {'Content-Type': 'multipart/form-data'}
        URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token=" + str(access_token.json()["access_token"])
        #读取markdown文件,作为消息主要内容通知
        with open('C:/Users/ja014690/Desktop/hello.md', 'r') as f:
            content=f.read()
        json = {
            "agent_id":AgentId,
            "userid_list":str(userid.json()["result"]["userid"]),
            "msg":{
                "msgtype": "action_card",
                "action_card": {
                    "title": "本周运维巡检提醒!!!",
                    "markdown": content,
                    "single_title":"巡检链接",
                    "single_url": "dingtalk://dingtalkclient/page/link?url=http://172.30.9.95&pc_slide=false"
                }
            }
        }

Markdown格式

标题
# 一级标题
## 二级标题
### 三级标题
#### 四级标题
##### 五级标题
###### 六级标题
 
引用
> A man who stands for nothing will fall for anything.
 
文字加粗、斜体
**bold**
*italic*
 
链接
[this is a link](http://name.com)
 
图片
![](http://name.com/pic.jpg)
 
无序列表
- item1
- item2
 
有序列表
1. item1
2. item2

换行
  \n  (建议\n前后分别加2个空格)