網絡代理之HTTP代理(golang反向代理、負載均衡算法實現)


網絡代理於網絡轉發區別

 

 

 

 

網絡代理:

用戶不直接連接服務器,網絡代理去連接,獲取數據后返回給用戶

網絡轉發:

是路由器對報文的轉發操作,中間可能對數據包修改

 

網絡代理類型:

 

 

 

 

 

正向代理:

 

 實現一個web瀏覽器代理:

 

 代碼實現一個web瀏覽器代理:

 

 代碼實現:

package main

import (
    "fmt"
    "io"
    "net"
    "net/http"
    "strings"
)

type Pxy struct{}

func (p *Pxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    fmt.Printf("Received request %s %s %s\n", req.Method, req.Host, req.RemoteAddr)
    transport := http.DefaultTransport
    // step 1,淺拷貝對象,然后就再新增屬性數據
    outReq := new(http.Request)
    *outReq = *req
    if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
        if prior, ok := outReq.Header["X-Forwarded-For"]; ok {
            clientIP = strings.Join(prior, ", ") + ", " + clientIP
        }
        outReq.Header.Set("X-Forwarded-For", clientIP)
    }
    
    // step 2, 請求下游
    res, err := transport.RoundTrip(outReq)
    if err != nil {
        rw.WriteHeader(http.StatusBadGateway)
        return
    }

    // step 3, 把下游請求內容返回給上游
    for key, value := range res.Header {
        for _, v := range value {
            rw.Header().Add(key, v)
        }
    }
    rw.WriteHeader(res.StatusCode)
    io.Copy(rw, res.Body)
    res.Body.Close()
}

func main() {
    fmt.Println("Serve on :8080")
    http.Handle("/", &Pxy{})
    http.ListenAndServe("0.0.0.0:8080", nil)
}

 

 

反向代理:

 

 

如何實現一個反向代理:

  • 這個功能比較復雜,我們先實現一個簡版的http反向代理。
  • 代理接收客戶端請求,更改請求結構體信息
  • 通過一定的負載均衡算法獲取下游服務地址
  • 把請求發送到下游服務器,並獲取返回的內容
  • 對返回的內容做一些處理,然后返回給客戶端

 

啟動兩個http服務(真是服務地址)

127.0.0.1:2003
127.0.0.1:2004
package main

import (
    "fmt"
    "io"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    rs1 := &RealServer{Addr: "127.0.0.1:2003"}
    rs1.Run()
    rs2 := &RealServer{Addr: "127.0.0.1:2004"}
    rs2.Run()

    //監聽關閉信號
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
}

type RealServer struct {
    Addr string
}

func (r *RealServer) Run() {
    log.Println("Starting httpserver at " + r.Addr)
    mux := http.NewServeMux()
    mux.HandleFunc("/", r.HelloHandler)
    mux.HandleFunc("/base/error", r.ErrorHandler)
    server := &http.Server{
        Addr:         r.Addr,
        WriteTimeout: time.Second * 3,
        Handler:      mux,
    }
    go func() {
        log.Fatal(server.ListenAndServe())
    }()
}

func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) {
    //127.0.0.1:8008/abc?sdsdsa=11
    //r.Addr=127.0.0.1:8008
    //req.URL.Path=/abc
    fmt.Println(req.Host)
    upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path)
    realIP := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-Ip=%v\n", req.RemoteAddr, req.Header.Get("X-Forwarded-For"), req.Header.Get("X-Real-Ip"))

    io.WriteString(w, upath)
    io.WriteString(w, realIP)
}

func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
    upath := "error handler"
    w.WriteHeader(500)
    io.WriteString(w, upath)
}
real_server

 

 

啟動一個代理服務

代理服務 127.0.0.1:2002(此處代碼並沒有使用負載均衡算法,只是簡單地固定代理到其中一個服務器)

package main

import (
    "bufio"
    "log"
    "net/http"
    "net/url"
)

var (
    proxy_addr = "http://127.0.0.1:2003"
    port       = "2002"
)

func handler(w http.ResponseWriter, r *http.Request) {
    //step 1 解析代理地址,並更改請求體的協議和主機
    proxy, err := url.Parse(proxy_addr)
    r.URL.Scheme = proxy.Scheme
    r.URL.Host = proxy.Host

    //step 2 請求下游
    transport := http.DefaultTransport
    resp, err := transport.RoundTrip(r)
    if err != nil {
        log.Print(err)
        return
    }

    //step 3 把下游請求內容返回給上游
    for k, vv := range resp.Header {
        for _, v := range vv {
            w.Header().Add(k, v)
        }
    }
    defer resp.Body.Close()
    bufio.NewReader(resp.Body).WriteTo(w)
}

func main() {
    http.HandleFunc("/", handler)
    log.Println("Start serving on port " + port)
    err := http.ListenAndServe(":"+port, nil)
    if err != nil {
        log.Fatal(err)
    }
}
reverse_proxy

 

 

用戶訪問127.0.0.1:2002   反向代理到  127.0.0.1:2003

 

http代理

上面演示的是一個簡版的http代理,不具備一下功能:

  • 錯誤回調及錯誤日志處理
  • 更改代理返回內容
  • 負載均衡
  • url重寫
  • 限流、熔斷、降級

 

 

用golang官方提供的ReverseProxy實現一個http代理

  • ReverseProxy功能點
  • ReverseProxy實例
  • ReverseProxy源碼實現

 

 

拓展ReverseProxy功能

  • 4中負載輪訓類型實現以及接口封裝
  • 拓展中間件支持:限流、熔斷實現、權限、數據統計

 

用ReverseProxy實現一個http代理:

 

 

package main

import (
    "log"
    "net/http"
    "net/http/httputil"
    "net/url"
)

var addr = "127.0.0.1:2002"

func main() {
    //127.0.0.1:2002/xxx  => 127.0.0.1:2003/base/xxx
    //127.0.0.1:2003/base/xxx
    rs1 := "http://127.0.0.1:2003/base"
    url1, err1 := url.Parse(rs1)
    if err1 != nil {
        log.Println(err1)
    }
    proxy := httputil.NewSingleHostReverseProxy(url1)
    log.Println("Starting httpserver at " + addr)
    log.Fatal(http.ListenAndServe(addr, proxy))
}

 

ReverseProxy修改返回的內容

重寫 

httputil.NewSingleHostReverseProxy(url1)
package main

import (
	"bytes"
	"errors"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"net/http/httputil"
	"net/url"
	"regexp"
	"strings"
)

var addr = "127.0.0.1:2002"

func main() {
	//127.0.0.1:2002/xxx
	//127.0.0.1:2003/base/xxx
	rs1 := "http://127.0.0.1:2003/base"
	url1, err1 := url.Parse(rs1)
	if err1 != nil {
		log.Println(err1)
	}
	proxy := NewSingleHostReverseProxy(url1)
	log.Println("Starting httpserver at " + addr)
	log.Fatal(http.ListenAndServe(addr, proxy))
}

func NewSingleHostReverseProxy(target *url.URL) *httputil.ReverseProxy {
	//http://127.0.0.1:2002/dir?name=123
	//RayQuery: name=123
	//Scheme: http
	//Host: 127.0.0.1:2002
	targetQuery := target.RawQuery
	director := func(req *http.Request) {
		//url_rewrite
		//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ??
		//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc
		//127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc
		re, _ := regexp.Compile("^/dir(.*)");
		req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1")

		req.URL.Scheme = target.Scheme
		req.URL.Host = target.Host

		//target.Path : /base
		//req.URL.Path : /dir
		req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
		if targetQuery == "" || req.URL.RawQuery == "" {
			req.URL.RawQuery = targetQuery + req.URL.RawQuery
		} else {
			req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
		}
		if _, ok := req.Header["User-Agent"]; !ok {
			req.Header.Set("User-Agent", "")
		}
	}
	modifyFunc := func(res *http.Response) error {
		if res.StatusCode != 200 {
			return errors.New("error statusCode")
			oldPayload, err := ioutil.ReadAll(res.Body)
			if err != nil {
				return err
			}
			newPayLoad := []byte("hello " + string(oldPayload))
			res.Body = ioutil.NopCloser(bytes.NewBuffer(newPayLoad))
			res.ContentLength = int64(len(newPayLoad))
			res.Header.Set("Content-Length", fmt.Sprint(len(newPayLoad)))
		}
		return nil
	}
	errorHandler := func(res http.ResponseWriter, req *http.Request, err error) {
		res.Write([]byte(err.Error()))
	}
	return &httputil.ReverseProxy{Director: director, ModifyResponse: modifyFunc, ErrorHandler: errorHandler}
}

func singleJoiningSlash(a, b string) string {
	aslash := strings.HasSuffix(a, "/")
	bslash := strings.HasPrefix(b, "/")
	switch {
	case aslash && bslash:
		return a + b[1:]
	case !aslash && !bslash:
		return a + "/" + b
	}
	return a + b
}

  

ReverseProxy補充知識:

特殊Header頭:X-Forward-For、X-Real-Ip、Connection、TE、Trailer

第一代理取出標准的逐段傳輸頭(HOP-BY-HOP)

X-Forward-For

  • 記錄最后直連實際服務器之前,整個代理過程
  • 可能會被偽造

 

 

X-Real-Ip

  • 請求實際服務器的IP
  • 每過一層代理都會被覆蓋掉,只需要第一代里設置轉發
  • 不會被偽造

 代碼實現:

package main

import (
    "bytes"
    "io/ioutil"
    "log"
    "math/rand"
    "net"
    "net/http"
    "net/http/httputil"
    "net/url"
    "regexp"
    "strconv"
    "strings"
    "time"
)

var addr = "127.0.0.1:2001"

func main() {
    rs1 := "http://127.0.0.1:2002"
    url1, err1 := url.Parse(rs1)
    if err1 != nil {
        log.Println(err1)
    }
    urls := []*url.URL{url1}
    proxy := NewMultipleHostsReverseProxy(urls)
    log.Println("Starting httpserver at " + addr)
    log.Fatal(http.ListenAndServe(addr, proxy))
}

var transport = &http.Transport{
    DialContext: (&net.Dialer{
        Timeout:   30 * time.Second, //連接超時
        KeepAlive: 30 * time.Second, //長連接超時時間
    }).DialContext,
    MaxIdleConns:          100,              //最大空閑連接
    IdleConnTimeout:       90 * time.Second, //空閑超時時間
    TLSHandshakeTimeout:   10 * time.Second, //tls握手超時時間
    ExpectContinueTimeout: 1 * time.Second,  //100-continue 超時時間
}

func NewMultipleHostsReverseProxy(targets []*url.URL) *httputil.ReverseProxy {
    //請求協調者
    director := func(req *http.Request) {
        //url_rewrite
        //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ??
        //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc
        //127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc
        re, _ := regexp.Compile("^/dir(.*)");
        req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1")

        //隨機負載均衡
        targetIndex := rand.Intn(len(targets))
        target := targets[targetIndex]
        targetQuery := target.RawQuery
        req.URL.Scheme = target.Scheme
        req.URL.Host = target.Host

        // url地址重寫:重寫前:/aa 重寫后:/base/aa
        req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
        if targetQuery == "" || req.URL.RawQuery == "" {
            req.URL.RawQuery = targetQuery + req.URL.RawQuery
        } else {
            req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
        }
        if _, ok := req.Header["User-Agent"]; !ok {
            req.Header.Set("User-Agent", "user-agent")
        }
        //只在第一代理中設置此header頭
        req.Header.Set("X-Real-Ip", req.RemoteAddr)
    }
    //更改內容
    modifyFunc := func(resp *http.Response) error {
        //請求以下命令:curl 'http://127.0.0.1:2002/error'
        if resp.StatusCode != 200 {
            //獲取內容
            oldPayload, err := ioutil.ReadAll(resp.Body)
            if err != nil {
                return err
            }
            //追加內容
            newPayload := []byte("StatusCode error:" + string(oldPayload))
            resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload))
            resp.ContentLength = int64(len(newPayload))
            resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))
        }
        return nil
    }
    //錯誤回調 :關閉real_server時測試,錯誤回調
    errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
        http.Error(w, "ErrorHandler error:"+err.Error(), 500)
    }
    return &httputil.ReverseProxy{
        Director:       director,
        Transport:      transport,
        ModifyResponse: modifyFunc,
        ErrorHandler:   errFunc}
}

func singleJoiningSlash(a, b string) string {
    aslash := strings.HasSuffix(a, "/")
    bslash := strings.HasPrefix(b, "/")
    switch {
    case aslash && bslash:
        return a + b[1:]
    case !aslash && !bslash:
        return a + "/" + b
    }
    return a + b
}
第一層代理

 

 

第二層代理

package main

import (
    "bytes"
    "compress/gzip"
    "io/ioutil"
    "log"
    "math/rand"
    "net"
    "net/http"
    "net/http/httputil"
    "net/url"
    "regexp"
    "strconv"
    "strings"
    "time"
)

var addr = "127.0.0.1:2002"

func main() {
    //rs1 := "http://www.baidu.com"
    rs1 := "http://127.0.0.1:2003"
    url1, err1 := url.Parse(rs1)
    if err1 != nil {
        log.Println(err1)
    }

    //rs2 := "http://www.baidu.com"
    rs2 := "http://127.0.0.1:2004"
    url2, err2 := url.Parse(rs2)
    if err2 != nil {
        log.Println(err2)
    }
    urls := []*url.URL{url1, url2}
    proxy := NewMultipleHostsReverseProxy(urls)
    log.Println("Starting httpserver at " + addr)
    log.Fatal(http.ListenAndServe(addr, proxy))
}

var transport = &http.Transport{
    DialContext: (&net.Dialer{
        Timeout:   30 * time.Second, //連接超時
        KeepAlive: 30 * time.Second, //長連接超時時間
    }).DialContext,
    MaxIdleConns:          100,              //最大空閑連接
    IdleConnTimeout:       90 * time.Second, //空閑超時時間
    TLSHandshakeTimeout:   10 * time.Second, //tls握手超時時間
    ExpectContinueTimeout: 1 * time.Second,  //100-continue 超時時間
}

func NewMultipleHostsReverseProxy(targets []*url.URL) *httputil.ReverseProxy {
    //請求協調者
    director := func(req *http.Request) {
        //url_rewrite
        //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ??
        //127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc
        //127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc
        re, _ := regexp.Compile("^/dir(.*)");
        req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1")

        //隨機負載均衡
        targetIndex := rand.Intn(len(targets))
        target := targets[targetIndex]
        targetQuery := target.RawQuery
        req.URL.Scheme = target.Scheme
        req.URL.Host = target.Host

        //todo 部分章節補充1
        //todo 當對域名(非內網)反向代理時需要設置此項。當作后端反向代理時不需要
        req.Host = target.Host

        // url地址重寫:重寫前:/aa 重寫后:/base/aa
        req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
        if targetQuery == "" || req.URL.RawQuery == "" {
            req.URL.RawQuery = targetQuery + req.URL.RawQuery
        } else {
            req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
        }
        if _, ok := req.Header["User-Agent"]; !ok {
            req.Header.Set("User-Agent", "user-agent")
        }
        //只在第一代理中設置此header頭
        //req.Header.Set("X-Real-Ip", req.RemoteAddr)
    }
    //更改內容
    modifyFunc := func(resp *http.Response) error {
        //請求以下命令:curl 'http://127.0.0.1:2002/error'
        //todo 部分章節功能補充2
        //todo 兼容websocket
        if strings.Contains(resp.Header.Get("Connection"), "Upgrade") {
            return nil
        }
        var payload []byte
        var readErr error

        //todo 部分章節功能補充3
        //todo 兼容gzip壓縮
        if strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") {
            gr, err := gzip.NewReader(resp.Body)
            if err != nil {
                return err
            }
            payload, readErr = ioutil.ReadAll(gr)
            resp.Header.Del("Content-Encoding")
        } else {
            payload, readErr = ioutil.ReadAll(resp.Body)
        }
        if readErr != nil {
            return readErr
        }

        //異常請求時設置StatusCode
        if resp.StatusCode != 200 {
            payload = []byte("StatusCode error:" + string(payload))
        }

        //todo 部分章節功能補充4
        //todo 因為預讀了數據所以內容重新回寫
        resp.Body = ioutil.NopCloser(bytes.NewBuffer(payload))
        resp.ContentLength = int64(len(payload))
        resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(payload)), 10))
        return nil
    }
    //錯誤回調 :關閉real_server時測試,錯誤回調
    errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
        http.Error(w, "ErrorHandler error:"+err.Error(), 500)
    }
    return &httputil.ReverseProxy{
        Director:       director,
        Transport:      transport,
        ModifyResponse: modifyFunc,
        ErrorHandler:   errFunc}
}

func singleJoiningSlash(a, b string) string {
    aslash := strings.HasSuffix(a, "/")
    bslash := strings.HasPrefix(b, "/")
    switch {
    case aslash && bslash:
        return a + b[1:]
    case !aslash && !bslash:
        return a + "/" + b
    }
    return a + b
}
View Code

 

 

實際服務器:

package main

import (
    "fmt"
    "io"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    rs1 := &RealServer{Addr: "127.0.0.1:2003"}
    rs1.Run()
    rs2 := &RealServer{Addr: "127.0.0.1:2004"}
    rs2.Run()

    //監聽關閉信號
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
}

type RealServer struct {
    Addr string
}

func (r *RealServer) Run() {
    log.Println("Starting httpserver at " + r.Addr)
    mux := http.NewServeMux()
    mux.HandleFunc("/", r.HelloHandler)
    mux.HandleFunc("/base/error", r.ErrorHandler)
    server := &http.Server{
        Addr:         r.Addr,
        WriteTimeout: time.Second * 3,
        Handler:      mux,
    }
    go func() {
        log.Fatal(server.ListenAndServe())
    }()
}

func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) {
    //127.0.0.1:8008/abc?sdsdsa=11
    //r.Addr=127.0.0.1:8008
    //req.URL.Path=/abc
    fmt.Println(req.Host)
    upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path)
    realIP := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-Ip=%v\n", req.RemoteAddr, req.Header.Get("X-Forwarded-For"), req.Header.Get("X-Real-Ip"))

    io.WriteString(w, upath)
    io.WriteString(w, realIP)
}

func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
    upath := "error handler"
    w.WriteHeader(500)
    io.WriteString(w, upath)
}
View Code

 

負載均衡策略:

  • 隨機負載
  •   隨機挑選目標服務器ip
  • 輪詢負載
  •   ABC三台服務器,ABCABC一次輪詢
  • 加權負載
  •   給目標設置訪問權重,按照權重輪詢
  • 一致性hash負載
  •   請求固定的url訪問固定的ip

 

隨機負載:

package load_balance

import (
    "errors"
    "fmt"
    "math/rand"
    "strings"
)

type RandomBalance struct {
    curIndex int
    rss      []string
    //觀察主體
    conf LoadBalanceConf
}

func (r *RandomBalance) Add(params ...string) error {
    if len(params) == 0 {
        return errors.New("param len 1 at least")
    }
    addr := params[0]
    r.rss = append(r.rss, addr)
    return nil
}

func (r *RandomBalance) Next() string {
    if len(r.rss) == 0 {
        return ""
    }
    r.curIndex = rand.Intn(len(r.rss))
    return r.rss[r.curIndex]
}

func (r *RandomBalance) Get(key string) (string, error) {
    return r.Next(), nil
}

func (r *RandomBalance) SetConf(conf LoadBalanceConf) {
    r.conf = conf
}

func (r *RandomBalance) Update() {
    if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
        fmt.Println("Update get conf:", conf.GetConf())
        r.rss = []string{}
        for _, ip := range conf.GetConf() {
            r.Add(strings.Split(ip, ",")...)
        }
    }
    if conf, ok := r.conf.(*LoadBalanceCheckConf); ok {
        fmt.Println("Update get conf:", conf.GetConf())
        r.rss = nil
        for _, ip := range conf.GetConf() {
            r.Add(strings.Split(ip, ",")...)
        }
    }
}
random.go

 

package load_balance

import (
    "fmt"
    "testing"
)

func TestRandomBalance(t *testing.T) {
    rb := &RandomBalance{}
    rb.Add("127.0.0.1:2003") //0
    rb.Add("127.0.0.1:2004") //1
    rb.Add("127.0.0.1:2005") //2
    rb.Add("127.0.0.1:2006") //3
    rb.Add("127.0.0.1:2007") //4

    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
}
random_test

 

=== RUN   TestRandomBalance
127.0.0.1:2004
127.0.0.1:2005
127.0.0.1:2005
127.0.0.1:2007
127.0.0.1:2004
127.0.0.1:2006
127.0.0.1:2003
127.0.0.1:2003
127.0.0.1:2004
--- PASS: TestRandomBalance (0.00s)
PASS

 

輪詢負載:

package load_balance

import (
    "errors"
    "fmt"
    "strings"
)

type RoundRobinBalance struct {
    curIndex int
    rss      []string
    //觀察主體
    conf LoadBalanceConf
}

func (r *RoundRobinBalance) Add(params ...string) error {
    if len(params) == 0 {
        return errors.New("param len 1 at least")
    }
    addr := params[0]
    r.rss = append(r.rss, addr)
    return nil
}

func (r *RoundRobinBalance) Next() string {
    if len(r.rss) == 0 {
        return ""
    }
    lens := len(r.rss) //5
    if r.curIndex >= lens {
        r.curIndex = 0
    }
    curAddr := r.rss[r.curIndex]
    r.curIndex = (r.curIndex + 1) % lens
    return curAddr
}

func (r *RoundRobinBalance) Get(key string) (string, error) {
    return r.Next(), nil
}

func (r *RoundRobinBalance) SetConf(conf LoadBalanceConf) {
    r.conf = conf
}

func (r *RoundRobinBalance) Update() {
    if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
        fmt.Println("Update get conf:", conf.GetConf())
        r.rss = []string{}
        for _, ip := range conf.GetConf() {
            r.Add(strings.Split(ip, ",")...)
        }
    }
    if conf, ok := r.conf.(*LoadBalanceCheckConf); ok {
        fmt.Println("Update get conf:", conf.GetConf())
        r.rss = nil
        for _, ip := range conf.GetConf() {
            r.Add(strings.Split(ip, ",")...)
        }
    }
}
round_tobin

 

package load_balance

import (
    "fmt"
    "testing"
)

func Test_main(t *testing.T) {
    rb := &RoundRobinBalance{}
    rb.Add("127.0.0.1:2003") //0
    rb.Add("127.0.0.1:2004") //1
    rb.Add("127.0.0.1:2005") //2
    rb.Add("127.0.0.1:2006") //3
    rb.Add("127.0.0.1:2007") //4

    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
}
round_robin_test

 

=== RUN   Test_main
127.0.0.1:2003
127.0.0.1:2004
127.0.0.1:2005
127.0.0.1:2006
127.0.0.1:2007
127.0.0.1:2003
127.0.0.1:2004
127.0.0.1:2005
127.0.0.1:2006
--- PASS: Test_main (0.00s)
PASS

 

加權負載均衡:

  • Weight
  • 初始化時對節點約定的權重
  • currentWeight
  • 節點臨時權重,每輪都會變化
  • effectiveWeight
  • 節點有效權重,默認與Weight相同
  • totalWeight
  • 所有節點有效權重之和:sum(effectiveWeight)

 

type WeightNode struct {
    addr            string
    weight          int //權重值
    currentWeight   int //節點當前權重
    effectiveWeight int //有效權重
}

 

  • 1,currentWeight = currentWeight + effectiveWeight
  • 2,選中最大的currentWeight節點為選中的節點
  • 3,currentWeight = currentWeight-totalWeight(4+3+2=9)

 

 計算方法如下:

第一次:

  •   currentWeight = currentWeight + effectiveWeight
  •     currentWeight     {A=4+4,B=3+3,C=2+2}   ==  {A=8,B=6,C=4}
  •   選中最大的currentWeight節點為選中的節點
  •     A最大 此時作為節點
  •   currentWeight = currentWeight-totalWeight(4+3+2=9)  【選中的節點currentWeight = currentWeight-totalWeight】
  •     currentWeight  {A=8-9,B=6,C=4}  == {A=-1,B=6,C=4}

第二次:{A=-1,B=6,C=4} 開始

  •   currentWeight = currentWeight + effectiveWeight
  •     currentWeight     {A=-1+4,B=6+3,C=4+2}   ==  {A=3,B=9,C=6}
  •   選中最大的currentWeight節點為選中的節點
  •     B最大 此時作為節點
  •   選中的節點currentWeight = currentWeight-totalWeight(4+3+2=9)
  •     currentWeight  {A=3,B=9-9,C=6}  == {A=3,B=0,C=6}

。。。。。。。以此類推。。。。。。。。。

 

 

package load_balance

import (
    "errors"
    "fmt"
    "strconv"
    "strings"
)

type WeightRoundRobinBalance struct {
    curIndex int
    rss      []*WeightNode
    rsw      []int
    //觀察主體
    conf LoadBalanceConf
}

type WeightNode struct {
    addr            string
    weight          int //權重值
    currentWeight   int //節點當前權重
    effectiveWeight int //有效權重
}

func (r *WeightRoundRobinBalance) Add(params ...string) error {
    if len(params) != 2 {
        return errors.New("param len need 2")
    }
    parInt, err := strconv.ParseInt(params[1], 10, 64)
    if err != nil {
        return err
    }
    node := &WeightNode{addr: params[0], weight: int(parInt)}
    node.effectiveWeight = node.weight
    r.rss = append(r.rss, node)
    return nil
}

func (r *WeightRoundRobinBalance) Next() string {
    total := 0
    var best *WeightNode
    for i := 0; i < len(r.rss); i++ {
        w := r.rss[i]
        //step 1 統計所有有效權重之和
        total += w.effectiveWeight

        //step 2 變更節點臨時權重為的節點臨時權重+節點有效權重
        w.currentWeight += w.effectiveWeight

        //step 3 有效權重默認與權重相同,通訊異常時-1, 通訊成功+1,直到恢復到weight大小
        if w.effectiveWeight < w.weight {
            w.effectiveWeight++
        }
        //step 4 選擇最大臨時權重點節點
        if best == nil || w.currentWeight > best.currentWeight {
            best = w
        }
    }
    if best == nil {
        return ""
    }
    //step 5 變更臨時權重為 臨時權重-有效權重之和
    best.currentWeight -= total
    return best.addr
}

func (r *WeightRoundRobinBalance) Get(key string) (string, error) {
    return r.Next(), nil
}

func (r *WeightRoundRobinBalance) SetConf(conf LoadBalanceConf) {
    r.conf = conf
}

func (r *WeightRoundRobinBalance) Update() {
    if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
        fmt.Println("WeightRoundRobinBalance get conf:", conf.GetConf())
        r.rss = nil
        for _, ip := range conf.GetConf() {
            r.Add(strings.Split(ip, ",")...)
        }
    }
    if conf, ok := r.conf.(*LoadBalanceCheckConf); ok {
        fmt.Println("WeightRoundRobinBalance get conf:", conf.GetConf())
        r.rss = nil
        for _, ip := range conf.GetConf() {
            r.Add(strings.Split(ip, ",")...)
        }
    }
}
weight_tound_robin.go
package load_balance

import (
    "fmt"
    "testing"
)

func TestLB(t *testing.T) {
    rb := &WeightRoundRobinBalance{}
    rb.Add("127.0.0.1:2003", "4") //0
    rb.Add("127.0.0.1:2004", "3") //1
    rb.Add("127.0.0.1:2005", "2") //2

    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
    fmt.Println(rb.Next())
}
test

 

一致性hash(ip_hash、url_hash)

為了解決平衡性:引入了虛擬節點概念(把個節點 均勻的覆蓋到環上)

package load_balance

import (
    "errors"
    "fmt"
    "hash/crc32"
    "sort"
    "strconv"
    "strings"
    "sync"
)

type Hash func(data []byte) uint32

type UInt32Slice []uint32

func (s UInt32Slice) Len() int {
    return len(s)
}

func (s UInt32Slice) Less(i, j int) bool {
    return s[i] < s[j]
}

func (s UInt32Slice) Swap(i, j int) {
    s[i], s[j] = s[j], s[i]
}

type ConsistentHashBanlance struct {
    mux      sync.RWMutex
    hash     Hash
    replicas int               //復制因子 虛擬節點數
    keys     UInt32Slice       //已排序的節點hash切片 映射在環上的虛擬節點
    hashMap  map[uint32]string //節點哈希和Key的map,鍵是hash值,值是節點key

    //觀察主體
    conf LoadBalanceConf
}

func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance {
    m := &ConsistentHashBanlance{
        replicas: replicas,//復制因子 虛擬節點數
        hash:     fn,
        hashMap:  make(map[uint32]string),
    }
    if m.hash == nil {
        //最多32位,保證是一個2^32-1環
        m.hash = crc32.ChecksumIEEE
    }
    return m
}

// 驗證是否為空
func (c *ConsistentHashBanlance) IsEmpty() bool {
    return len(c.keys) == 0
}

// Add 方法用來添加緩存節點,參數為節點key,比如使用IP
func (c *ConsistentHashBanlance) Add(params ...string) error {
    if len(params) == 0 {
        return errors.New("param len 1 at least")
    }
    addr := params[0]
    c.mux.Lock()
    defer c.mux.Unlock()
    // 結合復制因子計算所有虛擬節點的hash值,並存入m.keys中,同時在m.hashMap中保存哈希值和key的映射
    for i := 0; i < c.replicas; i++ {
        hash := c.hash([]byte(strconv.Itoa(i) + addr))
        c.keys = append(c.keys, hash)
        c.hashMap[hash] = addr
    }
    // 對所有虛擬節點的哈希值進行排序,方便之后進行二分查找
    sort.Sort(c.keys)
    return nil
}

// Get 方法根據給定的對象獲取最靠近它的那個節點
func (c *ConsistentHashBanlance) Get(key string) (string, error) {
    if c.IsEmpty() {
        return "", errors.New("node is empty")
    }
    hash := c.hash([]byte(key))

    // 通過二分查找獲取最優節點,第一個"服務器hash"值大於"數據hash"值的就是最優"服務器節點"
    idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })

    // 如果查找結果 大於 服務器節點哈希數組的最大索引,表示此時該對象哈希值位於最后一個節點之后,那么放入第一個節點中
    if idx == len(c.keys) {
        idx = 0
    }
    c.mux.RLock()
    defer c.mux.RUnlock()
    return c.hashMap[c.keys[idx]], nil
}

func (c *ConsistentHashBanlance) SetConf(conf LoadBalanceConf) {
    c.conf = conf
}

func (c *ConsistentHashBanlance) Update() {
    if conf, ok := c.conf.(*LoadBalanceZkConf); ok {
        fmt.Println("Update get conf:", conf.GetConf())
        c.mux.Lock()
        defer c.mux.Unlock()
        c.keys = nil
        c.hashMap = nil
        for _, ip := range conf.GetConf() {
            c.Add(strings.Split(ip, ",")...)
        }
    }
    if conf, ok := c.conf.(*LoadBalanceCheckConf); ok {
        fmt.Println("Update get conf:", conf.GetConf())
        c.mux.Lock()
        defer c.mux.Unlock()
        c.keys = nil
        c.hashMap = nil
        for _, ip := range conf.GetConf() {
            c.Add(strings.Split(ip, ",")...)
        }
    }
}
consistent_hash.go

 

package load_balance

import (
    "fmt"
    "testing"
)

func TestNewConsistentHashBanlance(t *testing.T) {
    rb := NewConsistentHashBanlance(10, nil)
    rb.Add("127.0.0.1:2003") //0
    rb.Add("127.0.0.1:2004") //1
    rb.Add("127.0.0.1:2005") //2
    rb.Add("127.0.0.1:2006") //3
    rb.Add("127.0.0.1:2007") //4

    //url hash
    fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo"))
    fmt.Println(rb.Get("http://127.0.0.1:2002/base/error"))
    fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo"))
    fmt.Println(rb.Get("http://127.0.0.1:2002/base/changepwd"))

    //ip hash
    fmt.Println(rb.Get("127.0.0.1"))
    fmt.Println(rb.Get("192.168.0.1"))
    fmt.Println(rb.Get("127.0.0.1"))
}
test.go

 

工廠方法簡單封裝上述幾種拒載均衡調用:

interface.go

package load_balance

type LoadBalance interface {
    Add(...string) error
    Get(string) (string, error)

    //后期服務發現補充
    Update()
}

 

 

factory.go

package load_balance

type LbType int

const (
    LbRandom LbType = iota
    LbRoundRobin
    LbWeightRoundRobin
    LbConsistentHash
)

func LoadBanlanceFactory(lbType LbType) LoadBalance {
    switch lbType {
    case LbRandom:
        return &RandomBalance{}
    case LbConsistentHash:
        return NewConsistentHashBanlance(10, nil)
    case LbRoundRobin:
        return &RoundRobinBalance{}
    case LbWeightRoundRobin:
        return &WeightRoundRobinBalance{}
    default:
        return &RandomBalance{}
    }
}

 

 

調用:

func main() {
    rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
    if err := rb.Add("http://127.0.0.1:2003/base", "10"); err != nil {
        log.Println(err)
    }
    if err := rb.Add("http://127.0.0.1:2004/base", "20"); err != nil {
        log.Println(err)
    }
   // 。。。。。。。。。。。。。。。
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM