一、摘要
現實交易中為了能及時了解發明者量化機器人交易狀態,有時候我們需要將機器人所執行的交易結果發送到微信、郵箱、短信等等。但每天上百條各種各樣的信息,使得對這些信息已經不敏感,導致重要的信息不能及時查收,所以本篇通過調用釘釘群接口實現機器人推送消息。
二、釘釘群機器人
釘釘群機器人是一個高級擴展功能,只要有一個釘釘賬號,就可以使用它。它可以將第三方信息聚合到釘釘群中,實現信息自動同步。支持Webhook協議的自定義接入,通過發明者量化機器人,將提醒、報警等信息聚合到釘釘群中。支持文本(text)、鏈接(link)、markdown三種消息格式,五種消息類型。同一條信息還可以同時發送至多個釘釘群。
參考官方鏈接:https://ding-doc.dingtalk.com/doc#/serverapi2/ye8tup
三、創建機器人
第1步:創建釘釘群
釘釘群每創建一個自定義機器人都會產生唯一的Hook地址,我們稱為WebHook地址,通過向該WebHook地址推送消息,釘釘群就會收到消息。我們以PC端釘釘為例,首先點擊左上方“+”號發起群聊,如果只想自己接受消息,可以隨便拉兩個人再踢出去,填寫群名稱:“FMZ機器人”,群類型選擇普通群即可。
第2步:添加釘釘群機器人
點擊頭像,選擇機器人管理,然后選擇自定義,點擊添加。自定義機器人名字:“FMZ”,添加到剛剛創建的釘釘群。機器人支持三種安全設置:
- 自定義關鍵字:只有信息包含這個關鍵詞,信息才會被同步。
- 加簽:相當於設置密碼。
- IP地址:固定第三方信息的IP地址段。
如果只用於提醒或報警,選擇自定義關鍵詞就可以了。在這里我們定義的關鍵詞是“:”,也就是說當發明者量化機器人推送的信息中包含“:”時,這條信息才會推送到釘釘群中。然后點擊同意協議完成。最后復制Webhook地址備用。
四、代碼實現
在獲取到Webhook地址后,我們可以在發明者量化策略中向這個地址發起HTTP POST請求,就可以給這個釘釘群發送信息。需要注意的是,在發起POST請求時,必須將字符集編碼設置成UTF-8。
import requests import json from datetime import datetime, timedelta, timezone # 向釘釘群輸出信息 def msg(text): token ="0303627a118e739e628bcde104e19cf5463f61a4a127e4f2376e6a8aa1156ef1" headers = {'Content-Type': 'application/json;charset=utf-8'} # 請求頭 api_url = f"https://oapi.dingtalk.com/robot/send?access_token={token}" json_text = { "msgtype": "text", # 信息格式 "text": { "content": text } } # 發送並打印信息 Log(requests.post(api_url, json.dumps(json_text), headers=headers).content) # 測試函數 def onTick(): arr = ['BTC', 'ETH', 'XRP', 'BCH', 'LTC'] # 主流數字貨幣 # 獲取東八區時間 bj_dt = str(datetime.now().astimezone(timezone(timedelta(hours=8)))) bj_dt = bj_dt.split('.')[0] # 處理時間 text = f'{bj_dt}\n' # 定義信息內容 for i in arr: # 循環主流數字貨幣數組 exchange.IO("currency", f"{i}_USDT") # 切換交易對 ticker = exchange.GetTicker().Last # 獲取最新價格 if i == 'LTC': full = ' :' else: full = ':' text = text + f"{i}/USDT{full}${ticker}\n" # 處理信息內容 msg(text) # 調用msg函數,輸出信息 # 策略入口 def main(): while True: # 進入無線循環 onTick() # 執行onTick函數 Sleep(1000 * 60) # 休眠一分鍾
自定義機器人在同步信息時,可以通過設置手機號碼@多個群內成員。被@群成員在收到該信息時,會有@消息提醒,即使設置了免打擾會話仍然會通知提醒。
# 向釘釘群輸出信息 def msg(text): token = "0303627a118e739e628bcde104e19cf5463f61a4a127e4f2376e6a8aa1156ef1" headers = {'Content-Type': 'application/json;charset=utf-8'} # 請求頭 api_url = f"https://oapi.dingtalk.com/robot/send?access_token={token}" json_text = { "msgtype": "text", # 信息格式 "text": { "content": text }, "at": { "atMobiles": [ "16666666666", # 被@的手機號碼 "18888888888" # 被@的手機號碼 ], "isAtAll": False # 不@所有人 } } # 發送並打印信息 Log(requests.post(api_url, json.dumps(json_text), headers=headers).content)
五、測試機器人
以上代碼我們寫了一個案例,每隔一分鍾獲取主流數字貨幣的價格,並且把這些信息推送到釘釘群中:
六、完整策略
完整策略已經公開到發明者量化官網,點擊下方鏈接復制策略即可。
https://www.fmz.com/strategy/216952