轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com
本文使用的Istio源碼是 release 1.5。
介紹
pilot-discovery是在Pilot中的核心服務,在Pilot中名為pilot-discovery,主要功能是從注冊中心(如 kubernetes 或者 consul)獲取信息並匯集,從 Kubernetes API Server 中獲取流量規則,並將服務信息和流量規則轉化為數據面可以理解的格式,通過標准的數據面 API 下發到網格中的各個SideCar中。
pilot-discovery包含了服務發現、配置規則發現、xDS配置下發。總體上打算分三篇來進行講解,這一篇主要看看服務發現部分的實現。文章中有涉及xDS協議的一些東西,大家可以看看這篇文章:深入解讀Service Mesh背后的技術細節。
Pilot服務發現指通過監聽底層平台的服務注冊中心來緩存Istio服務模型,並且監視服務模型的變化,再服務模型更新時觸發相關事件回調處理函數的執行。
服務發現工作機制
Pilot初始化
discoveryCmd = &cobra.Command{
Use: "discovery",
Short: "Start Istio proxy discovery service.",
Args: cobra.ExactArgs(0),
RunE: func(c *cobra.Command, args []string) error {
...
//日志配置
if err := log.Configure(loggingOptions); err != nil {
return err
}
...
stop := make(chan struct{})
// 創建xDs服務器
discoveryServer, err := bootstrap.NewServer(&serverArgs)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}
// 啟動服務器
if err := discoveryServer.Start(stop); err != nil {
return fmt.Errorf("failed to start discovery service: %v", err)
}
//等待進程推出
cmd.WaitSignal(stop)
discoveryServer.WaitUntilCompletion()
return nil
},
}
Pilot服務在初始化的時候首先會初始化日志配置,然后創建xDs服務器,這里的xDs指的是x Discovery Service的意思,x代表了一系列的組件如:Cluster、Endpoint、Listener、Route 等。
func NewServer(args *PilotArgs) (*Server, error) {
args.Default()
e := &model.Environment{
ServiceDiscovery: aggregate.NewController(),
PushContext: model.NewPushContext(),
}
s := &Server{
basePort: args.BasePort,
clusterID: getClusterID(args),
environment: e,
EnvoyXdsServer: envoyv2.NewDiscoveryServer(e, args.Plugins),
forceStop: args.ForceStop,
mux: http.NewServeMux(),
}
// 初始化處理Istio Config的控制器
if err := s.initConfigController(args); err != nil {
return nil, fmt.Errorf("config controller: %v", err)
}
// 初始化處理Service Discovery的控制器
if err := s.initServiceControllers(args); err != nil {
return nil, fmt.Errorf("service controllers: %v", err)
}
...
//初始化xDS服務端
if err := s.initDiscoveryService(args); err != nil {
return nil, fmt.Errorf("discovery service: %v", err)
}
...
// Webhook 回調服務
if err := s.initHTTPSWebhookServer(args); err != nil {
return nil, fmt.Errorf("injectionWebhook server: %v", err)
}
//sidecar注入相關
if err := s.initSidecarInjector(args); err != nil {
return nil, fmt.Errorf("sidecar injector: %v", err)
}
...
return s, nil
}
NewServer方法里面初始化了很多模塊,這里挑相關的看看initConfigController是和配置服務相關的,我們之后再看,這里我們主要看initServiceControllers。
ServiceControllers
服務發現的主要邏輯在Pilot中由ServiceController(服務控制器)實現,通過監聽底層平台的服務注冊中心來緩存Istio服務模型,並監視服務模型的變化,在服務模型更新時觸發相關事件回調處理函數的執行。
初始化
Controller的初始化執行流程很簡單,這里用一張圖來描述,initServiceControllers方法最后會調用到NewController方法來進行初始化。
func NewController(client kubernetes.Interface, options Options) *Controller {
log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
options.WatchedNamespace, options.ResyncPeriod)
// The queue requires a time duration for a retry delay after a handler error
// 初始化Controller
c := &Controller{
domainSuffix: options.DomainSuffix,
client: client,
//控制器任務隊列
queue: queue.NewQueue(1 * time.Second),
clusterID: options.ClusterID,
xdsUpdater: options.XDSUpdater,
servicesMap: make(map[host.Name]*model.Service),
externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
networksWatcher: options.NetworksWatcher,
metrics: options.Metrics,
}
//獲取informer
sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
//注冊 informer處理器
c.services = sharedInformers.Core().V1().Services().Informer()
//Services Handler
registerHandlers(c.services, c.queue, "Services", c.onServiceEvent)
//endpoints Handler
switch options.EndpointMode {
case EndpointsOnly:
c.endpoints = newEndpointsController(c, sharedInformers)
case EndpointSliceOnly:
c.endpoints = newEndpointSliceController(c, sharedInformers)
}
//Nodes Handler
c.nodes = sharedInformers.Core().V1().Nodes().Informer()
registerHandlers(c.nodes, c.queue, "Nodes", c.onNodeEvent)
podInformer := sharedInformers.Core().V1().Pods().Informer()
c.pods = newPodCache(podInformer, c)
//Pods Handler
registerHandlers(podInformer, c.queue, "Pods", c.pods.onEvent)
return c
}
NewController方法里面首先是初始化Controller,然后獲取informer后分別注冊Services Handler、endpoints Handler、Nodes Handler、Pods Handler。
核心功能就是監聽k8s相關資源(Service、Endpoint、Pod、Node)的更新事件,執行相應的事件處理回調函數。
這里的Controller結構體實現了Controller接口:
type Controller interface {
// AppendServiceHandler notifies about changes to the service catalog.
AppendServiceHandler(f func(*Service, Event)) error
// AppendInstanceHandler notifies about changes to the service instances
// for a service.
AppendInstanceHandler(f func(*ServiceInstance, Event)) error
// Run until a signal is received
Run(stop <-chan struct{})
}
再注冊完畢后會調用其Run方法異步執行。
//異步調用Run方法
go serviceControllers.Run(stop)
//run方法里面會遍歷GetRegistries列表,並異步執行其Run方法
func (c *Controller) Run(stop <-chan struct{}) {
for _, r := range c.GetRegistries() {
go r.Run(stop)
}
<-stop
log.Info("Registry Aggregator terminated")
}
到這里ServiceController為四種資源分別創建了一個監聽器,用於監聽K8s的資源更新,並注冊EventHandler。
Service處理器
func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
if err := c.checkReadyForEvents(); err != nil {
return err
}
svc, ok := curr.(*v1.Service)
if !ok {
tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("Couldn't get object from tombstone %#v", curr)
return nil
}
svc, ok = tombstone.Obj.(*v1.Service)
if !ok {
log.Errorf("Tombstone contained object that is not a service %#v", curr)
return nil
}
}
log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)
//將k8s service 轉換成 istio service
svcConv := kube.ConvertService(*svc, c.domainSuffix, c.clusterID)
//根據事件類型處理事件
switch event {
//刪除事件
case model.EventDelete:
c.Lock()
delete(c.servicesMap, svcConv.Hostname)
delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
c.Unlock()
// EDS needs to just know when service is deleted.
//更新服務緩存
c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
default:
// instance conversion is only required when service is added/updated.
instances := kube.ExternalNameServiceInstances(*svc, svcConv)
c.Lock()
c.servicesMap[svcConv.Hostname] = svcConv
if instances == nil {
delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
} else {
c.externalNameSvcInstanceMap[svcConv.Hostname] = instances
}
c.Unlock()
//更新服務緩存
c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
}
// Notify service handlers.
// 觸發XDS事件處理器
for _, f := range c.serviceHandlers {
f(svcConv, event)
}
return nil
}
Service事件處理器會將根據事件的類型更新緩存,然后調用serviceHandlers的事件處理器進行回調。serviceHandlers事件處理器是在初始化DiscoveryService的時候設置的。
serviceHandler := func(svc *model.Service, _ model.Event) {
pushReq := &model.PushRequest{
Full: true,
NamespacesUpdated: map[string]struct{}{svc.Attributes.Namespace: {}},
ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(): {}},
Reason: []model.TriggerReason{model.ServiceUpdate},
}
//配置更新
s.EnvoyXdsServer.ConfigUpdate(pushReq)
}
Endpoint處理器
Endpoint處理器會在調用newEndpointsController創建endpointsController的時候進行注冊
func newEndpointsController(c *Controller, sharedInformers informers.SharedInformerFactory) *endpointsController {
informer := sharedInformers.Core().V1().Endpoints().Informer()
out := &endpointsController{
kubeEndpoints: kubeEndpoints{
c: c,
informer: informer,
},
}
//注冊處理器
out.registerEndpointsHandler()
return out
}
在回調的時候會調用到endpointsController的onEvent方法:
func (e *endpointsController) onEvent(curr interface{}, event model.Event) error {
...
return e.handleEvent(ep.Name, ep.Namespace, event, curr, func(obj interface{}, event model.Event) {
ep := obj.(*v1.Endpoints)
//EDS更新處理
e.c.updateEDS(ep, event)
})
}
這里會調用updateEDS進行EDS(Endpoint Discovery service)更新處理。
func (c *Controller) updateEDS(ep *v1.Endpoints, event model.Event) {
hostname := kube.ServiceHostname(ep.Name, ep.Namespace, c.domainSuffix)
endpoints := make([]*model.IstioEndpoint, 0)
if event != model.EventDelete {
for _, ss := range ep.Subsets {
for _, ea := range ss.Addresses {
//獲取Endpoint對應的Pod實例
pod := c.pods.getPodByIP(ea.IP)
...
// 將Endpoint轉換成Istio模型IstioEndpoint
for _, port := range ss.Ports {
endpoints = append(endpoints, &model.IstioEndpoint{
Address: ea.IP,
EndpointPort: uint32(port.Port),
ServicePortName: port.Name,
Labels: labelMap,
UID: uid,
ServiceAccount: sa,
Network: c.endpointNetwork(ea.IP),
Locality: locality,
Attributes: model.ServiceAttributes{Name: ep.Name, Namespace: ep.Namespace},
TLSMode: tlsMode,
})
}
}
}
}
//使用xdsUpdater更新EDS
_ = c.xdsUpdater.EDSUpdate(c.clusterID, string(hostname), ep.Namespace, endpoints)
}
在這里會重新封裝endpoints然后調用EDSUpdate進行更新。
func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint) error {
inboundEDSUpdates.Increment()
s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints, false)
return nil
}
func (s *DiscoveryServer) edsUpdate(clusterID, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint, internal bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
requireFull := false
...
//找到之前緩存的服務
if _, f := s.EndpointShardsByService[serviceName]; !f {
s.EndpointShardsByService[serviceName] = map[string]*EndpointShards{}
}
ep, f := s.EndpointShardsByService[serviceName][namespace]
//不存在則初始化
if !f {
ep = &EndpointShards{
Shards: map[string][]*model.IstioEndpoint{},
ServiceAccounts: map[string]bool{},
}
s.EndpointShardsByService[serviceName][namespace] = ep
if !internal {
adsLog.Infof("Full push, new service %s", serviceName)
requireFull = true
}
}
...
ep.mutex.Lock()
ep.Shards[clusterID] = istioEndpoints
ep.ServiceAccounts = serviceAccounts
ep.mutex.Unlock()
if !internal {
var edsUpdates map[string]struct{}
if !requireFull {
edsUpdates = map[string]struct{}{serviceName: {}}
}
//配置更新
s.ConfigUpdate(&model.PushRequest{
Full: requireFull,
NamespacesUpdated: map[string]struct{}{namespace: {}},
ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(): {}},
EdsUpdates: edsUpdates,
Reason: []model.TriggerReason{model.EndpointUpdate},
})
}
}
edsUpdate方法里面實際上就是做了兩件事,一是更新緩存,二是調用ConfigUpdate進行配置更新。
ConfigUpdate資源更新實際上就是通過事件分發執行xDS分發,這塊的細節我們稍后再講。
總結
通過這篇我們掌握了服務發現是通過k8s的Informer來注冊監聽Service、EndPoint、nodes、pods等資源的更新事件,然后通過事件驅動模型執行回調函數,再調用xDS的ConfigUpdate來執行異步更新配置的操作。
Reference
https://www.servicemesher.com/blog/istio-analysis-4/
https://www.cnblogs.com/163yun/p/8962278.html
https://www.servicemesher.com/blog/envoy-proxy-config-deep-dive/