eventfd實現線程事件通知機制


 通過eventfd實現的事件通知機制


eventfd的使用

eventfd系統函數

eventfd - 事件通知文件描述符

#include <sys/eventfd.h>
int eventfd(unsigned int initval ,int flags );

創建一個能被用戶應用程序用於時間等待喚醒機制的eventfd對象.
initval :
eventfd()創建一個可用作事件的“eventfd對象”用戶空間應用程序和內核等待/通知機制通知用戶空間應用程序的事件。該對象包含一個由內核維護的無符號64位整型(uint64_t)計數器。此計數器的初始值通過initval指定。一般設0.

flags :
以下標志中按位OR運算以更改eventfd()的行為,(文件中常用的這兩個flags肯定都懂意思吧,就不翻譯了,第三個信號量的不管它.):

   EFD_CLOEXEC (since Linux 2.6.27)
          Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. See the description of the O_CLOEXEC flag in open(2) for reasons why this may be useful. EFD_NONBLOCK (since Linux 2.6.27) Set the O_NONBLOCK file status flag on the new open file description. Using this flag saves extra calls to fcntl(2) to achieve the same result. EFD_SEMAPHORE (since Linux 2.6.30) Provide semaphore-like semantics for reads from the new file descriptor. See below.

read(2)

成功讀取返回一個8byte的整數。read(2)如果提供的緩沖區的大小小於8個字節返回錯誤EINVAL

write (2)

將緩沖區寫入的8字節整形值加到內核計數器上。可以寫入的最大值
是計數器中是最大的無符號64位值減1(即0xfffffffffffffffe)。

返回值:

On success, eventfd() returns a new eventfd file descriptor. On error, -1 is returned and errno is set to indicate the error.

使用示例

#include <iostream> #include <assert.h> #include <poll.h> #include <signal.h> #include <sys/eventfd.h> #include <unistd.h> #include <string.h> #include <thread> static int s_efd = 0; int createEventfd() { int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); std::cout << "createEventfd() fd : " << evtfd << std::endl; if (evtfd < 0) { std::cout << "Failed in eventfd\n"; abort(); } return evtfd; } void testThread() { int timeout = 0; while(timeout < 3) { sleep(1); timeout++; } uint64_t one = 1; ssize_t n = write(s_efd, &one, sizeof one); if(n != sizeof one) { std::cout << " writes " << n << " bytes instead of 8\n"; } } int main() { s_efd = createEventfd(); fd_set rdset; FD_ZERO(&rdset); FD_SET(s_efd, &rdset); struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; std::thread t(testThread); while(1) { if(select(s_efd + 1, &rdset, NULL, NULL, &timeout) == 0) { std::cout << "timeout\n"; timeout.tv_sec = 1; timeout.tv_usec = 0; FD_SET(s_efd, &rdset); continue; } uint64_t one = 0; ssize_t n = read(s_efd, &one, sizeof one); if(n != sizeof one) { std::cout << " read " << n << " bytes instead of 8\n"; } std::cout << " wakeup !\n"; break; } t.join(); close(s_efd); return 0; }
./test.out createEventfd() fd : 3 timeout timeout timeout wakeup !

eventfd 單純的使用文件描述符實現的線程間的通知機制,可以很好的融入select、poll、epoll的I/O復用機制中.

EventLoop對eventfd的封裝

所增加的接口及成員:

    typedef std::function<void()> Functor; void runInLoop(const Functor& cb); void wakeup(); //是寫m_wakeupFd 通知poll 處理讀事件. void queueInLoop(const Functor& cb); private: //used to waked up void handleRead(); void doPendingFunctors(); int m_wakeupFd; std::unique_ptr<Channel> p_wakeupChannel; mutable MutexLock m_mutex; bool m_callingPendingFunctors; /* atomic */ std::vector<Functor> m_pendingFunctors; // @GuardedBy mutex_

工作時序

(runInLoop() -> quueInLoop())/queueInLoop() -> wakeup() -> poll() -> handleRead() -> doPendingFunctors()

runInLoop()

如果用戶在當前IO線程調用這個函數, 回調會同步進行; 如果用戶在其他線程調用runInLoop(),cb會被加入隊列, IO線程會被喚醒來調用這個Functor.

void EventLoop::runInLoop(const Functor& cb) { if(isInloopThread()) cb(); else queueInLoop(cb); }

queueInLoop()

會將回調添加到容器,同時通過wakeup()喚醒poll()調用容器內的回調.

void EventLoop::queueInLoop(const Functor& cb) { LOG_TRACE << "EventLoop::queueInLoop()"; { MutexLockGuard lock(m_mutex); m_pendingFunctors.push_back(std::move(cb)); } if(!isInloopThread()) { wakeup(); } }

內部實現,

wakeup()

寫已注冊到poll的eventfd 通知poll 處理讀事件.

// m_wakeupFd(createEventfd()), // p_wakeupChannel(new Channel(this, m_wakeupFd)), void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(m_wakeupFd, &one, sizeof one); if(n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } }

handleRead()

poll回調讀事件,處理eventfd.

void EventLoop::handleRead() //handle wakeup Fd { LOG_TRACE << "EventLoop::handleRead() handle wakeup Fd"; uint64_t one = 1; ssize_t n = sockets::read(m_wakeupFd, &one, sizeof one); if(n != sizeof one) { LOG_ERROR << "EventLoop::handleRead() reads " << n << "bytes instead of 8"; } doPendingFunctors(); }

doPendingFunctors()

處理掛起的事件.

void EventLoop::doPendingFunctors() { LOG_TRACE << "EventLoop::doPendingFunctors()"; std::vector<Functor> functors; m_callingPendingFunctors = true; { MutexLockGuard lock(m_mutex); functors.swap(m_pendingFunctors); } for(size_t i = 0; i < functors.size(); ++i) { functors[i](); } m_callingPendingFunctors = false; }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM