新手學分布式 - Envoy Proxy XDS Server動態配置的一點使用心得


Envoy Proxy 動態API的使用總結

Envoy Proxy和其它L4/L7反向搭理工具最大的區別就是原生支持動態配置。 首先來看一下Envoy的大致架構

從上圖可以簡單理解:Listener負責接受外部的請求,然后經過Filter/Router處理之后,在轉發到具體的Cluster。 其中Listener,Router,Cluster和Host地址都是可以動態配置的,配置這些數據的服務就稱之為X Discovery Services,簡稱XDS。

本文主要描述如何編寫XDS Server更新邏輯。

Envoy Porxy XDS Service通過GRPC服務進行數據更新,所有Proto文件可以參考 https://github.com/envoyproxy/envoy/tree/master/api/envoy/api/v2 。 用戶可以根據proto文件自行生成相對應語言的GRPC代碼文件。如果使用golang來實現的話,Envoy已經提供了一份編譯好的GRPC代碼,地址在這里: https://github.com/envoyproxy/go-control-plane/tree/master/envoy/api/v2

每個XDS Service都有兩種GRPC服務, StreamDeltaStream用來更新全量數據,Delta用來更新增量數據。下面以RDS Service為例來看看如何實現一個 XDS Service。

RDS Service可以提供所有的Route信息,一個簡化后的典型Route配置如下:

# 完整的Route API定義參考 https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/rds.proto#envoy-api-msg-routeconfiguration

                route_config:
                  name: local_route
                  virtual_hosts:
                    - name: local_service
                      domains: ["*"]
                      require_tls: NONE
                      routes:
                        - match:
                            prefix: "/MyService"
                          route: { cluster: my-grpc-svc_cluster }

上面的配置語義為: 當收到一個Path前綴為/MyService的請求后,將此請求轉發到my-grpc-svc_cluster. (my-grpc-svc_cluster表示的是后端Upstream信息,可以是STATIC類型也可以由CDS Service動態提供)

RDS Service的作用就是動態生成類似上面的語義配置。 先來看相對簡單的StreamRoutes如何實現。

GRPC描述文件中,對此函數的定義如下:

service RouteDiscoveryService {
  rpc StreamRoutes(stream DiscoveryRequest) returns (stream DiscoveryResponse) {
  }

  rpc DeltaRoutes(stream DeltaDiscoveryRequest) returns (stream DeltaDiscoveryResponse) {
  }

  rpc FetchRoutes(DiscoveryRequest) returns (DiscoveryResponse) {
    option (google.api.http) = {
      post: "/v2/discovery:routes"
      body: "*"
    };
  }
}

從返回值可以看出StreamRoutes是一個流函數,RDS會通過這個流實時將數據推送給Envoy。 所以大致的實現模型就是如下的樣子:

func (r rds) StreamRoutes(ls envoy_api_v2.RouteDiscoveryService_StreamRoutesServer) error {
    for{
        select{
            case x <- c>:
                ls.Send(xxx)
        }

    }
}

Send函數接受的是DiscoveryResponse指針,而這個DiscoveryResponse從定義來看是自解釋動態結構體。 具體數據類型由typeUrl屬性來決定。 具體到Route來說,typeURL是"type.googleapis.com/envoy.api.v2.RouteConfiguration". (類型說明參見 https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol.html?highlight=type url#resource-types

數據則由Resource來保存。

Resource[]*any.Any類型,說白了就是萬能的Interface{}。所以創建any.Any時需要指定具體的數據類型("type.googleapis.com/envoy.api.v2.RouteConfiguration"). data則是經過ProtoMessage編碼后的二進制數據。 所以創建any.Any應該是下面的樣子:


            data, err := proto.Marshal(xxxx)
			if err != nil {
				logrus.Errorf("Marshal Error. %s", err)
				continue
			}

			any :=  &any.Any{
				TypeUrl: "type.googleapis.com/envoy.api.v2.Cluster",
				Value:   data,
			})

xxxx是RDS需要返回給Envoy的路由數據,也就是RouteConfiguration。所以下面來看如何構建RouteConfiguration。 通過API定義可知,有一些數據是必輸項(通過proto校驗描述文件也可以獲取必輸項,但不如看API文檔來的直接)。 假設我們要實現開篇簡單的Route配置,那么 RouteConfiguration 應該這樣定義:

      r:=&envoy_api_v2.RouteConfiguration{
			Name: "local_route",
			VirtualHosts: []*route.VirtualHost{
				&route.VirtualHost{
					Name: "local_service",
					Domains: []string{
						"*",
					},
					Routes: []*route.Route{
						&route.Route{
							Match: &route.RouteMatch{
								PathSpecifier: &route.RouteMatch_Prefix{Prefix: "/MyService"},
							},
							Action: &route.Route_Route{Route: &route.RouteAction{
								ClusterSpecifier: &route.RouteAction_Cluster{Cluster: "my-grpc-svc_cluster"},
							}},
						},
					},
				},
			},
		},

需要注意兩個地方:

  1. Name: "local_route"。 這里的Name一定要和Listener中定義的RouteConfig Name保持一致。 如果不一致,Listener不會加載這段Route配置(換言之,這個Name就是雙方的關聯主鍵)
  2. Cluster 名稱也要保持一致。 同理,如果不一致,后續請求轉發時就會找不到UPstream

經過這些步驟,一個近似完整的Route DiscoveryResponse就定義完成了。 而后就可以通過調用Send來發送給Envoy。

然而此時事情並沒有結束, 開篇說過Stream同步全量,Delta同步增量。 再詳細一點,在StreamRoutes中每次都需要傳輸當前所有的Route配置,而不僅僅是發生過變更的數據 . 個人感覺這種處理方式,對於數據組織來說很麻煩,但對於Envoy數據更新來說確很方便(每次都是全量數據,不用做merge了)。 merge總是一件耗時費力的事情,就看事情誰來做,這次envoy決定讓用戶來做了。

所以我們需要調整一下StreamRoutes實現模型:

func (r rds) StreamRoutes(ls envoy_api_v2.RouteDiscoveryService_StreamRoutesServer) error {
    for{
        select{
            case x <- c>:
                // x表示變動的數據
                n := merge(x) //對x進行merge操作,返回當前最新全量數據n

                var srvRoute []*route.Route
                for _, d := range n{
                    srvRoute = append(srvRoute, &route.Route{
                                        Match: &route.RouteMatch{
                                            PathSpecifier: &route.RouteMatch_Prefix{Prefix: xxxx},
                                        },
                                        Action: &route.Route_Route{Route: &route.RouteAction{
                                            ClusterSpecifier: &route.RouteAction_Cluster{Cluster: xxxx},
                                        }},
                                    })
                }

                rc := []*envoy_api_v2.RouteConfiguration{
                    &envoy_api_v2.RouteConfiguration{
                        Name: "local_route",
                        VirtualHosts: []*route.VirtualHost{
                            &route.VirtualHost{
                                Name: "local_service",
                                Domains: []string{
                                    "*",
                                },
                                Routes: srvRoute,
                            },
                        },
                    },
                }

                var resource []*any.Any

                for _, rca := range rc {
                    data, err := proto.Marshal(rca)
                    if err != nil {
                        return err
                    }

                    resource = append(resource, &any.Any{
                        TypeUrl: "type.googleapis.com/envoy.api.v2.RouteConfiguration",
                        Value:   data,
                    })
                }


                ls.Send(&envoy_api_v2.DiscoveryResponse{
                        VersionInfo: xxx,
                        Resources:   resource,
                        Canary:      false,
                        TypeUrl:     "type.googleapis.com/envoy.api.v2.RouteConfiguration",
                        Nonce:       time.Now().String(),
                    })
        }

    }
}

調整之后,每次就會返回Envoy最新的Route數據。 上面的模型僅考慮了單Envoy實例的情況,並未考慮多實例。 當多實例鏈接RDS Service時, 從c獲取數據,就會變成非冪等事件,從而無法保證所有Envoy實例數據保持一致。

實現StreamRoutes之后,在來看如何實現DeltaRoutes

Delta是用來同步增量數據的,從函數原型來看,入參也是一個Stream,所以函數原型應該和StreamRoutes差不多。 如果你也這樣想,就錯了

Delta的stream只是用來傳輸數據的(猜測是為了提高數據傳輸效率,而並不是為了保持長連接)。 每次傳輸完成之后,Envoy都會主動斷開這個鏈接。 也就是說,Envoy是定時調用DeltaRoutes來獲取增量更新數據的。如果按照stream的實現模型來編寫邏輯,將會發現經過一段時間后,這個stream會莫名的變成closed狀態。 原因就是envoy接收到此次事件后,主動關閉了stream。

所以如果要使用Delta模式,那么會無法保證Envoy無法實時響應數據變化(因為這個定時調用的存在)。 而如果使用Stream模式,那么用戶需要自行維護數據正確性(如果merge很復雜,正確性就會下降)。

所以選擇Stream還是Delta對於用戶來說是個問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM