多線程版Libevent //保存線程的結構體 struct LibeventThread { LibEvtServer* that; //用作傳參 std::shared_ptr<std::thread> spThread; // 線程 struct event_base * thread_base; // 事件根基 struct event notify_event; evutil_socket_t notfiy_recv_fd; // socketpair 接收端fd(工作線程接收通知) evutil_socket_t notfiy_send_fd; // socketpair 發送端fd(監聽線程發送通知) #ifdef BOOST_LOCKFREE boost::lockfree::spsc_queue<conn_queue_item, boost::lockfree::capacity<1000> > conn_queue; #else std::mutex conn_mtx; //維護連接隊列的鎖 std::queue<conn_queue_item> conn_queue; //conn_queue 是一個管理conn_queue_item的隊列 #endif }; bool LibEvtServer::init(I_NetServerEvent* event, int start, int size) { m_ids = new ChannelIDGenerator(); m_ids->init(start, size); m_allChannels.resize(m_ids->getSize()); m_event = event; //event支持windows下線程的函數 int hr = evthread_use_windows_threads(); m_base = event_base_new(); if (!m_base) { fprintf(stderr, "Could not initialize libevent!\n"); return false; } #ifdef MUL_LIBEVENT_THREAD m_last_thread = -1; //注意初始化為-1 //初始化線程 init_threads(THREAD_NUMB); #endif return true; } bool LibEvtServer::init_threads(int thread_numb) { m_libevent_threads.resize(thread_numb); //為每個線程指定雙向通道(類似於管道) for(int i = 0; i < thread_numb; ++i) { LibeventThread* plt = new LibeventThread(); #ifdef WIN32 //創建一個socketpair即可與互相通信的兩個socket,保存在fds里面 evutil_socket_t fds[2]; if(evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0) { std::cout << "創建socketpair失敗\n"; return false; } //設置成無阻賽的socket evutil_make_socket_nonblocking(fds[0]); evutil_make_socket_nonblocking(fds[1]); #else int fds[2]; if (pipe(fds)) { perror("Can't create notify pipe"); exit(1); } #endif plt->notfiy_recv_fd = fds[0]; plt->notfiy_send_fd = fds[1]; //安裝libevent線程[創建base,注冊通道事件(用於監聽新鏈接)] setup_libevent_thread(plt); //線程放入容器中 m_libevent_threads[i] = plt; } //開始創建並啟動線程 for(int i = 0; i < thread_numb; ++i) { m_libevent_threads[i]->spThread.reset(new std::thread([] (void* arg) { auto me = (LibeventThread*) arg; // Wait for events to become active, and run their callbacks. //This is a more flexible version of event_base_dispatch(). event_base_loop(me->thread_base, 0); }, m_libevent_threads[i])); } return true; } //設置線程信息 void LibEvtServer::setup_libevent_thread(LibeventThread * pLibeventThread) { auto plt = pLibeventThread; plt->thread_base = event_base_new(); // 創建線程的event_base //給每個libevent線程設置連接通知回調函數。 plt->that = this; //設置線程事件notify_event event_set(&plt->notify_event, plt->notfiy_recv_fd,//EV_READ表示只要這個socket可讀就調用notify_cb函數 EV_READ | EV_PERSIST, ::notify_cb, plt); //設置事件和event_base的關系 event_base_set(plt->thread_base, &plt->notify_event); // 設置事件的從屬關系(相當於指明事件屬於哪個event_base) //添加事件 event_add(&plt->notify_event, 0); // 正式添加事件 } void LibEvtServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { #ifdef MUL_LIBEVENT_THREAD int cur_thread = (m_last_thread + 1) % THREAD_NUMB; // 輪循選擇工作線程 m_last_thread = cur_thread; conn_queue_item item; item.fd = fd; //item.ch2 = NULL; auto plt = m_libevent_threads[cur_thread]; { //向線程的隊列中放入一個item,每個線程有個隊列,保存連接的socketfd #ifdef BOOST_LOCKFREE while(!plt->conn_queue.push(item)) { #ifndef _DEBUG boost::this_thread::interruptible_wait(1); #else Sleep(1); #endif Plug::PlugMessageBox("連接隊列居然滿了,超過1000的未處理數!"); } #else std::lock_guard<std::mutex> lock(plt->conn_mtx); plt->conn_queue.push(item); #endif } //激活讀線程的讀事件 send(plt->notfiy_send_fd, "c", 1, 0); #else auto base = evconnlistener_get_base(listener); auto bev = bufferevent_socket_new(base, fd, BEV_OPT_THREADSAFE);//|BEV_OPT_CLOSE_ON_FREE); if (!bev) { fprintf(stderr, "Error constructing bufferevent!"); event_base_loopbreak(base); return ; } auto c2 = CreateChannel(bev); bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2); bufferevent_enable(bev, EV_READ | EV_WRITE ); #endif } //偵聽端口,-1表示向系統申請一個任意可用端口 bool LibEvtServer::listen(int* port) { struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; if(-1 == *port) sin.sin_port = htons(10000); else sin.sin_port = htons(*port); m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin)); if (!m_listener) { return false; } if( -1 == *port) *port = ntohs(sin.sin_port); if (!m_listener) { fprintf(stderr, "Could not create a listener!\n"); return false; } m_spListenThread.reset(new std::thread([this]//現在看這個線程只是收到連接,然后交給線程,然后通知線程 { //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST); //event_base_loop(m_base, EVLOOP_ONCE); event_base_dispatch(m_base); if(WSAENOTSOCK == WSAGetLastError()) { Plug::PlugMessageBox(L"操作無效套接字啊!"); } Plug::PlugMessageBox(L"Libevent派發線程退出!"); })); return true; } void LibEvtServer::notify_cb(evutil_socket_t fd, short which, LibeventThread *pLibeventThread) { //首先將socketpair的1個字節通知信號讀出(這是必須的,在水平觸發模式下如果不處理該事件,則會循環通知,直到事件被處理) char buf[1]; recv(fd, buf, 1, 0);//從sockpair的另一端讀數據 auto plt = pLibeventThread; conn_queue_item item; //從自己的連接隊列中取出連接數 { //取出隊列中的第一個元素 #ifdef BOOST_LOCKFREE while(!plt->conn_queue.pop(item))//pop一個出來 { #ifndef _DEBUG boost::this_thread::interruptible_wait(1); #else Sleep(1); #endif Plug::PlugMessageBox("通知隊列居然彈空了啊!"); } #else std::lock_guard<std::mutex> lck(plt->conn_mtx); item = plt->conn_queue.front(); #endif } //創建每個socket的bufferevent auto bev = bufferevent_socket_new(plt->thread_base, item.fd, BEV_OPT_THREADSAFE); Channel2* c2 = CreateChannel(bev); //設置接收、狀態改變 回調函數 bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2); bufferevent_enable(bev, EV_READ | EV_WRITE ); } //看了這個過程就是這個樣子的,監聽線程接收到連接之后把這個socket丟給Libevent線程,libevent創建bufferevent //處理相關讀和寫事件,這個工程通過每個線程的連接隊列,然后一個socketpair通知的。這樣每個線程就很平均的處理所有的連接事件 //多線程比單線程的是復雜很多,只是這種模式不知道,但bufferevent還是一樣的