golang+sse+angular的心跳機制、angullar的輪詢機制、time.Duration和time.NewTicker的學習


長連接斷開的原因

  • 連接超時,瀏覽器自動斷開連接
  • 進程被殺死
  • 不可抗拒因素

根據不同情況,高效保活的方式

  • 連接超時:心跳機制
  • 進程保活
  • 斷線重連

重點心跳機制

  • 產物
    • 心跳包
    • 心跳應答

輪詢與心跳區別

  • 輪詢一次相當於:建立一次TCP連接+斷開連接
  • 心跳:在已有的連接上進行保活

心跳設計要點

  • 心跳包的規格(內容&大小)
  • 心跳發送間隔時間(按照項目的特性進行判斷)
  • 斷線重連機制(核心= 如何判斷長連接的有效性)

心跳具體實現(基於sse的長連接)

  • 客戶端做心跳機制:客戶端長時間沒有反應,使用心跳機制,證明客戶端的存在

  • 服務端做心跳機制:服務端長時間沒有反應,使用心跳機制,證明服務端還存在

  • 服務端做心跳機制

思考點:

  • 如何判斷連接中斷信號(單獨的思考,在本次的代碼中,沒有用於跟心跳機制有關,以后有想法,會補上)
notify := w.(http.CloseNotifier).CloseNotify()
	// log.Println("notify:",<- notify) 會直接堵住的,因為notify它接收連接中斷信號
go func(){
	// 太迷了,正確想法就是:只能接收異常的信號,就是網絡中斷的信號
	fmt.Println("接收連接中斷信號")
	<-notify
	userData[r.RemoteAddr] = r.RemoteAddr
	offUser <- r.RemoteAddr
	log.Println(r.RemoteAddr,"just close")
}()
  • 如何將一一對應的客戶端和服務端保存
// 接收發送給客戶端數據
type RW struct{
	Rw http.ResponseWriter
	T time.Time
}
var rw = make(map[int64]*RW)

// 考慮使用map。記得當正確的數據發送給客戶端之后要將對應的map鍵值刪除
delete(rw,a)   // 當發送完之后,就要將這個客戶端刪除了。a時鍵值
  • 利用golang中的time.Ticker機制,監聽是否有服務端等待,然后進行輪詢保活。心跳機制重點(利用協程進行監聽)
// 保活,心跳
	go func(){
		defer func(){
			if err := recover();err!=nil{
				fmt.Println(err)
			}
		}()
		fmt.Println("開啟保活")
		keepAliveInterval := time.Duration(6000)
		fmt.Println(keepAliveInterval)
		ticker := time.NewTicker(3*time.Second)
		for {
			select{
			case <-ticker.C:
				fmt.Println("保活,心跳機制")
				t1 := time.Now()
				for _,value:= range rw{
					fmt.Println(value)
					if t1.Sub(value.T)>keepAliveInterval{
						fmt.Println("進入保活")
						f,ok:=value.Rw.(http.Flusher)
						if !ok{
							fmt.Fprintf(value.Rw,"不能用來做sse")
							return
						}
						fmt.Fprintf(value.Rw,"data:請耐心等待,我正在努力的加載數據\n\n")
						f.Flush()
					}
				}
			}
		}
	}()

樣例代碼

server.go

package main

import(
	"fmt"
	"log"
	"time"
	"sync"
	"net/http"
)

// 接收發送給客戶端數據
type RW struct{
	Rw http.ResponseWriter
	T time.Time
}

var offUser = make(chan string,0)
var userData = make(map[string]string)
var rw = make(map[int64]*RW)
var i int64 = 0
var lock sync.Mutex

func init(){
	log.SetFlags(log.Ltime|log.Lshortfile)
}

func sseService(w http.ResponseWriter,r *http.Request){
	var a int64  // 用來接收key值
	defer func(){
		if err := recover();err!=nil{
			fmt.Println(err)
		}
	}()
	
	lock.Lock()
	i++
	a=i
	lock.Unlock()
	// 提取get請求參數
	fmt.Println("a =",a)
	f,ok := w.(http.Flusher)
	if !ok{
		http.Error(w,"cannot support sse",http.StatusInternalServerError)
		return
	}

	// 用於監聽客戶端時候已經斷開了連接
	notify := w.(http.CloseNotifier).CloseNotify()
	// log.Println("notify:",<- notify) 會直接堵住的,因為notify它接收網絡中斷信號
	go func(){
		fmt.Println("接收關閉信號")
		<-notify
		offUser <- r.RemoteAddr
		log.Println(r.RemoteAddr,"just close")
	}()

	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Transfer-Encoding", "chunked")
	w.Header().Set("Access-Control-Allow-Origin","*")

	fmt.Fprintf(w,"data:welcome\n\n")
	f.Flush()

	// 將當前的w保存
	fmt.Println("心跳")
	t := time.Now()
	rr := &RW{Rw:w,T:t}
	fmt.Println("rr =",rr)
	rw[a] = rr 

	// 模擬服務端接收發送數據阻塞
	fmt.Println("模擬服務端發送數據阻塞")
	time.Sleep(time.Second*30)
	fmt.Fprintf(w,"data:12345加油\n\n")
	f.Flush()
	delete(rw,a)   // 當發送完之后,就要將這個客戶端刪除了
}

func testClose(w http.ResponseWriter,r *http.Request){
	fmt.Println("remoteAddr:",r.RemoteAddr) 
	fmt.Println("userData:",userData)	
	// 用於監聽客戶端時候已經斷開了連接
	notify := w.(http.CloseNotifier).CloseNotify()
	go func(){
		fmt.Println("接收連接中斷信號")
		<-notify
		userData[r.RemoteAddr] = r.RemoteAddr
		offUser <- r.RemoteAddr
		log.Println(r.RemoteAddr,"just close")
	}()
	time.Sleep(time.Second*1)
	fmt.Fprintln(w,"這里任意數字")
}


func main(){
	fmt.Println("sse1")

	// 獲取中斷的客戶端
	go func(){
		fmt.Println("監聽關閉的客戶端")
		for{
			select{
			case user:=<-offUser:
				log.Println("userOff:",user)
			}
		}
	}()

	// 保活,心跳
	go func(){
		defer func(){
			if err := recover();err!=nil{
				fmt.Println(err)
			}
		}()
		fmt.Println("開啟保活")
		keepAliveInterval := time.Duration(6000)
		fmt.Println(keepAliveInterval)
		ticker := time.NewTicker(3*time.Second)
		for {
			select{
			case <-ticker.C:
				fmt.Println("保活,心跳機制")
				t1 := time.Now()
				for _,value:= range rw{
					fmt.Println(value)
					if t1.Sub(value.T)>keepAliveInterval{
						fmt.Println("進入保活")
						f,ok:=value.Rw.(http.Flusher)
						if !ok{
							fmt.Fprintf(value.Rw,"不能用來做sse")
							return
						}
						fmt.Fprintf(value.Rw,"data:請耐心等待,我正在努力的加載數據\n\n")
						f.Flush()
					}
				}
			}
		}
	}()

	http.HandleFunc("/sse",sseService)
	http.HandleFunc("/testClose",testClose)
	http.ListenAndServe(":8080",nil)
}

client(angular)

  sse(){
    let that = this
    if ("EventSource" in window){
      console.log("可以使用EventSource")
    }else{
      return
    }
    var url = "http://localhost:8080/sse?pid="+12345
    var es = new EventSource(url)
    // 監聽事件
    // 連接事件
    es.onopen = function(e:any){
      console.log("我進來啦")
      console.log(e)
    }

    // message事件
    es.onmessage = function(e){
      that.Data = e.data
      if (e.data=="12345加油"){  // 后端通知前端結束發送信息
        console.log("12345加油,這是服務端正確想發送的數據")
        es.close()
      }else{
        console.log(e.data)
      }
    }
    es.addEventListener("error",(e:any)=>{
      // 這里的e要聲明變量,否則回報沒有readyState屬性
      console.log("e.target",e.target)
      console.log("SSEERROR:",e.target.readyState)
      if(e.target.readyState == 0){
        // 重連
        console.log("Reconnecting...")
        es.close()  // 不開啟服務端,直接關閉
      }
      if(e.target.readyState==2){
        // 放棄
        console.log("give up.")
      }
    },false);
  }

學習心跳機制附帶的知識點

angular設置輪詢

  • setInterval()方法重復調用一個函數或執行一個代碼段,在每次調用之間具有固定的時間延遲
  • clearInterval()刪除重復調用
 myTest = setInterval(()=>{
      var i:number = 1
      console.log("輪詢還是心跳")
      if(i===4){
        return
      }
      i++
    },1500)  // 一旦實例化,就會直接運行

  test(){
   clearInterval(this.myTest)   // 清除重復運行函數
  }

time.Duration

  • Duration的基本單位是納秒
  • 作用:打印時間時,根據最合適的時間單位打印;用於時間比較
keepAliveInterval := time.Duration(3) 

// 打印數據值
3ns

time.NewTicker

  • 創建一個輪詢機制,規定隔一段時間處理一次函數
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)

go func(){
	for{
		select{
		case <-done:
			return
		case t := <-ticker.C: // 500微秒輪詢一次
			fmt.Println("Tick at",t)
		}
	}
}()

time.Sleep(10*time.Second)
ticker.Stop()
done<-true
fmt.Println("ticker stopper")

總結

  • 學到一招:對於有是接口的方法:直接去看相對應實現的源代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM