上文Go websocket 聊天室demo以及k8s 部署 后面有一個問題, 如果2個客服端 分別來鏈接到不同的服務 如何發布消息了?
如圖:
cliant A ->ServerA ----推送消息到kafka---->推送消息到 服務A和B---->服務AB都去尋找自己的client集合------>發送消息給具體的客戶端【有可能是廣播 也可能是指定具體的用戶】
代碼:
package main import ( "context" "encoding/json" "fmt" "log" "net" "net/http" "time" "github.com/Shopify/sarama" "github.com/gorilla/websocket" uuid "github.com/satori/go.uuid" ) //客戶端管理 type ClientManager struct { //客戶端 map 儲存並管理所有的長連接client,在線的為true,不在的為false clients map[string]*Client //web端發送來的的message我們用broadcast來接收,並最后分發給所有的client broadcast chan []byte //新創建的長連接client register chan *Client //新注銷的長連接client unregister chan *Client } //客戶端 Client type Client struct { //用戶id id string //連接的socket socket *websocket.Conn //發送的消息 send chan []byte //服務器IP ip string } //會把Message格式化成json type Message struct { //消息struct Sender string `json:"sender,omitempty"` //發送者 Recipient string `json:"recipient,omitempty"` //接收者 Content string `json:"content,omitempty"` //內容 } //創建客戶端管理者 var manager = ClientManager{ broadcast: make(chan []byte), register: make(chan *Client), unregister: make(chan *Client), clients: make(map[string]*Client), } func (manager *ClientManager) start() { for { select { case conn := <-manager.register: manager.clients[conn.id] = conn //把返回連接成功的消息json格式化 jsonMessage, _ := json.Marshal(&Message{Content: "/A new socket has connected. " + conn.ip, Sender: conn.id}) //manager.send(jsonMessage) syncProducer(jsonMessage) //如果連接斷開了 case conn := <-manager.unregister: //判斷連接的狀態,如果是true,就關閉send,刪除連接client的值 if _, ok := manager.clients[conn.id]; ok { close(conn.send) delete(manager.clients, conn.id) jsonMessage, _ := json.Marshal(&Message{Content: "/A socket has disconnected. " + conn.ip, Sender: conn.id}) //manager.send(jsonMessage) syncProducer(jsonMessage) } //廣播 case message := <-manager.broadcast: manager.send(message) } } } //定義客戶端管理的send方法 func (manager *ClientManager) send(message []byte) { obj := &Message{} _ = json.Unmarshal(message, obj) for id, conn := range manager.clients { if obj.Sender == id { //continue } if obj.Recipient == conn.id || len(obj.Recipient) < 1 { conn.send <- message } } } //定義客戶端結構體的read方法 func (c *Client) read() { defer func() { manager.unregister <- c _ = c.socket.Close() }() for { //讀取消息 _, str, err := c.socket.ReadMessage() //如果有錯誤信息,就注銷這個連接然后關閉 if err != nil { manager.unregister <- c _ = c.socket.Close() break } //如果沒有錯誤信息就把信息放入broadcast message := &Message{} _ = json.Unmarshal(str, message) message.Sender = c.id jsonMessage, _ := json.Marshal(&message) fmt.Println(fmt.Sprintf("read Id:%s, msg:%s", c.id, string(jsonMessage))) //manager.broadcast <- jsonMessage syncProducer(jsonMessage) } } func (c *Client) write() { defer func() { _ = c.socket.Close() }() for { select { //從send里讀消息 case message, ok := <-c.send: //如果沒有消息 if !ok { _ = c.socket.WriteMessage(websocket.CloseMessage, []byte{}) return } //有消息就寫入,發送給web端 _ = c.socket.WriteMessage(websocket.TextMessage, message) fmt.Println(fmt.Sprintf("write Id:%s, msg:%s", c.id, string(message))) } } } func main() { fmt.Println("Starting application...") //開一個goroutine執行開始程序 go manager.start() initial() //注冊默認路由為 /ws ,並使用wsHandler這個方法 http.HandleFunc("/ws", wsHandler) http.HandleFunc("/health", healthHandler) //監聽本地的8011端口 fmt.Println("chat server start.....") _ = http.ListenAndServe(":8080", nil) } func wsHandler(res http.ResponseWriter, req *http.Request) { //將http協議升級成websocket協議 conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(res, req, nil) if err != nil { http.NotFound(res, req) return } //每一次連接都會新開一個client,client.id通過uuid生成保證每次都是不同的 client := &Client{id: uuid.Must(uuid.NewV4(),nil).String(), socket: conn, send: make(chan []byte), ip: LocalIp()} //注冊一個新的鏈接 manager.register <- client //啟動協程收web端傳過來的消息 go client.read() //啟動協程把消息返回給web端 go client.write() } func healthHandler(res http.ResponseWriter, _ *http.Request) { _, _ = res.Write([]byte("ok")) } func LocalIp() string { address, _ := net.InterfaceAddrs() var ip = "localhost" for _, address := range address { if ipAddress, ok := address.(*net.IPNet); ok && !ipAddress.IP.IsLoopback() { if ipAddress.IP.To4() != nil { ip = ipAddress.IP.String() } } } return ip } /////kafka var topic = "chat" var producer sarama.SyncProducer func initial() { config := sarama.NewConfig() // Version 必須大於等於 V0_10_2_0 config.Version = sarama.V0_10_2_1 config.Consumer.Return.Errors = true fmt.Println("start connect kafka") // 開始連接kafka服務器 address := []string{"192.168.100.30:9092"} client, err := sarama.NewClient(address, config) if err != nil { fmt.Println("connect kafka failed; err", err) return } groupId := LocalIp() group, err := sarama.NewConsumerGroupFromClient(groupId, client) if err != nil { fmt.Println("connect kafka failed; err", err) return } go ConsumerGroup(group, []string{topic}) config = sarama.NewConfig() // 等待服務器所有副本都保存成功后的響應 config.Producer.RequiredAcks = sarama.WaitForAll // 隨機的分區類型:返回一個分區器,該分區器每次選擇一個隨機分區 config.Producer.Partitioner = sarama.NewRandomPartitioner // 是否等待成功和失敗后的響應 config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second producer, err = sarama.NewSyncProducer(address, config) if err != nil { log.Printf("sarama.NewSyncProducer err, message=%s \n", err) } } //同生產步消息模式 func syncProducer(data []byte) { msg := &sarama.ProducerMessage{ Topic: topic, Partition: 0, Value: sarama.ByteEncoder(data), } partition, offset, err := producer.SendMessage(msg) if err != nil { fmt.Println(fmt.Sprintf("Send message Fail %v", err)) } fmt.Printf("send message success topic=%s Partition = %d, offset=%d content:=%s \n", topic, partition, offset ,string(data)) } func ConsumerGroup(group sarama.ConsumerGroup, topics []string) { // 檢查錯誤 go func() { for err := range group.Errors() { fmt.Println("group errors : ", err) } }() ctx := context.Background() fmt.Println("start get msg") // for 是應對 consumer rebalance for { // 需要監聽的主題 handler := ConsumerGroupHandler{} // 啟動kafka消費組模式,消費的邏輯在上面的 ConsumeClaim 這個方法里 err := group.Consume(ctx, topics, handler) if err != nil { fmt.Println("consume failed; err : ", err) return } } } type ConsumerGroupHandler struct{} func (ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error { //sess.MarkOffset(topic, 0, 0, "") return nil } func (ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error { return nil } // 這個方法用來消費消息的 func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // 獲取消息 for msg := range claim.Messages() { fmt.Printf("kafka receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) //發送消息 manager.send(msg.Value) // 將消息標記為已使用 sess.MarkMessage(msg, "") sess.Commit() } return nil }
客服端:
<html> <head> <title>Golang Chat</title> <script type="application/javascript" src="jquery-1.12.4.js"></script> <script type="text/javascript"> $(function() { var conn; var msg = $("#msg"); var log = $("#log"); var recipient =$("#recipient") function appendLog(msg) { var d = log[0] var doScroll = d.scrollTop == d.scrollHeight - d.clientHeight; msg.appendTo(log) if (doScroll) { d.scrollTop = d.scrollHeight - d.clientHeight; } } $("#form").submit(function() { if (!conn) { return false; } if (!msg.val()) { return false; } content= JSON.stringify({"content":msg.val(),"recipient" :recipient.val()}) conn.send(content); msg.val(""); return false }); if (window["WebSocket"]) { conn = new WebSocket("ws://chatserver.go.com/ws"); conn.onclose = function(evt) { appendLog($("<div><b>Connection Closed.</b></div>")) } conn.onmessage = function(evt) { appendLog($("<div/>").text(evt.data)) } } else { appendLog($("<div><b>WebSockets Not Support.</b></div>")) } }); </script> <style type="text/css"> html { overflow: hidden; } body { overflow: hidden; padding: 0; margin: 0; width: 100%; height: 100%; background: gray; } #log { background: white; margin: 0; padding: 0.5em 0.5em 0.5em 0.5em; position: absolute; top: 0.5em; left: 0.5em; right: 0.5em; bottom: 3em; overflow: auto; } #form { padding: 0 0.5em 0 0.5em; margin: 0; position: absolute; bottom: 1em; left: 0px; width: 100%; overflow: hidden; } </style> </head> <body> <div id="log"></div> <form id="form"> <label>接受者<label><input type="text" id ="recipient"/> <input type="submit" value="發送" /> <input type="text" id="msg" size="64"/> </form> </body> </html>
運行效果: