Envoy 源碼分析--event
申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。
Envoy 的事件是復用了 libevent 的 event_base
。其在代碼中的表現就是類 Dispatcher
,一個 Dispatcher
其實就是一個 event_loop
,主要的核心功能有:網絡事件處理,定時器,信號處理,任務隊列,代碼對象的析構等。下面是相關的類圖。
ImplBase
包含了 libevent 的事件類型,對象在析構時會自動調用 event_del
。 ImplBase
派生出 FileEventImpl
、 SignalEventImpl
和 TimerImpl
三種類型的事件。 RealTimeSystem
在創建調度后,會創建一個線程局部存儲(TLS)的時間隊列。DispatchedThreadImpl
包含了 DispatcherImpl
在啟動時會創建一條線程,然后啟動一個 event_loop
,同時在 event_loop
外層包了個 guard_dog
防止死鎖。
libevent
Envoy 是 C++ 的,而 libevent 是個 C 庫,這就需要自動管理 C 結構的內存。 Envoy 通過繼承智能指針 unique_ptr
來重新封裝了 libevent 的結構體。
template <class T, void (*deleter)(T*)> class CSmartPtr : public std::unique_ptr<T, void (*)(T*)> {
public:
CSmartPtr() : std::unique_ptr<T, void (*)(T*)>(nullptr, deleter) {}
CSmartPtr(T* object) : std::unique_ptr<T, void (*)(T*)>(object, deleter) {}
};
然后使用 CSmartPtr
就可以自動管理 libevent 的結構體。使用方式如下:
struct event_base;
extern "C" {
void event_base_free(event_base*);
}
struct evbuffer;
extern "C" {
void evbuffer_free(evbuffer*);
}
struct bufferevent;
extern "C" {
void bufferevent_free(bufferevent*);
}
struct evconnlistener;
extern "C" {
void evconnlistener_free(evconnlistener*);
}
typedef CSmartPtr<event_base, event_base_free> BasePtr;
typedef CSmartPtr<evbuffer, evbuffer_free> BufferPtr;
typedef CSmartPtr<bufferevent, bufferevent_free> BufferEventPtr;
typedef CSmartPtr<evconnlistener, evconnlistener_free> ListenerPtr;
這樣 libevent 的結構體就變成了 C++ 的智能指針。
Envoy 有三種事件都是 event
類型,我們需要對事件類型進行抽象,自動管理事件的釋放。Envoy 將 event
作為 ImplBase
的成員,在類析構進自動釋放,所有事件只要繼承 ImplBase
就完成了事件的自動管理。
class ImplBase {
protected:
~ImplBase();
event raw_event_;
};
ImplBase::~ImplBase() {
event_del(&raw_event_);
}
Timer
Timer 只有兩接口一個用於啟動,另一個用於關閉。
class Timer {
public:
virtual ~Timer() {}
virtual void disableTimer() PURE;
virtual void enableTimer(const std::chrono::milliseconds& d) PURE;
};
創建 Timer 時,會在構造函數內進行初始化。enableTimer
時調用 event_add
加入事件。 disableTimer
時調用 event_del
刪除事件。
TimerImpl::TimerImpl(Libevent::BasePtr& libevent, TimerCb cb) : cb_(cb) {
ASSERT(cb_);
evtimer_assign(
&raw_event_, libevent.get(),
[](evutil_socket_t, short, void* arg) -> void { static_cast<TimerImpl*>(arg)->cb_(); }, this);
}
void TimerImpl::disableTimer() { event_del(&raw_event_); }
void TimerImpl::enableTimer(const std::chrono::milliseconds& d) {
if (d.count() == 0) {
event_active(&raw_event_, EV_TIMEOUT, 0);
} else {
// TODO(#4332): use duration_cast more nicely to clean up this code.
std::chrono::microseconds us = std::chrono::duration_cast<std::chrono::microseconds>(d);
timeval tv;
tv.tv_sec = us.count() / 1000000;
tv.tv_usec = us.count() % 1000000;
event_add(&raw_event_, &tv);
}
}
SignalEvent
SignalEvent 比較簡單在構造函數時,直接初始化並加入事件。
SignalEventImpl::SignalEventImpl(DispatcherImpl& dispatcher, int signal_num, SignalCb cb)
: cb_(cb) {
evsignal_assign(
&raw_event_, &dispatcher.base(), signal_num,
[](evutil_socket_t, short, void* arg) -> void { static_cast<SignalEventImpl*>(arg)->cb_(); },
this);
evsignal_add(&raw_event_, nullptr);
}
FileEvent
文件相關的事件封裝為 FileEvent。我們知道 linux 中 socket 也是一個文件,因此 socket 套接字相關的事件也屬於 FileEvent。FileEvent 使用持久性事件假定用戶一直讀或寫,直到收到 EAGAIN。
FileEvent 提供兩個接口。activate
無論事件是否准備就緒,此方法都會主動觸發事件,典型場景:socket 讀寫事件, EventLoop 喚醒等。setEnabled
用於設置事件。
class FileEvent {
public:
virtual ~FileEvent() {}
virtual void activate(uint32_t events) PURE;
virtual void setEnabled(uint32_t events) PURE;
};
RealTimeSystem
RealTimeSystem 暴露三個接口。
class RealTimeSystem : public TimeSystem {
public:
SchedulerPtr createScheduler(Libevent::BasePtr&) override;
SystemTime systemTime() override { return time_source_.systemTime(); }
MonotonicTime monotonicTime() override { return time_source_.monotonicTime(); }
private:
RealTimeSource time_source_;
}
systemTime
返回系統時間。調用的是 std::chrono 的 system_clock。monotonicTime
返回的是系統的啟動時間。即 linux 命令uptime
上的啟動時間。用於時間間隔,不會受系統修改時間的影響。調用的是 std::chrono 的 steady_clock。createScheduler
創建一個計時器工廠(factory模式)。間接啟用線程本地計時器隊列管理,因此每個線程具有單獨的計時器。RealScheduler
類放在源文件中,外部無法調用。
//創建計時器工廠
SchedulerPtr RealTimeSystem::createScheduler(Libevent::BasePtr& libevent) {
return std::make_unique<RealScheduler>(libevent);
}
class RealScheduler : public Scheduler {
public:
RealScheduler(Libevent::BasePtr& libevent) : libevent_(libevent) {}
//創建一個本地計時器
TimerPtr createTimer(const TimerCb& cb) override {
return std::make_unique<TimerImpl>(libevent_, cb);
};
private:
Libevent::BasePtr& libevent_;
};
任務隊列
Dispatcher
內部創建了一個任務隊列,將所有的 callback
加入隊列。同時創建一個 Timer
調用一個函數,函數內循環處理。
post
方法將傳進來的 callback
加入到任務任務。如果加入前的隊列為空就需要觸發定時器。post_timer_
在構造函數內已設置好其對應的函數,調用 runPostCallbacks
。
void DispatcherImpl::post(std::function<void()> callback) {
bool do_post;
{
Thread::LockGuard lock(post_lock_);
do_post = post_callbacks_.empty();
post_callbacks_.push_back(callback);
}
if (do_post) {
post_timer_->enableTimer(std::chrono::milliseconds(0));
}
}
DispatcherImpl::DispatcherImpl(TimeSystem& time_system, Buffer::WatermarkFactoryPtr&& factory,
Api::Api& api)
: ...
post_timer_(createTimer([this]() -> void { runPostCallbacks(); })),
current_to_delete_(&to_delete_1_) {
RELEASE_ASSERT(Libevent::Global::initialized(), "");
}
runPostCallbacks
是一個死循環,每次取一個 callback
進行處理。直到隊列為空跳出循環。從這可以看出 post
進來的任務,如果在加入前隊列為空的話,runPostCallbacks
已退出,因此需要重新觸發 post_timer_
。
void DispatcherImpl::runPostCallbacks() {
while (true) {
std::function<void()> callback;
{
Thread::LockGuard lock(post_lock_);
if (post_callbacks_.empty()) {
return;
}
callback = post_callbacks_.front();
post_callbacks_.pop_front();
}
callback();
}
}
延遲析構
延遲析構指的是將 unique_ptr
的對象的析構的動作交由 Dispatcher
來完成。 DeferredDeletable
是個空接口,所有析構的對象都要繼承 DeferredDeletable
。
class DeferredDeletable {
public:
virtual ~DeferredDeletable() {}
};
typedef std::unique_ptr<DeferredDeletable> DeferredDeletablePtr;
Dispatcher
對象保存了所有要延遲析構的對象
std::vector<DeferredDeletablePtr> to_delete_1_;
std::vector<DeferredDeletablePtr> to_delete_2_;
std::vector<DeferredDeletablePtr>* current_to_delete_;
to_delete_1_
和 to_delete_2
保存了析構的對象,current_to_delete_
指針當前要析構的對象。加入延遲析構對象時,如果當前的析構對象長度為 1,deferred_delete_timer_
就會被觸發。
void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
ASSERT(isThreadSafe());
current_to_delete_->emplace_back(std::move(to_delete));
ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
if (1 == current_to_delete_->size()) {
deferred_delete_timer_->enableTimer(std::chrono::milliseconds(0));
}
}
deferred_delete_timer_
是在構造函數內已構造好回調函數 clearDeferredDeleteList
。clearDeferredDeleteList
中 current_to_delete_
始終指向當前正要析構的對象列表,每次執行完析構后就指向另外一個對象列表,來回交替。
void DispatcherImpl::clearDeferredDeleteList() {
ASSERT(isThreadSafe());
std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;
size_t num_to_delete = to_delete->size();
if (deferred_deleting_ || !num_to_delete) {
return;
}
ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
if (current_to_delete_ == &to_delete_1_) {
current_to_delete_ = &to_delete_2_;
} else {
current_to_delete_ = &to_delete_1_;
}
deferred_deleting_ = true;
for (size_t i = 0; i < num_to_delete; i++) {
(*to_delete)[i].reset();
}
to_delete->clear();
deferred_deleting_ = false;
}
可以看出延遲析構的原理和任務隊列原理差不多。
為何要延遲析構以及析構時為何需要兩個隊列,可參考:https://yq.aliyun.com/articles/659277
dispacth_thread
dispacth_thread
只是一個簡單的 event_loop
線程,不支持像接收新連接那樣的工作線程。 接口很簡單,在啟動時,啟動一個新線程,在新線程中調用 dispatch
run 執行 event_loop
。同時會新建一個 GuardDog
監控線程是否死鎖。
void DispatchedThreadImpl::start(Server::GuardDog& guard_dog) {
thread_ =
api_.threadFactory().createThread([this, &guard_dog]() -> void { threadRoutine(guard_dog); });
}
void DispatchedThreadImpl::threadRoutine(Server::GuardDog& guard_dog) {
ENVOY_LOG(debug, "dispatched thread entering dispatch loop");
auto watchdog = guard_dog.createWatchDog(api_.threadFactory().currentThreadId());
watchdog->startWatchdog(*dispatcher_);
dispatcher_->run(Dispatcher::RunType::Block);
ENVOY_LOG(debug, "dispatched thread exited dispatch loop");
guard_dog.stopWatching(watchdog);
watchdog.reset();
dispatcher_.reset();
}