更好的閱讀體驗建議點擊下方原文鏈接。
原文地址:http://maoqide.live/post/golang/golang-websocket-message-pushing/
使用 golang 的 websocket 框架 melody,實現通用的消息分組推送服務。針對同一推送對象,只起一個后端協程進行廣播推送,減少資源消耗,並提供監控接口查詢當前的協程和websocket連接。
github:ws-notifier
melody
melody 是一個 golang 的 websocket 框架,通過對 websocket 包裝,實現方便的廣播或推送消息給多個指定的 session。
ws-notifier
ws-notifier 在 melody 的基礎上,通過給 Session 添加特定的key,實現對特定的 group 的消息推送,並實現負責后端推送的 worker 和 group 的關聯,對同一group只需起一個 worker 的 goroutine 進行推送,減少后端推送的資源消耗。
example
handler
https://github.com/maoqide/ws-notifier/tree/master/example
使用 golang 的 ticker 對不同組的客戶端推送消息,同一組的客戶端推送消息相同。
n := notifier.Default()
使用 default 配置獲取默認配置的 notifier 實例。
//...
group := strings.Trim(r.RequestURI, "/")
// should be random generated
sessionID := "123456"
// ...
使用請求 URL 的 path 作為 group 的標識,隨機生成的唯一 ID 作為 session 的標識,通過 group 對同一組的客戶端廣播,通過 sessionID 可對某一客戶端單獨推送。
// n.Notify 啟動后端推送 worker,如果已經啟動則直接返回
n.Notify(groupID, tickerWorker, time.Hour*24)
n.HandleRequestWithKeys(w, r, map[string]interface{}{"group": groupID, "id": groupID + "_" + sessionID})
HandleRequestWithKeys
傳入對應 groupID 和 sessionID,將 session 加入 melody 的 hub 中管理,並使 group 和 sessionID 生效。
worker
func tickerWorker(groupID string, sigChan chan int8, n *notifier.Notifier) error {
worker := fmt.Sprintf("ticker_worker_%s_%d", groupID, time.Now().Unix())
fmt.Printf("worker: %s\n", worker)
defer func() {
select {
case sigChan <- 0:
log.Printf("ticker worker: %s exit", worker)
case <-time.After(time.Second * 3):
log.Printf("ticker worker: %s exit after 3s delaying", worker)
}
}()
// ...
}
worker 退出前,要向外部管理 goroutine 發送退出信號,以便 notifier 回收內部 worker map的 worker,待下一個 session 連接時啟動新的 worker。
for {
// ...
select {
case signal := <-sigChan:
log.Printf("receice stop signal %d for ticker worker: %s", signal, worker)
return nil
case <-ticker.C:
// ...
}
// ...
}
推送過程中,接收信號 channel,當收到退出信號后退出循環。