傳統業務實現 Websocket 並不難,然而函數計算基本上都是事件驅動,不支持長鏈接操作。如果將函數計算與 API 網關結合,是否可以有 Websocket 的實現方案呢?
API 網關觸發器實現 Websocket
WebSocket 協議是基於 TCP 的一種新的網絡協議。它實現了瀏覽器與服務器全雙工 (full-duplex) 通信,即允許服務器主動發送信息給客戶端。WebSocket 在服務端有數據推送需求時,可以主動發送數據至客戶端。而原有 HTTP 協議的服務端對於需推送的數據,僅能通過輪詢或 long poll 的方式來讓客戶端獲得。
由於雲函數是無狀態且以觸發式運行,即在有事件到來時才會被觸發。因此,為了實現 WebSocket,雲函數 SCF 與 API 網關相結合,通過 API 網關承接及保持與客戶端的連接。您可以認為雲函數與 API 網關一起實現了服務端。當客戶端有消息發出時,會先傳遞給 API 網關,再由 API 網關觸發雲函數執行。當服務端雲函數要向客戶端發送消息時,會先由雲函數將消息 POST 到 API 網關的反向推送鏈接,再由 API 網關向客戶端完成消息的推送。
具體的實現架構如下:
對於 WebSocket 的整個生命周期,主要由以下幾個事件組成:
- 連接建立:客戶端向服務端請求建立連接並完成連接建立;
- 數據上行:客戶端通過已經建立的連接向服務端發送數據;
- 數據下行:服務端通過已經建立的連接向客戶端發送數據;
- 客戶端斷開:客戶端要求斷開已經建立的連接;
- 服務端斷開:服務端要求斷開已經建立的連接。
對於 WebSocket 整個生命周期的事件,雲函數和 API 網關的處理過程如下:
- 連接建立:客戶端與 API 網關建立 WebSocket 連接,API 網關將連接建立事件發送給 SCF;
- 數據上行:客戶端通過 WebSocket 發送數據,API 網關將數據轉發送給 SCF;
- 數據下行:SCF 通過向 API 網關指定的推送地址發送請求,API 網關收到后會將數據通過 WebSocket 發送給客戶端;
- 客戶端斷開:客戶端請求斷開連接,API 網關將連接斷開事件發送給 SCF;
- 服務端斷開:SCF 通過向 API 網關指定的推送地址發送斷開請求,API 網關收到后斷開 WebSocket 連接。
因此,雲函數與 API 網關之間的交互,需要由 3 類雲函數來承載:
- 注冊函數:在客戶端發起和 API 網關之間建立 WebSocket 連接時觸發該函數,通知 SCF WebSocket 連接的 secConnectionID。通常會在該函數記錄 secConnectionID 到持久存儲中,用於后續數據的反向推送;
- 清理函數:在客戶端主動發起 WebSocket 連接中斷請求時觸發該函數,通知 SCF 准備斷開連接的 secConnectionID。通常會在該函數清理持久存儲中記錄的該 secConnectionID;
- 傳輸函數:在客戶端通過 WebSocket 連接發送數據時觸發該函數,告知 SCF 連接的 secConnectionID 以及發送的數據。通常會在該函數處理業務數據。例如,是否將數據推送給持久存儲中的其他 secConnectionID。
Websocket 功能實現
根據騰訊雲官網提供的該功能的整體架構圖:
這里我們可以使用對象存儲 COS 作為持久化的方案,當用戶建立鏈接存儲 ConnectionId
到 COS 中,當用戶斷開連接刪除該鏈接 ID。
其中注冊函數:
# -*- coding: utf8 -*-
import os
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))
def main_handler(event, context):
print("event is %s" % event)
connectionID = event['websocket']['secConnectionID']
retmsg = {}
retmsg['errNo'] = 0
retmsg['errMsg'] = "ok"
retmsg['websocket'] = {
"action": "connecting",
"secConnectionID": connectionID
}
cosClient.put_object(
Bucket=bucket,
Body='websocket'.encode("utf-8"),
Key=str(connectionID),
EnableMD5=False
)
return retmsg
傳輸函數:
# -*- coding: utf8 -*-
import os
import json
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))
sendbackHost = os.environ.get("url")
def Get_ConnectionID_List():
response = cosClient.list_objects(
Bucket=bucket,
)
return [eve['Key'] for eve in response['Contents']]
def send(connectionID, data):
retmsg = {}
retmsg['websocket'] = {}
retmsg['websocket']['action'] = "data send"
retmsg['websocket']['secConnectionID'] = connectionID
retmsg['websocket']['dataType'] = 'text'
retmsg['websocket']['data'] = data
requests.post(sendbackHost, json=retmsg)
def main_handler(event, context):
print("event is %s" % event)
connectionID_List = Get_ConnectionID_List()
connectionID = event['websocket']['secConnectionID']
count = len(connectionID_List)
data = event['websocket']['data'] + "(===Online people:" + str(count) + "===)"
for ID in connectionID_List:
if ID != connectionID:
send(ID, data)
return "send success"
清理函數:
# -*- coding: utf8 -*-
import os
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))
sendbackHost = os.environ.get("url")
def main_handler(event, context):
print("event is %s" % event)
connectionID = event['websocket']['secConnectionID']
retmsg = {}
retmsg['websocket'] = {}
retmsg['websocket']['action'] = "closing"
retmsg['websocket']['secConnectionID'] = connectionID
requests.post(sendbackHost, json=retmsg)
cosClient.delete_object(
Bucket=bucket,
Key=str(connectionID),
)
return event
Yaml 文件如下:
Conf:
component: "serverless-global"
inputs:
region: ap-guangzhou
bucket: chat-cos-1256773370
secret_id:
secret_key:
myBucket:
component: '@serverless/tencent-cos'
inputs:
bucket: ${Conf.bucket}
region: ${Conf.region}
restApi:
component: '@serverless/tencent-apigateway'
inputs:
region: ${Conf.region}
protocols:
- http
- https
serviceName: ChatDemo
environment: release
endpoints:
- path: /
method: GET
protocol: WEBSOCKET
serviceTimeout: 800
function:
transportFunctionName: ChatTrans
registerFunctionName: ChatReg
cleanupFunctionName: ChatClean
ChatReg:
component: "@serverless/tencent-scf"
inputs:
name: ChatReg
codeUri: ./code
handler: reg.main_handler
runtime: Python3.6
region: ${Conf.region}
environment:
variables:
region: ${Conf.region}
bucket: ${Conf.bucket}
secret_id: ${Conf.secret_id}
secret_key: ${Conf.secret_key}
url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw
ChatTrans:
component: "@serverless/tencent-scf"
inputs:
name: ChatTrans
codeUri: ./code
handler: trans.main_handler
runtime: Python3.6
region: ${Conf.region}
environment:
variables:
region: ${Conf.region}
bucket: ${Conf.bucket}
secret_id: ${Conf.secret_id}
secret_key: ${Conf.secret_key}
url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw
ChatClean:
component: "@serverless/tencent-scf"
inputs:
name: ChatClean
codeUri: ./code
handler: clean.main_handler
runtime: Python3.6
region: ${Conf.region}
environment:
variables:
region: ${Conf.region}
bucket: ${Conf.bucket}
secret_id: ${Conf.secret_id}
secret_key: ${Conf.secret_key}
url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw
注意,這里需要先部署 API 網關。當部署完成,獲得回推地址,將回推地址以 url 的形式寫入到對應函數的環境變量中:
理論上應該是可以通過 ${restApi.url[0].internalDomain}
自動獲得到 url 的,但是我並沒有成功獲得到這個 url,只能先部署 API 網關,獲得到這個地址之后,再重新部署。
部署完成之后,我們可以編寫 HTML 代碼,實現可視化的 Websocket Client,其核心的 JavaScript 代碼為:
window.onload = function () {
var conn;
var msg = document.getElementById("msg");
var log = document.getElementById("log");
function appendLog(item) {
var doScroll = log.scrollTop === log.scrollHeight - log.clientHeight;
log.appendChild(item);
if (doScroll) {
log.scrollTop = log.scrollHeight - log.clientHeight;
}
}
document.getElementById("form").onsubmit = function () {
if (!conn) {
return false;
}
if (!msg.value) {
return false;
}
conn.send(msg.value);
//msg.value = "";
var item = document.createElement("div");
item.innerText = "發送↑:";
appendLog(item);
var item = document.createElement("div");
item.innerText = msg.value;
appendLog(item);
return false;
};
if (window["WebSocket"]) {
//替換為websocket連接地址
conn = new WebSocket("ws://service-01era6ni-1256773370.gz.apigw.tencentcs.com/release/");
conn.onclose = function (evt) {
var item = document.createElement("div");
item.innerHTML = "<b>Connection closed.</b>";
appendLog(item);
};
conn.onmessage = function (evt) {
var item = document.createElement("div");
item.innerText = "接收↓:";
appendLog(item);
var messages = evt.data.split('\n');
for (var i = 0; i < messages.length; i++) {
var item = document.createElement("div");
item.innerText = messages[i];
appendLog(item);
}
};
} else {
var item = document.createElement("div");
item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
appendLog(item);
}
};
完成之后,我們打開兩個頁面,進行測試:
總結
通過雲函數 + API 網關進行 Websocket 的實踐,絕對不僅僅是一個聊天工具這么簡單,它可以用在很多方面,例如通過 Websocket 進行實時日志系統的制作等。
單獨的函數計算,僅僅是一個計算平台,只有和周邊的 BaaS 結合,才能展示出 Serverless 架構的價值和真正的能力。這也是為什么很多人說 Serverless=FaaS+BaaS 的一個原因。
期待更多小伙伴,可以通過 Serverless 架構,創造出更多有趣的應用。
Serverless Framework 30 天試用計划
我們誠邀您來體驗最便捷的 Serverless 開發和部署方式。在試用期內,相關聯的產品及服務均提供免費資源和專業的技術支持,幫助您的業務快速、便捷地實現 Serverless!
One More Thing
3 秒你能做什么?喝一口水,看一封郵件,還是 —— 部署一個完整的 Serverless 應用?
復制鏈接至 PC 瀏覽器訪問:https://serverless.cloud.tencent.com/deploy/express
3 秒極速部署,立即體驗史上最快的 Serverless HTTP 實戰開發!
傳送門:
- GitHub: github.com/serverless
- 官網:serverless.com
歡迎訪問:Serverless 中文網,您可以在 最佳實踐 里體驗更多關於 Serverless 應用的開發!