深入解讀Service Mesh的數據面Envoy


在前面的一篇文章中,詳細解讀了Service Mesh中的技術細節,深入解讀Service Mesh背后的技術細節。

但是對於數據面的關鍵組件Envoy沒有詳細解讀,這篇文章補上。

 

一、Envoy的工作模式

 

 

 Envoy的工作模式如圖所示,橫向是管理平面。

 

Envoy會暴露admin的API,可以通過API查看Envoy中的路由或者集群的配置。

 

例如通過curl http://127.0.0.1:15000/routes可以查看路由的配置,結果如下圖,請記住路由的配置層級,后面在代碼中會看到熟悉的數據結構。

 

 

routes下面有virtual_hosts,里面有帶prefix的route,里面是route entry,里面是weight cluster,對於不同的cluster不同的路由權重,再里面是cluster的列表,有cluster的名稱和權重。

 

再如通過curl http://127.0.0.1:15000/clusters可以得到集群也即cluster的配置,這里面是真正cluster的信息。

 

在另外一面,Envoy會調用envoy API去pilot里面獲取路由和集群的配置,envoy API的詳情可以查看https://www.envoyproxy.io/docs/envoy/v1.8.0/api-v2/http_routes/http_routes中的API文檔,可以看到route的配置詳情,也是按照上面的層級組織的。

 

當Envoy從pilot獲取到路由和集群信息之后,會保存在內存中,當有個客戶端要連接后端的時候,客戶端處於Downstream,后端處於Upstream,當數據從Downstream流向Upstream的時候,會通過Filter,根據路由和集群的配置,選擇后端的應用建立連接,將請求轉發出去。

 

接下來我們來看Envoy是如何實現這些的。

 

二、Envoy的關鍵數據結構的創建

 

 

Envoy的啟動會在main函數中創建Envoy::MainCommon,在它的構造函數中,會調用父類MainCommonBase的構造函數,創建Server::InstanceImpl,接下來調用InstanceImpl::initialize(...)

 

接下來就進入關鍵的初始化階段。

 

加載初始化配置,里面配置了Listener Discover Service, Router Discover Service, Cluster Discover Service等。

 

InstanceUtil::loadBootstrapConfig(bootstrap_, options);

 

 創建AdminImpl,從而可以接受請求接口

 

admin_.reset(new AdminImpl(initial_config.admin().accessLogPath(), initial_config.admin().profilePath(), *this));

 

 創建ListenerManagerImpl,用於管理監聽,因為Downstream要訪問Upstream的時候,envoy會進行監聽,Downstream會連接監聽的端口。

 

listener_manager_.reset(new ListenerManagerImpl(*this, listener_component_factory_, worker_factory_, time_system_));

 

 在ListenerManagerImpl的構造函數中,創建了很多的worker,Envoy采用libevent監聽socket的事件,當有一個新的連接來的時候,會將任務分配給某個worker進行處理,從而實現異步的處理。

 

ListenerManagerImpl::ListenerManagerImpl(...) {
  for (uint32_t i = 0; i < server.options().concurrency(); i++) {
    workers_.emplace_back(worker_factory.createWorker());
  }
}

 

 createWorker會創建WorkerImpl,初始化WorkerImpl需要兩個重要的參數。

一個是allocateDispatcher創建出來的DispatcherImpl,用來封裝libevent的事件分發的。

一個是ConnectionHandlerImpl,用來管理一個連接的。

 

我們接着看初始化過程,創建ProdClusterManagerFactory,用於創建Cluster Manager,管理上游的集群。

 

cluster_manager_factory_.reset(new Upstream::ProdClusterManagerFactory(...);

 

 在ProdClusterManagerFactory會創建ClusterManagerImpl,在ClusterManagerImpl的構造函數中,如果配置了CDS,就需要訂閱Cluster Discover Service。

 

if (bootstrap.dynamic_resources().has_cds_config()) {
    cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this);
    init_helper_.setCds(cds_api_.get());
} 

 

 會調用ProdClusterManagerFactory::createCds,里面會創建CdsApiImpl。在CdsApiImpl的構造函數中,會創建CdsSubscription,訂閱CDS,當集群的配置發生變化的時候,會調用CdsApiImpl::onConfigUpdate(...)

 

接下來在InstanceImpl::initialize的初始化函數中,如果配置了LDS,就需要訂閱Listener Discover Service。

 

if (bootstrap_.dynamic_resources().has_lds_config()) {
  listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config());
}

 

ListenerManagerImpl的createLdsApi會調用ProdListenerComponentFactory的createLdsApi函數,會創建LdsApiImpl。在LdsApiImpl的構造函數中,會創建LdsSubscription,訂閱LDS,當Listener的配置改變的時候,會調用LdsApiImpl::onConfigUpdate(...)

 

三、Envoy的啟動

 

MainCommonBase::run() 會調用InstanceImpl::run(),會調用InstanceImpl::startWorkers(),會調用ListenerManagerImpl::startWorkers。

 

startWorkers會將Listener添加到所有的worker中,然后啟動worker的線程。

 

void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
    workers_started_ = true;
    for (const auto& worker : workers_) {
        for (const auto& listener : active_listeners_) {
            addListenerToWorker(*worker, *listener);
        }
        worker->start(guard_dog);
    }
}

 

對於每一個WorkerImpl,會調用ConnectionHandlerImpl的addListener函數,會創建ActiveListener對象,ActiveListener很重要,他的函數會在libevent收到事件的時候被調用。

 

在ActiveListener的構造函數中,會調用相同Worker的DispatcherImpl的createListener,創建Network::ListenerImpl,注意這個類的namespace,因為對於Envoy來講,LDS里面有個Listener的概念,但是Socket也有Listener的概念,在Network這個namespace下面的,是對socket和libevent的封裝。

 

在Network::ListenerImpl的構造函數中,當收到事件的時候,會調用注冊的listenCallback函數。

 

if (bind_to_port) {
    listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));
}

 

在listenCallback函數中,會調用onAccept函數,這個函數是ConnectionHandlerImpl::ActiveListener::onAccept函數。

 

listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address), listener->hand_off_restored_destination_connections_);

 

四、Envoy從Pilot中獲取Listener

 

當Listener的配置有變化的時候,會調用LdsApiImpl::onConfigUpdate(...)。

 

會調用ListenerManagerImpl的addOrUpdateListener(...)函數,在這里面,會創建一個ListenerImpl,

 

這里的listener是LDS定義的Listener,而非網絡的Listener了。

 

在ListenerImpl的構造函數中,首先會創建ListenerFilter,可以對監聽的socket進行定制化的配置,例如為了實現use_original_dst,就需要加入一個ListenerFilter。

 

為了創建ListenerFilter,先要調用ProdListenerComponentFactory::createListenerFilterFactoryList_來創建工廠。

 

創建完了ListenerFilter之后,為了對於進來的網絡包進行處理,會創建NetworkFilter,正是這些NetworkFilter實現了對網絡包的解析,並根據網絡包的內容,實現路由和負載均衡策略。

 

為了創建NetworkFilter,先要調用ProdListenerComponentFactory::createNetworkFilterFactoryList_來創建工廠,在這個函數里面,會遍歷所有的NamedNetworkFilterConfigFactory,也即起了名字的filter都過一遍,都有哪些呢?class NetworkFilterNameValues里面有定義,這里面最重要的是:

 

// HTTP connection manager filter
const std::string HttpConnectionManager = "envoy.http_connection_manager";

 

我們這里重點看HTTP協議的轉發,我們重點看這個名字對應的HttpConnectionManagerFilterConfigFactory,並調用他的createFilterFactory函數,返回一個Network::FilterFactoryCb類型的callback函數。

 

五、Envoy訂閱RDS

 

HttpConnectionManagerFilterConfigFactory的createFilterFactory函數最終調用createFilterFactoryFromProtoTyped函數中,會創建

Router::RouteConfigProviderManagerImpl。

 

std::shared_ptr<Router::RouteConfigProviderManager>
    route_config_provider_manager
        = context.singletonManager().getTyped<Router::RouteConfigProviderManager>(
            SINGLETON_MANAGER_REGISTERED_NAME(route_config_provider_manager), [&context] {
            return std::make_shared<Router::RouteConfigProviderManagerImpl>(context.admin());
});

 

創建完畢Router::RouteConfigProviderManagerImpl之后,會創建HttpConnectionManagerConfig,將Router::RouteConfigProviderManagerImpl作為成員變量傳入。

 

std::shared_ptr<HttpConnectionManagerConfig> filter_config(new HttpConnectionManagerConfig(proto_config, context, *date_provider, *route_config_provider_manager));

 

在HttpConnectionManagerConfig的構造函數中,調用Router::RouteConfigProviderUtil::create。

 

route_config_provider_ = Router::RouteConfigProviderUtil::create(config, context_, stats_prefix_,route_config_provider_manager_);

 

Router::RouteConfigProviderUtil::create會調用RouteConfigProviderManagerImpl::createRdsRouteConfigProvider,在這個函數里面,會創建RdsRouteConfigSubscription訂閱RDS,然后創建RdsRouteConfigProviderImpl。

 

當Router的配置發生變化的時候,會調用RdsRouteConfigProviderImpl::onConfigUpdate(),在這個函數里面,會從pilot獲取Route的配置,生成ConfigImpl對象。

 

當我們仔細分析ConfigImpl的時候,我們發現,這個數據結構和第一節Envoy中的路由配置是一樣的。

 

在ConfigImpl里面有RouteMatcher用於匹配路由,在RouteMatcher里面有VirtualHostImpl,在VirtualHostImpl里面有RouteEntryImplBase,其中一種RouteEntry是WeightedClusterEntry,在WeightedClusterEntry里面有cluster_name_和cluster_weight_。

 

到此RouteConfigProviderManagerImpl如何訂閱RDS告一段落,我們回到HttpConnectionManagerFilterConfigFactory的createFilterFactoryFromProtoTyped中,在這個函數的最后一部分,將返回Network::FilterFactoryCb,是一個callback函數,作為createFilterFactoryFromProtoTyped的返回值。

 

在這個callback函數中,會創建Http::ConnectionManagerImpl,將HttpConnectionManagerConfig作為成員變量傳入,Http::ConnectionManagerImpl是一個Network::ReadFilter。在Envoy里面,有Network::WriteFilter和Network::ReadFilter,其中從Downstream發送到Upstream的使用Network::ReadFilter對網絡包進行處理,反方向的是使用Network::WriteFilter。

 

callback函數創建ConnectionManagerImpl之后調用filter_manager.addReadFilter,將ConnectionManagerImpl放到ReadFilter列表中。當然這個callback函數現在是不調用的。

 

createFilterFactoryFromProtoTyped的返回值callback函數會返回到ProdListenerComponentFactory::createNetworkFilterFactoryList_函數,這個函數返回一個callback函數列表,其中一個就是上面生成的這個函數。

 

再返回就回到了ListenerImpl的構造函數,他會調用addFilterChain,將ProdListenerComponentFactory::createNetworkFilterFactoryList_返回的callback函數列表加入到鏈里面。

 

六、當一個新的連接建立的時候

 

通過前面的分析,我們制定當一個新的連接建立的時候,會觸發libevent的事件,最終調用ConnectionHandlerImpl::ActiveListener::onAccept函數。

 

在這個函數中,會調用使用上面createListenerFilterFactoryList_創建的ListenerFilter的工廠創建ListenerFilter,然后使用這些Filter進行處理。

 

// Create and run the filters
config_.filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);
 

ConnectionHandlerImpl::ActiveSocket::continueFilterChain函數調用ConnectionHandlerImpl::ActiveListener::newConnection,創建一個新的連接。

 

在ConnectionHandlerImpl::ActiveListener::newConnection函數中,先是會調用DispatcherImpl::createServerConnection,創建Network::ConnectionImpl,在Network::ConnectionImpl的構造函數里面:

 

file_event_ = dispatcher_.createFileEvent(fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write);
 

對於一個新的socket連接,對於操作系統來講是一個文件,也即有個文件描述符,當一個socket可讀的時候,會觸發一個事件,當一個socket可寫的時候,可以觸發另一個事件,發生事件后,調用onFileEvent。

 

在ConnectionHandlerImpl::ActiveListener::newConnection函數中,接下來就應該為這個連接創建NetworkFilter了。

 

config_.filterChainFactory().createNetworkFilterChain(*new_connection, filter_chain->networkFilterFactories());
 

上面這段代碼,會調用ListenerImpl::createNetworkFilterChain。

 

里面調用Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);

 

bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager, const std::vector<Network::FilterFactoryCb>& factories)
{
    for (const Network::FilterFactoryCb& factory : factories) {
        factory(filter_manager);
    }
    return filter_manager.initializeReadFilters();
}
 

可以看出這里會調用上面生成的callback函數,將ConnectionManagerImpl放到ReadFilter列表中。

 

 

七、當有新的數據到來的時候

 

當Downstream發送數據到Envoy的時候,socket就處於可讀的狀態,因而ConnectionImpl::onFileEvent函數會被調用,當事件是Event::FileReadyType::Read的時候,調用ConnectionImpl::onReadReady()。

 

在ConnectionImpl::onReadReady()函數里面,socket將數據讀入緩存。

 

IoResult result = transport_socket_->doRead(read_buffer_);

 

然后調用ConnectionImpl::onRead,里面調用filter_manager_.onRead()使用NetworkFilter對於數據進行處理。

 

FilterManagerImpl::onRead() 會調用FilterManagerImpl::onContinueReading有下面的邏輯。

 

for (; entry != upstream_filters_.end(); entry++) {
    BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
        FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
        if (status == FilterStatus::StopIteration) {
            return;
        }
    }
}

 

對於每一個Filter,都調用onData函數,咱們上面解析過,其中HTTP對應的ReadFilter是ConnectionManagerImpl,因而調用ConnectionManagerImpl::onData函數。

 

八、對數據進行解析

 

 

ConnectionManagerImpl::onData函數中,首先創建數據解析器。

 

codec_ = config_.createCodec(read_callbacks_->connection(), data, *this);

 

調用HttpConnectionManagerConfig::createCodec,對於HTTP1,創建Http::Http1::ServerConnectionImpl。

 

然后對數據進行解析。

 

codec_->dispatch(data);

 

 調用ConnectionImpl::dispatch,里面調用ConnectionImpl::dispatchSlice,在這里面對HTTP進行解析。

 

http_parser_execute(&parser_, &settings_, slice, len);

 

 

解析的參數是settings_,可以看出這里面解析url,然后解析header,結束后調用onHeadersCompleteBase()函數,然后解析正文。

 

由於路由是根據HTTP頭里面的信息來的,因而我們重點看ConnectionImpl::onHeadersCompleteBase(),里面會調用ServerConnectionImpl::onHeadersComplete函數。

 

active_request_->request_decoder_->decodeHeaders(std::move(headers), false);

 

 ServerConnectionImpl::onHeadersComplete里面調用ConnectionManagerImpl::ActiveStream::decodeHeaders這到了路由策略的重點。

 

 

這里面調用了兩個函數,一個是refreshCachedRoute()刷新路由配置,並查找到匹配的路由項Route Entry。

 

另一個是ConnectionManagerImpl::ActiveStream::decodeHeaders的另一個實現。

 

decodeHeaders(nullptr, *request_headers_, end_stream);

 

在這里會通過Route Entry找到后端的集群,並建立連接。

 

九、路由匹配

 

我們先來解析refreshCachedRoute()

 

void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
    Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_);
    request_info_.route_entry_ = route ? route->routeEntry() : nullptr;
    cached_route_ = std::move(route);
}

 

snapped_route_config_是什么呢?snapped_route_config_的初始化如下。

 

snapped_route_config_(connection_manager.config_.routeConfigProvider().config())

 

 routeConfigProvider()返回的就是RdsRouteConfigProviderImpl,其config函數返回的就是ConfigImpl,也就是上面我們描述過的層級的數據結構。

 

ConfigImpl的route函數如下

 

RouteConstSharedPtr route(const Http::HeaderMap& headers, uint64_t random_value) const override {
    return route_matcher_->route(headers, random_value);
}

 

RouteMatcher的route函數,根據headers.Host()查找virtualhost,然后調用 VirtualHostImpl::getRouteFromEntries。

 

RouteConstSharedPtr RouteMatcher::route(const Http::HeaderMap& headers, uint64_t random_value) const {
    const VirtualHostImpl* virtual_host = findVirtualHost(headers);
    if (virtual_host) {
        return virtual_host->getRouteFromEntries(headers, random_value);
    } else {
        return nullptr;
    }
}

 

VirtualHostImpl::getRouteFromEntries函數里面有下面的循環

 

for (const RouteEntryImplBaseConstSharedPtr& route : routes_) {
    RouteConstSharedPtr route_entry = route->matches(headers, random_value);
    if (nullptr != route_entry) {
        return route_entry;
    }
}
 

對於每一個RouteEntry,看是否匹配。例如常用的有PrefixRouteEntryImpl::matches,看前綴是否一致。

 

RouteConstSharedPtr PrefixRouteEntryImpl::matches(const Http::HeaderMap& headers,uint64_t random_value) const {
    if (RouteEntryImplBase::matchRoute(headers, random_value) && StringUtil::startsWith(headers.Path()->value().c_str(), prefix_, case_sensitive_)) {
        return clusterEntry(headers, random_value);
    }
    return nullptr;
}
 

得到匹配的Route Entry后,調用RouteEntryImplBase::clusterEntry獲取一個Cluster的名字。對於WeightedCluster,會調用下面的函數。

 

WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,true);

 

 在這個函數里面,會根據權重選擇一個WeightedClusterEntry返回。

 

這個時候,我們得到了后面Cluster,也即集群的名稱,接下來需要得到集群的具體的IP地址並建立連接。

 

十、查找集群並建立連接

 

ConnectionManagerImpl::ActiveStream::decodeHeaders的另一個實現會調用Route.cc里面的Filter::decodeHeaders。

 

 

在Filter::decodeHeaders函數中有以下的實現。

// A route entry matches for the request.
route_entry_ = route_->routeEntry();
Upstream::ThreadLocalCluster* cluster = config_.cm_.get(route_entry_->clusterName());
 

通過上一節的查找,WeightedClusterEntry里面有cluster的名稱,接下來就是從cluster manager里面根據cluster名稱查找到cluster的信息。

 

cluster manager就是我們最初創建的ClusterManagerImpl

 

ThreadLocalCluster* ClusterManagerImpl::get(const std::string& cluster) {
    ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>();
    auto entry = cluster_manager.thread_local_clusters_.find(cluster);
    if (entry != cluster_manager.thread_local_clusters_.end()) {
        return entry->second.get();
    } else {
    return nullptr;
    }
}
 

返回ClusterInfoImpl,是cluster的信息。

 

cluster_ = cluster->info();

 

找到Cluster后,開始建立連接。

 

// Fetch a connection pool for the upstream cluster.
Http::ConnectionPool::Instance* conn_pool = getConnPool();
 

route.cc的Filter::getConnPool()里面調用以下函數。

 

config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), route_entry_->priority(), protocol, this);

 

ClusterManagerImpl::httpConnPoolForCluster調用ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool函數。

 

在這個函數里面,HostConstSharedPtr host = lb_->chooseHost(context);通過負載均衡,在一個cluster里面選擇一個后端的機器建立連接。

 

最后。

 

歡迎關注微信公眾號

 

 

小弟參加GIAC年度新人評選,馬了這么多字,能幫忙投個票嗎?請點擊連接進行投票

 

劉超 網易雲技術架構部總監

 

長期致力於雲計算開源技術的分享,布道和落地,將網易內部最佳實踐服務客戶與行業。

 

技術分享:出版《Lucene應用開發解密》,極客時間專欄《趣談網絡協議》,個人公眾號《劉超的通俗雲計算》文章Kubernetes及微服務系列18篇,Mesos系列30篇,KVM系列25篇,Openvswitch系列31篇,OpenStack系列24篇,Hadoop系列10篇。公眾號文章《終於有人把雲計算,大數據,人工智能講明白了》累積10萬+

 

大會布道:InfoQ架構師峰會明星講師,作為邀請講師在QCon,LC3,SACC,GIAC,CEUC,SoftCon,NJSD等超過10場大型技術峰會分享網易的最佳實踐

 

行業落地:將網易的容器和微服務產品在銀行,證券,物流,視頻監控,智能制造等多個行業落地。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM