負載均衡器在 Web 架構中扮演着非常重要的角色,被用於為多個后端分發流量負載,提升服務的伸縮性。負載均衡器后面配置了多個服務,在某個服務發生故障時,負載均衡器可以很快地選擇另一個可用的服務,所以整體的服務可用性得到了提升。
自研負載均衡器的工作原理
負載均衡器在向后端服務分發流量負載時可以使用幾種策略。
-
輪詢(Round Robin)——均勻地分發流量負載,假設所有后端服務都具有同樣的處理能力;
-
加權輪詢(Weighted Round Robin)——根據后端服務的處理能力加權;
-
最少連接(Least Connections)——優先把流量負載分發給連接最少的后端。
我打算實現最簡單的策略,即輪詢。
一、輪詢選擇
輪詢的原理非常簡單,后端服務有平等的機會處理任務。
如上圖所示,輪詢過程是循環不斷的,但我們不能直接使用這種方式。
如果其中的一個后端發生故障該怎么辦?我們當然不希望把流量定向給它。我們只能把流量路由給正常運行的服務。
二、定義結構體
我們需要知道所有后端服務器的狀態,比如一個服務是死了還是活着,還要跟蹤它們的 url。
我們可以定義一個結構體來保存后端的信息。
type Backend struct { URL *url.URL Alive bool mux sync.RWMutex ReverseProxy *httputil.ReverseProxy }
我們還需要一種方式來跟蹤所有后端,以及一個計算器變量。
type ServerPool struct { backends []*Backend current uint64 }
三、使用 ReverseProxy
之前說過,負載均衡器的作用是將流量負載分發到后端的服務器上,並將結果返回給客戶端。
根據 Go 語言文檔的描述:
這剛好是我們想要的,所以我們沒有必要重復發明輪子。我們可以直接使用 ReverseProxy 來中繼初始請求。
u, _ := url.Parse("http://localhost:8080") rp := httputil.NewSingleHostReverseProxy(u) // 初始化服務器,並添加處理器 http.HandlerFunc(rp.ServeHTTP)
我們使用 httputil.NewSingleHostReverseProxy(url) 初始化一個反向代理,這個反向代理可以將請求中繼到指定的 url。在上面的例子中,所有的請求都會被中繼到 localhost:8080,結果被發送給初始客戶端。
如果看一下 ServeHTTP 方法的簽名,我們會發現它返回的是一個 HTTP 處理器,所以我們可以將它傳給 http 的 HandlerFunc。
在我們的例子中,可以使用 Backend 里的 URL 來初始化 ReverseProxy,這樣反向代理就會把請求路由給指定的 URL。
四、選擇的過程
在選擇下一個服務器時,我們需要跳過已經死掉的服務器,但不管怎樣,我們都需要一個計數器。
因為有很多客戶端連接到負載均衡器,所以發生竟態條件是不可避免的。為了防止這種情況,我們需要使用 mutex 給 ServerPool 加鎖。但這樣做對性能會有影響,更何況我們並不是真想要給 ServerPool 加鎖,我們只是想要更新計數器。
最理想的解決方案是使用原子操作,Go 語言的 atomic 包為此提供了很好的支持。
func (s *ServerPool) NextIndex() int { return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends))) }
我們通過原子操作遞增 current 的值,並通過對 slice 的長度取模來獲得當前索引值。所以,返回值總是介於 0 和 slice 的長度之間,畢竟我們想要的是索引值,而不是總的計數值。
五、選擇可用的后端
我們需要循環將請求路由到后端的每一台服務器上,但要跳過已經死掉的服務。
GetNext() 方法總是返回一個介於 0 和 slice 長度之間的值,如果這個值對應的服務器不可用,我們需要遍歷一遍 slice。
遍歷一遍 slice
如上圖所示,我們將從 next 位置開始遍歷整個列表,但在選擇索引時,需要保證它處在 slice 的長度之內,這個可以通過取模運算來保證。
在找到可用的服務器后,我們將它標記為當前可用服務器。
上述操作對應的代碼如下。
// GetNextPeer 返回下一個可用的服務器 func (s *ServerPool) GetNextPeer() *Backend { // 遍歷后端列表,找到可用的服務器 next := s.NextIndex() l := len(s.backends) + next // 從 next 開始遍歷 for i := next; i < l; i++ { idx := i % len(s.backends) // 通過取模計算獲得索引 // 如果找到一個可用的服務器,將它作為當前服務器。如果不是初始的那個,就把它保存下來 if s.backends[idx].IsAlive() { if i != next { atomic.StoreUint64(&s.current, uint64(idx)) // 標記當前可用服務器 } return s.backends[idx] } } return nil }
六、避免竟態條件
我們還需要考慮到一些情況,比如不同的 goroutine 會同時訪問 Backend 結構體里的一個變量。
我們知道,讀取這個變量的 goroutine 比修改這個變量的要多,所以我們使用 RWMutex 來串行化對 Alive 的訪問操作。
// SetAlive func (b *Backend) SetAlive(alive bool) { b.mux.Lock() b.Alive = alive b.mux.Unlock() } // 如果后端還活着,IsAlive 返回 true func (b *Backend) IsAlive() (alive bool) { b.mux.RLock() alive = b.Alive b.mux.RUnlock() return }
七、對請求進行負載均衡
在有了上述的這些東西之后,接下來就可以用下面這個簡單的辦法來對請求進行負載均衡。只有當所有的后端服務都死掉它才會退出。
// lb 對入向請求進行負載均衡 func lb(w http.ResponseWriter, r *http.Request) { peer := serverPool.GetNextPeer() if peer != nil { peer.ReverseProxy.ServeHTTP(w, r) return } http.Error(w, "Service not available", http.StatusServiceUnavailable) }
這個方法可以作為 HandlerFunc 傳給 http 服務器。
server := http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: http.HandlerFunc(lb), }
八、只將流量路由給活躍的服務器
現在的 lb 方法存在一個嚴重的問題,我們並不知道后端服務是否處於正常的運行狀態。為此,我們需要嘗試發送請求,檢查一下它是否正常。
我們可以通過兩種方法來達到目的:
-
主動(Active):在處理當前請求時,如果發現當前的后端沒有響應,就把它標記為已宕機。
-
被動(Passive):在固定的時間間隔內對后端服務器執行 ping 操作,以此來檢查服務器的狀態。
-
九、主動模式
在發生錯誤時,ReverseProxy 會觸發 ErrorHandler 回調函數,我們可以利用它來檢查故障。
1
proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) { log.Printf("[%s] %s\n", serverUrl.Host, e.Error()) retries := GetRetryFromContext(request) if retries < 3 { select { case <-time.After(10 * time.Millisecond): ctx := context.WithValue(request.Context(), Retry, retries+1) proxy.ServeHTTP(writer, request.WithContext(ctx)) } return } // 在三次重試之后,把這個后端標記為宕機 serverPool.MarkBackendStatus(serverUrl, false) // 同一個請求在嘗試了幾個不同的后端之后,增加計數 attempts := GetAttemptsFromContext(request) log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts) ctx := context.WithValue(request.Context(), Attempts, attempts+1) lb(writer, request.WithContext(ctx)) }
我們使用強大的閉包來實現錯誤處理器,它可以捕獲外部變量錯誤。它會檢查重試次數,如果小於 3,就把同一個請求發送給同一個后端服務器。之所以要進行重試,是因為服務器可能會發生臨時錯誤,在經過短暫的延遲(比如服務器沒有足夠的 socket 來接收請求)之后,服務器又可以繼續處理請求。我們使用了一個計時器,把重試時間間隔設定在 10 毫秒左右。
在重試失敗之后,我們就把這個后端標記為宕機。
接下來,我們要找出新的可用后端。我們使用 context 來維護重試次數。在增加重試次數后,我們把它傳回 lb,選擇一個新的后端來處理請求。
但我們不能不加以限制,所以我們會在進一步處理請求之前檢查是否達到了最大的重試上限。
我們從請求里拿到重試次數,如果已經達到最大上限,就終結這個請求。
// lb 對傳入的請求進行負載均衡 func lb(w http.ResponseWriter, r *http.Request) { attempts := GetAttemptsFromContext(r) if attempts > 3 { log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path) http.Error(w, "Service not available", http.StatusServiceUnavailable) return } peer := serverPool.GetNextPeer() if peer != nil { peer.ReverseProxy.ServeHTTP(w, r) return } http.Error(w, "Service not available", http.StatusServiceUnavailable) }
十、context 的使用
我們可以利用 context 在 http 請求中保存有用的信息,用它來跟蹤重試次數。
首先,我們需要為 context 指定鍵。我們建議使用不沖突的整數值作為鍵,而不是字符串。Go 語言提供了 iota 關鍵字,可以用來實現遞增的常量,每一個常量都包含了唯一值。這是一種完美的整型鍵解決方案。
const ( Attempts int = iota Retry )
然后我們就可以像操作 HashMap 那樣獲取這個值。默認返回值要視情況而定。
// GetAttemptsFromContext 返回嘗試次數 func GetRetryFromContext(r *http.Request) int { if retry, ok := r.Context().Value(Retry).(int); ok { return retry } return 0 }
十一、被動模式
被動模式就是定時對后端執行 ping 操作,以此來檢查它們的狀態。
我們通過建立 TCP 連接來執行 ping 操作。如果后端及時響應,我們就認為它還活着。當然,如果你喜歡,也可以改成直接調用某個端點,比如 /status。切記,在執行完操作后要關閉連接,避免給服務器造成額外的負擔,否則服務器會一直維護連接,最后把資源耗盡。
// isAlive 通過建立 TCP 連接檢查后端是否還活着 func isBackendAlive(u *url.URL) bool { timeout := 2 * time.Second conn, err := net.DialTimeout("tcp", u.Host, timeout) if err != nil { log.Println("Site unreachable, error: ", err) return false } _ = conn.Close() // 不需要維護連接,把它關閉 return true }
現在我們可以遍歷服務器,並標記它們的狀態。
// HealthCheck 對后端執行 ping 操作,並更新狀態 func (s *ServerPool) HealthCheck() { for _, b := range s.backends { status := "up" alive := isBackendAlive(b.URL) b.SetAlive(alive) if !alive { status = "down" } log.Printf("%s [%s]\n", b.URL, status) } }
我們可以啟動定時器來定時發起 ping 操作。
// healthCheck 返回一個 routine,每 2 分鍾檢查一次后端的狀態 func healthCheck() { t := time.NewTicker(time.Second * 20) for { select { case <-t.C: log.Println("Starting health check...") serverPool.HealthCheck() log.Println("Health check completed") } } }
在上面的例子中,<-t.C 每 20 秒返回一個值,select 會檢測到這個事件。在沒有 default case 的情況下,select 會一直等待,直到有滿足條件的 case 被執行。
最后,使用單獨的 goroutine 來執行。
go healthCheck()
十二、測試
負載均衡代碼

1 package main 2 3 import ( 4 "context" 5 "flag" 6 "fmt" 7 "log" 8 "net" 9 "net/http" 10 "net/http/httputil" 11 "net/url" 12 "strings" 13 "sync" 14 "sync/atomic" 15 "time" 16 ) 17 18 const ( 19 Attempts int = iota 20 Retry 21 ) 22 23 //定義結構體 24 //后端保存關於服務器的數據 25 type Backend struct { 26 URL *url.URL 27 Alive bool 28 mux sync.RWMutex 29 ReverseProxy *httputil.ReverseProxy 30 } 31 32 //跟蹤所有后端,以及一個計算器變量 33 type ServerPool struct { 34 backends []*Backend 35 current uint64 36 } 37 38 // SetAlive 39 func (b *Backend) SetAlive(alive bool) { 40 b.mux.Lock() 41 b.Alive = alive 42 b.mux.Unlock() 43 } 44 45 // 如果后端還活着,IsAlive 返回 true 46 func (b *Backend) IsAlive() (alive bool) { 47 b.mux.RLock() 48 alive = b.Alive 49 b.mux.RUnlock() 50 return 51 } 52 53 // lb 對入向請求進行負載均衡 54 func lb(w http.ResponseWriter, r *http.Request) { 55 //重試次數,如果已經達到最大上限,就終結這個請求 56 attempts := GetAttemptsFromContext(r) 57 if attempts > 3 { 58 log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path) 59 http.Error(w, "Service not available", http.StatusServiceUnavailable) 60 return 61 } 62 63 peer := serverPool.GetNextPeer() 64 if peer != nil { 65 peer.ReverseProxy.ServeHTTP(w, r) 66 return 67 } 68 http.Error(w, "Service not available", http.StatusServiceUnavailable) 69 } 70 71 // 自動增加計數器並返回一個索引,使用atomic 保證原子性 72 //通過原子操作遞增 current 的值,並通過對 slice 的長度取模來獲得當前索引值。所以,返回值總是介於 0 和 slice 的長度之間,畢竟我們想要的是索引值,而不是總的計數值。 73 func (s *ServerPool) NextIndex() int { 74 return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends))) 75 } 76 77 // GetNextPeer返回下一個活動的對等點以獲取連接 78 //找到可用的服務器后,我們將它標記為當前可用服務器。 79 func (s *ServerPool) GetNextPeer() *Backend { 80 // 循環整個后端,找出一個活動后端 81 next := s.NextIndex() 82 l := len(s.backends) + next // 從next開始移動一個完整的周期 83 for i := next; i < l; i++ { 84 idx := i % len(s.backends) // take an index by modding 85 if s.backends[idx].IsAlive() { // if we have an alive backend, use it and store if its not the original one 86 if i != next { 87 atomic.StoreUint64(&s.current, uint64(idx)) 88 } 89 return s.backends[idx] 90 } 91 } 92 return nil 93 } 94 95 // GetAttemptsFromContext 返回嘗試次數 96 func GetRetryFromContext(r *http.Request) int { 97 if retry, ok := r.Context().Value(Retry).(int); ok { 98 return retry 99 } 100 return 0 101 } 102 103 // healthCheck runs a routine for check status of the backends every 2 mins 104 // healthCheck 返回一個 routine,每 2 分鍾檢查一次后端的狀態 105 func healthCheck() { 106 t := time.NewTicker(time.Second * 20) 107 for { 108 select { 109 case <-t.C: 110 log.Println("Starting health check...") 111 serverPool.HealthCheck() 112 log.Println("Health check completed") 113 } 114 } 115 } 116 117 // HealthCheck ping后端並更新狀態 118 func (s *ServerPool) HealthCheck() { 119 for _, b := range s.backends { 120 status := "up" 121 alive := isBackendAlive(b.URL) 122 b.SetAlive(alive) 123 if !alive { 124 status = "down" 125 } 126 log.Printf("%s [%s]\n", b.URL, status) 127 } 128 } 129 130 // isAlive checks whether a backend is Alive by establishing a TCP connection 131 // isAlive 通過建立 TCP 連接檢查后端是否還活着 132 func isBackendAlive(u *url.URL) bool { 133 timeout := 2 * time.Second 134 conn, err := net.DialTimeout("tcp", u.Host, timeout) 135 if err != nil { 136 log.Println("Site unreachable, error: ", err) 137 return false 138 } 139 _ = conn.Close() // 不需要維護連接,把它關閉 140 return true 141 } 142 143 // GetAttemptsFromContext returns the attempts for request 144 func GetAttemptsFromContext(r *http.Request) int { 145 if attempts, ok := r.Context().Value(Attempts).(int); ok { 146 return attempts 147 } 148 return 1 149 } 150 151 // AddBackend to the server pool 152 func (s *ServerPool) AddBackend(backend *Backend) { 153 s.backends = append(s.backends, backend) 154 } 155 156 // MarkBackendStatus changes a status of a backend 157 func (s *ServerPool) MarkBackendStatus(backendURL *url.URL, alive bool) { 158 for _, b := range s.backends { 159 if b.URL.String() == backendURL.String() { 160 b.SetAlive(alive) 161 break 162 } 163 } 164 } 165 166 var serverPool ServerPool 167 168 func main() { 169 var serverList string 170 var port int 171 flag.StringVar(&serverList, "backends", "http://localhost:3302,http://localhost:3303,http://localhost:3304", "Load balanced backends, use commas to separate") 172 flag.IntVar(&port, "port", 3031, "Port to serve") 173 flag.Parse() 174 175 if len(serverList) == 0 { 176 log.Fatal("Please provide one or more backends to load balance") 177 } 178 179 // 解析服務器 180 tokens := strings.Split(serverList, ",") 181 //range類似迭代器,可以遍歷 182 for _, tok := range tokens { 183 serverURL, err := url.Parse(tok) 184 if err != nil { 185 log.Fatal(err) 186 } 187 188 //使用 httputil.NewSingleHostReverseProxy(url) 初始化一個反向代理 189 proxy := httputil.NewSingleHostReverseProxy(serverURL) 190 191 //在發生錯誤時,ReverseProxy 會觸發 ErrorHandler 回調函數,我們可以利用它來檢查故障。 192 proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) { 193 log.Printf("[%s] %s\n", serverURL.Host, e.Error()) 194 retries := GetRetryFromContext(request) 195 if retries < 3 { 196 select { 197 case <-time.After(10 * time.Millisecond): 198 ctx := context.WithValue(request.Context(), Retry, retries+1) 199 proxy.ServeHTTP(writer, request.WithContext(ctx)) 200 } 201 return 202 } 203 204 // 在三次重試之后,把這個后端標記為宕機 205 serverPool.MarkBackendStatus(serverURL, false) 206 207 // 同一個請求在嘗試了幾個不同的后端之后,增加計數 208 attempts := GetAttemptsFromContext(request) 209 log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts) 210 ctx := context.WithValue(request.Context(), Attempts, attempts+1) 211 lb(writer, request.WithContext(ctx)) 212 } 213 214 serverPool.AddBackend(&Backend{ 215 URL: serverURL, 216 Alive: true, 217 ReverseProxy: proxy, 218 }) 219 log.Printf("Configured server: %s\n", serverURL) 220 221 } 222 // 初始化服務器,並添加處理器 223 // create http server 224 server := http.Server{ 225 Addr: fmt.Sprintf(":%d", port), 226 Handler: http.HandlerFunc(lb), 227 } 228 229 // start health checking 230 go healthCheck() 231 232 log.Printf("Load Balancer started at :%d\n", port) 233 if err := server.ListenAndServe(); err != nil { 234 log.Fatal(err) 235 } 236 }
直接運行就好了
web服務器代碼

package main import ( "flag" "fmt" "log" "net/http" "strconv" ) func sayhelloName(w http.ResponseWriter, r *http.Request) { r.ParseForm() //解析參數,默認是不會解析的 fmt.Fprintln(w, "Hello moon!") //這個寫入到w的是輸出到客戶端的 fmt.Fprintln(w, "count:"+strconv.Itoa(port)) //這個寫入到w的是輸出到客戶端的 count++ fmt.Fprintln(w, "count:"+strconv.Itoa(count)) //這個寫入到w的是輸出到客戶端的 } var port int var count int func main() { flag.IntVar(&port, "port", 3302, "duan端口號,默認3302") // 【必須調用】從 arguments 中解析注冊的 flag flag.Parse() fmt.Printf("port=%v \n", port) http.HandleFunc("/", sayhelloName) //設置訪問的路由 err := http.ListenAndServe(":"+strconv.Itoa(port), nil) //設置監聽的端口 if err != nil { log.Fatal("ListenAndServe: ", err) } }
使用方法
go run web.go -port=3302 go run web.go -port=3303 go run web.go -port=3304
這里web.go是代碼文件名
測試
訪問http://localhost:3031/並刷新
十三、結論
這篇文章提到了很多東西:
-
輪詢;
-
Go 語言標准庫里的 ReverseProxy;
-
mutex;
-
原子操作;
-
閉包;
-
回調;
-
select。
這個簡單的負載均衡器還有很多可以改進的地方:
-
使用堆來維護后端的狀態,以此來降低搜索成本;
-
收集統計信息;
-
實現加權輪詢或最少連接策略;
-
支持文件配置。
代碼地址:
https://github.com/kasvith/simplelb/
原文連接:
https://kasvith.github.io/posts/lets-create-a-simple-lb-go/