通過定時觸發器,可以簡單快速地定制一個企業微信機器人。我們可以用它來實現喝水、吃飯提醒等小功能,還能實現定時推送新聞、天氣,甚至是監控告警的小功能。
使用企業微信機器人
在企業微信中,選擇添加機器人:
之后,我們可以根據文檔進行企業微信機器人的基礎功能定制:
以下是用 curl 工具往群組推送文本消息的示例(注意要將 url 替換成機器人的 webhook 地址,content 必須是 utf8 編碼):
curl '企業微信機器人地址' \
-H 'Content-Type: application/json' \
-d '
{
"msgtype": "text",
"text": {
"content": "hello world"
}
}'
通過 Python 語言實現:
url = ""
data = {
"msgtype": "markdown",
"markdown": {
"content": "hello world",
}
}
data = json.dumps(data).encode("utf-8")
req_attr = urllib.request.Request(url, data)
resp_attr = urllib.request.urlopen(req_attr)
return_msg = resp_attr.read().decode("utf-8")
此時,我們可以通過 Serverless Framework 部署一個機器人的基本功能,並且設置好 API 網關觸發器:
index.py
文件如下:
import os
import json
import urllib.request
def main_handler(event, context):
url = os.environ.get("url")
data = {
"msgtype": "markdown",
"markdown": {
"content": "hello world",
}
}
data = json.dumps(data).encode("utf-8")
req_attr = urllib.request.Request(url, data)
resp_attr = urllib.request.urlopen(req_attr)
return resp_attr.read().decode("utf-8")
serverless.yaml
文件如下:
MyRobot_Base:
component: '@serverless/tencent-scf'
inputs:
name: MyRobot_Base
runtime: Python3.6
timeout: 3
codeUri: ./base_robot
description: 機器人推送接口
region: ap-guangzhou
environment:
variables:
url: webhook地址
handler: index.main_handler
memorySize: 64
tags:
app: myrobot
events:
- apigw:
name: MyRobot
parameters:
protocols:
- http
- https
description: 機器人推送接口
environment: release
endpoints:
- path: /push
method: ANY
部署成功之后,可以看到命令行中輸出的地址:
在瀏覽器中打開,可以看到企業微信機器人已經被觸發了:
以上就是一個簡單的 hello world
功能。接下來,好戲開始!
我們對這個基礎函數進行進一步的改造:
import os
import json
import urllib.request
def main_handler(event, context):
url = os.environ.get("url")
data = {
"msgtype": "markdown",
"markdown": {
"content": event['body'],
}
}
data = json.dumps(data).encode("utf-8")
req_attr = urllib.request.Request(url, data)
resp_attr = urllib.request.urlopen(req_attr)
return resp_attr.read().decode("utf-8")
通過將 data
中的 content
字段更改為 event['body']
可以讓其他模塊請求該接口,實現機器人推送功能,當然這個基礎函數我們還可以進行完善,不僅僅是 markdown
格式,封裝更多支持的格式:
機器人功能拓展
提醒喝水/吃飯功能
通過定時觸發器,訪問雲函數,可以實現該功能。
例如 index.py
代碼:
import os
import json
import urllib.request
def main_handler(event, context):
url = os.environ.get("url")
data = "每天都要多喝水哦,不要忘記補充水分".encode("utf-8")
req_attr = urllib.request.Request(url, data)
resp_attr = urllib.request.urlopen(req_attr)
return resp_attr.read().decode("utf-8")
serverless.yaml
文件:
MyRobot_Water:
component: '@serverless/tencent-scf'
inputs:
name: MyRobot_Water
runtime: Python3.6
timeout: 3
codeUri: ./water
description: 提醒喝水的機器人
region: ap-guangzhou
environment:
variables:
url: https://service-lf3ug84s-1256773370.gz.apigw.tencentcs.com/release/push
handler: index.main_handler
memorySize: 64
tags:
app: myrobot
events:
- timer:
name: timer
parameters:
cronExpression: '0 */30 9-17 * * * *'
enable: true
這個函數就是每天上午 9 點到下午 5 點,每 30 分鍾提醒喝一次水。
天氣預報/當地新聞功能
想要實現天氣預報/新聞播報的功能,我們可以通過已有的新聞接口來實現,以騰訊雲的雲市場為例,尋找一個新聞類 API 接口:
根據 API 文檔,可以看到請求地址是:https://service-aqvnjmiq-1257101137.gz.apigw.tencentcs.com/release/news/search
Get 方法可以攜帶一個參數:keyword
,作為目標的關鍵詞,編寫代碼:
import ssl, hmac, base64, hashlib, os, json
from datetime import datetime as pydatetime
from urllib.parse import urlencode
from urllib.request import Request, urlopen
def main_handler(event, context):
source = "market"
datetime = pydatetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
signStr = "x-date: %s\nx-source: %s" % (datetime, source)
sign = base64.b64encode(hmac.new(os.environ.get('secretKey').encode('utf-8'), signStr.encode('utf-8'), hashlib.sha1).digest())
auth = 'hmac id="%s", algorithm="hmac-sha1", headers="x-date x-source", signature="%s"' % (os.environ.get("secretId"), sign.decode('utf-8'))
headers = {
'X-Source': source,
'X-Date': datetime,
'Authorization': auth,
}
queryParams = {'keyword': '科技新聞'}
url = 'https://service-aqvnjmiq-1257101137.gz.apigw.tencentcs.com/release/news/search'
if len(queryParams.keys()) > 0:
url = url + '?' + urlencode(queryParams)
content = ""
for eve in json.loads(urlopen(Request(url, headers=headers)).read().decode("utf-8"))["result"]["list"][0:5]:
content = content + "* [%s](%s) \n"%(eve['title'], eve['url'])
if content:
urlopen(Request(os.environ.get('url'), content.encode("utf-8")))
serverless.yaml
文件:
MyRobot_News:
component: '@serverless/tencent-scf'
inputs:
name: MyRobot_News
runtime: Python3.6
timeout: 3
codeUri: ./news
description: 新聞推送
region: ap-guangzhou
environment:
variables:
url: https://service-lf3ug84s-1256773370.gz.apigw.tencentcs.com/release/push
secretId: 雲市場密鑰信息
secretKey: 雲市場密鑰信息
handler: index.main_handler
memorySize: 64
tags:
app: myrobot
events:
- timer:
name: timer
parameters:
cronExpression: '0 0 */8 * * * *'
enable: true
運行效果如下,每天早晨 8 點為我們推送當日科技新聞:
監控告警功能
我們還可以賦予企業微信機器人監控告警的能力:
index.py
文件:
import os
import urllib.request
def getStatusCode(url):
return urllib.request.urlopen(url).getcode()
def main_handler(event, context):
url = "http://www.anycodes.cn"
if getStatusCode(url) == 200:
print("您的網站%s可以訪問!" % (url))
else:
urllib.request.urlopen(urllib.request.Request(os.environ.get('url'), ("您的網站%s 不可以訪問!" % (url)).encode("utf-8")))
return None
serverless.yaml
文件:
MyRobot_Monitor:
component: '@serverless/tencent-scf'
inputs:
name: MyRobot_Monitor
runtime: Python3.6
timeout: 3
codeUri: ./monitor
description: 網站監控
region: ap-guangzhou
environment:
variables:
url: https://service-lf3ug84s-1256773370.gz.apigw.tencentcs.com/release/push
handler: index.main_handler
memorySize: 64
tags:
app: myrobot
events:
- timer:
name: timer
parameters:
cronExpression: '0 */30 * * * * *'
enable: true
部署完成后,網站的監控腳本就已經啟動,每 30 分鍾檢查一次網站是否可用。如果不可用,則會發送告警:
思路發散
企業微信機器人可以通過 Serverless 架構被賦予更多更有趣的功能,那么還有哪些產品可以和 Serverless 架構相結合,變得更有趣呢?
隨着網絡技術的不斷發展,IoT 技術也逐漸走進了千家萬戶,無論是掃地機器人、智能窗簾等智能家居,還是智能音箱等娛樂設施,IoT 技術都變得可見可及。
小愛同學,也能通過 Serverless 架構,快速開發出專屬新功能。
首先我們去「小愛同學」的開放平台注冊賬號,並且提交認證:
接下來對小愛同學的定制化功能進行研究。如圖所示,在開發文檔中,我們可以看到小愛同學開發者平台為我們提供的能力信息,同樣我們也可以查看到 request 以及 response 的詳細信息:
繼續進行項目設計。本文的目標是通過對小愛同學說出「進入雲+社區」等關鍵詞,為用戶返回騰訊雲+社區的最新熱門文章的題目和簡介。
整個流程如圖所示:
函數代碼編寫:
# -*- coding: utf8 -*-
import json
import logging
import urllib.request
import urllib.parse
logging.basicConfig(level=logging.NOTSET)
def main_handler(event, context):
host = "https://cloud.tencent.com/"
path = "developer/services/ajax/column/article?action=FetchColumnHomeArticleList"
json_data = {
"action": "FetchColumnHomeArticleList",
"payload": {
"pageNumber": 1,
"pageSize": 20,
"version": 1
}
}
data = json.dumps(json_data).encode("utf-8")
request_attr = urllib.request.Request(url=host + path, data=data)
response_attr = urllib.request.urlopen(request_attr).read().decode("utf-8")
json_resp = json.loads(response_attr)
logging.debug(json_resp)
temp_str = "文章題目為%s,主要內容是%s"
list_data = json_resp["data"]["list"][0:5]
art_list = [temp_str % (eve["title"], eve["abstract"]) for eve in list_data]
news_str = '''今日騰訊雲加社區熱門文章如下:%s''' % ("、".join(art_list))
logging.debug(news_str)
xiaoai_response = {"version": "1.0",
"response": {
"open_mic": False,
"to_speak": {
"type": 0,
"text": news_str
}
},
"is_session_end": False
}
return xiaoai_response
完成之后,使用 Serverless Framework 進行部署,綁定 API 網關觸發器,通過請求地址可以看到測試結果:
可以看到,我們已經獲得到目標數據。此時,我們在小愛同學官網,創建技能開發,在填寫好和保存好基本信息之后,選擇配置服務,填寫 HTTPS 中的測試化地址:
配置完成之后,開始測試,如下圖所示,可以看到,當我們輸入預定的命令「打開雲加社區」,系統會正確回取到結果信息,並且給我們返回:
至此,我們通過 Serverless 架構,成功地為「小愛同學」開發了一項新功能,我們還可以將這個新功能就拿去發布和上線!
總結
本文僅僅是一次簡單的示范,通過企業微信機器人與 Serverless 架構的結合,用若干代碼實現提醒功能、新聞推送功能以及業務監控告警功能。同時我們還發散思維,讓小愛同學也擁有了新的能力。
不難看出,通過 Serverless 架構,我們可以快速為產品增加一些新的功能,賦予新的生機!
Serverless Framework 30 天試用計划
我們誠邀您來體驗最便捷的 Serverless 開發和部署方式。在試用期內,相關聯的產品及服務均提供免費資源和專業的技術支持,幫助您的業務快速、便捷地實現 Serverless!
One More Thing
3 秒你能做什么?喝一口水,看一封郵件,還是 —— 部署一個完整的 Serverless 應用?
復制鏈接至 PC 瀏覽器訪問:https://serverless.cloud.tencent.com/deploy/express
3 秒極速部署,立即體驗史上最快的 Serverless HTTP 實戰開發!
傳送門:
- GitHub: github.com/serverless
- 官網:serverless.com
歡迎訪問:Serverless 中文網,您可以在 最佳實踐 里體驗更多關於 Serverless 應用的開發!