一、釘釘代碼
import dingtalk.api import requests class DingDingAPI(): def __init__(self, appkey, appsecret): self.appkey = appkey self.appsecret = appsecret self.access_token = self.get_token() # 獲取token def get_token(self): params = { "appkey": self.appkey, "appsecret": self.appsecret } try: res = requests.get("https://oapi.dingtalk.com/gettoken", params=params) access_token = res.json().get("access_token") return access_token except Exception as e: print(e) # 發送消息 def send_msg(self, chatid="", content=""): req = dingtalk.api.OapiChatSendRequest("https://oapi.dingtalk.com/chat/send") req.chatid = chatid req.text = { "content": content } req.msgtype = "text" resp = req.getResponse(self.access_token) return resp # 發送文件 def send_file(self, chatid="", media_id=""): req = dingtalk.api.OapiChatSendRequest("https://oapi.dingtalk.com/chat/send") req.chatid = chatid req.file = { "media_id": media_id } req.msgtype = "file" resp = req.getResponse(self.access_token) return resp # 上傳文件 def upload_media(self,file_name="name.docx",file_path=""): req = dingtalk.api.OapiMediaUploadRequest("https://oapi.dingtalk.com/media/upload") req.type = "file" req.media = dingtalk.api.FileItem(file_name,open(file_path, 'rb')) resp = req.getResponse(self.access_token) return resp # 通過電話獲取userid def get_by_mobile(self,mobile): req = dingtalk.api.OapiUserGetByMobileRequest("https://oapi.dingtalk.com/user/get_by_mobile") req.mobile = mobile try: resp = req.getResponse(self.access_token) return resp except Exception as e: print(e) # 通過userid獲取信息,不能使用機器人appkey def get_user_msg(self,userid): req = dingtalk.api.OapiUserGetRequest("https://oapi.dingtalk.com/user/get") req.userid = userid try: resp = req.getResponse(self.access_token) print(resp) return resp except Exception as e: print(e) if __name__ == '__main__': appkey = "*******" appsecret = "**********" dd=DingDingAPI(appkey,appsecret)
官方的封裝模塊dingtalk是python2.7的,我已改成3.0的
https://pan.baidu.com/s/1tpnTm17h4kVq81DK71wlDQ
密碼:ivt4
釘釘的debug調試網址:https://wsdebug.dingtalk.com/,可以用來獲取chatid
二、釘釘機器人
需要一個公網ip,搭建django服務器來接收釘釘服務器的回調
首先是注冊一個釘釘機器人,重要的是將自己接收釘釘消息的服務器ip加路由填入到-‘消息接收地址’
#url部分代碼 from django.conf.urls import url from django.contrib import admin from dingding import views urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^robot/', views.robot), ]
#views部分代碼 from django.http import HttpResponse, JsonResponse import json import hmac import hashlib import base64 # 機器人的app_secret app_secret = "************" # Create your views here. def robot(request): if request.method == "POST": HTTP_SIGN = request.META.get("HTTP_SIGN") HTTP_TIMESTAMP = request.META.get("HTTP_TIMESTAMP") res = json.loads(request.body) print(res) # 用戶輸入釘釘的信息 content = res.get("text").get("content") string_to_sign = '{}\n{}'.format(HTTP_TIMESTAMP, app_secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(app_secret.encode("utf-8"), string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode("utf-8") print(sign) print(HTTP_SIGN) #驗證簽名是否為釘釘服務器發來的 if sign == HTTP_SIGN: ''' 可以寫一些執行邏輯,返回用戶想要的信息,比如工資信息,可以去數據庫查工資信息回給釘釘用戶 ''' if "我的工資" in content: return JsonResponse( {"msgtype": "text", "text": { "content": "您上月的工資為*****元" } } ) return JsonResponse( {"msgtype": "text", "text": { "content": "謝謝使用此機器人,{}".format(content) } } ) return JsonResponse({"error":"你沒有權限訪問此接口"}) if request.method == "GET": return HttpResponse("hello")
三、修改發送文件不能打開的問題
import mimetypes # 直接用requests包發送請求,官方包哪里傳參數的時候有問題 def upload_media_v1(self, file_name="", file_path=""): files = {'type': (None, "file"), 'media': (file_name, open(file_path, 'rb'), mimetypes.guess_type(file_name)[0])} params = { "access_token": self.access_token, "type": "file" } re = requests.post("https://oapi.dingtalk.com/media/upload", files=files, params=params) return re.json()
還需要修改一處源碼,不然不能發送中文文件,紅色的注釋,綠色的增加
四、增加一個釘釘webhook代碼
import requests import datetime def send_msg(task_name,start_time,end_time): url = "https://oapi.dingtalk.com/robot/send?access_token=**************" data = { "msgtype": "markdown", "markdown": { "title": "定時任務運行結果!", "text": "### 您的任務已出錯,請及時處理!\n\n" "> **任務名稱:** %s \n\n" "> **開始時間:** %s \n\n" "> **結束時間:** %s"%(task_name,start_time,end_time) }, "at": { "atMobiles": [ "手機號" ], "isAtAll": False } } res=requests.post(url=url,json=data) print(res.json()) task_name="T公司發票上傳" start_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') end_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') send_msg(task_name,start_time,end_time)