實現UDP高效接收/響應


環境Linux g++6.3.0

問題一:一個ip地址如何接收高並發請求

問題二:如何高並發響應消息

發送請求端只能通過ip地址+端口號向服務器發送請求碼,所以服務器只能用一個UDP去綁定此ip以及端口號。而如何完成高並發發送響應消息,

誰去發送這個響應消息,接收請求信息的UDP?這就造成其中一個任務必須等待另一個任務執行完畢,sendto是非阻塞,而recvfrom是阻塞,若

執行recvfrom碰巧沒有下個請求信息或者網絡阻塞造成UDP丟失,那么sendto豈是不能執行(一直等待recvfrom結束),這時就會想用超時解決,或

者也可以開啟線程來執行這個sendto函數。對於線程,先不考慮消息傳遞共享問題(IPC),假若有多個響應需要同一時間一起發送,那么此特定

UDP需要與哪個匹配,它可以同時發送這些任務?不可能吧,所以需要多個UDP來發送這些響應。現在可以接收以及並發響應。

問題三:線程只能執行函數一次,則結束。所以每次都得重新構建新線程,如何避免

問題四:當線程構建速率大於線程接收速率時,如何處理

把線程扔進隊列中,進行統一管理,然后如何取任務?這個問題的解決辦法與問題四解決辦法相同,也就是在線程池內創建一個隊列用來存放

任務,而后線程從任務隊列中取出任務執行。問題四的解決是可以限制任務隊列的容量,當滿的時候進行等待。

問題五:線程池對外開放接口,參數應該是哪種類型

問題六:接收請求信息UDP和發送響應信息UDP有何共同點,如何創建這兩個類

希望被執行的對象是函數類/函數指針,所以線程池對外的接口參數也就是函數類/函數指針及其參數,而決定這些特性取決發送響應信息UDP類,

而把發送消息信息UDP類添加進線程池中的類是接收請求信息UDP類(響應依賴請求),接收請求信息UDP類與發送響應信息UDP類共同點是對

socket構建,所以先創建一個基類UDP類

問題七:發送響應信息的載體UDP socket如何分配,發送響應信息類所需要的信息從何處獲取

首先意識到決定這一切,也是對線程池添加任務速率的瓶頸,影響並發響應速率的源頭:接收請求信息UDP。所以在此類中添加一個UDP socket

隊列,而發送響應信息類所需信息有sockaddr_in(ipv4)、響應內容以及一個發送載體UDP socket,因此在UDP中添加這些屬性

問題八:如何添加任務進線程池

怎么辦?C++11 std::function<void(P)> 然后呢?參數有多個,所以

template <typename...Args>
class ThreadPool
{
       .
       .
       .
       void AddTask(std::function<void(Args...)>);   
}

先不說如何實現在線程池中存放可變參數,這樣后此線程池需要被特例化且智能被一種參數所用。可能有點表達不明白,既然是存放,那么我們

的目的是為了取出,所以如何取?必須提前知道所取的類型using Task= std::function<void(Args...)>; 而這必須在構建時就確定好。

如果能把參數類型擦除(去)掉多好,那么就成了有多個無類型的參數,可在存放參數數量又是一個麻煩,要是只有一個那么就好了。所以我們要

把多個屬性包裝在一個類中再把類型擦除掉,perfect!

實現過程:

//基礎udp類,創建了初始化sock,關閉sock,獲取sock方法
//其一,AdvanceUdp類繼承此類
//其二,在並發發送請求消息時需要多個udp發送消息,需要
//一個循環隊列,而循環隊列的成員就是此類
class CommonUdp
{
    protected:
    int sock;    //sock
    public:
    //初始化,AF_INET ipv4族,SOCK_DGRAM 數據流
    //IPPROTO_UDP UDP協議
    void Init()    
    {
        sock= socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP);
        if (sock< 0)
        {
            throw std::out_of_range("socket() failed\n");
        }
    }
    //關閉sock
    void Close() const 
    {
        close(sock);
    }
    //獲取sock
    int GetSock() const noexcept{return sock;}
    //析構 調用Close
    virtual ~CommonUdp()
    {
#if defined(Csz_test)
        std::cout<<"CommonUdp::destructor\n";
#endif
        Close();
    }
};

UDP載體隊列,我采用環形鏈表,添加任務時從表中取出sock,接着鏈表指向下一個,不斷循環使用。沒使用Container其一是覺得沒有必要,

其二若要進行循環使用則還需要一個標記位來記錄上一次的位置。而自構環形鏈表類可以方便使用,省空間。

 1 //Upd環形列表
 2 //禁止了拷貝構造/移動構造/拷貝賦值/移動賦值為了避免內存泄露
 3 class UdpAnnulus
 4 {
 5     public:
 6         CommonUdp udp;
 7         UdpAnnulus *next;
 8         UdpAnnulus() noexcept
 9         {
10 #if defined(Csz_test)
11             std::cout<<"UdpAnnulus constructor\n";
12 #endif
13         }
14         ~UdpAnnulus() noexcept
15         {
16 #if defined(Csz_test)
17             std::cout<<"UdpAnnulus destructor\n";
18 #endif
19         }
20         UdpAnnulus(const UdpAnnulus &)=delete;
21         UdpAnnulus(UdpAnnulus &&)=delete;
22         UdpAnnulus& operator=(const UdpAnnulus &)=delete;
23         UdpAnnulus& operator=(UdpAnnulus &&)=delete;
24 };

接着是發送響應消息的方法類所需要的參數類型 :

data是消息存儲內存起始地址,data[3] 數組類型,后面會看見其實請求消息只有一字節,也就是八位。前三位用來表示8個請求類型,后五位用來表

示請求哪個位置信息。sock 發送響應消息載體。client_addr存放ipv4地址信息,包括ip地址、端口號、協議類型以及套接字類型等。sentry顧名思義

是一個哨兵,用來映射請求消息后五位所在具體位置。

現在來詳細分析為什么請求碼明明只有一個字節卻需要用到三個char。理由其一,若是惡意信息則只需檢查0== data[1] 是否為false,true則是正常用

戶請求,false則忽略且記錄mac地址,當被記錄次數達到一定數量可以delete。理由其二,recvfrom參數中表示接收容量的參數必須比實際少一位,

用來填'\0',所以如果用2個char則每次0== data[1]都為true,也就起不到篩選效果。

 1 //廣播方法類的參數,保存在Any類中
 2 //data 接收請求碼
 3 //sock 發送UDP sock
 4 //sentry 響應信息在文件中的位置標記(ifstream.seekg(sentry))
 5 //client_addr 接收響應消息的對象
 6 class Message
 7 {
 8     public:
 9         char data[3];
10         int sock;
11         uint32_t sentry;
12         sockaddr_in client_addr;
13 #if defined(Csz_test)
14         ~Message(){std::cout<<"Message::destructor\n";}
15 #endif
16 };

下面是接收請求消息的UDP類,刪除了部分功能實現代碼

因為繼承了CommonUdp所以只需進行bind()即可使用。設置了兩個bool來記錄結束以及保證Free()函數只執行一次,當!end_flag為true則ServiceUdpGo

繼續循環執行,而once_flag在執行了Free()函數后被置為true,調用析構函數時檢查once_flag是否為true,若不是則調用Free(),Free包含了關閉接收請

求消息的載體socket以及銷毀釋放環形UDP鏈表。guard是環形Udp當前位置,每當獲取到當前sock后,guard自動指向自己的下一個。Mess是發送響應

消息Udp方法類的參數,在ServiceUdpGo函數中配置好信息,接着加入線程池任務隊列中,等待被執行。RecordPtr以key-value形式映射請求消息后五

位與響應消息所在具體位置的關系,0~2^5對應每條Json頭位置。

InitJsonFileRecord()用來初始化RecordPtr對相應文件進行檢索找到對應位置。

CreateUdpAnnulus()創建環形UDP鏈表。有兩種辦法,其一不斷把新的節點插在頭節點的下一個,而原本頭節點的下一個變成新節點的下一個,也就是

說頭節點以及最后一個節點保持着不變,在最后把最后一個節點的下一個節點指向頭節點。其二每次把新節點放到尾,最后用尾的下一個指向頭節點。

注意的一點是,節點里面的數據也就是UDP socket 在構建好節點后需要初始化,而初始化可能會扔出異常,這時候就需要一個異常處理機制,避免程序

崩掉而環形鏈表所申請的空間未釋放,造成內存泄露。

DestroyUdpAnnulus()銷毀環形UDP鏈表,我的做法是先把環形拆掉,變成單向鏈表,然后進行內存資源回收。

ExceptionUdpAnnulus()創建環形UDP鏈表時,處理因初始化UDP socket失敗扔出的異常。

GetBroadCastSock() 獲取環形UDP鏈表當前socket,並將哨兵移動到下一個。

Check()檢查data[1]是否等於0,也就是說請求消息是否合法

RetNum() 通過映射將data[0]后五位轉換成響應消息真實所在文件位置。

AdvanceUdp()構造,進行綁定ip地址、端口號,創建環形鏈表,初始化RecordPtr。

~AdvanceUdp()析構,若在實例變量釋放時還未調用Free()進行資源回收,則析構會自主調用,若已調用則不做任何something

Free() 關閉socket,銷毀環形UDP鏈表

Bind() 將socket與ip及端口進行綁定

End() 結束ServiceUdpGo()函數循環

ServiceUdpGo() 這里有一個值得注意的地方,當調用End() 而recvfrom處於阻塞狀態,那么就不能立即結束函數,必須等待recvfrom錯誤或者下一個請求

消息到來。 select多路復用,掛起等待I/O發送信號量喚醒,若有消息到來則會將到來描述符置為,也就是說每次都要重新設置所要監視的描述符。超時

時間也需要每次重置,否則下一次就是0,0也就是說輪詢效果。

//繼承了普通Udp類,有個sock,以及初始化/關閉/獲取sock方法
//通過AdvanceUdp 實質是一個udp反復接收其他udp發送過來的請求信息
//然后進行分類處理並派送給其他udp去進行發送
//end_flag 結束標記,線程不斷跑接收請求函數,是否結束則跟着次標記
//once_flag 保證關閉socket和釋放環形鏈表只執行一次
//mess 將任務扔進線程池需要方法類以及其參數,而mess就是其參數
//RecordPtr 記錄了json文件每條記錄在文件中的位置,因為UDP是無狀態
//所以需要認為記錄'狀態'
//調用End() 去設置end_flag為true,而ServiceUdpGo() 是while (!end_flag)
//調用 recvfrom 阻塞/等待,也就是說在沒接收到任何信息時,是不會結束此線程
//循環。解決辦法 1.設置超時 2.設置鎖 condition_variable
class AdvanceUdp : public CommonUdp
{
    private:
        using RecordPtr= std::shared_ptr<std::vector<uint32_t>>;
    private:
        bool end_flag; //結束標記
        bool once_flag; //運行一次標記
        UdpAnnulus *guard; //udp環形鏈表 哨兵
        Message mess; //response消息
        RecordPtr mv_record; //智能指針shared_ptr
        RecordPtr os_record;
        RecordPtr sw_record;
        RecordPtr tv_record;private:
        //初始化 RecordPtr
        void InitJsonFileRecord();//創建環形鏈表
        //有兩種方法 一種是不斷的把新指針插在頭的下一個(注釋),另一種是按順下申請下去
        //每申請一個變量就為成員變量udp調用初始化函數
        void CreateUdpAnnulus(uint8_t &&T_num)
        {
#if defined(Csz_test)
            std::cout<<"AdvanceUdp::CreateUdpAnnulus()\n";
#endif
            guard= new UdpAnnulus();
            guard->next= nullptr;
            //異常
            try
            {
                guard->udp.Init();
            }
            catch (const std::out_of_range &T_mess)
            {
                std::cerr<<T_mess.what();
                delete guard;
                exit(1);
            }
            //UdpAnnulus *flag;
            UdpAnnulus *temp= guard;
            while (T_num> 1)
            {
                //UdpAnnulus *temp= new UdpAnnulus();
                //temp->next= guard->next;
                //guard->next= temp;
                temp->next= new UdpAnnulus();
                temp= temp->next;
                try
                {
                    //guard->next->udp.Init();
                    temp->udp.Init();
                }
                catch (const std::out_of_range &T_mess)
                {
                    temp->next= nullptr;
                    std::cerr<<T_mess.what();
                    ExceptionUdpAnnulus();
                    exit(1);
                }
                //if (nullptr== temp->next)
                //    flag= temp;
                --T_num;
            }
            //flag->next= guard;
            temp->next= guard; //尾的下一個是頭
        }
        //銷毀環形鏈表
        //先把頭的下一個賦值給臨時變量,再把頭下一個置空
        //這時候就是個單向鏈表了,從臨時變量開始進行刪除
        void DestroyUdpAnnulus()
        {
#if defined(Csz_test)
            std::cout<<"AdvanceUdp::DestroyUdpAnnulus()\n";
#endif
            if (nullptr== guard)
                return ;
            UdpAnnulus *temp= guard->next;
            guard->next= nullptr;
            while (nullptr!= temp)
            {
                UdpAnnulus *dele= temp;
                temp= temp->next;
                delete dele;
            }
            guard= nullptr;
        }
        //環形鏈表初始化udp時拋出異常處理機制
        //在初始化環形鏈表時頭是不變的
        void ExceptionUdpAnnulus()
        {
#if defined(Csz_test)
            std::cout<<"AdvanceUdp::ExceptionUdpAnnulus()\n";
#endif
            while (nullptr!=guard)
            {
                UdpAnnulus *dele= guard;
                guard= guard->next;
                delete dele;
            }
        }
        //獲取環形鏈表中udp socket
        //然后guard移動到下一個
        int GetBroadCastSock()
        {
            int value= guard->udp.GetSock();
            guard= guard->next;
#if defined(Csz_test)
            std::cout<<"AdvanceUdp::GetBroadCastSock() sock:"<<value<<"\n";
#endif
            return value;
        }
        //判斷是否是合法用戶
        //第二個字符是否為空'\0'
        bool Check()
        {if (mess.data[1]== 0)
                return true;
            else
                return false;
        }
        //返回所請求頁數所在的位置(文件中的位置)
        //×××1 1111 后五位表示頁數 2^5=64頁
        //111× ×××× 前三位為類型 2^3=8種
        //000× :0  001× :32  010× :64  011× 96
        //100× :128  101× :160  110× :192  111× :224
        //32:MV  64:TV  96:SW  128:OS
        uint32_t RetNum(char &T_seek)
        {
#if defined(Csz_test)
            std::cout<<"AdvanceUdp::RetNum()\n";
#endif
            switch (SelectType()(T_seek))
            {
                case 32:
                    if ((T_seek& 0x1f)>= mv_record->size())
                        return -1;
                    return (*mv_record)[T_seek& 0x1f];
                case 64:
                    if ((T_seek& 0x1f)>= tv_record->size())
                        return -1;
                    return (*tv_record)[T_seek& 0x1f];
                case 96:
                    if ((T_seek& 0x1f)>= sw_record->size())
                        return -1;
                    return (*sw_record)[T_seek& 0x1f];
                case 128:
                    if ((T_seek& 0x1f)>= os_record->size())
                        return -1;
                    return (*os_record)[T_seek& 0x1f];
                default:
                    //0,224,160,192 不合法請求碼
                    return -2;
            }
        }
    public:
        //構造函數,參數為udp環形鏈表的個數
        //Bind方法 扔出異常std::out_of_range
        //對T_num進行檢測,不能是負數
        AdvanceUdp(uint16_t &&T_port,uint8_t &&T_num= UdpNum) : end_flag(false),once_flag(false),guard(nullptr)
        {
            try
            {
                Bind(std::forward<uint16_t>(T_port));
            }
            catch (const std::out_of_range &T_mess)
            {
                std::cerr<<T_mess.what();
                exit(1);
            }
            //T_num= T_num> 0? :T_num : 1;
            CreateUdpAnnulus(std::forward<uint8_t>(T_num));
            InitJsonFileRecord();
        }
        //析構函數,若已經手動釋放則不做任何操作
        ~AdvanceUdp()
        {
#if defined(Csz_test)
            std::cout<<"AdvanceUdp::destructor\n";
#endif
            if (!once_flag)
            {
                Free();
            }
        }
        AdvanceUdp(const AdvanceUdp &)=delete;
        AdvanceUdp(AdvanceUdp &&)=delete;
        AdvanceUdp& operator=(const AdvanceUdp &)=delete;
        AdvanceUdp& operator=(AdvanceUdp &&)=delete;
        //關閉udp socket(用於接收消息),釋放環形鏈表
        //設置操作標記符為true
        void Free();//將sock與地址/端口進行綁定
        //調用基類進行sock初始化
        //struct sockaddr_in ipv4地址結構體
        void Bind(uint16_t &&T_port);//停止ServiceUdpGo
        void End();//線程調用函數
        //參數為線程池,用來存放請求任務
        //recvfrom 接收請求消息,消息大小需要比實際大小少1,因為要用來存放最后一個null
        //因為udp是有界,所以在接收消息時,當接收消息大小< 緩沖大小時,多余的數據將被
        //悄悄的丟棄,利用這個我們調用Check()檢查mess.data[1]是否為空,若不時則是非法用戶
        void ServiceUdpGo(std::shared_ptr<Csz_ThreadPool::ThreadPool> T_pool)
        {
            //多路復用
            fd_set sock_set; //創建描述符
            int max_desc= sock+ 1; //設置最大描述符
            int code; //記錄select返回值 -1為錯誤,0為超時
#if defined(Csz_test)
            std::cout<<"AdvanceUdp::ServiceUdpGo() start\n";
#endif
            while (!end_flag)
            {
                FD_ZERO(&sock_set); //清零
                FD_SET(sock,&sock_set); //設置描述符
                //時間結構必須每次都重置,否則下次清零
                struct timeval time_out;
                time_out.tv_sec= TIMEOUT; //
                time_out.tv_usec= 0; //微秒
                memset(&mess,0,sizeof(mess));
                socklen_t addr_len= sizeof(mess.client_addr);
                if ((code= select(max_desc,&sock_set,NULL,NULL,&time_out))<= 0)
                {
                    if (-1== code)
                    {
                        std::cerr<<"AdvanceUdp::ServiceUdpGo() select return -1\n";
                        exit(0);
                    }
#if defined(Csz_test)
                    //std::cout<<"AdvanceUdp::ServiceUdpGo() time out\n";
#endif
                    //continue;
                    //超時后若直接進行下一次操作,recvfrom則不會運行,
                    //下次select也是等待超時,而沒收到上次信號量
                }
                if (FD_ISSET(sock,&sock_set))
                {
                    if (recvfrom(sock,mess.data,sizeof(mess.data)-1,0,(struct sockaddr*)&mess.client_addr,&addr_len)< 0)
                    {
#if !defined(Csz_O1)
                        std::cerr<<"recvfrom< 0\n";
#endif
                        continue;
                    }
                }
                else
                {
                    continue;
                }
                if (Check())
                {
                    mess.sentry= RetNum(mess.data[0]);
                    mess.sock= GetBroadCastSock();
                    T_pool->AddTask(UdpBroadCast(),std::move(mess));
                }
                else
                {
                    //AOP
                    //記錄錯誤請求ip
                    //std::cerr<<"error request from ip"
#if !defined(Csz_O1)
                    char client_addr[16]={0};
                    inet_ntop(AF_INET,&mess.client_addr.sin_addr.s_addr,client_addr,sizeof(client_addr));
                    std::cerr<<"error request from ip "<<client_addr<<" port "<<ntohs(mess.client_addr.sin_port)<<"\n";
#endif
                }
            }
#if defined(Csz_test)
            std::cout<<"AdvanceUdp::ServiceUdpGo() stop\n";
#endif
        }
};

c++17添加了一個類Any 

An object of class any stores an instance of any type that satisfies the constructor requirements or is empty, and this is referred to as the state of the

class any object. The stored instance is called the contained object. Two states are equivalent if they are either both empty or if both are not empty and

if the contained objects are equivalent.The class any describes a type-safe container for single values of any type.

而這里要用到的是個類似Any類的類,它能擦除掉任何類型並把值保存在內部,在需要的時候可以取出來。其實就是把任何想保存的類型變成一個統一

的類型,用模板來提取所要保存類型的類型信息(std::type_index),而Value(值)則用內部類(其實跟定義變量一樣,只不過這個變量是我們自己構建)的

派生類來記錄下,為什么不能直接寫在Any類中呢?在特例化的時候有不同屬性的Any的std::type_index是不等價,而如果直接用內部類來記錄值,則

內部類特例化的時候有不同屬性的內部類的 std::type_index是不等價,從而導致兩個Any的std::type_index不等價。說到底就是std::type_index要等價,

除了類名相同,屬性名也得相同。所以在Any類中定義一個指向內部類的unique_ptr指針,而指針實際是指向內部類的派生類。這有點類似橋連接,只

知道Any類有個智能指針,而不在意這個指針真實是指向哪里。

//擦除掉類型,利用在構造函數時,傳進所要擦除的類型以及值,用指針
//進行記錄值,用type_index記錄類型,而在需要返回類型的值時,用模板
//繼續傳如類型,與type_index比較若相等則進行強制類型轉換,dynamic_cast

struct Any 
{
    private:
        struct Base; //接口
        using BasePtr= std::unique_ptr<Base>; //智能指針保證資源釋放
        std::type_index index;  //記錄當前保存的類型
        BasePtr ptr;  //記錄當前類型的智能指針,get()返回指向的類型
    private:
        struct Base
        {
            //用虛函數保證繼承關系的派生類能夠調用析構函數
            //在實現多態時,當用基類操作派生類,在析構時防止只析構基類而不析構派生類
            virtual ~Base(){} //必須virtual
            virtual BasePtr Clone()const =0; //必須virtual
        };
        //保存值的類
        //模板特例化傳進的是保持值的類型,而構造函數模板則是保證值傳遞進來是原始類型值
        //這樣做的好處是可能是右值,也能剩下構造值的時間
        template <typename T>
        class Derived : public Base
        {
            public:
                T value;
            public:
                template <typename U>
                Derived(U &&T_value) : value(std::forward<U>(T_value)) 
                {
#if defined(Csz_test)
                    std::cout<<"Any::Derived::constructor\n";
#endif
                }
#if defined(Csz_test)
                ~Derived(){std::cout<<"Any::Derived::destructor\n";}
#endif
                BasePtr Clone()const
                {
#if defined(Csz_test)
                    std::cout<<"Any::Derived::Clone()\n";
#endif
                    return BasePtr(new Derived<T>(value));
                }
        };
        //輔助方法,調用Derived::Clone()變簡單
        BasePtr Clone() const
        {
#if defined(Csz_test)
            std::cout<<"Any::Close()\n";
#endif
            if (nullptr!= ptr)
                return ptr->Clone();
            return nullptr;
        }
    public:
        //默認構造
        Any() : index(typeid(void)),ptr(nullptr){}
        //右值構造
        Any(Any &&T_other) : index(std::move(T_other.index)),ptr(std::move(T_other.ptr)){}
        //左值構造
        Any(const Any &T_other) : index(T_other.index),ptr(T_other.Clone()){}
        //值構造
        //用模板,限定模板類型為非Any類
        //std::decay 先移除T引用得到U,U為 remove_reference<T>::type
        //若是is_array<U>::value 為true,則修改類型type為 remove_extent<U>::type* 移除數組的一個維度,若是一維數組則是變量指針
        //若是is_function<U>::value為true,則修改類型type為 add_pointer<U>::type 也就是函數指針
        //剩下的修改type類型為 remove_cv<U>::type
        template <typename T,typename= typename std::enable_if<!std::is_same<typename std::decay<T>::type,Any>::value,T>::type>
        Any(T &&T_value) : index(typeid(typename std::decay<T>::type)),ptr(new Derived<typename std::decay<T>::type>(std::forward<T>(T_value))){}
#if defined(Csz_test)
        ~Any(){std::cout<<"Any::destructor\n";}
#endif
        bool IsNull() const noexcept
        {
#if defined(Csz_test)
            std::cout<<"Any::IsNull()\n";
#endif
            return nullptr== ptr;
        }
        //查詢當前保存的類型是否為函數特例化的類型
        template <typename T>
        bool Is() const noexcept
        {
#if defined(Csz_test)
            std::cout<<"Any::Is()\n";
#endif
            return std::type_index(typeid(T))== index;
        }
        template <typename T>
        T AnyCast()
        {
#if defined(Csz_test)
            std::cout<<"Any::AnyCast()\n";
#endif
            if (!Is<T>())
            {
#if !defined(Csz_O1)
                std::cerr<<"can not cast "<<index.name()<<" to "<<typeid(T).name()<<"\n";
#endif
                throw std::bad_cast();
            }
            //std::unique_ptr<T>.get() 返回智能指針指向T的指針,若沒有則返回nullptr
            //std::unique<Base> ptr
            auto derived= dynamic_cast<Derived<T>*>(ptr.get());
            //若是值為類則必須在類里面寫好移動構造函數,避免了資源釋放兩次
            //若值是標准類型,其實也是無所謂但養成好習慣,標准類型的move and copy效果是一樣的
            return std::move(derived->value);
        }
        //copy賦值操作符
        Any& operator=(const Any &T_other)
        {
#if defined(Csz_test)
            std::cout<<"Any::operator= copy()\n";
#endif
            if (T_other.ptr== ptr)
                return *this;
            ptr= T_other.Clone();
            index= T_other.index;
            return *this;
        }
        //move賦值操作符
        Any& operator=(Any &&T_other)
        {
#if defined(Csz_test)
            std::cout<<"Any::operator= move\n";
#endif
            if (T_other.ptr== ptr)
                return *this;
            ptr=std::move(T_other.ptr);
            index= std::move(T_other.index);
            return *this;
        }
};

 Any類是在哪里使用呢?除了是作為傳入線程池的參數還有就是作為另一個關鍵方法類的參數,那就是發送響應消息UDP類。

在方法類中指定從Any類中取出所需要的類型,若Any中並沒有保存這個類型則會扔出異常。當類型取出后,對消息字節的前三位做 相與 得出type。

為什么不是函數,而是方法類呢?在調用的時候函數一定是通過指針去調用,而方法類可能是內聯代碼。

//廣播方法類
//參數為Any,在操作時進行轉換
//對消息進行類型分類
struct UdpBroadCast : public std::unary_function<Any,void>
{
    void operator()(Any &&T_any)const noexcept
    {
        //broadcast
        //Any::AnyCast throw std::bad_cast
        Message value;
        try
        {
            value= T_any.AnyCast<Message>();
        }
        catch (std::bad_cast &T_mess)
        {
#if !defined(Csz_O1)
            std::cerr<<"UdpBroadCast::operator(): "<<T_mess.what()<<"\n";
#endif
            return ;
        }
#if defined(Csz_test)
        std::cout<<"UdpBroadCast::operator() request:"<<short(value.data[0])<<"\n";
#endif
        switch (SelectType()(value.data[0]))
        {
            case 32:
                SendTo(JSON_MV,value.sentry,value.sock,value.client_addr);
                break;
            case 64:
                SendTo(JSON_TV,value.sentry,value.sock,value.client_addr);
                break;
            case 96:
                SendTo(JSON_SW,value.sentry,value.sock,value.client_addr);
                break;
            case 128:
                SendTo(JSON_OS,value.sentry,value.sock,value.client_addr);
                break;
            default:
                //0,224,160,192
                return ;
        }
    }
    //實際廣播方法
    //若T_sentry== -1 uint32_t -1對應最大值(請求頁數大於最大值)
    //若T_sentry== -2 不合法請求碼
    //則不做任何處理,請求頁數超過了數據庫所擁有的頁數
    //文件打不開可能是文件名不對,或者句柄數量已到上限
    //std::string 也是不斷動態申請空間,當當前空間不足時
    //將再申請一塊更大的空間(通常是以前空間×2),再釋放
    //之前的空間,所以為了避免返回整合空間一開始就將空間
    //設置位較大,使用temp.reserve();
    //若后面JSON是動態改變則可以在過后使用temp.shrink_to_fit()釋放多余空間
    void SendTo(const char * T_file_name,const uint32_t &T_sentry,const int &T_sock,const sockaddr_in &T_client_addr) const noexcept
    {
        if (uint32_t(-2)== T_sentry)
        {
#if !defined(Csz_O1)
            char client_addr[16]={0};
            inet_ntop(AF_INET,&T_client_addr.sin_addr.s_addr,client_addr,sizeof(client_addr));
            std::cerr<<"ip "<<client_addr<<" port "<<ntohs(T_client_addr.sin_port)<<" RequestCode out of range\n";
#endif
            return ;
        }
        std::string temp;
        temp.reserve(32);
        std::ifstream handle(T_file_name);
        if (!handle.is_open())
        {
#if !defined(Csz_O1)
            std::cerr<<"file_name: "<<T_file_name<<" open failed\n";
#endif
            temp.assign("服務器數據錯誤");
        }
        else if (uint32_t(-1)== T_sentry)
        {
#if !defined(Csz_O1)
            char client_addr[16]={0};
            inet_ntop(AF_INET,&T_client_addr.sin_addr.s_addr,client_addr,sizeof(client_addr));
            std::cerr<<"ip "<<client_addr<<" port "<<ntohs(T_client_addr.sin_port)<<" request page> Max\n";
#endif
            temp.assign("請求頁碼大於最大值");
        }
        else
        {
        temp.reserve(1152);
        handle.seekg(T_sentry);
        std::getline(handle,temp);
        }
        //sendto 失敗返回-1
        if (sendto(T_sock,temp.c_str(),temp.size(),0,(struct sockaddr*)&T_client_addr,sizeof(T_client_addr))< 0)
        {
#if !defined(Csz_O1)
            char client_addr[16]={0};
            inet_ntop(AF_INET,&T_client_addr.sin_addr.s_addr,client_addr,sizeof(client_addr));
            std::cerr<<T_sock<<": sendto "<<client_addr<<" port "<<ntohs(T_client_addr.sin_port)<<" failed\n";
#endif
            //記錄
        }
    }
};

現在只差線程池了,而線程池中關鍵的還是任務的添與取。任務隊列采用雙向鏈表而線程隊列采用單向鏈表,因為任務隊列有分先后,而線程隊列不關心

先后問題且單向鏈表比雙向鏈表在空間利用上更為有效。

stop_flag 停止任務添加與取出標記。size_max用來線程鏈表最大容量,避免內存爆炸。is_full與is_empty是多線程之間的通信變量,可以使一些線程等待

其他線程的通知,一般總是綁定一個std::unique_lock。task_list 任務隊列,存放一個std::pair<std::function,Any> ,std::function用來存放方法類或着函數,

而Any則是這些方法類或者函數的參數,std::function用於擦除方法類、函數以及lambda的區別,而Any用於擦除參數類型的不同,使整個線程池耦合性降低。

Add()是個模板函數,為std::forward的使用提供環境(引用未定義),使std::forward正確推出左值和右值,當是右值時可以節省內存的開銷。作用是將任務

添加經鏈表中。這里對停止做了一個處理,是為了不造成任務數據丟失,為線程池暫停執行與重新開始提供了條件。

Put()重載了兩個,左值參數與右值參數,共同點都是調用了Add()。使用std::forward保持參數類型與傳入參數類型是一致的,比如右值參數,傳進Add()也

一樣是一個右值參數,若沒使用則是依左值參數傳入。

Take() 取出任務,這里並不是將取到的變量返回,而是通過參數傳入進行取值,很明顯的一個優勢就是不必構建一個變量進行取值返回。

Stop() 停止任務添加與取出,這里需要注意的是在獲取鎖以及改變stop_flag標記位是在局部范圍內完成,若沒有的話,在喚醒其他線程時,由於鎖還未

釋放,所以其他線程則需等待獲取鎖。

IsFull()檢查是否為滿,為條件鎖提供判斷。

IsEmpty()檢查是否為空,為條件鎖提供判斷。這兩個函數都不可以添加鎖,因為在Add()或者Take()都已經獲取了mutex,而現在若是要再獲取mutex則發生

死鎖。

Size() 獲得任務鏈表當前大小。

Clear() 清除任務鏈表。

#define SIZE 30   //默認30個容量任務

//#define Csz_test

    //Func 方法類
    template<typename Func>
    class TaskList
    {
        private:
            bool stop_flag;  //標記是否停止
            uint8_t size_max; //最大任務數
            std::mutex task_mutex; //
            std::condition_variable is_full;  //為滿時等待
            std::condition_variable is_empty;  //為空時等待
            std::list<std::pair<Func,Any>> task_list; //以Any類擦除了函數參數的不同
        private:
            //這里用到引用未定義,若傳進來是右值則為右值引用,其他則為左值
            //const Task && T_task 加了const 則不是引用未定義,只是個右值引用參數
            template <typename Task,typename Parameter>
            void Add(Task && T_task,Parameter &&T_any)
            {
                std::unique_lock<std::mutex> guard(task_mutex); //上鎖
                //若不能執行則釋放鎖進入等待,加了第二參數相當
                //while (!predicate())
                //        is_full.wait();
                is_full.wait(guard,[this]{return stop_flag || !IsFull();}); 
                if (true== stop_flag)
                {
                    //任務是被停止了而不是結束,有可能繼續,為了保證任務不丟失
                    //再次檢查是否為滿,不為滿則加入任務
                    if (!IsFull())
                        task_list.emplace_back(std::forward<Task>(T_task),std::forward<Parameter>(T_any));
                    return ;
                }
#if defined(Csz_test)
                std::cout<<"TaskList::Add::emplace_back\n";
#endif
                //就地構造,避免了一次沒必要的移動拷貝或者賦值拷貝
                task_list.emplace_back(std::forward<Task>(T_task),std::forward<Parameter>(T_any));
                is_empty.notify_one(); //此時不為空,所以喚起因為空而等待的線程
            }
        public:
            TaskList(const uint8_t &T_size= SIZE) noexcept : stop_flag(false),size_max(T_size){}
            ~TaskList()noexcept 
            {
#if defined(Csz_test)
                std::cout<<"TaskList::destructor\n";
#endif
                task_list.clear();
            }
            TaskList(const TaskList &T_other)=delete;
            TaskList(TaskList &&T_other)=delete;
            TaskList& operator=(const TaskList &T_other)=delete;
            TaskList& operator=(TaskList &&T_other)=delete;
            //任務添加為左值
            void Put(const Func &T_task,const Any &T_any)
            {
#if defined(Csz_test)
                std::cout<<"TaskList::Put()copy\n";
#endif
                Add(T_task,T_any);
            }
            //任務添加為右值
            //利用完美轉發保持參數以右值形式傳進
            void Put(Func &&T_task,Any &&T_any)
            {
#if defined(Csz_test)
                std::cout<<"TaskList::Put()move\n";
#endif
                Add(std::forward<Func>(T_task),std::forward<Any>(T_any)); //forward<int>(value) 為右值
            }
            //不返回值,利用參數加引用,假設在沒有返回值優化的情況下,那么將比有返回值
            //的少了一個constructor以及一個copy
            void Take(Func &T_task,Any &T_any)
            {
                std::unique_lock<std::mutex> guard(task_mutex);
                //若不能執行則釋放鎖進入等待,加了第二個參數相當於
                //while(!predicate())
                //        is_empty.wait();
                is_empty.wait(guard,[this]{return stop_flag || !IsEmpty();});
                if (true== stop_flag)
                {
                    //T_task=[]{}; //先判斷后運行
                    return ;
                }
#if defined(Csz_test)
                std::cout<<"TaskList::Take()\n";
#endif
                auto task= std::move(task_list.front());
                task_list.pop_front();
                T_task= std::move(task.first);
                T_any= std::move(task.second);
                is_full.notify_one();  //此時不為滿,所以喚醒因為滿而進行等待的線程
            }
            void Stop() noexcept
            {
#if defined(Csz_test)
                std::cout<<"TaskList::Stop()\n";
#endif
                {
                    //設為局部鎖,若在加鎖的情況下喚醒其他等待線程,
                    //則其他線程必須等待mutex的釋放
                    std::unique_lock<std::mutex> guard(task_mutex);
                    stop_flag= true; 
                }
                is_full.notify_all();
                is_empty.notify_all();
            }private:
            //在Add函數中調用到,而Add函數本身是在有鎖的情況下進行
            //若在此函數再進行加鎖着就會出現餓死的情況也就是死鎖
            bool IsFull() const noexcept
            {
#if defined(Csz_test)
                std::cout<<"TaskList::IsFull()\n";
#endif
                return task_list.size()== size_max;
            }
            //同上 在take
            bool IsEmpty() const noexcept
            {
#if defined(Csz_test)
                std::cout<<"TaskList::IsEmpty()\n";
#endif
                return task_list.empty();
            }
        public:
            //若是在外調用,保證返回值是當前的准確值
            //加鎖的情況下
            size_t Size() const noexcept
            {
#if defined(Csz_test)
                std::cout<<"TaskList::Size()\n";
#endif
                std::lock_guard<std::mutex> guard(task_mutex);
                return task_list.size();
            }
            void Clear()
            {
#if defined(Csz_test)
                std::cout<<"TaskList::Clear()\n";
#endif
                std::lock_guard<std::mutex> guard(task_mutex);
                task_list.clear();
            }
    };

線程池,可以被多個線程操作(添加任務,結束線程),暫停與重新開始(不建議多線程調用,因為可能會發生死鎖或者拋出異常)。

stop_flag 原子操作變量,用於標記線程池暫停狀態。

stop_mutex 協助線程池暫停線程運行,調用Stop()時對stop_mutex進行鎖定,而線程在檢查到stop_flag為true時對stop_mutex鎖進行獲取,着就造成線程

停止等待。其實這個暫停與重新開始功能很雞肋,因為是直接對mutex進行操作,而不是用std::unique_lock或者std::lock_guard進行對鎖的管理(也不能

使用這兩個),這就出現了問題,如果多個線程同時對線程池進行暫停或者重新開始,先說暫停把,那么就會造成調用者不斷等待獲取鎖,從而餓死。而對

一個沒有鎖的鎖進行unlock則會拋出異常(行為未定義)。所以若要使用這兩個函數則必須只能是單線程操作,在析構時檢查stop_flag是否為true,若是則

調用Restart()對 stop_mutex 進行解鎖,避免線程池中的線程還處於獲取stop_mutex鎖的狀態從而發生了死鎖。

promise_one c++11新特性,保證收回資源函數只被運行一次,為多線程同時操作單個線程池提供了安全保障。

end_flag 原子操作變量,用於標記線程池是否結束。

thread_list 是單向鏈表std::forward_ist,用於存放線程。相比雙向鏈表空間利用率更好。

task_list 任務鏈表類,用於存放任務以及其參數(Any),內部是用雙向鏈表std::list對數據進行保存。因為任務有請求的先后順序,而單向鏈表對這個支持

不太友好。

Start() 初始化線程鏈表,鏈表內存放着是std::unique_ptr<std::thread> 用智能指針管理內存資源。這里需要注意的一點是需要傳入this指針,每個類方法

都有一個隱藏的this指針,而這個參數在參數列表的第一個。

ThreadRun() 線程池中線程跑的函數,用end_flag判斷是否結束,stop_flag判斷是否獲取stop_mutex鎖。獲取stop_mutex鎖用std::lock_guard進行管理

出來局部范圍自動釋放。

ThreadPoolEnd() 結束線程池,先停止任務鏈表的添加與取出,再等待線程結束,釋放內存。

ThreadPool()構造,對線程池中線程數量、任務鏈表的容量、stop_flag及end_flag賦值,對線程鏈表和任務鏈表進行初始化。

~ThreadPool析構,若沒釋放stop_mutex鎖則對stop_mutex 進行解鎖。若沒運行End()函數對線程鏈表及任務鏈表資源回收則調用End(),這里std::call_once

對此提供保障。

AddTask() 提供左值參數與右值參數兩個版本,當是右值參數時將節省內存的開銷。

End()結束線程池,調用了std::call_once(std::once_flag, Callable&& f, Args&&... args)。

Stop() 暫停線程池中線程,對stop_mutex鎖進行鎖定。

Restart() 重新恢復線程池中線程的運行,對stop_mutex鎖進行釋放。

namespace Csz_ThreadPool
{
    class ThreadPool 
    {
        private:
            //stop_flag停止線程運行,Stop method and Restart method
            //停止后還是能繼續往task_list中加入任務,而線程都處於waiting狀態
            //因為Stop()鎖住了stop_mutex,而線程們正不斷'努力'獲取這個鎖調用
            //std::lock_guard<std::mutex> guard(stop_mutex) RAII機制,
            //而在Restart()釋放了stop_mutex,線程依次獲取此鎖后,出了局部范圍
            //又釋放了此鎖,以便其他線程獲取此鎖。
            //暫停與重新開始(不建議多線程調用,因為可能會發生死鎖或者拋出異常)
            std::atomic_bool stop_flag;
            std::mutex stop_mutex;
            //std::once_flag 調用法std::call_once(promise_one,[]{})若是在調用
            //函數里發生了異常或者扔出了異常則下次調用是會再次運行
            //保證線程池的釋放只進行一次(在多線程同時操作線程池ing)
            //std::atomic_bool 提供了原子操作
            std::once_flag promise_one;
            std::atomic_bool end_flag;
            //若直接把線程放進container里則進行拷貝,若放進線程指針則需要
            //注意在join時候必須釋放內存,若使用智能指針則可以同時避免着兩種情況
            //使用forward_list原因:只需要把線程指針從一端放進去而不需要從另一端取出來
            //而forward_list比list在空間利用率上更高
            using ThreadPtr= std::unique_ptr<std::thread>;
            std::forward_list<ThreadPtr> thread_list; //線程隊列
            using Task= std::function<void(Any)>;
            TaskList<Task> task_list; //任務隊列
        private:
            //初始化,創建線程並把線程move進去線程隊列,方便結束線程池時
            //join線程
            void Start(const int & T_thread_num) noexcept
            {
#if defined(Csz_test)
                std::cout<<"Thread::Start()\n";
#endif
                for (int i= 0; i< T_thread_num; i++)
                {
                    thread_list.push_front(std::move(ThreadPtr(new std::thread(&ThreadPool::ThreadRun,this))));
                    //thread_list.emplace_front(&ThreadPool::ThreadRun,this); //重載make_unique
                }
            }
            //線程執行的函數
            void ThreadRun() noexcept
            {
                Task task;
                Any parameter;
                while (!end_flag)
                {
                    task_list.Take(task,parameter);
                    if (end_flag)
                        return ;
#if defined(Csz_test)
                    std::cout<<"ThreadPool::ThreadRun task_begin\n";
#endif
                    task(std::move(parameter));
#if defined(Csz_test)
                    std::cout<<"ThreadPool::ThreadRun task_end\n";
                    //std::this_thread::sleep_for(std::chrono::seconds(1));
#endif
                    if (stop_flag)
                    {
                        std::lock_guard<std::mutex> guard(stop_mutex);
                    }
                }
            }
            //結束線程,等待線程執行完當前task,這里有個debug,也就是說
            //你的結束並不是立即結束而是必須等待線程把所領取的task執行完
            //最后清空線程隊列以及任務隊列
            void ThreadPoolEnd() noexcept
            {
#if defined(Csz_test)
                std::cout<<"ThreadPool::ThreadPoolEnd()\n";
#endif
                end_flag=true;
                task_list.Stop();
                for (auto &thread : thread_list)
                    if (thread)
                        thread->join();
                thread_list.clear();
                task_list.Clear();
            }
        public:
            //構造函數,在賦值參數的時候要嚴格按照申明順序!!!
            //parameter1 線程隊列最大容量,默認為系統核數
            //parameter2 任務隊列最大容量,默認為 #define SIZE 30
            ThreadPool(int T_thread_num= std::thread::hardware_concurrency(),uint8_t T_size= SIZE)noexcept : stop_flag(false),end_flag(false),task_list(T_size)
            {
                Start(T_thread_num);
            }
            //當忘了結束線程池則有析構函數來執行
            ~ThreadPool()
            {
#if defined(Csz_test)
                std::cout<<"ThreadPool::destructor\n";
#endif
                if (true== stop_flag)
                    Restart();
                End();
            }
            //添加任務,右值
            //使用完美轉發保證了參數類型原樣
            void AddTask(Task &&T_task,Any &&T_parameter) noexcept
            {
#if defined(Csz_test)
                std::cout<<"ThreadPool::AddTask()move\n";
#endif
                task_list.Put(std::forward<Task>(T_task),std::forward<Any>(T_parameter));
            }
            //添加任務,除了右值
            void AddTask(const Task &T_task,const Any &&T_parameter) noexcept
            {
#if defined(Csz_test)
                std::cout<<"ThreadPool::AddTask()copy\n";
#endif
                task_list.Put(T_task,T_parameter);
            }
            //結束線程池
            //call_once保證只正常執行一次
            void End() noexcept
            {
#if defined(Csz_test)
                std::cout<<"ThreadPool::End()\n";
#endif
                std::call_once(promise_one,[this]{ThreadPoolEnd();});
            }
            //停止線程,但不停止任務添加
            void Stop() noexcept
            {
#if defined(Csz_test)
                std::cout<<"ThreadPool::Stop()\n";
#endif
                //多線程控制線程池
                if (true== stop_flag)
                    return ;
                stop_mutex.lock();
                stop_flag= true;
            }
            //恢復線程,線程繼續從任務隊列中取任務執行
            void Restart() noexcept
            {
#if defined(Csz_test)
                std::cout<<"ThreadPool::Restart()\n";
#endif
                //Start(T_thread_num);
                //多線程控制線程池
                if (false== stop_flag)
                    return ;
                stop_flag= false;
                stop_mutex.unlock();
            }
    };
}

 期待下一段時間,嫌棄自己寫的代碼,批判自己的疏忽。

參考文獻 深入應用C++11 代碼優化與工程級應用


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM