項目中的Libevent(多線程)


多線程版Libevent
//保存線程的結構體
struct LibeventThread
{
    LibEvtServer* that;                            //用作傳參
    std::shared_ptr<std::thread>   spThread;    // 線程
    struct event_base * thread_base;            // 事件根基
    struct event   notify_event; 
    evutil_socket_t  notfiy_recv_fd;            // socketpair 接收端fd(工作線程接收通知)
    evutil_socket_t  notfiy_send_fd;            // socketpair 發送端fd(監聽線程發送通知)
#ifdef BOOST_LOCKFREE
    boost::lockfree::spsc_queue<conn_queue_item, boost::lockfree::capacity<1000> > conn_queue; 
#else
    std::mutex conn_mtx;                        //維護連接隊列的鎖
    std::queue<conn_queue_item>  conn_queue;    //conn_queue 是一個管理conn_queue_item的隊列
#endif
};

bool LibEvtServer::init(I_NetServerEvent* event, int start, int size)
{
    m_ids = new ChannelIDGenerator();
    m_ids->init(start, size);
    m_allChannels.resize(m_ids->getSize());

    m_event = event;
    

    //event支持windows下線程的函數
    int hr = evthread_use_windows_threads();
    m_base = event_base_new();
    if (!m_base) {
        fprintf(stderr, "Could not initialize libevent!\n");
        return false;
    }
#ifdef MUL_LIBEVENT_THREAD
    m_last_thread = -1; //注意初始化為-1
    //初始化線程
    init_threads(THREAD_NUMB);
#endif
    return true;
}

bool LibEvtServer::init_threads(int thread_numb)
{
    m_libevent_threads.resize(thread_numb);

    //為每個線程指定雙向通道(類似於管道)
    for(int i = 0; i < thread_numb; ++i)
    {
        
        LibeventThread* plt = new LibeventThread();
#ifdef WIN32
        //創建一個socketpair即可與互相通信的兩個socket,保存在fds里面
        evutil_socket_t fds[2];
        if(evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0) 
        {
            std::cout << "創建socketpair失敗\n";
            return false;
        }
        //設置成無阻賽的socket
        evutil_make_socket_nonblocking(fds[0]);
        evutil_make_socket_nonblocking(fds[1]);
#else
        int fds[2];
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }
#endif
        plt->notfiy_recv_fd = fds[0];
        plt->notfiy_send_fd = fds[1];
        
        //安裝libevent線程[創建base,注冊通道事件(用於監聽新鏈接)]
        setup_libevent_thread(plt);
        
        //線程放入容器中
        m_libevent_threads[i] = plt;
    }

    //開始創建並啟動線程
    for(int i = 0; i < thread_numb; ++i)
    {
        m_libevent_threads[i]->spThread.reset(new std::thread([]
        (void* arg)
        {
            auto me = (LibeventThread*) arg;
            //  Wait for events to become active, and run their callbacks.
            //This is a more flexible version of event_base_dispatch().
            event_base_loop(me->thread_base, 0); 
        }, m_libevent_threads[i]));
    }
    return true;
}

//設置線程信息
void LibEvtServer::setup_libevent_thread(LibeventThread * pLibeventThread)
{
    auto plt = pLibeventThread;
    plt->thread_base = event_base_new(); // 創建線程的event_base
    
    //給每個libevent線程設置連接通知回調函數。
    plt->that = this;
    //設置線程事件notify_event
    event_set(&plt->notify_event, plt->notfiy_recv_fd,//EV_READ表示只要這個socket可讀就調用notify_cb函數
        EV_READ | EV_PERSIST, ::notify_cb, plt); 
    //設置事件和event_base的關系
    event_base_set(plt->thread_base, &plt->notify_event); // 設置事件的從屬關系(相當於指明事件屬於哪個event_base)
    //添加事件
    event_add(&plt->notify_event, 0); // 正式添加事件    
}


void LibEvtServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
#ifdef MUL_LIBEVENT_THREAD
    int cur_thread = (m_last_thread + 1) %  THREAD_NUMB; // 輪循選擇工作線程
    m_last_thread = cur_thread;

    conn_queue_item item;
    item.fd = fd;
    //item.ch2 = NULL;

    auto  plt = m_libevent_threads[cur_thread];
    {    
        //向線程的隊列中放入一個item,每個線程有個隊列,保存連接的socketfd
#ifdef BOOST_LOCKFREE
        while(!plt->conn_queue.push(item))
        {
#ifndef _DEBUG
            boost::this_thread::interruptible_wait(1);
#else
            Sleep(1);
#endif
            Plug::PlugMessageBox("連接隊列居然滿了,超過1000的未處理數!");
        }
#else
        std::lock_guard<std::mutex> lock(plt->conn_mtx);
        plt->conn_queue.push(item);
#endif
    }
    //激活讀線程的讀事件
    send(plt->notfiy_send_fd, "c", 1, 0);

#else
    auto base = evconnlistener_get_base(listener);

    auto bev = bufferevent_socket_new(base, fd, BEV_OPT_THREADSAFE);//|BEV_OPT_CLOSE_ON_FREE);
    if (!bev) 
    {
        fprintf(stderr, "Error constructing bufferevent!");
        event_base_loopbreak(base);
        return ;
    }

    auto c2 = CreateChannel(bev);

    bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2);
    bufferevent_enable(bev, EV_READ | EV_WRITE );
    
#endif
}



//偵聽端口,-1表示向系統申請一個任意可用端口
bool LibEvtServer::listen(int* port)
{
    struct sockaddr_in sin;

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    if(-1 == *port)
        sin.sin_port = htons(10000);
    else
        sin.sin_port = htons(*port);

    m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this,
        LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr*)&sin,
        sizeof(sin));
    if (!m_listener) 
    {
        return false;
    }

    if( -1 == *port)
        *port = ntohs(sin.sin_port);

    if (!m_listener) {
        fprintf(stderr, "Could not create a listener!\n");
        return false;
    }
    m_spListenThread.reset(new std::thread([this]//現在看這個線程只是收到連接,然后交給線程,然后通知線程
    {
        //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);
        //event_base_loop(m_base, EVLOOP_ONCE);
        event_base_dispatch(m_base);
        if(WSAENOTSOCK == WSAGetLastError())
        {
            Plug::PlugMessageBox(L"操作無效套接字啊!");
        }
        
        Plug::PlugMessageBox(L"Libevent派發線程退出!");
    }));
    return true;
}

void LibEvtServer::notify_cb(evutil_socket_t fd, short which, LibeventThread *pLibeventThread)
{    
    //首先將socketpair的1個字節通知信號讀出(這是必須的,在水平觸發模式下如果不處理該事件,則會循環通知,直到事件被處理)
    char  buf[1];
    recv(fd, buf, 1, 0);//從sockpair的另一端讀數據

    auto plt = pLibeventThread;

    conn_queue_item  item;

    //從自己的連接隊列中取出連接數
    {
        //取出隊列中的第一個元素
#ifdef BOOST_LOCKFREE
        while(!plt->conn_queue.pop(item))//pop一個出來
        {
#ifndef _DEBUG
            boost::this_thread::interruptible_wait(1);
#else
            Sleep(1);
#endif
            Plug::PlugMessageBox("通知隊列居然彈空了啊!");
        }
#else
        std::lock_guard<std::mutex>  lck(plt->conn_mtx);
        item = plt->conn_queue.front();
#endif
    }

    //創建每個socket的bufferevent
    auto bev = bufferevent_socket_new(plt->thread_base, item.fd, BEV_OPT_THREADSAFE);

    Channel2* c2 = CreateChannel(bev);

    //設置接收、狀態改變 回調函數
    bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2);
    bufferevent_enable(bev, EV_READ | EV_WRITE );
}

//看了這個過程就是這個樣子的,監聽線程接收到連接之后把這個socket丟給Libevent線程,libevent創建bufferevent
//處理相關讀和寫事件,這個工程通過每個線程的連接隊列,然后一個socketpair通知的。這樣每個線程就很平均的處理所有的連接事件
//多線程比單線程的是復雜很多,只是這種模式不知道,但bufferevent還是一樣的

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM