網絡代理於網絡轉發區別
網絡代理:
用戶不直接連接服務器,網絡代理去連接,獲取數據后返回給用戶
網絡轉發:
是路由器對報文的轉發操作,中間可能對數據包修改
網絡代理類型:
正向代理:
實現一個web瀏覽器代理:
代碼實現一個web瀏覽器代理:
代碼實現:
package main import ( "fmt" "io" "net" "net/http" "strings" ) type Pxy struct{} func (p *Pxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { fmt.Printf("Received request %s %s %s\n", req.Method, req.Host, req.RemoteAddr) transport := http.DefaultTransport // step 1,淺拷貝對象,然后就再新增屬性數據 outReq := new(http.Request) *outReq = *req if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil { if prior, ok := outReq.Header["X-Forwarded-For"]; ok { clientIP = strings.Join(prior, ", ") + ", " + clientIP } outReq.Header.Set("X-Forwarded-For", clientIP) } // step 2, 請求下游 res, err := transport.RoundTrip(outReq) if err != nil { rw.WriteHeader(http.StatusBadGateway) return } // step 3, 把下游請求內容返回給上游 for key, value := range res.Header { for _, v := range value { rw.Header().Add(key, v) } } rw.WriteHeader(res.StatusCode) io.Copy(rw, res.Body) res.Body.Close() } func main() { fmt.Println("Serve on :8080") http.Handle("/", &Pxy{}) http.ListenAndServe("0.0.0.0:8080", nil) }
反向代理:
如何實現一個反向代理:
- 這個功能比較復雜,我們先實現一個簡版的http反向代理。
- 代理接收客戶端請求,更改請求結構體信息
- 通過一定的負載均衡算法獲取下游服務地址
- 把請求發送到下游服務器,並獲取返回的內容
- 對返回的內容做一些處理,然后返回給客戶端
啟動兩個http服務(真是服務地址)
127.0.0.1:2003
127.0.0.1:2004

package main import ( "fmt" "io" "log" "net/http" "os" "os/signal" "syscall" "time" ) func main() { rs1 := &RealServer{Addr: "127.0.0.1:2003"} rs1.Run() rs2 := &RealServer{Addr: "127.0.0.1:2004"} rs2.Run() //監聽關閉信號 quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit } type RealServer struct { Addr string } func (r *RealServer) Run() { log.Println("Starting httpserver at " + r.Addr) mux := http.NewServeMux() mux.HandleFunc("/", r.HelloHandler) mux.HandleFunc("/base/error", r.ErrorHandler) server := &http.Server{ Addr: r.Addr, WriteTimeout: time.Second * 3, Handler: mux, } go func() { log.Fatal(server.ListenAndServe()) }() } func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) { //127.0.0.1:8008/abc?sdsdsa=11 //r.Addr=127.0.0.1:8008 //req.URL.Path=/abc fmt.Println(req.Host) upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path) realIP := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-Ip=%v\n", req.RemoteAddr, req.Header.Get("X-Forwarded-For"), req.Header.Get("X-Real-Ip")) io.WriteString(w, upath) io.WriteString(w, realIP) } func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) { upath := "error handler" w.WriteHeader(500) io.WriteString(w, upath) }
啟動一個代理服務
代理服務 127.0.0.1:2002(此處代碼並沒有使用負載均衡算法,只是簡單地固定代理到其中一個服務器)

package main import ( "bufio" "log" "net/http" "net/url" ) var ( proxy_addr = "http://127.0.0.1:2003" port = "2002" ) func handler(w http.ResponseWriter, r *http.Request) { //step 1 解析代理地址,並更改請求體的協議和主機 proxy, err := url.Parse(proxy_addr) r.URL.Scheme = proxy.Scheme r.URL.Host = proxy.Host //step 2 請求下游 transport := http.DefaultTransport resp, err := transport.RoundTrip(r) if err != nil { log.Print(err) return } //step 3 把下游請求內容返回給上游 for k, vv := range resp.Header { for _, v := range vv { w.Header().Add(k, v) } } defer resp.Body.Close() bufio.NewReader(resp.Body).WriteTo(w) } func main() { http.HandleFunc("/", handler) log.Println("Start serving on port " + port) err := http.ListenAndServe(":"+port, nil) if err != nil { log.Fatal(err) } }
用戶訪問127.0.0.1:2002 反向代理到 127.0.0.1:2003
http代理
上面演示的是一個簡版的http代理,不具備一下功能:
- 錯誤回調及錯誤日志處理
- 更改代理返回內容
- 負載均衡
- url重寫
- 限流、熔斷、降級
用golang官方提供的ReverseProxy實現一個http代理
- ReverseProxy功能點
- ReverseProxy實例
- ReverseProxy源碼實現
拓展ReverseProxy功能
- 4中負載輪訓類型實現以及接口封裝
- 拓展中間件支持:限流、熔斷實現、權限、數據統計
用ReverseProxy實現一個http代理:
package main import ( "log" "net/http" "net/http/httputil" "net/url" ) var addr = "127.0.0.1:2002" func main() { //127.0.0.1:2002/xxx => 127.0.0.1:2003/base/xxx //127.0.0.1:2003/base/xxx rs1 := "http://127.0.0.1:2003/base" url1, err1 := url.Parse(rs1) if err1 != nil { log.Println(err1) } proxy := httputil.NewSingleHostReverseProxy(url1) log.Println("Starting httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, proxy)) }
ReverseProxy修改返回的內容
重寫
httputil.NewSingleHostReverseProxy(url1)
package main import ( "bytes" "errors" "fmt" "io/ioutil" "log" "net/http" "net/http/httputil" "net/url" "regexp" "strings" ) var addr = "127.0.0.1:2002" func main() { //127.0.0.1:2002/xxx //127.0.0.1:2003/base/xxx rs1 := "http://127.0.0.1:2003/base" url1, err1 := url.Parse(rs1) if err1 != nil { log.Println(err1) } proxy := NewSingleHostReverseProxy(url1) log.Println("Starting httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, proxy)) } func NewSingleHostReverseProxy(target *url.URL) *httputil.ReverseProxy { //http://127.0.0.1:2002/dir?name=123 //RayQuery: name=123 //Scheme: http //Host: 127.0.0.1:2002 targetQuery := target.RawQuery director := func(req *http.Request) { //url_rewrite //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ?? //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc //127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc re, _ := regexp.Compile("^/dir(.*)"); req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1") req.URL.Scheme = target.Scheme req.URL.Host = target.Host //target.Path : /base //req.URL.Path : /dir req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) if targetQuery == "" || req.URL.RawQuery == "" { req.URL.RawQuery = targetQuery + req.URL.RawQuery } else { req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery } if _, ok := req.Header["User-Agent"]; !ok { req.Header.Set("User-Agent", "") } } modifyFunc := func(res *http.Response) error { if res.StatusCode != 200 { return errors.New("error statusCode") oldPayload, err := ioutil.ReadAll(res.Body) if err != nil { return err } newPayLoad := []byte("hello " + string(oldPayload)) res.Body = ioutil.NopCloser(bytes.NewBuffer(newPayLoad)) res.ContentLength = int64(len(newPayLoad)) res.Header.Set("Content-Length", fmt.Sprint(len(newPayLoad))) } return nil } errorHandler := func(res http.ResponseWriter, req *http.Request, err error) { res.Write([]byte(err.Error())) } return &httputil.ReverseProxy{Director: director, ModifyResponse: modifyFunc, ErrorHandler: errorHandler} } func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") bslash := strings.HasPrefix(b, "/") switch { case aslash && bslash: return a + b[1:] case !aslash && !bslash: return a + "/" + b } return a + b }
ReverseProxy補充知識:
特殊Header頭:X-Forward-For、X-Real-Ip、Connection、TE、Trailer
第一代理取出標准的逐段傳輸頭(HOP-BY-HOP)
X-Forward-For
- 記錄最后直連實際服務器之前,整個代理過程
- 可能會被偽造
X-Real-Ip
- 請求實際服務器的IP
- 每過一層代理都會被覆蓋掉,只需要第一代里設置轉發
- 不會被偽造
代碼實現:

package main import ( "bytes" "io/ioutil" "log" "math/rand" "net" "net/http" "net/http/httputil" "net/url" "regexp" "strconv" "strings" "time" ) var addr = "127.0.0.1:2001" func main() { rs1 := "http://127.0.0.1:2002" url1, err1 := url.Parse(rs1) if err1 != nil { log.Println(err1) } urls := []*url.URL{url1} proxy := NewMultipleHostsReverseProxy(urls) log.Println("Starting httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, proxy)) } var transport = &http.Transport{ DialContext: (&net.Dialer{ Timeout: 30 * time.Second, //連接超時 KeepAlive: 30 * time.Second, //長連接超時時間 }).DialContext, MaxIdleConns: 100, //最大空閑連接 IdleConnTimeout: 90 * time.Second, //空閑超時時間 TLSHandshakeTimeout: 10 * time.Second, //tls握手超時時間 ExpectContinueTimeout: 1 * time.Second, //100-continue 超時時間 } func NewMultipleHostsReverseProxy(targets []*url.URL) *httputil.ReverseProxy { //請求協調者 director := func(req *http.Request) { //url_rewrite //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ?? //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc //127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc re, _ := regexp.Compile("^/dir(.*)"); req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1") //隨機負載均衡 targetIndex := rand.Intn(len(targets)) target := targets[targetIndex] targetQuery := target.RawQuery req.URL.Scheme = target.Scheme req.URL.Host = target.Host // url地址重寫:重寫前:/aa 重寫后:/base/aa req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) if targetQuery == "" || req.URL.RawQuery == "" { req.URL.RawQuery = targetQuery + req.URL.RawQuery } else { req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery } if _, ok := req.Header["User-Agent"]; !ok { req.Header.Set("User-Agent", "user-agent") } //只在第一代理中設置此header頭 req.Header.Set("X-Real-Ip", req.RemoteAddr) } //更改內容 modifyFunc := func(resp *http.Response) error { //請求以下命令:curl 'http://127.0.0.1:2002/error' if resp.StatusCode != 200 { //獲取內容 oldPayload, err := ioutil.ReadAll(resp.Body) if err != nil { return err } //追加內容 newPayload := []byte("StatusCode error:" + string(oldPayload)) resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload)) resp.ContentLength = int64(len(newPayload)) resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10)) } return nil } //錯誤回調 :關閉real_server時測試,錯誤回調 errFunc := func(w http.ResponseWriter, r *http.Request, err error) { http.Error(w, "ErrorHandler error:"+err.Error(), 500) } return &httputil.ReverseProxy{ Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc} } func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") bslash := strings.HasPrefix(b, "/") switch { case aslash && bslash: return a + b[1:] case !aslash && !bslash: return a + "/" + b } return a + b }
第二層代理

package main import ( "bytes" "compress/gzip" "io/ioutil" "log" "math/rand" "net" "net/http" "net/http/httputil" "net/url" "regexp" "strconv" "strings" "time" ) var addr = "127.0.0.1:2002" func main() { //rs1 := "http://www.baidu.com" rs1 := "http://127.0.0.1:2003" url1, err1 := url.Parse(rs1) if err1 != nil { log.Println(err1) } //rs2 := "http://www.baidu.com" rs2 := "http://127.0.0.1:2004" url2, err2 := url.Parse(rs2) if err2 != nil { log.Println(err2) } urls := []*url.URL{url1, url2} proxy := NewMultipleHostsReverseProxy(urls) log.Println("Starting httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, proxy)) } var transport = &http.Transport{ DialContext: (&net.Dialer{ Timeout: 30 * time.Second, //連接超時 KeepAlive: 30 * time.Second, //長連接超時時間 }).DialContext, MaxIdleConns: 100, //最大空閑連接 IdleConnTimeout: 90 * time.Second, //空閑超時時間 TLSHandshakeTimeout: 10 * time.Second, //tls握手超時時間 ExpectContinueTimeout: 1 * time.Second, //100-continue 超時時間 } func NewMultipleHostsReverseProxy(targets []*url.URL) *httputil.ReverseProxy { //請求協調者 director := func(req *http.Request) { //url_rewrite //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ?? //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc //127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc re, _ := regexp.Compile("^/dir(.*)"); req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1") //隨機負載均衡 targetIndex := rand.Intn(len(targets)) target := targets[targetIndex] targetQuery := target.RawQuery req.URL.Scheme = target.Scheme req.URL.Host = target.Host //todo 部分章節補充1 //todo 當對域名(非內網)反向代理時需要設置此項。當作后端反向代理時不需要 req.Host = target.Host // url地址重寫:重寫前:/aa 重寫后:/base/aa req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) if targetQuery == "" || req.URL.RawQuery == "" { req.URL.RawQuery = targetQuery + req.URL.RawQuery } else { req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery } if _, ok := req.Header["User-Agent"]; !ok { req.Header.Set("User-Agent", "user-agent") } //只在第一代理中設置此header頭 //req.Header.Set("X-Real-Ip", req.RemoteAddr) } //更改內容 modifyFunc := func(resp *http.Response) error { //請求以下命令:curl 'http://127.0.0.1:2002/error' //todo 部分章節功能補充2 //todo 兼容websocket if strings.Contains(resp.Header.Get("Connection"), "Upgrade") { return nil } var payload []byte var readErr error //todo 部分章節功能補充3 //todo 兼容gzip壓縮 if strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") { gr, err := gzip.NewReader(resp.Body) if err != nil { return err } payload, readErr = ioutil.ReadAll(gr) resp.Header.Del("Content-Encoding") } else { payload, readErr = ioutil.ReadAll(resp.Body) } if readErr != nil { return readErr } //異常請求時設置StatusCode if resp.StatusCode != 200 { payload = []byte("StatusCode error:" + string(payload)) } //todo 部分章節功能補充4 //todo 因為預讀了數據所以內容重新回寫 resp.Body = ioutil.NopCloser(bytes.NewBuffer(payload)) resp.ContentLength = int64(len(payload)) resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(payload)), 10)) return nil } //錯誤回調 :關閉real_server時測試,錯誤回調 errFunc := func(w http.ResponseWriter, r *http.Request, err error) { http.Error(w, "ErrorHandler error:"+err.Error(), 500) } return &httputil.ReverseProxy{ Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc} } func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") bslash := strings.HasPrefix(b, "/") switch { case aslash && bslash: return a + b[1:] case !aslash && !bslash: return a + "/" + b } return a + b }
實際服務器:

package main import ( "fmt" "io" "log" "net/http" "os" "os/signal" "syscall" "time" ) func main() { rs1 := &RealServer{Addr: "127.0.0.1:2003"} rs1.Run() rs2 := &RealServer{Addr: "127.0.0.1:2004"} rs2.Run() //監聽關閉信號 quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit } type RealServer struct { Addr string } func (r *RealServer) Run() { log.Println("Starting httpserver at " + r.Addr) mux := http.NewServeMux() mux.HandleFunc("/", r.HelloHandler) mux.HandleFunc("/base/error", r.ErrorHandler) server := &http.Server{ Addr: r.Addr, WriteTimeout: time.Second * 3, Handler: mux, } go func() { log.Fatal(server.ListenAndServe()) }() } func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) { //127.0.0.1:8008/abc?sdsdsa=11 //r.Addr=127.0.0.1:8008 //req.URL.Path=/abc fmt.Println(req.Host) upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path) realIP := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-Ip=%v\n", req.RemoteAddr, req.Header.Get("X-Forwarded-For"), req.Header.Get("X-Real-Ip")) io.WriteString(w, upath) io.WriteString(w, realIP) } func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) { upath := "error handler" w.WriteHeader(500) io.WriteString(w, upath) }
負載均衡策略:
- 隨機負載
- 隨機挑選目標服務器ip
- 輪詢負載
- ABC三台服務器,ABCABC一次輪詢
- 加權負載
- 給目標設置訪問權重,按照權重輪詢
- 一致性hash負載
- 請求固定的url訪問固定的ip
隨機負載:

package load_balance import ( "errors" "fmt" "math/rand" "strings" ) type RandomBalance struct { curIndex int rss []string //觀察主體 conf LoadBalanceConf } func (r *RandomBalance) Add(params ...string) error { if len(params) == 0 { return errors.New("param len 1 at least") } addr := params[0] r.rss = append(r.rss, addr) return nil } func (r *RandomBalance) Next() string { if len(r.rss) == 0 { return "" } r.curIndex = rand.Intn(len(r.rss)) return r.rss[r.curIndex] } func (r *RandomBalance) Get(key string) (string, error) { return r.Next(), nil } func (r *RandomBalance) SetConf(conf LoadBalanceConf) { r.conf = conf } func (r *RandomBalance) Update() { if conf, ok := r.conf.(*LoadBalanceZkConf); ok { fmt.Println("Update get conf:", conf.GetConf()) r.rss = []string{} for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } if conf, ok := r.conf.(*LoadBalanceCheckConf); ok { fmt.Println("Update get conf:", conf.GetConf()) r.rss = nil for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } }

package load_balance import ( "fmt" "testing" ) func TestRandomBalance(t *testing.T) { rb := &RandomBalance{} rb.Add("127.0.0.1:2003") //0 rb.Add("127.0.0.1:2004") //1 rb.Add("127.0.0.1:2005") //2 rb.Add("127.0.0.1:2006") //3 rb.Add("127.0.0.1:2007") //4 fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) }
=== RUN TestRandomBalance 127.0.0.1:2004 127.0.0.1:2005 127.0.0.1:2005 127.0.0.1:2007 127.0.0.1:2004 127.0.0.1:2006 127.0.0.1:2003 127.0.0.1:2003 127.0.0.1:2004 --- PASS: TestRandomBalance (0.00s) PASS
輪詢負載:

package load_balance import ( "errors" "fmt" "strings" ) type RoundRobinBalance struct { curIndex int rss []string //觀察主體 conf LoadBalanceConf } func (r *RoundRobinBalance) Add(params ...string) error { if len(params) == 0 { return errors.New("param len 1 at least") } addr := params[0] r.rss = append(r.rss, addr) return nil } func (r *RoundRobinBalance) Next() string { if len(r.rss) == 0 { return "" } lens := len(r.rss) //5 if r.curIndex >= lens { r.curIndex = 0 } curAddr := r.rss[r.curIndex] r.curIndex = (r.curIndex + 1) % lens return curAddr } func (r *RoundRobinBalance) Get(key string) (string, error) { return r.Next(), nil } func (r *RoundRobinBalance) SetConf(conf LoadBalanceConf) { r.conf = conf } func (r *RoundRobinBalance) Update() { if conf, ok := r.conf.(*LoadBalanceZkConf); ok { fmt.Println("Update get conf:", conf.GetConf()) r.rss = []string{} for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } if conf, ok := r.conf.(*LoadBalanceCheckConf); ok { fmt.Println("Update get conf:", conf.GetConf()) r.rss = nil for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } }

package load_balance import ( "fmt" "testing" ) func Test_main(t *testing.T) { rb := &RoundRobinBalance{} rb.Add("127.0.0.1:2003") //0 rb.Add("127.0.0.1:2004") //1 rb.Add("127.0.0.1:2005") //2 rb.Add("127.0.0.1:2006") //3 rb.Add("127.0.0.1:2007") //4 fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) }
=== RUN Test_main 127.0.0.1:2003 127.0.0.1:2004 127.0.0.1:2005 127.0.0.1:2006 127.0.0.1:2007 127.0.0.1:2003 127.0.0.1:2004 127.0.0.1:2005 127.0.0.1:2006 --- PASS: Test_main (0.00s) PASS
加權負載均衡:
- Weight
- 初始化時對節點約定的權重
- currentWeight
- 節點臨時權重,每輪都會變化
- effectiveWeight
- 節點有效權重,默認與Weight相同
- totalWeight
- 所有節點有效權重之和:sum(effectiveWeight)
type WeightNode struct { addr string weight int //權重值 currentWeight int //節點當前權重 effectiveWeight int //有效權重 }
- 1,currentWeight = currentWeight + effectiveWeight
- 2,選中最大的currentWeight節點為選中的節點
- 3,currentWeight = currentWeight-totalWeight(4+3+2=9)
計算方法如下:
第一次:
- currentWeight = currentWeight + effectiveWeight
- currentWeight {A=4+4,B=3+3,C=2+2} == {A=8,B=6,C=4}
- 選中最大的currentWeight節點為選中的節點
- A最大 此時作為節點
- currentWeight = currentWeight-totalWeight(4+3+2=9) 【選中的節點currentWeight = currentWeight-totalWeight】
- currentWeight {A=8-9,B=6,C=4} == {A=-1,B=6,C=4}
第二次:{A=-1,B=6,C=4} 開始
- currentWeight = currentWeight + effectiveWeight
- currentWeight {A=-1+4,B=6+3,C=4+2} == {A=3,B=9,C=6}
- 選中最大的currentWeight節點為選中的節點
- B最大 此時作為節點
- 選中的節點currentWeight = currentWeight-totalWeight(4+3+2=9)
- currentWeight {A=3,B=9-9,C=6} == {A=3,B=0,C=6}
。。。。。。。以此類推。。。。。。。。。

package load_balance import ( "errors" "fmt" "strconv" "strings" ) type WeightRoundRobinBalance struct { curIndex int rss []*WeightNode rsw []int //觀察主體 conf LoadBalanceConf } type WeightNode struct { addr string weight int //權重值 currentWeight int //節點當前權重 effectiveWeight int //有效權重 } func (r *WeightRoundRobinBalance) Add(params ...string) error { if len(params) != 2 { return errors.New("param len need 2") } parInt, err := strconv.ParseInt(params[1], 10, 64) if err != nil { return err } node := &WeightNode{addr: params[0], weight: int(parInt)} node.effectiveWeight = node.weight r.rss = append(r.rss, node) return nil } func (r *WeightRoundRobinBalance) Next() string { total := 0 var best *WeightNode for i := 0; i < len(r.rss); i++ { w := r.rss[i] //step 1 統計所有有效權重之和 total += w.effectiveWeight //step 2 變更節點臨時權重為的節點臨時權重+節點有效權重 w.currentWeight += w.effectiveWeight //step 3 有效權重默認與權重相同,通訊異常時-1, 通訊成功+1,直到恢復到weight大小 if w.effectiveWeight < w.weight { w.effectiveWeight++ } //step 4 選擇最大臨時權重點節點 if best == nil || w.currentWeight > best.currentWeight { best = w } } if best == nil { return "" } //step 5 變更臨時權重為 臨時權重-有效權重之和 best.currentWeight -= total return best.addr } func (r *WeightRoundRobinBalance) Get(key string) (string, error) { return r.Next(), nil } func (r *WeightRoundRobinBalance) SetConf(conf LoadBalanceConf) { r.conf = conf } func (r *WeightRoundRobinBalance) Update() { if conf, ok := r.conf.(*LoadBalanceZkConf); ok { fmt.Println("WeightRoundRobinBalance get conf:", conf.GetConf()) r.rss = nil for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } if conf, ok := r.conf.(*LoadBalanceCheckConf); ok { fmt.Println("WeightRoundRobinBalance get conf:", conf.GetConf()) r.rss = nil for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } }

package load_balance import ( "fmt" "testing" ) func TestLB(t *testing.T) { rb := &WeightRoundRobinBalance{} rb.Add("127.0.0.1:2003", "4") //0 rb.Add("127.0.0.1:2004", "3") //1 rb.Add("127.0.0.1:2005", "2") //2 fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) }
一致性hash(ip_hash、url_hash)
為了解決平衡性:引入了虛擬節點概念(把個節點 均勻的覆蓋到環上)

package load_balance import ( "errors" "fmt" "hash/crc32" "sort" "strconv" "strings" "sync" ) type Hash func(data []byte) uint32 type UInt32Slice []uint32 func (s UInt32Slice) Len() int { return len(s) } func (s UInt32Slice) Less(i, j int) bool { return s[i] < s[j] } func (s UInt32Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } type ConsistentHashBanlance struct { mux sync.RWMutex hash Hash replicas int //復制因子 虛擬節點數 keys UInt32Slice //已排序的節點hash切片 映射在環上的虛擬節點 hashMap map[uint32]string //節點哈希和Key的map,鍵是hash值,值是節點key //觀察主體 conf LoadBalanceConf } func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance { m := &ConsistentHashBanlance{ replicas: replicas,//復制因子 虛擬節點數 hash: fn, hashMap: make(map[uint32]string), } if m.hash == nil { //最多32位,保證是一個2^32-1環 m.hash = crc32.ChecksumIEEE } return m } // 驗證是否為空 func (c *ConsistentHashBanlance) IsEmpty() bool { return len(c.keys) == 0 } // Add 方法用來添加緩存節點,參數為節點key,比如使用IP func (c *ConsistentHashBanlance) Add(params ...string) error { if len(params) == 0 { return errors.New("param len 1 at least") } addr := params[0] c.mux.Lock() defer c.mux.Unlock() // 結合復制因子計算所有虛擬節點的hash值,並存入m.keys中,同時在m.hashMap中保存哈希值和key的映射 for i := 0; i < c.replicas; i++ { hash := c.hash([]byte(strconv.Itoa(i) + addr)) c.keys = append(c.keys, hash) c.hashMap[hash] = addr } // 對所有虛擬節點的哈希值進行排序,方便之后進行二分查找 sort.Sort(c.keys) return nil } // Get 方法根據給定的對象獲取最靠近它的那個節點 func (c *ConsistentHashBanlance) Get(key string) (string, error) { if c.IsEmpty() { return "", errors.New("node is empty") } hash := c.hash([]byte(key)) // 通過二分查找獲取最優節點,第一個"服務器hash"值大於"數據hash"值的就是最優"服務器節點" idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash }) // 如果查找結果 大於 服務器節點哈希數組的最大索引,表示此時該對象哈希值位於最后一個節點之后,那么放入第一個節點中 if idx == len(c.keys) { idx = 0 } c.mux.RLock() defer c.mux.RUnlock() return c.hashMap[c.keys[idx]], nil } func (c *ConsistentHashBanlance) SetConf(conf LoadBalanceConf) { c.conf = conf } func (c *ConsistentHashBanlance) Update() { if conf, ok := c.conf.(*LoadBalanceZkConf); ok { fmt.Println("Update get conf:", conf.GetConf()) c.mux.Lock() defer c.mux.Unlock() c.keys = nil c.hashMap = nil for _, ip := range conf.GetConf() { c.Add(strings.Split(ip, ",")...) } } if conf, ok := c.conf.(*LoadBalanceCheckConf); ok { fmt.Println("Update get conf:", conf.GetConf()) c.mux.Lock() defer c.mux.Unlock() c.keys = nil c.hashMap = nil for _, ip := range conf.GetConf() { c.Add(strings.Split(ip, ",")...) } } }

package load_balance import ( "fmt" "testing" ) func TestNewConsistentHashBanlance(t *testing.T) { rb := NewConsistentHashBanlance(10, nil) rb.Add("127.0.0.1:2003") //0 rb.Add("127.0.0.1:2004") //1 rb.Add("127.0.0.1:2005") //2 rb.Add("127.0.0.1:2006") //3 rb.Add("127.0.0.1:2007") //4 //url hash fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo")) fmt.Println(rb.Get("http://127.0.0.1:2002/base/error")) fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo")) fmt.Println(rb.Get("http://127.0.0.1:2002/base/changepwd")) //ip hash fmt.Println(rb.Get("127.0.0.1")) fmt.Println(rb.Get("192.168.0.1")) fmt.Println(rb.Get("127.0.0.1")) }
工廠方法簡單封裝上述幾種拒載均衡調用:
interface.go
package load_balance type LoadBalance interface { Add(...string) error Get(string) (string, error) //后期服務發現補充 Update() }
factory.go
package load_balance type LbType int const ( LbRandom LbType = iota LbRoundRobin LbWeightRoundRobin LbConsistentHash ) func LoadBanlanceFactory(lbType LbType) LoadBalance { switch lbType { case LbRandom: return &RandomBalance{} case LbConsistentHash: return NewConsistentHashBanlance(10, nil) case LbRoundRobin: return &RoundRobinBalance{} case LbWeightRoundRobin: return &WeightRoundRobinBalance{} default: return &RandomBalance{} } }
調用:
func main() { rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin) if err := rb.Add("http://127.0.0.1:2003/base", "10"); err != nil { log.Println(err) } if err := rb.Add("http://127.0.0.1:2004/base", "20"); err != nil { log.Println(err) } // 。。。。。。。。。。。。。。。 }