長連接斷開的原因
- 連接超時,瀏覽器自動斷開連接
- 進程被殺死
- 不可抗拒因素
根據不同情況,高效保活的方式
- 連接超時:心跳機制
- 進程保活
- 斷線重連
重點心跳機制
- 產物
- 心跳包
- 心跳應答
輪詢與心跳區別
- 輪詢一次相當於:建立一次TCP連接+斷開連接
- 心跳:在已有的連接上進行保活
心跳設計要點
- 心跳包的規格(內容&大小)
- 心跳發送間隔時間(按照項目的特性進行判斷)
- 斷線重連機制(核心= 如何判斷長連接的有效性)
心跳具體實現(基於sse的長連接)
-
客戶端做心跳機制:客戶端長時間沒有反應,使用心跳機制,證明客戶端的存在
-
服務端做心跳機制:服務端長時間沒有反應,使用心跳機制,證明服務端還存在
-
服務端做心跳機制
思考點:
- 如何判斷連接中斷信號(單獨的思考,在本次的代碼中,沒有用於跟心跳機制有關,以后有想法,會補上)
notify := w.(http.CloseNotifier).CloseNotify()
// log.Println("notify:",<- notify) 會直接堵住的,因為notify它接收連接中斷信號
go func(){
// 太迷了,正確想法就是:只能接收異常的信號,就是網絡中斷的信號
fmt.Println("接收連接中斷信號")
<-notify
userData[r.RemoteAddr] = r.RemoteAddr
offUser <- r.RemoteAddr
log.Println(r.RemoteAddr,"just close")
}()
- 如何將一一對應的客戶端和服務端保存
// 接收發送給客戶端數據
type RW struct{
Rw http.ResponseWriter
T time.Time
}
var rw = make(map[int64]*RW)
// 考慮使用map。記得當正確的數據發送給客戶端之后要將對應的map鍵值刪除
delete(rw,a) // 當發送完之后,就要將這個客戶端刪除了。a時鍵值
- 利用golang中的time.Ticker機制,監聽是否有服務端等待,然后進行輪詢保活。心跳機制重點(利用協程進行監聽)
// 保活,心跳
go func(){
defer func(){
if err := recover();err!=nil{
fmt.Println(err)
}
}()
fmt.Println("開啟保活")
keepAliveInterval := time.Duration(6000)
fmt.Println(keepAliveInterval)
ticker := time.NewTicker(3*time.Second)
for {
select{
case <-ticker.C:
fmt.Println("保活,心跳機制")
t1 := time.Now()
for _,value:= range rw{
fmt.Println(value)
if t1.Sub(value.T)>keepAliveInterval{
fmt.Println("進入保活")
f,ok:=value.Rw.(http.Flusher)
if !ok{
fmt.Fprintf(value.Rw,"不能用來做sse")
return
}
fmt.Fprintf(value.Rw,"data:請耐心等待,我正在努力的加載數據\n\n")
f.Flush()
}
}
}
}
}()
樣例代碼
server.go
package main
import(
"fmt"
"log"
"time"
"sync"
"net/http"
)
// 接收發送給客戶端數據
type RW struct{
Rw http.ResponseWriter
T time.Time
}
var offUser = make(chan string,0)
var userData = make(map[string]string)
var rw = make(map[int64]*RW)
var i int64 = 0
var lock sync.Mutex
func init(){
log.SetFlags(log.Ltime|log.Lshortfile)
}
func sseService(w http.ResponseWriter,r *http.Request){
var a int64 // 用來接收key值
defer func(){
if err := recover();err!=nil{
fmt.Println(err)
}
}()
lock.Lock()
i++
a=i
lock.Unlock()
// 提取get請求參數
fmt.Println("a =",a)
f,ok := w.(http.Flusher)
if !ok{
http.Error(w,"cannot support sse",http.StatusInternalServerError)
return
}
// 用於監聽客戶端時候已經斷開了連接
notify := w.(http.CloseNotifier).CloseNotify()
// log.Println("notify:",<- notify) 會直接堵住的,因為notify它接收網絡中斷信號
go func(){
fmt.Println("接收關閉信號")
<-notify
offUser <- r.RemoteAddr
log.Println(r.RemoteAddr,"just close")
}()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Access-Control-Allow-Origin","*")
fmt.Fprintf(w,"data:welcome\n\n")
f.Flush()
// 將當前的w保存
fmt.Println("心跳")
t := time.Now()
rr := &RW{Rw:w,T:t}
fmt.Println("rr =",rr)
rw[a] = rr
// 模擬服務端接收發送數據阻塞
fmt.Println("模擬服務端發送數據阻塞")
time.Sleep(time.Second*30)
fmt.Fprintf(w,"data:12345加油\n\n")
f.Flush()
delete(rw,a) // 當發送完之后,就要將這個客戶端刪除了
}
func testClose(w http.ResponseWriter,r *http.Request){
fmt.Println("remoteAddr:",r.RemoteAddr)
fmt.Println("userData:",userData)
// 用於監聽客戶端時候已經斷開了連接
notify := w.(http.CloseNotifier).CloseNotify()
go func(){
fmt.Println("接收連接中斷信號")
<-notify
userData[r.RemoteAddr] = r.RemoteAddr
offUser <- r.RemoteAddr
log.Println(r.RemoteAddr,"just close")
}()
time.Sleep(time.Second*1)
fmt.Fprintln(w,"這里任意數字")
}
func main(){
fmt.Println("sse1")
// 獲取中斷的客戶端
go func(){
fmt.Println("監聽關閉的客戶端")
for{
select{
case user:=<-offUser:
log.Println("userOff:",user)
}
}
}()
// 保活,心跳
go func(){
defer func(){
if err := recover();err!=nil{
fmt.Println(err)
}
}()
fmt.Println("開啟保活")
keepAliveInterval := time.Duration(6000)
fmt.Println(keepAliveInterval)
ticker := time.NewTicker(3*time.Second)
for {
select{
case <-ticker.C:
fmt.Println("保活,心跳機制")
t1 := time.Now()
for _,value:= range rw{
fmt.Println(value)
if t1.Sub(value.T)>keepAliveInterval{
fmt.Println("進入保活")
f,ok:=value.Rw.(http.Flusher)
if !ok{
fmt.Fprintf(value.Rw,"不能用來做sse")
return
}
fmt.Fprintf(value.Rw,"data:請耐心等待,我正在努力的加載數據\n\n")
f.Flush()
}
}
}
}
}()
http.HandleFunc("/sse",sseService)
http.HandleFunc("/testClose",testClose)
http.ListenAndServe(":8080",nil)
}
client(angular)
sse(){
let that = this
if ("EventSource" in window){
console.log("可以使用EventSource")
}else{
return
}
var url = "http://localhost:8080/sse?pid="+12345
var es = new EventSource(url)
// 監聽事件
// 連接事件
es.onopen = function(e:any){
console.log("我進來啦")
console.log(e)
}
// message事件
es.onmessage = function(e){
that.Data = e.data
if (e.data=="12345加油"){ // 后端通知前端結束發送信息
console.log("12345加油,這是服務端正確想發送的數據")
es.close()
}else{
console.log(e.data)
}
}
es.addEventListener("error",(e:any)=>{
// 這里的e要聲明變量,否則回報沒有readyState屬性
console.log("e.target",e.target)
console.log("SSEERROR:",e.target.readyState)
if(e.target.readyState == 0){
// 重連
console.log("Reconnecting...")
es.close() // 不開啟服務端,直接關閉
}
if(e.target.readyState==2){
// 放棄
console.log("give up.")
}
},false);
}
學習心跳機制附帶的知識點
angular設置輪詢
- setInterval()方法重復調用一個函數或執行一個代碼段,在每次調用之間具有固定的時間延遲
- clearInterval()刪除重復調用
myTest = setInterval(()=>{
var i:number = 1
console.log("輪詢還是心跳")
if(i===4){
return
}
i++
},1500) // 一旦實例化,就會直接運行
test(){
clearInterval(this.myTest) // 清除重復運行函數
}
time.Duration
- Duration的基本單位是納秒
- 作用:打印時間時,根據最合適的時間單位打印;用於時間比較
keepAliveInterval := time.Duration(3)
// 打印數據值
3ns
time.NewTicker
- 創建一個輪詢機制,規定隔一段時間處理一次函數
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func(){
for{
select{
case <-done:
return
case t := <-ticker.C: // 500微秒輪詢一次
fmt.Println("Tick at",t)
}
}
}()
time.Sleep(10*time.Second)
ticker.Stop()
done<-true
fmt.Println("ticker stopper")
總結
- 學到一招:對於有是接口的方法:直接去看相對應實現的源代碼