使用 Go 語言徒手擼一個負載均衡器


負載均衡器在 Web 架構中扮演着非常重要的角色,被用於為多個后端分發流量負載,提升服務的伸縮性。負載均衡器后面配置了多個服務,在某個服務發生故障時,負載均衡器可以很快地選擇另一個可用的服務,所以整體的服務可用性得到了提升。

自研負載均衡器的工作原理  

負載均衡器在向后端服務分發流量負載時可以使用幾種策略。

  • 輪詢(Round Robin)——均勻地分發流量負載,假設所有后端服務都具有同樣的處理能力;

  • 加權輪詢(Weighted Round Robin)——根據后端服務的處理能力加權;

  • 最少連接(Least Connections)——優先把流量負載分發給連接最少的后端。

我打算實現最簡單的策略,即輪詢。

 

 

 一、輪詢選擇  

輪詢的原理非常簡單,后端服務有平等的機會處理任務。

 

 

 

 

如上圖所示,輪詢過程是循環不斷的,但我們不能直接使用這種方式。

如果其中的一個后端發生故障該怎么辦?我們當然不希望把流量定向給它。我們只能把流量路由給正常運行的服務。

 二、定義結構體  

 

我們需要知道所有后端服務器的狀態,比如一個服務是死了還是活着,還要跟蹤它們的 url。

我們可以定義一個結構體來保存后端的信息。

type Backend struct {
  URL          *url.URL
  Alive        bool
  mux          sync.RWMutex
  ReverseProxy *httputil.ReverseProxy
}

  我們還需要一種方式來跟蹤所有后端,以及一個計算器變量。

type ServerPool struct {
  backends []*Backend
  current  uint64
}

  三、使用 ReverseProxy  

之前說過,負載均衡器的作用是將流量負載分發到后端的服務器上,並將結果返回給客戶端。

根據 Go 語言文檔的描述:

這剛好是我們想要的,所以我們沒有必要重復發明輪子。我們可以直接使用 ReverseProxy 來中繼初始請求。

 

 

 

u, _ := url.Parse("http://localhost:8080")
rp := httputil.NewSingleHostReverseProxy(u)

// 初始化服務器,並添加處理器
http.HandlerFunc(rp.ServeHTTP)

  

我們使用 httputil.NewSingleHostReverseProxy(url) 初始化一個反向代理,這個反向代理可以將請求中繼到指定的 url。在上面的例子中,所有的請求都會被中繼到 localhost:8080,結果被發送給初始客戶端。

如果看一下 ServeHTTP 方法的簽名,我們會發現它返回的是一個 HTTP 處理器,所以我們可以將它傳給 http 的 HandlerFunc。

在我們的例子中,可以使用 Backend 里的 URL 來初始化 ReverseProxy,這樣反向代理就會把請求路由給指定的 URL。

四、選擇的過程  

在選擇下一個服務器時,我們需要跳過已經死掉的服務器,但不管怎樣,我們都需要一個計數器。

因為有很多客戶端連接到負載均衡器,所以發生竟態條件是不可避免的。為了防止這種情況,我們需要使用 mutex 給 ServerPool 加鎖。但這樣做對性能會有影響,更何況我們並不是真想要給 ServerPool 加鎖,我們只是想要更新計數器。

最理想的解決方案是使用原子操作,Go 語言的 atomic 包為此提供了很好的支持。

func (s *ServerPool) NextIndex() int {
  return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))
}

  我們通過原子操作遞增 current 的值,並通過對 slice 的長度取模來獲得當前索引值。所以,返回值總是介於 0 和 slice 的長度之間,畢竟我們想要的是索引值,而不是總的計數值。

五、選擇可用的后端  

我們需要循環將請求路由到后端的每一台服務器上,但要跳過已經死掉的服務。

GetNext() 方法總是返回一個介於 0 和 slice 長度之間的值,如果這個值對應的服務器不可用,我們需要遍歷一遍 slice。

 

 

 

遍歷一遍 slice

 

如上圖所示,我們將從 next 位置開始遍歷整個列表,但在選擇索引時,需要保證它處在 slice 的長度之內,這個可以通過取模運算來保證。

在找到可用的服務器后,我們將它標記為當前可用服務器。

上述操作對應的代碼如下。

// GetNextPeer 返回下一個可用的服務器
func (s *ServerPool) GetNextPeer() *Backend {
  // 遍歷后端列表,找到可用的服務器
  next := s.NextIndex()
  l := len(s.backends) + next // 從 next 開始遍歷
  for i := next; i < l; i++ {
    idx := i % len(s.backends) // 通過取模計算獲得索引
    // 如果找到一個可用的服務器,將它作為當前服務器。如果不是初始的那個,就把它保存下來
    if s.backends[idx].IsAlive() {
      if i != next {
        atomic.StoreUint64(&s.current, uint64(idx)) // 標記當前可用服務器
      }
      return s.backends[idx]
    }
  }
  return nil
}

  

六、避免竟態條件  

我們還需要考慮到一些情況,比如不同的 goroutine 會同時訪問 Backend 結構體里的一個變量。

我們知道,讀取這個變量的 goroutine 比修改這個變量的要多,所以我們使用 RWMutex 來串行化對 Alive 的訪問操作。

// SetAlive
func (b *Backend) SetAlive(alive bool) {
  b.mux.Lock()
  b.Alive = alive
  b.mux.Unlock()
}

// 如果后端還活着,IsAlive 返回 true
func (b *Backend) IsAlive() (alive bool) {
  b.mux.RLock()
  alive = b.Alive
  b.mux.RUnlock()
  return
}

  七、對請求進行負載均衡  

 

在有了上述的這些東西之后,接下來就可以用下面這個簡單的辦法來對請求進行負載均衡。只有當所有的后端服務都死掉它才會退出。

// lb 對入向請求進行負載均衡
func lb(w http.ResponseWriter, r *http.Request) {
  peer := serverPool.GetNextPeer()
  if peer != nil {
    peer.ReverseProxy.ServeHTTP(w, r)
    return
  }
  http.Error(w, "Service not available", http.StatusServiceUnavailable)
}

  

這個方法可以作為 HandlerFunc 傳給 http 服務器。

server := http.Server{
  Addr:    fmt.Sprintf(":%d", port),
  Handler: http.HandlerFunc(lb),
}   

  

八、只將流量路由給活躍的服務器  

 

現在的 lb 方法存在一個嚴重的問題,我們並不知道后端服務是否處於正常的運行狀態。為此,我們需要嘗試發送請求,檢查一下它是否正常。

我們可以通過兩種方法來達到目的:

  • 主動(Active):在處理當前請求時,如果發現當前的后端沒有響應,就把它標記為已宕機。

  • 被動(Passive):在固定的時間間隔內對后端服務器執行 ping 操作,以此來檢查服務器的狀態。

  •  

九、主動模式  

 

在發生錯誤時,ReverseProxy 會觸發 ErrorHandler 回調函數,我們可以利用它來檢查故障。

1

proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
  log.Printf("[%s] %s\n", serverUrl.Host, e.Error())
  retries := GetRetryFromContext(request)
  if retries < 3 {
    select {
      case <-time.After(10 * time.Millisecond):
        ctx := context.WithValue(request.Context(), Retry, retries+1)
        proxy.ServeHTTP(writer, request.WithContext(ctx))
      }
      return
    }

  // 在三次重試之后,把這個后端標記為宕機
  serverPool.MarkBackendStatus(serverUrl, false)

  // 同一個請求在嘗試了幾個不同的后端之后,增加計數
  attempts := GetAttemptsFromContext(request)
  log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts)
  ctx := context.WithValue(request.Context(), Attempts, attempts+1)
  lb(writer, request.WithContext(ctx))
}

  

我們使用強大的閉包來實現錯誤處理器,它可以捕獲外部變量錯誤。它會檢查重試次數,如果小於 3,就把同一個請求發送給同一個后端服務器。之所以要進行重試,是因為服務器可能會發生臨時錯誤,在經過短暫的延遲(比如服務器沒有足夠的 socket 來接收請求)之后,服務器又可以繼續處理請求。我們使用了一個計時器,把重試時間間隔設定在 10 毫秒左右。

在重試失敗之后,我們就把這個后端標記為宕機。

接下來,我們要找出新的可用后端。我們使用 context 來維護重試次數。在增加重試次數后,我們把它傳回 lb,選擇一個新的后端來處理請求。

但我們不能不加以限制,所以我們會在進一步處理請求之前檢查是否達到了最大的重試上限。

我們從請求里拿到重試次數,如果已經達到最大上限,就終結這個請求。

// lb 對傳入的請求進行負載均衡
func lb(w http.ResponseWriter, r *http.Request) {
  attempts := GetAttemptsFromContext(r)
  if attempts > 3 {
    log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)
    http.Error(w, "Service not available", http.StatusServiceUnavailable)
    return
  }

  peer := serverPool.GetNextPeer()
  if peer != nil {
    peer.ReverseProxy.ServeHTTP(w, r)
    return
  }
  http.Error(w, "Service not available", http.StatusServiceUnavailable)
}

  十、context 的使用  

 

我們可以利用 context 在 http 請求中保存有用的信息,用它來跟蹤重試次數。

首先,我們需要為 context 指定鍵。我們建議使用不沖突的整數值作為鍵,而不是字符串。Go 語言提供了 iota 關鍵字,可以用來實現遞增的常量,每一個常量都包含了唯一值。這是一種完美的整型鍵解決方案。

const (
  Attempts int = iota
  Retry
)

  

然后我們就可以像操作 HashMap 那樣獲取這個值。默認返回值要視情況而定。

// GetAttemptsFromContext 返回嘗試次數
func GetRetryFromContext(r *http.Request) int {
  if retry, ok := r.Context().Value(Retry).(int); ok {
    return retry
  }
  return 0
}

  十一、被動模式  

 

被動模式就是定時對后端執行 ping 操作,以此來檢查它們的狀態。

我們通過建立 TCP 連接來執行 ping 操作。如果后端及時響應,我們就認為它還活着。當然,如果你喜歡,也可以改成直接調用某個端點,比如 /status。切記,在執行完操作后要關閉連接,避免給服務器造成額外的負擔,否則服務器會一直維護連接,最后把資源耗盡。

// isAlive 通過建立 TCP 連接檢查后端是否還活着
func isBackendAlive(u *url.URL) bool {
  timeout := 2 * time.Second
  conn, err := net.DialTimeout("tcp", u.Host, timeout)
  if err != nil {
    log.Println("Site unreachable, error: ", err)
    return false
  }
  _ = conn.Close() // 不需要維護連接,把它關閉
  return true
}

  

現在我們可以遍歷服務器,並標記它們的狀態。

// HealthCheck 對后端執行 ping 操作,並更新狀態
func (s *ServerPool) HealthCheck() {
  for _, b := range s.backends {
    status := "up"
    alive := isBackendAlive(b.URL)
    b.SetAlive(alive)
    if !alive {
      status = "down"
    }
    log.Printf("%s [%s]\n", b.URL, status)
  }
}

我們可以啟動定時器來定時發起 ping 操作。

// healthCheck 返回一個 routine,每 2 分鍾檢查一次后端的狀態
func healthCheck() {
  t := time.NewTicker(time.Second * 20)
  for {
    select {
    case <-t.C:
      log.Println("Starting health check...")
      serverPool.HealthCheck()
      log.Println("Health check completed")
    }
  }
}

在上面的例子中,<-t.C 每 20 秒返回一個值,select 會檢測到這個事件。在沒有 default case 的情況下,select 會一直等待,直到有滿足條件的 case 被執行。

最后,使用單獨的 goroutine 來執行。

go healthCheck()

十二、測試

負載均衡代碼

  1 package main
  2 
  3 import (
  4     "context"
  5     "flag"
  6     "fmt"
  7     "log"
  8     "net"
  9     "net/http"
 10     "net/http/httputil"
 11     "net/url"
 12     "strings"
 13     "sync"
 14     "sync/atomic"
 15     "time"
 16 )
 17 
 18 const (
 19     Attempts int = iota
 20     Retry
 21 )
 22 
 23 //定義結構體
 24 //后端保存關於服務器的數據
 25 type Backend struct {
 26     URL          *url.URL
 27     Alive        bool
 28     mux          sync.RWMutex
 29     ReverseProxy *httputil.ReverseProxy
 30 }
 31 
 32 //跟蹤所有后端,以及一個計算器變量
 33 type ServerPool struct {
 34     backends []*Backend
 35     current  uint64
 36 }
 37 
 38 // SetAlive
 39 func (b *Backend) SetAlive(alive bool) {
 40     b.mux.Lock()
 41     b.Alive = alive
 42     b.mux.Unlock()
 43 }
 44 
 45 // 如果后端還活着,IsAlive 返回 true
 46 func (b *Backend) IsAlive() (alive bool) {
 47     b.mux.RLock()
 48     alive = b.Alive
 49     b.mux.RUnlock()
 50     return
 51 }
 52 
 53 // lb 對入向請求進行負載均衡
 54 func lb(w http.ResponseWriter, r *http.Request) {
 55     //重試次數,如果已經達到最大上限,就終結這個請求
 56     attempts := GetAttemptsFromContext(r)
 57     if attempts > 3 {
 58         log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)
 59         http.Error(w, "Service not available", http.StatusServiceUnavailable)
 60         return
 61     }
 62 
 63     peer := serverPool.GetNextPeer()
 64     if peer != nil {
 65         peer.ReverseProxy.ServeHTTP(w, r)
 66         return
 67     }
 68     http.Error(w, "Service not available", http.StatusServiceUnavailable)
 69 }
 70 
 71 // 自動增加計數器並返回一個索引,使用atomic 保證原子性
 72 //通過原子操作遞增 current 的值,並通過對 slice 的長度取模來獲得當前索引值。所以,返回值總是介於 0 和 slice 的長度之間,畢竟我們想要的是索引值,而不是總的計數值。
 73 func (s *ServerPool) NextIndex() int {
 74     return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))
 75 }
 76 
 77 // GetNextPeer返回下一個活動的對等點以獲取連接
 78 //找到可用的服務器后,我們將它標記為當前可用服務器。
 79 func (s *ServerPool) GetNextPeer() *Backend {
 80     // 循環整個后端,找出一個活動后端
 81     next := s.NextIndex()
 82     l := len(s.backends) + next // 從next開始移動一個完整的周期
 83     for i := next; i < l; i++ {
 84         idx := i % len(s.backends)     // take an index by modding
 85         if s.backends[idx].IsAlive() { // if we have an alive backend, use it and store if its not the original one
 86             if i != next {
 87                 atomic.StoreUint64(&s.current, uint64(idx))
 88             }
 89             return s.backends[idx]
 90         }
 91     }
 92     return nil
 93 }
 94 
 95 // GetAttemptsFromContext 返回嘗試次數
 96 func GetRetryFromContext(r *http.Request) int {
 97     if retry, ok := r.Context().Value(Retry).(int); ok {
 98         return retry
 99     }
100     return 0
101 }
102 
103 // healthCheck runs a routine for check status of the backends every 2 mins
104 // healthCheck 返回一個 routine,每 2 分鍾檢查一次后端的狀態
105 func healthCheck() {
106     t := time.NewTicker(time.Second * 20)
107     for {
108         select {
109         case <-t.C:
110             log.Println("Starting health check...")
111             serverPool.HealthCheck()
112             log.Println("Health check completed")
113         }
114     }
115 }
116 
117 // HealthCheck ping后端並更新狀態
118 func (s *ServerPool) HealthCheck() {
119     for _, b := range s.backends {
120         status := "up"
121         alive := isBackendAlive(b.URL)
122         b.SetAlive(alive)
123         if !alive {
124             status = "down"
125         }
126         log.Printf("%s [%s]\n", b.URL, status)
127     }
128 }
129 
130 // isAlive checks whether a backend is Alive by establishing a TCP connection
131 // isAlive 通過建立 TCP 連接檢查后端是否還活着
132 func isBackendAlive(u *url.URL) bool {
133     timeout := 2 * time.Second
134     conn, err := net.DialTimeout("tcp", u.Host, timeout)
135     if err != nil {
136         log.Println("Site unreachable, error: ", err)
137         return false
138     }
139     _ = conn.Close() // 不需要維護連接,把它關閉
140     return true
141 }
142 
143 // GetAttemptsFromContext returns the attempts for request
144 func GetAttemptsFromContext(r *http.Request) int {
145     if attempts, ok := r.Context().Value(Attempts).(int); ok {
146         return attempts
147     }
148     return 1
149 }
150 
151 // AddBackend to the server pool
152 func (s *ServerPool) AddBackend(backend *Backend) {
153     s.backends = append(s.backends, backend)
154 }
155 
156 // MarkBackendStatus changes a status of a backend
157 func (s *ServerPool) MarkBackendStatus(backendURL *url.URL, alive bool) {
158     for _, b := range s.backends {
159         if b.URL.String() == backendURL.String() {
160             b.SetAlive(alive)
161             break
162         }
163     }
164 }
165 
166 var serverPool ServerPool
167 
168 func main() {
169     var serverList string
170     var port int
171     flag.StringVar(&serverList, "backends", "http://localhost:3302,http://localhost:3303,http://localhost:3304", "Load balanced backends, use commas to separate")
172     flag.IntVar(&port, "port", 3031, "Port to serve")
173     flag.Parse()
174 
175     if len(serverList) == 0 {
176         log.Fatal("Please provide one or more backends to load balance")
177     }
178 
179     // 解析服務器
180     tokens := strings.Split(serverList, ",")
181     //range類似迭代器,可以遍歷
182     for _, tok := range tokens {
183         serverURL, err := url.Parse(tok)
184         if err != nil {
185             log.Fatal(err)
186         }
187 
188         //使用 httputil.NewSingleHostReverseProxy(url) 初始化一個反向代理
189         proxy := httputil.NewSingleHostReverseProxy(serverURL)
190 
191         //在發生錯誤時,ReverseProxy 會觸發 ErrorHandler 回調函數,我們可以利用它來檢查故障。
192         proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
193             log.Printf("[%s] %s\n", serverURL.Host, e.Error())
194             retries := GetRetryFromContext(request)
195             if retries < 3 {
196                 select {
197                 case <-time.After(10 * time.Millisecond):
198                     ctx := context.WithValue(request.Context(), Retry, retries+1)
199                     proxy.ServeHTTP(writer, request.WithContext(ctx))
200                 }
201                 return
202             }
203 
204             // 在三次重試之后,把這個后端標記為宕機
205             serverPool.MarkBackendStatus(serverURL, false)
206 
207             // 同一個請求在嘗試了幾個不同的后端之后,增加計數
208             attempts := GetAttemptsFromContext(request)
209             log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts)
210             ctx := context.WithValue(request.Context(), Attempts, attempts+1)
211             lb(writer, request.WithContext(ctx))
212         }
213 
214         serverPool.AddBackend(&Backend{
215             URL:          serverURL,
216             Alive:        true,
217             ReverseProxy: proxy,
218         })
219         log.Printf("Configured server: %s\n", serverURL)
220 
221     }
222     // 初始化服務器,並添加處理器
223     // create http server
224     server := http.Server{
225         Addr:    fmt.Sprintf(":%d", port),
226         Handler: http.HandlerFunc(lb),
227     }
228 
229     // start health checking
230     go healthCheck()
231 
232     log.Printf("Load Balancer started at :%d\n", port)
233     if err := server.ListenAndServe(); err != nil {
234         log.Fatal(err)
235     }
236 }
View Code

直接運行就好了

web服務器代碼

package main

import (
    "flag"
    "fmt"
    "log"
    "net/http"
    "strconv"
)

func sayhelloName(w http.ResponseWriter, r *http.Request) {
    r.ParseForm()                                //解析參數,默認是不會解析的
    fmt.Fprintln(w, "Hello moon!")               //這個寫入到w的是輸出到客戶端的
    fmt.Fprintln(w, "count:"+strconv.Itoa(port)) //這個寫入到w的是輸出到客戶端的
    count++
    fmt.Fprintln(w, "count:"+strconv.Itoa(count)) //這個寫入到w的是輸出到客戶端的

}

var port int
var count int

func main() {
    flag.IntVar(&port, "port", 3302, "duan端口號,默認3302")

    // 【必須調用】從 arguments 中解析注冊的 flag
    flag.Parse()
    fmt.Printf("port=%v \n", port)
    http.HandleFunc("/", sayhelloName)                      //設置訪問的路由
    err := http.ListenAndServe(":"+strconv.Itoa(port), nil) //設置監聽的端口
    if err != nil {
        log.Fatal("ListenAndServe: ", err)
    }
}
View Code

使用方法

go run web.go -port=3302
go run web.go -port=3303
go run web.go -port=3304

這里web.go是代碼文件名

測試

訪問http://localhost:3031/並刷新

 

十三、結論   

這篇文章提到了很多東西:

  • 輪詢;

  • Go 語言標准庫里的 ReverseProxy;

  • mutex;

  • 原子操作;

  • 閉包;

  • 回調;

  • select。

這個簡單的負載均衡器還有很多可以改進的地方:

  • 使用堆來維護后端的狀態,以此來降低搜索成本;

  • 收集統計信息;

  • 實現加權輪詢或最少連接策略;

  • 支持文件配置。

代碼地址:

https://github.com/kasvith/simplelb/

原文連接:

https://kasvith.github.io/posts/lets-create-a-simple-lb-go/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM