Envoy 源碼分析--程序啟動過程
申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。
前面幾章分析了 event事件 和 底層網絡, 但對創建服務的過程並沒有串起來,只是分析了底層的網絡公共庫。這次我們分析下整個服務的創建過程。
初始化
main 入口
服務啟動的總入口 main
函數,會先創建 MainCommon
。
int main(int argc, char** argv) {
... ...
std::unique_ptr<Envoy::MainCommon> main_common;
try {
main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
... ...
}
MainCommon 初始化
MainCommon
在構造函數時,會先解析程序的參數,然后再調用 MainCommonBase
。
MainCommon::MainCommon(int argc, const char* const* argv)
: options_(argc, argv, &MainCommon::hotRestartVersion, spdlog::level::info),
base_(options_, real_time_system_, default_test_hooks_,prod_component_factory_,
std::make_unique<Runtime::RandomGeneratorImpl>(),platform_impl_.threadFactory(),
platform_impl_.fileSystem()) {}
OptionsImpl
使用開源的 tclap 解析庫。OptionsImpl
支持很多參數配置,具體的參數配置參考 operation/cli。
MainCommonBase
會初始化全局的參數,接着調用服務進行初始化。
MainCommonBase::MainCommonBase(... ...)
: options_(options), component_factory_(component_factory), thread_factory_(thread_factory),file_system_(file_system) {
//全局的或第三方庫預先初始化
Thread::ThreadFactorySingleton::set(&thread_factory_);
ares_library_init(ARES_LIB_INIT_ALL);
Event::Libevent::Global::initialize();
RELEASE_ASSERT(Envoy::Server::validateProtoDescriptors(), "");
Http::Http2::initializeNghttp2Logging();
... ...
//初始化服務
server_ = std::make_unique<Server::InstanceImpl>(
options_, time_system, local_address, test_hooks, *restarter_, *stats_store_,
access_log_lock, component_factory, std::move(random_generator), *tls_, thread_factory_,
file_system_);
服務 InstanceImpl 初始化
InstanceImpl
主要是初始化 admin
管理服務,各個靜態 XDS 的加載以及初始化服務。
InstanceImpl
在核心函數中先加載配置文件,通過參數 -c
配置。
void InstanceImpl::initialize(const Options& options,
Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory, TestHooks& hooks) {
... ...
//加載Bootstrap
InstanceUtil::loadBootstrapConfig(bootstrap_, options, *api_);
bootstrap_config_update_time_ = time_source_.systemTime();
... ...
}
配置文件是一個 json 格式,包括以下幾項:
- node:節點標識,會在管理服務器中呈現,用於標識目的(例如,頭域中生成相應的字段)。
- static_resources:指定靜態資源配置。
- dynamic_resources:動態發現服務源配置。
- cluster_manager:該服務所有的上游群集的群集管理配置。
- hds_config:服務健康檢查配置。
- flags_path:用於啟動文件標志的路徑。
- stats_sinks:統計匯總設置
- stats_config:配置內部處理統計信息。
- stats_flush_interval:刷新到統計信息服務的周期時間。出於性能方面的考慮,Envoy鎖定計數器,並且只是周期性地刷新計數器和計量器。如果未指定,則默認值為5000毫秒(5秒)。
- watchdog:看門狗配置。
- tracing:配置外置的追蹤服務程序。如果沒有指定,則不會執行追蹤。
- runtime:配置運行時配置分發服務程序。
- admin: 配置本地管理的HTTP服務。
- overload_manager:過載管理配置(資源限制)。
各個項的具體作用和配置參考Bootstrap
加載完配置項,接着會啟動 admin
本地的HTTP服務。
初始化 admin 服務
admin
先創建一個 AdminImpl,在構造函數里初始化 URI。
AdminImpl::AdminImpl(const std::string& profile_path, Server::Instance& server)
: server_(server), profile_path_(profile_path),
... ...
handlers_{
{"/", "Admin home page", MAKE_ADMIN_HANDLER(handlerAdminHome), false, false},
{"/certs", "print certs on machine", MAKE_ADMIN_HANDLER(handlerCerts), false, false},
// 導出 cluster 統計信息
{"/clusters", "upstream cluster status", MAKE_ADMIN_HANDLER(handlerClusters), false,
false},
// 導出配置文件
{"/config_dump", "dump current Envoy configs (experimental)",
MAKE_ADMIN_HANDLER(handlerConfigDump), false, false},
// 導出連接統計信息
{"/contention", "dump current Envoy mutex contention stats (if enabled)",
MAKE_ADMIN_HANDLER(handlerContention), false, false},
... ...
admin_filter_chain_(std::make_shared<AdminFilterChain>()) {}
接着啟動一個服務。
admin_->startHttpListener(initial_config.admin().accessLogPath(), options.adminAddressPath(),
initial_config.admin().address(),
stats_store_.createScope("listener.admin."));
在啟動服務函數內部會創建 TcpListenSocket 和 AdminListener。
void AdminImpl::startHttpListener(const std::string& access_log_path,
const std::string& address_out_path,
Network::Address::InstanceConstSharedPtr address,
Stats::ScopePtr&& listener_scope) {
... ...
socket_ = std::make_unique<Network::TcpListenSocket>(address, nullptr, true);
listener_ = std::make_unique<AdminListener>(*this, std::move(listener_scope));
... ...
}
初始化 TcpListenSocket 時會在內部創建一個 socket 后再進行 bind。
using TcpListenSocket = NetworkListenSocket<NetworkSocketTrait<Address::SocketType::Stream>>;
template <typename T> class NetworkListenSocket : public ListenSocketImpl {
public:
NetworkListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port)
// socket 創建
: ListenSocketImpl(address->socket(T::type), address) {
RELEASE_ASSERT(io_handle_->fd() != -1, "");
setPrebindSocketOptions();
setupSocket(options, bind_to_port);
}
void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) {
setListenSocketOptions(options);
//准備進行綁定
if (bind_to_port) {
doBind();
}
}
void ListenSocketImpl::doBind() {
//綁定
const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd());
... ...
}
AdminListener 構造函數內只是參數的初始化。
AdminListener(AdminImpl& parent, Stats::ScopePtr&& listener_scope)
: parent_(parent), name_("admin"), scope_(std::move(listener_scope)),
stats_(Http::ConnectionManagerImpl::generateListenerStats("http.admin.", *scope_)) {}
做完 socket ,bind 后面就是進行 listen 處理。將 AdminListener 通過 handler 加入監聽隊列。handler 是在 InstanceImpl
的構造函數內初始化的。
InstanceImpl::InstanceImpl(... ...)
: handler_(new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher_)),
... ...{
}
void InstanceImpl::initialize(... ...)
{
... ...
//將 AdminListener 加入 ConnectionHandler
if (initial_config.admin().address()) {
admin_->addListenerToHandler(handler_.get());
}
... ...
}
void AdminImpl::addListenerToHandler(Network::ConnectionHandler* handler) {
// 這里的 listener_ 就是上面生成的 AdminListener
if (listener_) {
handler->addListener(*listener_);
}
}
在 addListener 內會新建一個 ActiveListener 內部類,先置為 disable 狀態。
void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) {
ActiveListenerPtr l(new ActiveListener(*this, config));
if (disable_listeners_) {
l->listener_->disable();
}
listeners_.emplace_back(config.socket().localAddress(), std::move(l));
}
在 ActiveListener 構造函數內創建 listen,里面dispatcher 會創建回調。等有新連接到來時,會回調 onAccept.
ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,Network::ListenerConfig& config)
: ActiveListener(
parent,
// 創建listen
parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
config.handOffRestoredDestinationConnections()),
config) {}
Network::ListenerPtr
DispatcherImpl::createListener(Network::Socket& socket, Network::ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) {
ASSERT(isThreadSafe());
return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port,
hand_off_restored_destination_connections)};
}
void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
listener_.reset(
//創建 evconnlistener_new ,有連接回調listenCallback
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
... ...
}
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,int remote_addr_len, void* arg) {
... ...
//回調ActiveListener的onAccept
listener->cb_.onAccept(
std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address),
listener->hand_off_restored_destination_connections_);
}
onAccept 對 Listern 過濾后,創建新連接。
void ConnectionHandlerImpl::ActiveListener::onAccept()
{
... ...
active_socket->continueFilterChain(true);
... ...
}
void ConnectionHandlerImpl::ActiveSocket::continueFilterChain(bool success) {
... ...
listener_.newConnection(std::move(socket_));
... ...
}
void ConnectionHandlerImpl::ActiveListener::onNewConnection() {
... ...
if (new_connection->state() != Network::Connection::State::Closed) {
ActiveConnectionPtr active_connection(
new ActiveConnection(*this, std::move(new_connection), parent_.dispatcher_.timeSource()));
active_connection->moveIntoList(std::move(active_connection), connections_);
parent_.num_connections_++;
}
... ...
}
這樣,新連接就建立起來。
配置文件 XDS 初始化
初始化 Bootstrap 的 XDS 時,先初始化 static sercret,先初始化 cluster,接着初始化 listeners。
void MainImpl::initialize(... ...) {
//初始化secrets
const auto& secrets = bootstrap.static_resources().secrets();
for (ssize_t i = 0; i < secrets.size(); i++) {
ENVOY_LOG(debug, "static secret #{}: {}", i, secrets[i].name());
server.secretManager().addStaticSecret(secrets[i]);
}
//初始化 cluster
bootstrap.static_resources().clusters().size());
cluster_manager_ = cluster_manager_factory.clusterManagerFromProto(bootstrap);
// 初始化listeners
const auto& listeners = bootstrap.static_resources().listeners();
for (ssize_t i = 0; i < listeners.size(); i++) {
ENVOY_LOG(debug, "listener #{}:", i);
server.listenerManager().addOrUpdateListener(listeners[i], "", false);
}
初始化 cluster 會分兩階段初始化。先初始化非 EDS 部分,再初始化 EDS 部分。分兩個階段的初始化是因為在 v2 配置中每個 EDS 集群單獨設置訂閱。此訂閱是 API 源時 群集將依賴於非 EDS 群集,因此必須首先初始化非 EDS 群集。cluster 的類型有 5個類型:
enum DiscoveryType {
// Refer to the :ref:`static discovery
STATIC = 0;
// Refer to the :ref:`strict DNS discovery
STRICT_DNS = 1;
// Refer to the :ref:`logical DNS discovery
LOGICAL_DNS = 2;
// Refer to the :ref:`service discovery
EDS = 3;
// Refer to the :ref:`original destination discovery
ORIGINAL_DST = 4;
}
cluster 初始化順序:
非 EDS 部分 -> ADS -> EDS -> CDS
ClusterManagerImpl::ClusterManagerImpl(... ...) {
... ...
for (const auto& cluster : bootstrap.static_resources().clusters()) {
// 第一次初始化非 EDS 部分
if (cluster.type() != envoy::api::v2::Cluster::EDS) {
loadCluster(cluster, "", false, active_clusters_);
}
}
// 初始化 ADS
if (bootstrap.dynamic_resources().has_ads_config()) {
ads_mux_ = std::make_unique<Config::GrpcMuxImpl>(
local_info,
Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, bootstrap.dynamic_resources().ads_config(), stats)
->create(),
main_thread_dispatcher,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_,
Envoy::Config::Utility::parseRateLimitSettings(bootstrap.dynamic_resources().ads_config()));
} else {
ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
}
for (const auto& cluster : bootstrap.static_resources().clusters()) {
// 初始化 EDS
if (cluster.type() == envoy::api::v2::Cluster::EDS) {
loadCluster(cluster, "", false, active_clusters_);
}
}
... ...
//初始化 CDS
if (bootstrap.dynamic_resources().has_cds_config()) {
cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), *this);
init_helper_.setCds(cds_api_.get());
} else {
init_helper_.setCds(nullptr);
}
}
上面都初始化完成后,再初始化 lds,最后再初始化 hds。
// 初始化lds
if (bootstrap_.dynamic_resources().has_lds_config()) {
listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config());
}
//初始化hds
if (bootstrap_.has_hds_config()) {
const auto& hds_config = bootstrap_.hds_config();
async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
*config_.clusterManager(), thread_local_, time_source_, *api_);
... ...
}
初始化 ListenerManager
ListenerManager 的初始化只是事先創建 worker。
ListenerManagerImpl::ListenerManagerImpl(... ...) {
... ...
// 創建worker子線程
for (uint32_t i = 0; i < server.options().concurrency(); i++) {
workers_.emplace_back(worker_factory.createWorker(server.overloadManager()));
}
}
WorkerPtr ProdWorkerFactory::createWorker(OverloadManager& overload_manager) {
// 新建子線程,每個線種一個dispatchr
Event::DispatcherPtr dispatcher(api_.allocateDispatcher());
return WorkerPtr{new WorkerImpl(
tls_, hooks_, std::move(dispatcher),
Network::ConnectionHandlerPtr{new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher)},
overload_manager, api_)};
}
WorkerImpl::WorkerImpl(... ...)
: tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)), api_(api) {
tls_.registerThread(*dispatcher_, false);
overload_manager.registerForAction(
OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
[this](OverloadActionState state) { stopAcceptingConnectionsCb(state); });
}
啟動
main 啟動入口
main 函數調用 main_common
int main(int argc, char** argv) {
... ...
return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}
main_common 進一步調用 InstanceImpl
bool MainCommonBase::run() {
switch (options_.mode()) {
case Server::Mode::Serve:
server_->run();
return true;
... ...
}
InstanceImpl 啟用 loop 循環。
void InstanceImpl::run() {
auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(), access_log_manager_,
init_manager_, overloadManager(), [this] { startWorkers(); });
auto watchdog = guard_dog_->createWatchDog(api_->threadFactory().currentThreadId());
watchdog->startWatchdog(*dispatcher_);
dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(info, "main dispatch loop exited");
guard_dog_->stopWatching(watchdog);
watchdog.reset();
terminate();
}
服務啟動流程
本地 HTTP 管理服務的啟動流程上面已經分析過,現在討論本地服務的啟動流程(XDS 下發的暫不討論)。
在 cluster 初始化的時候,加入 listener。
void MainImpl::initialize(... ...) {
... ...
// 初始化listeners
const auto& listeners = bootstrap.static_resources().listeners();
for (ssize_t i = 0; i < listeners.size(); i++) {
ENVOY_LOG(debug, "listener #{}:", i);
server.listenerManager().addOrUpdateListener(listeners[i], "", false);
}
}
addOrUpdateListener 創建 ListenerImpl,ListenerImpl 做 bind 操作。
// 創建ListenerImpl
ListenerImplPtr new_listener(
new ListenerImpl(config, version_info, *this, name, modifiable, workers_started_, hash));
ListenerImpl& new_listener_ref = *new_listener;
... ...
//bind 地址將 socket 關聯ListenerImpl
new_listener->setSocket(draining_listener_socket
? draining_listener_socket
: factory_.createListenSocket(new_listener->address(),
new_listener->socketType(),
new_listener->listenSocketOptions(),
new_listener->bindToPort()));
Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket(... ...) {
... ...
// 調用 UdsListenSocket 做 bind() 操作。
if (io_handle->isOpen()) {
return std::make_shared<Network::UdsListenSocket>(std::move(io_handle), address);
}
return std::make_shared<Network::UdsListenSocket>(address);
}
// 最終調用系統bind()操作
void ListenSocketImpl::doBind() {
const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd());
... ...
}
在 InstanceImpl 啟動時,調用 RunHelper。RunHelper 則啟動 startWorkers。startWorker 將初始化得到的 listeners 加入到 work 中。
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
workers_started_ = true;
for (const auto& worker : workers_) {
ASSERT(warming_listeners_.empty());
for (const auto& listener : active_listeners_) {
addListenerToWorker(*worker, *listener);
}
worker->start(guard_dog);
}
}
work 將 linsteners 關聯到 connectioHandler。
void ListenerManagerImpl::addListenerToWorker(Worker& worker, ListenerImpl& listener) {
worker.addListener(listener, [this, &listener](bool success) -> void {
... ...
}
void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) {
dispatcher_->post([this, &listener, completion]() -> void {
try {
// 關聯到connectioHandler。
handler_->addListener(listener);
hooks_.onWorkerListenerAdded();
completion(true);
} catch (const Network::CreateListenerException& e) {
completion(false);
}
});
}
connectioHandler 在 work 初始化時創建。
ListenerManagerImpl::ListenerManagerImpl(Instance& server,
ListenerComponentFactory& listener_factory,
WorkerFactory& worker_factory)
: server_(server), factory_(listener_factory), stats_(generateStats(server.stats())),
config_tracker_entry_(server.admin().getConfigTracker().add(
"listeners", [this] { return dumpListenerConfigs(); })) {
for (uint32_t i = 0; i < server.options().concurrency(); i++) {
// 初始化worker
workers_.emplace_back(worker_factory.createWorker(server.overloadManager()));
}
}
WorkerPtr ProdWorkerFactory::createWorker(OverloadManager& overload_manager) {
Event::DispatcherPtr dispatcher(api_.allocateDispatcher());
return WorkerPtr{new WorkerImpl(
tls_, hooks_, std::move(dispatcher),
//創建connectioHandler
Network::ConnectionHandlerPtr{new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher)},
overload_manager, api_)};
}
將 linsteners 關聯到 connectioHandler 后,后面的 listen(),accept() 和創建連接過程和 admin
的 HTTP 啟動流程是一樣的。
LDS 服務啟動流程
整個服務的啟動流程基本就完成了,后面有新加服務的啟動流程和上面的服務啟動流程一樣,調用 addOrUpdateListener。在 addOrUpdateListener 內判斷服務是否已啟動,如果已啟動調用 ManagerImpl 等待初始化。
void ListenerImpl::initialize() {
last_updated_ = timeSource().systemTime();
if (workers_started_) {
//ManagerImpl
dynamic_init_manager_.initialize(*init_watcher_);
}
}
void ManagerImpl::initialize(const Watcher& watcher) {
... ...
for (const auto& target_handle : target_handles_) {
// 等待 target_handle 初始化完成。
if (!target_handle->initialize(watcher_)) {
onTargetReady();
}
}
}
初始化完成后,調用函數指針。函數指針在初始化WatcherImpl傳入。
void ManagerImpl::onTargetReady() {
if (--count_ == 0) {
// 初始化完成
ready();
}
}
void ManagerImpl::ready() {
state_ = State::Initialized;
watcher_handle_->ready();
}
bool WatcherHandleImpl::ready() const {
//調用函數指針
(*locked_fn)();
}
ListenerImpl::ListenerImpl(... ...)
: ... ...
// 初始化watch
init_watcher_(std::make_unique<Init::WatcherImpl>(
"ListenerImpl", [this] { parent_.onListenerWarmed(*this); })){}
在 onListenerWarmed 內將 listener 加入 work。后面流程和 服務啟動流程一樣,不再分析。
void ListenerManagerImpl::onListenerWarmed(ListenerImpl& listener) {
for (const auto& worker : workers_) {
addListenerToWorker(*worker, listener);
}
最后
整個服務的初始化和啟動流程就完成了。服務的啟動有3個類型 : 本地 HTTP 服務管理服務、本地配置文件的服務和xDS下發的服務。本章節只分析了服務的啟動流程,連接成功的后繼處理,以后分析。