Go WebSocket 實現


WebSocket是HTML5下的產物,能更好的節省服務器資源和帶寬。常見場景:html5多人游戲、聊天室、協同編輯、基於實時位置的應用、股票實時報價、彈幕、視頻會議、QQ,微信、等等... ...

websocket VS http

相似

都是應用層協議,都基於tcp傳輸協議
跟http有良好的兼容性,ws和http的默認端口都是80,wss和https的默認端口都是443
websocket在握手階段采用http發送數據

差異

http是半雙工,而websocket通過多路復用實現了全雙工
http只能由client主動發起數據請求,而websocket還可以由server主動向client推送數據。在需要及時刷新的場景中,http只能靠client高頻地輪詢,浪費嚴重
http是短連接(也可以實現長連接, HTTP1.1 的連接默認使用長連接),每次數據請求都得經過三次握手重新建立連接,而websocket是長連接
http長連接中每次請求都要帶上header,而websocket在傳輸數據階段不需要帶header

websocket握手協議

Request Header

Sec-Websocket-Version:13
Upgrade:websocket
Connection:Upgrade
Sec-Websocket-Key:duR0pUQxNgBJsRQKj2Jxsw==

Response Header

Upgrade:websocket
Connection:Upgrade
Sec-Websocket-Accept:a1y2oy1zvgHsVyHMx+hZ1AYrEHI=

Upgrade:websocket和Connection:Upgrade指明使用WebSocket協議
Sec-WebSocket-Version 指定Websocket協議版本
Sec-WebSocket-Key是一個Base64 encode的值,是瀏覽器隨機生成的
服務端收到Sec-WebSocket-Key后拼接上一個固定的GUID,進行一次SHA-1摘要,再轉成Base64編碼,得到Sec-WebSocket-Accept返回給客戶端。客戶端對本地的Sec-WebSocket-Key執行同樣的操作跟服務端返回的結果進行對比,如果不一致會返回錯誤關閉連接。如此操作是為了把websocket header跟http header區分開

websocket發送的消息類型有5種:TextMessag、BinaryMessage、CloseMessag、PingMessage、PongMessage
TextMessag和BinaryMessage分別表示發送文本消息和二進制消息
CloseMessage關閉幀,接收方收到這個消息就關閉連接
PingMessage和PongMessage是保持心跳的幀,發送方接收方是PingMessage,接收方發送方是PongMessage,目前瀏覽器沒有相關api發送ping給服務器,只能由服務器發ping給瀏覽器,瀏覽器返回pong消息

gorilla/websocket 概述

Upgrader用於升級 http 請求,把 http 請求升級為長連接的 WebSocket。結構如下:

type Upgrader struct {
    // 升級 websocket 握手完成的超時時間
    HandshakeTimeout time.Duration

    // io 操作的緩存大小,如果不指定就會自動分配。
    ReadBufferSize, WriteBufferSize int

    // 寫數據操作的緩存池,如果沒有設置值,write buffers 將會分配到鏈接生命周期里。
    WriteBufferPool BufferPool

    //按順序指定服務支持的協議,如值存在,則服務會從第一個開始匹配客戶端的協議。
    Subprotocols []string

    // http 的錯誤響應函數,如果沒有設置 Error 則,會生成 http.Error 的錯誤響應。
    Error func(w http.ResponseWriter, r *http.Request, status int, reason error)

    // 如果請求Origin標頭可以接受,CheckOrigin將返回true。 如果CheckOrigin為nil,則使用安全默認值:如果Origin請求頭存在且原始主機不等於請求主機頭,則返回false。
    // 請求檢查函數,用於統一的鏈接檢查,以防止跨站點請求偽造。如果不檢查,就設置一個返回值為true的函數
    CheckOrigin func(r *http.Request) bool

    // EnableCompression 指定服務器是否應嘗試協商每個郵件壓縮(RFC 7692)。 將此值設置為true並不能保證將支持壓縮。 目前僅支持“無上下文接管”模式
    EnableCompression bool
}

func (*Upgrader) Upgrade 函數將 http 升級到 WebSocket 協議。

// responseHeader包含在對客戶端升級請求的響應中。 
// 使用responseHeader指定cookie(Set-Cookie)和應用程序協商的子協議(Sec-WebSocket-Protocol)。
// 如果升級失敗,則升級將使用HTTP錯誤響應回復客戶端
// 返回一個 Conn 指針,使用 Conn 讀寫數據與客戶端通信。
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error)

WebSocket實例

Server.go

package main

import (
	"fmt"
	"github.com/gorilla/websocket"
	"net"
	"net/http"
	"os"
	"strconv"
	"time"
)

type (
	Request struct {
		A int
		B int
	}
	Response struct {
		Sum int
	}
	WsServer struct {
		listener net.Listener
		addr     string
		upgrade  *websocket.Upgrader
	}
)

func CheckError(err error) {
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
}
 
func NewWsServer(port int) *WsServer {
	ws := new(WsServer)
	ws.addr = "0.0.0.0:" + strconv.Itoa(port)
	ws.upgrade = &websocket.Upgrader{
		HandshakeTimeout: 2 * time.Second,
		ReadBufferSize:   1024,
		WriteBufferSize:  1024,
		Error:            func(w http.ResponseWriter, r *http.Request, status int, reason error) {},
		CheckOrigin:      func(r *http.Request) bool { return true },
	}
	return ws
}

func (ws *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/add" {
		httpCode := http.StatusInternalServerError
		phrase := http.StatusText(httpCode)
		http.Error(w, phrase, httpCode)
	}
	for key, values := range r.Header {
		fmt.Printf("%s:%v\n", key, values)
	}
	conn, err := ws.upgrade.Upgrade(w, r, nil)
	if err != nil {
		fmt.Printf("upgrade from http to websocket failed : %v\n", err)
	}
	defer conn.Close()
	_ = conn.SetReadDeadline(time.Now().Add(30 * time.Second))
	for {
		var request Request
		err = conn.ReadJSON(&request)
		if err != nil {
			fmt.Printf("Mage read error: %v\n", err)
			break
		}
		fmt.Printf("receive request a=%d b=%d\n", request.A, request.B)
		sum := request.A + request.B
		response := Response{
			Sum: sum,
		}
		err = conn.WriteJSON(&response)
		CheckError(err)
	}
}

func main() {
	ws := NewWsServer(3434)
	listener, err := net.Listen("tcp", ws.addr)
	CheckError(err)
	ws.listener = listener
	err = http.Serve(listener, ws)
	CheckError(err)
}

Client.go

package main

import (
	"fmt"
	"github.com/gorilla/websocket"
	"net/http"
	"os"
	"time"
)
type (
	Request struct {
		A int
		B int
	}
	Response struct {
		Sum int
	}
)

func CheckError(err error)  {
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
}


func main()  {
	dialer := &websocket.Dialer{}
	header := http.Header{
		"name":[]string{"Tome","Jim"},
	}
	conn, resp, err := dialer.Dial("ws://127.0.0.1:3434/add",header)
	CheckError(err)
	for key,values := range resp.Header {
		fmt.Printf("%s:%v\n",key,values)
	}
	defer  conn.Close()

	for {
		request := Request{A: 3,B: 9}
		err = conn.WriteJSON(request)
		CheckError(err)

		var response Response
		err = conn.ReadJSON(&response)
		fmt.Printf("response sum=%d\n",response.Sum)
		time.Sleep(time.Second)
	}

}

多人聊天室案例

Hub:持有每一個client的指針,broadcast管道里有數據時,把它寫入每一個client的send管道中,注銷client時關閉client的send管道。

client:前端(Browser)請求建立websocket連接時,為這條websocket連接專門啟用一個協程,創建一個client,把前端請求發來的數據寫入到hub中的broadcast管道中,把自身管道里的數據發送寫入給前端,跟前端的連接斷開時,請求從hub中注銷自己。

前端(Browser):當打開瀏覽器界面時,前端會請求建立websocket連接,關閉瀏覽器界面時會主動關閉websocket連接。

存活監測:當hub發現client的send管道寫不進數據時,把client注銷掉,client給websocket連接設置一個讀超時,並周期性地給前端發ping消息,如果沒有收到pong消息,則下一次的conn.read()會報超時錯誤,此時client關閉websocket連接。

hub.go

package main

type Hub struct {
	clients    map[*Client]bool //維護所有的client
	broadcast  chan []byte      //廣播消息
	register   chan *Client     //注冊
	unregister chan *Client     //注銷

}

func NewHub() *Hub {
	return &Hub{
		clients:    make(map[*Client]bool),
		broadcast:  make(chan []byte), //同步管道,確保hub消息不堆積,同時多個client給hub發數據會阻塞
		register:   make(chan *Client),
		unregister: make(chan *Client),
	}
}

func (hub *Hub) Run() {
	for {
		select {
		case client := <-hub.register:
			//client上線,注冊
			hub.clients[client] = true
		case client := <-hub.unregister:
			//查詢當前client是否存在
			if _, exists := hub.clients[client]; exists {
				//注銷client 通道
				close(client.send)
				//刪除注銷的client
				delete(hub.clients, client)
			}
		case msg := <-hub.broadcast:
			//將message廣播給每一位client
			for client := range hub.clients {
				select {
				case client.send <- msg:
				//異常client處理
				default:
					close(client.send)
					//刪除異常的client
					delete(hub.clients, client)
				}
			}
		}
	}
}

client.go

package main

import (
	"bytes"
	"fmt"
	"github.com/gorilla/websocket"
	"net/http"
	"time"
)

var (
	pongWait         = 60 * time.Second  //等待時間
	pingPeriod       = 9 * pongWait / 10 //周期54s
	maxMsgSize int64 = 512               //消息最大長度
	writeWait        = 10 * time.Second  //
)
var (
	newLine = []byte{'\n'}
	space   = []byte{' '}
)
var upgrader = websocket.Upgrader{
	HandshakeTimeout: 2 * time.Second, //握手超時時間
	ReadBufferSize:   1024,            //讀緩沖大小
	WriteBufferSize:  1024,            //寫緩沖大小
	CheckOrigin:      func(r *http.Request) bool { return true },
	Error:            func(w http.ResponseWriter, r *http.Request, status int, reason error) {},
}

type Client struct {
	send      chan []byte
	hub       *Hub
	conn      *websocket.Conn
	frontName []byte //前端的名字,用於展示在消息前面
}

func (client *Client) read() {
	defer func() {
		//hub中注銷client
		client.hub.unregister <- client
		fmt.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
		//關閉websocket管道
		client.conn.Close()
	}()
	//一次從管管中讀取的最大長度
	client.conn.SetReadLimit(maxMsgSize)
	//連接中,每隔54秒向客戶端發一次ping,客戶端返回pong,所以把SetReadDeadline設為60秒,超過60秒后不允許讀
	_ = client.conn.SetReadDeadline(time.Now().Add(pongWait))
	//心跳
	client.conn.SetPongHandler(func(appData string) error {
		//每次收到pong都把deadline往后推遲60秒
		_ = client.conn.SetReadDeadline(time.Now().Add(pongWait))
		return nil
	})

	for {
		//如果前端主動斷開連接,運行會報錯,for循環會退出。注冊client時,hub中會關閉client.send管道
		_, msg, err := client.conn.ReadMessage()
		if err != nil {
			//如果以意料之外的關閉狀態關閉,就打印日志
			if websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway) {
				fmt.Printf("read from websocket err: %v\n", err)
			}
			//ReadMessage失敗,關閉websocket管道、注銷client,退出
			break
		} else {
			//換行符替換成空格,去除首尾空格
			message := bytes.TrimSpace(bytes.Replace(msg, newLine, space, -1))
			if len(client.frontName) == 0 {
				//賦給frontName,不進行廣播
				client.frontName = message
				fmt.Printf("%s online\n", string(client.frontName))
			} else {
				//要廣播的內容前面加上front的名字,從websocket連接里讀出數據,發給hub的broadcast
				client.hub.broadcast <- bytes.Join([][]byte{client.frontName, message}, []byte(": "))
			}
		}
	}
}

//從hub的broadcast那兒讀限數據,寫到websocket連接里面去
func (client *Client) write() {
	//給前端發心跳,看前端是否還存活
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		//ticker不用就stop,防止協程泄漏
		ticker.Stop()
		fmt.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
		//給前端寫數據失敗,關閉連接
		client.conn.Close()
	}()

	for {
		select {
		//正常情況是hub發來了數據。如果前端斷開了連接,read()會觸發client.send管道的關閉,該case會立即執行。從而執行!ok里的return,從而執行defer
		case msg, ok := <-client.send:
			//client.send該管道被hub關閉
			if !ok {
				//寫一條關閉信息就可以結束一切
				_ = client.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			//10秒內必須把信息寫給前端(寫到websocket連接里去),否則就關閉連接
			_ = client.conn.SetWriteDeadline(time.Now().Add(writeWait))
                        //通過NextWriter創建一個新的writer,主要是為了確保上一個writer已經被關閉,即它想寫的內容已經flush到conn里去
			if writer, err := client.conn.NextWriter(websocket.TextMessage); err != nil {
				return
			} else {
				_, _ = writer.Write(msg)
				_, _ = writer.Write(newLine) //每發一條消息,都加一個換行符
				//為了提升性能,如果client.send里還有消息,則趁這一次都寫給前端
				n := len(client.send)
				for i := 0; i < n; i++ {
					_, _ = writer.Write(<-client.send)
					_, _ = writer.Write(newLine)
				}
				if err := writer.Close(); err != nil {
					return //結束一切
				}
			}
		case <-ticker.C:
			_ = client.conn.SetWriteDeadline(time.Now().Add(writeWait))
			//心跳保持,給瀏覽器發一個PingMessage,等待瀏覽器返回PongMessage
			if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return //寫websocket連接失敗,說明連接出問題了,該client可以over了
			}
		}
	}
}

func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil) //http升級為websocket協議
	if err != nil {
		fmt.Printf("upgrade error: %v\n", err)
		return
	}
	fmt.Printf("connect to client %s\n", conn.RemoteAddr().String())
	//每來一個前端請求,就會創建一個client
	client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
	//向hub注冊client
	client.hub.register <- client

	//啟動子協程,運行ServeWs的協程退出后子協程也不會能出
	//websocket是全雙工模式,可以同時read和write
	go client.read()
	go client.write()
}

main.go

package main

import (
	"flag"
	"fmt"
	"net/http"
)

func serveHome(w http.ResponseWriter, r *http.Request) {
	//只允許訪問根路徑
	if r.URL.Path != "/" {
		http.Error(w, "Not Found", http.StatusNotFound)
		return
	}
	//只允許GET請求
	if r.Method != "GET" {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}
	http.ServeFile(w, r, "home.html")
}

func main() {
	//如果命令行不指定port參數,則默認為3434
	port := flag.String("port", "3434", "http service port")
	//解析命令行輸入的port參數
	flag.Parse()
	hub := NewHub()
	go hub.Run()
	//注冊每種請求對應的處理函數
	http.HandleFunc("/", serveHome)
	http.HandleFunc("/ws", func(rw http.ResponseWriter, r *http.Request) {
		ServeWs(hub, rw, r)
	})
	//如果啟動成功,該行會一直阻塞,hub.run()會一直運行
	if err := http.ListenAndServe(":"+*port, nil); err != nil {
		fmt.Printf("start http service error: %s\n", err)
	}
}

//go run main.go --port 3434

home.html

<!DOCTYPE html>
<html lang="en">

<head>
    <title>聊天室</title>
    <script type="text/javascript">
        window.onload = function () {//頁面打開時執行以下初始化內容
            var conn;
            var msg = document.getElementById("msg");
            var log = document.getElementById("log");

            function appendLog(item) {
                var doScroll = log.scrollTop > log.scrollHeight - log.clientHeight - 1;
                log.appendChild(item);
                if (doScroll) {
                    log.scrollTop = log.scrollHeight - log.clientHeight;
                }
            }

            document.getElementById("form").onsubmit = function () {
                if (!conn) {
                    return false;
                }
                if (!msg.value) {
                    return false;
                }
                conn.send(msg.value);
                msg.value = "";
                return false;
            };

            if (window["WebSocket"]) {//如果支持websockte就嘗試連接
                //從瀏覽器的開發者工具里看一下ws的請求頭
                conn = new WebSocket("ws://127.0.0.1:3434/ws");//請求跟websocket服務端建立連接(注意端口要一致)。關閉瀏覽器頁面時會自動斷開連接
                conn.onclose = function (evt) {
                    var item = document.createElement("div")
                    item.innerHTML = "<b>Connection closed.</b>";//連接關閉時打印一條信息
                    appendLog(item);
                };
                conn.onmessage = function (evt) {//如果conn里有消息
                    var messages = evt.data.split('\n');//用換行符分隔每條消息
                    for (var i = 0; i < messages.length; i++) {
                        var item = document.createElement("div");
                        item.innerText = messages[i];//把消息逐條顯示在屏幕上
                        appendLog(item);
                    }
                };
            } else {
                var item = document.createElement("div");
                item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
                appendLog(item);
            }
        };
    </script>
    <style type="text/css">
        html {
            overflow: hidden;
        }

        body {
            overflow: hidden;
            padding: 0;
            margin: 0;
            width: 100%;
            height: 100%;
            background: gray;
        }

        #log {
            background: white;
            margin: 0;
            padding: 0.5em 0.5em 0.5em 0.5em;
            position: absolute;
            top: 0.5em;
            left: 0.5em;
            right: 0.5em;
            bottom: 3em;
            overflow: auto;
        }

        #form {
            padding: 0 0.5em 0 0.5em;
            margin: 0;
            position: absolute;
            bottom: 1em;
            left: 0px;
            width: 100%;
            overflow: hidden;
        }
    </style>
</head>

<body>
    <div id="log"></div>
    <form id="form">
        <input type="submit" value="發送" />
        <input type="text" id="msg" size="100" autofocus />
    </form>
</body>

</html>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM