Go語言實現建立websocket連接並定時發送心跳


在工作中需要建立大量websocket連接來模擬並發用戶,剛開始是使用jmeter第三方websocket包來實現,但在壓測過程中發現jmeter的多線程太消耗系統資源,大約建立8000左右的連接時負載機資源就已被占用的差不多,改用go來實現。

一下為部分實現代碼:

package main

import (
   "encoding/json"
   "flag"
   "fmt"
   "github.com/gorilla/websocket"
   "github.com/satori/go.uuid"
   "log"
   "net/url"
   "regexp"
   "sync"
   "time"
)

type Connetion struct {
   con *websocket.Conn
   mutex sync.Mutex
}

 //定義命令行參數 var addr = flag.String("a", "ip:port", "http service address") var clientUuid = flag.String("u", "", "uuid") var c = flag.Int("c", 5, "number of connections") func webSocketConn(wg sync.WaitGroup, msg []byte) { u := url.URL{Scheme: "ws", Host: *addr} var dialer *websocket.Dialer conn, _, err := dialer.Dial(u.String(), nil) if err != nil { fmt.Println(err) return } werr := conn.WriteMessage(websocket.TextMessage, msg) //fmt.Printf("發送信息:%s\n",string(msg)) //2. 創建一個正則表達式對象 regx, _:= regexp.Compile("\\w{8}(-\\w{4}){3}-\\w{12}") //3. 利用正則表達式對象, 匹配指定的字符串 res := regx.FindString(string(msg)) //fmt.Printf("匹配的clientId:%s\n",res) msg1 := make(map[string]interface{}) msg2 := make(map[string]interface{}) msg2["success"] = true msg1["clientId"] = res msg1["messageType"] = "ACK" msg1["messageId"] = "5e7d6e31e4b079c2b22876d8" msg1["data"] = msg2 aMsg, _ := json.Marshal(msg1) if werr != nil { fmt.Println(werr) } //申明定時器10s,設置心跳時間為10s ticker := time.NewTicker(time.Second * 10) connect := &Connetion{ con: conn, } //開啟多線程 go connect.timeWriter(ticker, conn) for { _, message, err := conn.ReadMessage() if err != nil { fmt.Println("read:", err) return }
//互斥鎖 connect.mutex.Lock() werr2 :
= connect.con.WriteMessage(websocket.TextMessage, aMsg) connect.mutex.Unlock() if werr2 != nil { fmt.Println(werr2) } fmt.Printf("received: %s\n", message) } wg.Done() // 每次把計數器-1 } func (con *Connetion)timeWriter(ticker *time.Ticker, c *websocket.Conn) { for { <-ticker.C err := c.SetWriteDeadline(time.Now().Add(10 * time.Second)) //fmt.Println(time.Now().Format(time.UnixDate)) if err != nil { log.Printf("ping error: %s\n", err.Error()) } con.mutex.Lock() if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { log.Printf("ping error: %s\n", err.Error()) } con.mutex.Unlock() } } func NewConnMsg() []byte { msg := make(map[string]interface{}) uuid1,_ := uuid.NewV4() //fmt.Printf("uuid值:%s\n",uuid1) id := uuid1.String() if *clientUuid == "" { msg["clientId"] = id } else { msg["clientId"] = *clientUuid } msg["messageId"] = "5e7d6e31e4b079c2b22876d8" msg["messageType"] = "LOGIN" msg["targetType"] = "PASSENGER" bMsg, _ := json.Marshal(msg) //log.Printf("%s\n", bMsg) return bMsg } func run() { flag.Parse() //命令行參數 var wg sync.WaitGroup //申明計數器 for i := 0; i < *c; i++ { wg.Add(1) //設置計數器初始值 go webSocketConn(wg, NewConnMsg()) if (*c % 200) == 0 { time.Sleep(time.Millisecond * 50) //fmt.Println(time.Now().Format(time.UnixDate)) } } log.Printf("creaate websocket connections: %v\n", *c) wg.Wait() //阻塞代碼的運行,直到計數器地值減為0 } func main() { //NewConnMsg() run() }

由於websocket不支持並發寫入,所以需要在寫消息的地方都需加上互斥鎖,不要則會報錯:concurrent write to websocket connection go


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM