boost.ASIO-可能是下一代C++標准的網絡庫


曾幾何時,Boost中有一個Socket庫,但后來沒有了下文,C++社區一直在翹首盼望一個標准網絡庫的出現,網絡上開源的網絡庫也有不少,例如Apache Portable Runtime就是比較著名的一個,也有像ACE這樣重量級的網絡框架。
去年,Boost將ASIO納入了自己的體系,由於Boost的影響力,ASIO有機會成為標准網絡庫。作者Chris Kohlhoff以ASIO為樣本向C++標准委員會提交了一個網絡庫建議書,里面提到:
ASIO的覆蓋范圍:
 Networking using TCP and UDP, including support for multicast.
 Client and server applications.
 Scalability to handle many concurrent connections.
 Protocol independence between IPv4 and IPv6.
 Name resolution (i.e. DNS).
 Timers.
不在ASIO考慮范圍之內的:
 Protocol implementations such as HTTP, SMTP or FTP.
 Encryption (e.g. SSL, TLS).
 Operating system specific demultiplexing APIs.
 Support for realtime environments.
 QoS-enabled sockets.
 Other TCP/IP protocols such as ICMP.
 Functions and classes for enumerating network interfaces.
Boost.Asio支持以下平台:
 Win32 using Visual C++ 7.1 and Visual C++ 8.0.
 Win32 using Borland C++Builder 6 patch 4.
 Win32 using MinGW.
 Win32 using Cygwin.
 Linux (2.4 or 2.6 kernels) using g++ 3.3 or later.
 Solaris using g++ 3.3 or later.
 Mac OS X 10.4 using g++ 3.3 or later.
 QNX Neutrino 6.3 using g++ 3.3 or later.
 FreeBSD using g++ 3.3 or later.
參考ACE的Proactor模式,ASIO采用異步通訊機制,同時參考了Symbian C++ sockets API、Microsoft .NET socket classes和Open Group的Extended Sockets API。

 

usidc5 2011-01-18 23:01

Asio 是一個跨平台的C++開發包用來處理網絡和低級I/O編程,通過先進的C++方法為開發人員提供連續異步模型。
示例代碼:
  void handle_read(const asio::error_code& error,
      size_t bytes_transferred)
  {
    if (!error)
    {
      asio::async_write(socket_,
          asio::buffer(data_, bytes_transferred),
          make_custom_alloc_handler(allocator_,
            boost::bind(&session::handle_write,
              shared_from_this(),
              asio::placeholders::error)));
    }
  }

  void handle_write(const asio::error_code& error)
  {
    if (!error)
    {
      socket_.async_read_some(asio::buffer(data_),
          make_custom_alloc_handler(allocator_,
            boost::bind(&session::handle_read,
              shared_from_this(),
              asio::placeholders::error,
              asio::placeholders::bytes_transferred)));
    }
  }

 

usidc5 2011-06-25 15:36
boost真是個好東西,每次去逛總會有驚喜。這次的驚喜是發現了asio,一個跨平台的支持異步I/O的網絡通訊socket庫。

異步I/O是一種高效的I/O模型,在Windows平台下這種機制的代表就是IOCP完成端口模型。事實上,asio在Windows平台上的實現就是對IOCP的封裝。

其實在網絡通訊這一塊,已經有許多成熟的框架了,最典型的就是ACE,一個網絡通訊設計模式的集大成者。但ACE對我來說太重型了,而且其起源於90年代,與標准庫的集成不是太好,比如ACE就有自己的容器類。。。總而言之,ACE是一個龐然大物,威力無窮,但也顯得比較笨重。

C++發展到現在,庫的設計風格也越來越趨向於泛型,boost就是一個典型,而且boost社區跟C++標准委員會的密切關系,使得進入boost的程序庫有更大的機會加入下一代的C++標准庫。

因此不管從設計風格(我不否認我也喜歡追時髦;)),還是從功利的角度看,學習asio都是一筆不錯的投資。

學習她,首先要安裝她。asio要求首先安裝boost,下面我把自己的安裝過程描述一遍,這確實還是頗費心思的。

首先要先安裝boost,這可是一個漫長而又炎熱的夏天。。。萬幸的是我以前已經裝過了,嘿嘿。我裝的是boost_1_33_0,為了完整說明,我這里也簡單列了下boost的安裝步驟,這也是從網上找來的。

step1.從www.boost.org下載boost庫

step2 在 tools\build\jam_src目錄下 運行build.bat來生成jam

step3 設置環境變量(后面的%PATH%要加)

PATH=%boost的絕對路徑%\tools\build\jam_src\bin.ntx86;%PATH% 
PATH=%boost的絕對路徑%;%PATH%

For Visial Studio 6.0 
SET MSVC_ROOT="VC6的安裝路徑" 
SET VISUALC="VC6的安裝路徑" 
Example: 
SET MSVC_ROOT="c:\Program Files\Microsoft Visual Studio\VC98"

For Visual Studio.net 
SET VC7_ROOT="vs.NET安裝路徑" 
Example: 
SET VC7_ROOT="C:\Program Files\Microsoft Visual Studio .NET\VC7"

For Visual Studio.net 2003 
SET VC71_ROOT="vs.NET2003安裝路徑" 
Example: 
set VC71_ROOT="C:\Program Files\Microsoft Visual Studio .NET 2003\Vc7"

step 4 編譯boost庫 
bjam "-sTOOLS=%編譯器%" install
Visual Studio 6.0 %編譯器%=msvc 
Visual Studio .NET %編譯器%=vc7 
Visual Studio .NET 2003 %編譯器%=vc-7_1

我用的是VC7.1,照着這個指示,當時編譯了好久才完成。不過我在最后一步時,忘了加上install。這也沒什么,你可以在boost下新建一個lib文件夾,然后把bin目錄下所有的庫文件都拷貝進來,然后在你的編譯器里進行適當的路徑設置(頭文件路徑,庫文件路徑),就可以使用boost了。

安裝好boost后(我裝在了E:\boost_1_33_0),就可以安裝asio了,先去http://asio.sourceforge.net下載,現在是最新版本0.3.8。
注意下載帶有boost前綴(boost_asio_0_3_8rc2.zip)的zip文件,解開后可以看到兩個目錄:boost和libs。把boost里面的所有文件拷貝到E:\boost_1_33_0\boost下面,注意里面有個detail目錄不能直接覆蓋,而是要把其中的文件(identifier.hpp)拷貝到E:\boost_1_33_0\boost\detail中去;同樣把libs里面的文件夾都拷貝到E:\boost_1_33_0\libs下,就可以了。

好了,接下來就到了激動人心的時刻,讓我們來開始編譯asio示例,我找了個asio自帶的最簡單的例子,關於同步定時器的,5秒后超時打印Hello, world!

#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

int main()
{
       boost::asio::io_service io;

       boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
       t.wait();

       std::cout << "Hello, world!\n";

       return 0;
}

先別管具體的代碼,開始編譯。。。果然不出所料,哪有那么順暢的事情;)

正在編譯...
main.cpp
e:\boost_1_33_0\boost\asio\detail\push_options.hpp(103) : fatal error C1189: #error :       Multithreaded RTL must be selected.

哦,原來需要多線程庫,這好辦,在項目屬性里:配置屬性 -> C/C++ -> 代碼生成 -> 運行時庫 ->多線程調試(/MTd),再試一下。

正在編譯...
main.cpp
Please define _WIN32_WINNT or _WIN32_WINDOWS appropriately
Assuming _WIN32_WINNT=0x0500 (i.e. Windows 2000 target)
正在鏈接...
LINK : fatal error LNK1104: 無法打開文件“libboost_system-vc71-mt-sgd-1_33.lib”

好家伙,一下出來倆。第一個是windows版本支持問題,我在項目屬性里把宏_WIN32_WINNT加到預處理器中,現在編譯通過了,但鏈接還是不行:

正在鏈接...
LINK : fatal error LNK1104: 無法打開文件“libboost_system-vc71-mt-sgd-1_33.lib”

找不到system庫。我納悶了一會,因為asio主頁上明明寫着大部分功能只需要boost頭文件即可。因此我又照着asio主頁上的說明,把_BOOST_ALL_NO_LIB也加到預處理器中去,但還是不行。

后來我又到下載的文件中去找,發現system是asio自帶的一個庫,要想使用asio,就必須先編譯這個庫,OMG~

我還沒單獨編譯過boost的一個庫,因此又去網上找了找,終於找到了,原來也不是很難。基本步驟還是跟編譯整個boost一樣,只不過在最后一步時,要換成這樣:

bjam "-sTOOLS=%編譯器%" --with-system install

就可以編譯system庫了。最后檢查下編譯器的頭文件和庫文件路徑是否正確,再重新試一遍,終於大功告成!

我懷着欣喜的心情開始測試asio自帶的tutorial程序,前面幾個關於定時器的運行的很正常,但到了后來測試daytime1的時候,鏈接又有問題了。

正在編譯...
main.cpp
f:\My-SmartWin-Demo\asio_demo\main.cpp(56) : warning C4267: “參數” : 從“size_t”轉換到“std::streamsize”,可能丟失數據
正在鏈接...
main.obj : error LNK2019: 無法解析的外部符號 "public: static class boost::system::error_category __cdecl boost::system::error_code::new_category(int (__cdecl*)(class boost::system::error_code const &),class std::basic_string<char,struct std::char_traits<char>,class std::allocator<char> > (__cdecl*)(class boost::system::error_code const &),class std::basic_string<unsigned short,struct std::char_traits<unsigned short>,class std::allocator<unsigned short> > (__cdecl*)(class boost::system::error_code const &))" (?new_category@error_code@system@boost@@SA?AVerror_category@23@P6AHABV123@@ZP6A?AV?$basic_string@DU?$char_traits@D@std@@V?$allocator@D@2@@std@@0@ZP6A?AV?$basic_string@GU?$char_traits@G@std@@V?$allocator@G@2@@6@0@Z@Z) ,該符號在函數 _$E47 中被引用
Debug/asio_demo.exe : fatal error LNK1120: 1 個無法解析的外部命令

前面那個警告不管,后面鏈接時說一個static函數new_category(...)只有聲明沒有實現,我找了下,這個函數在system庫里error_code.hpp中有聲明,error_code.cpp也有實現,而且明明system庫我也編譯成功,並加入相關路徑中,怎么還是會出錯?

郁悶了半天,后來干脆把error_code.cpp加入到daytime1工程中一起編譯,終於徹底搞定了。

真是TMD不容易啊。

 

usidc5 2011-06-25 15:39

對於一個網絡程序的服務器端我們需要提供的是服務器的address,和服務開放的端口號port。
在asio庫中首先我們必須使用一個io_service類來支持所有的IO功能。需要注意到是我們必須調用io_service_my.run()函數來開啟IO服務的事件循環以使功能都能被正常使用。
boost::asio::io_service io_service_my;
現在我們可以基於這個io_service_my來關聯構建一下幾個類:
1. boost::asio::ip::tcp::acceptor acceptor_my(io_service_my); 
因為LPD的實現是基於TCP傳輸協議,所以也使用了TCP的acceptor來接收client發來的連接。
2.  boost::asio::ip::tcp::resolver resolver_my(io_service_my);
boost::asio::ip::tcp::resolver::query query_my(address,port);
boost::asio::ip::tcp::endpoint endpoint_my = *resolver.resolve(query_my);
這幾個類主要是用來實現對地址的解析和綁定終端節點到相應的開放端口號上。首先構造一個關聯到io_service_my的解析器resolver_my。然后讓解析器resolver_my執行resolve
()函數來解析query_my指定的address和port到一個終端節點endpoint_my上。我們會看到這個endpoint_my終端節點會被綁定到這個acceptor_my接收器上。
3. boost::asio::ip::tcp::socket socket_my(io_service_my);
定義一個基於TCP協議的socket關聯到io_service_my對象上。
在這些准備工作做完后我們開始一些實際的動作:
/*
* 打開一個使用由endpoint_my指定的協議類型的接收器,這個protocol()函數會自動返回與endpoint_my關聯的協
* 議類型。
*/
acceptor_my.open(endpoint_my.protocol());

/*
* 設置選項允許socket綁定到一個已經正在被使用的地址。
*/
acceptor_my.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true);

/*
* 把接收器綁定到已經被設置過的endpoint_my。
*/
acceptor_my.bind(endpoint_my);

/*
* 接收器開始偵聽。
*/
acceptor_my.listen();

/*
* 以同步/異步方式開始接收連接。
*/
acceptor_my.accept(socket_my) //同步
acceptor_my.async_accept(socket_my,
  boost::bind(&handle_accept,boost::asio::placeholders::error));//異步
其中異步的偵聽函數原型是:
template<
    typename SocketService,
    typename AcceptHandler>
void async_accept(
    basic_socket< protocol_type, SocketService > & peer,
    AcceptHandler handler);
handler所對應的函數在新連接接收完成后會被調用。這種異步方式實現回調的方法也類似於使用boost::asio::io_service::post(boost::bind(&handle_accept));
注意到bind函數中的&handle_accept,這是函數handle_accept的入口地址,也就是在接收完成后會調用的函數在這里我們可以繼續進行下一步的處理,從socket_my中讀取或者寫
入數據。

/*
* 調用異步讀函數,把接收的數據拷貝到buffer緩存中,其中buffer是由參數buffer_my構造,
* 而buffer_my本身可以是一個容器例如boost::array<char,8192> buffer_my,表示申請了
* 一個8K個char字符型空間。也可以使用例外一種方法實現buffer的構造例如
* boost::asio::buffer(data,size_t);其中data表示指向某種數據類型的指針,
* size_t則表示包含多少個該數據類型的元素。
*/
void handle_accept(boost::system::error_code error)
{
if(!error)
{
  boost::array<char, 8192> buffer_my;
  boost::asio::async_read(socket_my, boost::asio::buffer(buffer_my),
   boost::bind(&handle_read, boost::asio::placeholders::error));

}
類似的寫程序如下
boost::asio::async_write(socket_my, boost::asio::buffer(buffer_my),
   boost::bind(&handle_write, 
     boost::asio::placeholders::error,
     boost::asio::placeholders::bytes_transferred));

最后在所有連接完成之后或是服務器停止的時候別忘記關掉連接。例如
socket_my.close();
acceptor_my.close();
至此一個基於boost::asio庫的網絡程序的框架就出來了,至於具體的設計類實現可以視需求而定。

 

usidc5 2011-06-25 15:40
摘要:本文通過形像而活潑的語言簡單地介紹了Boost::asio庫的使用,作為asio的一個入門介紹是非常合適的,可以給人一種新鮮的感覺,同時也能讓體驗到asio的主要內容。本文來自網絡,原文在這里。


目錄 [隱藏]
ASIO的同步方式
自我介紹
示例代碼
小結
ASIO的異步方式
自我介紹
示例代碼
小結
ASIO的“便民措施”
端點
超時
統一讀寫接口
基於流的操作
用ASIO編寫UDP通信程序
用ASIO讀寫串行口
演示代碼
Boost.Asio是一個跨平台的網絡及底層IO的C++編程庫,它使用現代C++手法實現了統一的異步調用模型。


ASIO的同步方式
ASIO庫能夠使用TCP、UDP、ICMP、串口來發送/接收數據,下面先介紹TCP協議的讀寫操作。對於讀寫方式,ASIO支持同步和異步兩種方式,首先登場的是同步方式,下面請同步方式自我介紹一下。


自我介紹
大家好!我是同步方式!


我的主要特點就是執着!所有的操作都要完成或出錯才會返回,不過偶的執着被大家稱之為阻塞,實在是郁悶~~(場下一片噓聲),其實這樣 也是有好處的,比如邏輯清晰,編程比較容易。


在服務器端,我會做個socket交給acceptor對象,讓它一直等客戶端連進來,連上以后再通過這個socket與客戶端通信, 而所有的通信都是以阻塞方式進行的,讀完或寫完才會返回。


在客戶端也一樣,這時我會拿着socket去連接服務器,當然也是連上或出錯了才返回,最后也是以阻塞的方式和服務器通信。


有人認為同步方式沒有異步方式高效,其實這是片面的理解。在單線程的情況下可能確實如此,我不能利用耗時的網絡操作這段時間做別的事 情,不是好的統籌方法。不過這個問題可以通過多線程來避免,比如在服務器端讓其中一個線程負責等待客戶端連接,連接進來后把socket交給另外的線程去 和客戶端通信,這樣與一個客戶端通信的同時也能接受其它客戶端的連接,主線程也完全被解放了出來。


我的介紹就有這里,謝謝大家!


示例代碼
好,感謝同步方式的自我介紹,現在放出同步方式的演示代碼(起立鼓掌!)。


服務器端


#include <iostream>
#include <boost/asio.hpp>


int main(int argc, char* argv[])
{
        using namespace boost::asio;
        // 所有asio類都需要io_service對象
        io_service iosev;
        ip::tcp::acceptor acceptor(iosev, 
        ip::tcp::endpoint(ip::tcp::v4(), 1000));
        for(;;)
        {
                // socket對象
                ip::tcp::socket socket(iosev);
                // 等待直到客戶端連接進來
                acceptor.accept(socket);
                // 顯示連接進來的客戶端
                std::cout << socket.remote_endpoint().address() << std::endl;
                // 向客戶端發送hello world!
                boost::system::error_code ec;
                socket.write_some(buffer("hello world!"), ec);


                // 如果出錯,打印出錯信息
                if(ec)
                {
                        std::cout << 
                                boost::system::system_error(ec).what() << std::endl;
                        break;
                }
                // 與當前客戶交互完成后循環繼續等待下一客戶連接
        }
        return 0;
}
客戶端


#include <iostream>
#include <boost/asio.hpp>


int main(int argc, char* argv[])
{
        using namespace boost::asio;


        // 所有asio類都需要io_service對象
        io_service iosev;
        // socket對象
        ip::tcp::socket socket(iosev);
        // 連接端點,這里使用了本機連接,可以修改IP地址測試遠程連接
        ip::tcp::endpoint ep(ip::address_v4::from_string("127.0.0.1"), 1000);
        // 連接服務器
        boost::system::error_code ec;
        socket.connect(ep,ec);
        // 如果出錯,打印出錯信息
        if(ec)
        {
                std::cout << boost::system::system_error(ec).what() << std::endl;
                return -1;
        }
        // 接收數據
        char buf[100];
        size_t len=socket.read_some(buffer(buf), ec);
        std::cout.write(buf, len);


        return 0;
}
小結
從演示代碼可以得知


ASIO的TCP協議通過boost::asio::ip名 空間下的tcp類進行通信。
IP地址(address,address_v4,address_v6)、 端口號和協議版本組成一個端點(tcp:: endpoint)。用於在服務器端生成tcp::acceptor對 象,並在指定端口上等待連接;或者在客戶端連接到指定地址的服務器上。
socket是 服務器與客戶端通信的橋梁,連接成功后所有的讀寫都是通過socket對 象實現的,當socket析 構后,連接自動斷 開。
ASIO讀寫所用的緩沖區用buffer函 數生成,這個函數生成的是一個ASIO內部使用的緩沖區類,它能把數組、指針(同時指定大 小)、std::vector、std::string、boost::array包裝成緩沖區類。
ASIO中的函數、類方法都接受一個boost::system::error_code類 型的數據,用於提供出錯碼。它可以轉換成bool測試是否出錯,並通過boost::system::system_error類 獲得詳細的出錯信息。另外,也可以不向ASIO的函數或方法提供 boost::system::error_code,這時如果出錯的話就會直 接拋出異常,異常類型就是boost::system:: system_error(它是從std::runtime_error繼承的)。
ASIO的異步方式
嗯?異步方式好像有點坐不住了,那就請異步方式上場,大家歡迎...


自我介紹
大家好,我是異步方式


和同步方式不同,我從來不花時間去等那些龜速的IO操作,我只是向系統說一聲要做什么,然后就可以做其它事去了。如果系統完成了操作, 系統就會通過我之前給它的回調對象來通知我。


在ASIO庫中,異步方式的函數或方法名稱前面都有“async_” 前綴,函數參數里會要求放一個回調函數(或仿函數)。異步操作執行 后不管有沒有完成都會立即返回,這時可以做一些其它事,直到回調函數(或仿函數)被調用,說明異步操作已經完成。


在ASIO中很多回調函數都只接受一個boost::system::error_code參數,在實際使用時肯定是不夠的,所以一般 使用仿函數攜帶一堆相關數據作為回調,或者使用boost::bind來綁定一堆數據。


另外要注意的是,只有io_service類的run()方法運行之后回調對象才會被調用,否則即使系統已經完成了異步操作也不會有任 務動作。


示例代碼
好了,就介紹到這里,下面是我帶來的異步方式TCP Helloworld 服務器端:


#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/smart_ptr.hpp>


using namespace boost::asio;
using boost::system::error_code;
using ip::tcp;


struct CHelloWorld_Service
{
        CHelloWorld_Service(io_service &iosev)
                :m_iosev(iosev),m_acceptor(iosev, tcp::endpoint(tcp::v4(), 1000))
        {}


        void start()
        {
                // 開始等待連接(非阻塞)
                boost::shared_ptr<tcp::socket> psocket(new tcp::socket(m_iosev));
                // 觸發的事件只有error_code參數,所以用boost::bind把socket綁定進去
                m_acceptor.async_accept(*psocket,
                        boost::bind(&CHelloWorld_Service::accept_handler, this, psocket, _1));
        }


        // 有客戶端連接時accept_handler觸發
        void accept_handler(boost::shared_ptr<tcp::socket> psocket, error_code ec)
        {
                if(ec) return;
                // 繼續等待連接
                start();
                // 顯示遠程IP
                std::cout << psocket->remote_endpoint().address() << std::endl;
                // 發送信息(非阻塞)
                boost::shared_ptr<std::string> pstr(new std::string("hello async world!"));
                psocket->async_write_some(buffer(*pstr),
                        boost::bind(&CHelloWorld_Service::write_handler, this, pstr, _1, _2));
        }


        // 異步寫操作完成后write_handler觸發
        void write_handler(boost::shared_ptr<std::string> pstr, error_code ec,
                size_t bytes_transferred)
        {
                if(ec)
                std::cout<< "發送失敗!" << std::endl;
                else
                std::cout<< *pstr << " 已發送" << std::endl;
        }


        private:
                io_service &m_iosev;
                ip::tcp::acceptor m_acceptor;
};


int main(int argc, char* argv[])
{
        io_service iosev;
        CHelloWorld_Service sev(iosev);
        // 開始等待連接
        sev.start();
        iosev.run();


        return 0;
}
小結
在這個例子中,首先調用sev.start()開 始接受客戶端連接。由於async_accept調 用后立即返回,start()方 法 也就馬上完成了。sev.start()在 瞬間返回后iosev.run()開 始執行,iosev.run()方法是一個循環,負責分發異步回調事件,只 有所有異步操作全部完成才會返回。


這里有個問題,就是要保證start()方法中m_acceptor.async_accept操 作所用的tcp::socket對 象 在整個異步操作期間保持有效(不 然系統底層異步操作了一半突然發現tcp::socket沒了,不是拿人家開涮嘛-_-!!!),而且客戶端連接進來后這個tcp::socket對象還 有用呢。這里的解決辦法是使用一個帶計數的智能指針boost::shared_ptr,並把這個指針作為參數綁定到回調函數上。


一旦有客戶連接,我們在start()里給的回調函數accept_handler就會被 調用,首先調用start()繼續異步等待其 它客戶端的連接,然后使用綁定進來的tcp::socket對象與當前客戶端通信。


發送數據也使用了異步方式(async_write_some), 同樣要保證在整個異步發送期間緩沖區的有效性,所以也用boost::bind綁定了boost::shared_ptr。


對於客戶端也一樣,在connect和read_some方法前加一個async_前綴,然后加入回調即可,大家自己練習寫一寫。


ASIO的“便民措施”
asio中提供一些便利功能,如此可以實現許多方便的操作。


端點
回到前面的客戶端代碼,客戶端的連接很簡單,主要代碼就是兩行:


...
// 連接
socket.connect(endpoint,ec);
...
// 通信
socket.read_some(buffer(buf), ec);
不過連接之前我們必須得到連接端點endpoint,也就是服務器地址、端口號以及所用的協議版本。


前面的客戶端代碼假設了服務器使用IPv4協議,服務器IP地址為127.0.0.1,端口號為1000。實際使用的情況是,我們經常只能知道服務器網絡ID,提供的服務類型,這時我們就得使用ASIO提供的tcp::resolver類來取得服務器的端點了。





比如我們要取得163網站的首頁,首先就要得到“www.163.com”服務器的HTTP端點:


io_service iosev;
ip::tcp::resolver res(iosev);
ip::tcp::resolver::query query("www.163.com","80"); //www.163.com 80端口
ip::tcp::resolver::iterator itr_endpoint = res.resolve(query);
這里的itr_endpoint是一個endpoint的迭代器,服務器的同一端口上可能不止一個端點,比如同時有IPv4和IPv6 兩種。現在,遍歷這些端點,找到可用的:


// 接上面代碼
ip::tcp::resolver::iterator itr_end; //無參數構造生成end迭代器
ip::tcp::socket socket(iosev);
boost::system::error_code ec = error::host_not_found;
for(;ec && itr_endpoint!=itr_end;++itr_endpoint)
{
        socket.close();
        socket.connect(*itr_endpoint, ec);
}
如果連接上,錯誤碼ec被清空,我們就可以與服務器通信了:


if(ec)
{
        std::cout << boost::system::system_error(ec).what() << std::endl;
        return -1;
}
// HTTP協議,取根路徑HTTP源碼
socket.write_some(buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
for(;;)
{
        char buf[128];
        boost::system::error_code error;
        size_t len = socket.read_some(buffer(buf), error);
        // 循環取數據,直到取完為止
        if(error == error::eof)
        break;
        else if(error)
        {
                std::cout << boost::system::system_error(error).what() << std::endl;
                return -1;
        }


        std::cout.write(buf, len);
}
當所有HTTP源碼下載了以后,服務器會主動斷開連接,這時客戶端的錯誤碼得到boost::asio::error::eof,我們 要根據它來判定是否跳出循環。


ip::tcp::resolver::query的構造函數接受服務器名和服務名。前面的服務名我們直接使用了端口號"80",有時 我們也可以使用別名,用記事本打開%windir%\system32\drivers\etc\services文件(Windows環境),可以看到 一堆別名及對應的端口,如:


echo           7/tcp                 # Echo
ftp           21/tcp                 # File Transfer Protocol (Control)
telnet        23/tcp                 # Virtual Terminal Protocol
smtp          25/tcp                 # Simple Mail Transfer Protocol
time          37/tcp  timeserver     # Time
比如要連接163網站的telnet端口(如果有的話),可以這樣寫:


ip::tcp::resolver::query query("www.163.com","telnet");
ip::tcp::resolver::iterator itr_endpoint = res.resolve(query);
超時
在網絡應用里,常常要考慮超時的問題,不然連接后半天沒反應誰也受不了。


ASIO庫提供了deadline_timer類來支持定時觸發,它的用法是:


// 定義定時回調
void print(const boost::system::error_code& /*e*/)
{
        std::cout << "Hello, world! ";
}

deadline_timer timer;
// 設置5秒后觸發回調
timer.expires_from_now(boost::posix_time::seconds(5));
timer.async_wait(print);
這段代碼執行后5秒鍾時打印Hello World!


我們可以利用這種定時機制和異步連接方式來實現超時取消:


deadline_timer timer;
// 異步連接
socket.async_connect(my_endpoint, connect_handler/*連接回調*/);
// 設置超時
timer.expires_from_now(boost::posix_time::seconds(5));
timer.async_wait(timer_handler);
...
// 超時發生時關閉socket
void timer_handler()
{
        socket.close();
}
最后不要忘了io_service的run()方法。


統一讀寫接口
除了前面例子所用的tcp::socket讀寫方法(read_some, write_some等)以外,ASIO也提供了幾個讀寫函數,主要有這么幾個:


read、write、read_until、write_until
當然還有異步版本的


async_read、async_write、async_read_until、async_write_until
這些函數可以以統一的方式讀寫TCP、串口、HANDLE等類型的數據流。


我們前面的HTTP客戶端代碼可以這樣改寫:


...
//socket.write_some(buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
write(socket,buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
...
//size_t len = socket.read_some(buffer(buf), error);
size_t len = read(socket, buffer(buf), transfer_all() ,error);
if(len) std::cout.write(buf, len);
這個read和write有多個重載,同樣,有錯誤碼參數的不會拋出異常而無錯誤碼參數的若出錯則拋出異常。


本例中read函數里的transfer_all()是一個稱為CompletionCondition的對象,表示讀取/寫入直接緩 沖區裝滿或出錯為止。另一個可選的是transfer_at_least(size_t),表示至少要讀取/寫入多少個字符。


read_until和write_until用於讀取直到某個條件滿足為止,它接受的參數不再是buffer,而是boost::asio:: streambuf。


比如我們可以把我們的HTTP客戶端代碼改成這樣:


boost::asio::streambuf strmbuf;
size_t len = read_until(socket,strmbuf," ",error);
std::istream is(&strmbuf);
is.unsetf(std::ios_base::skipws);
// 顯示is流里的內容
std::copy(std::istream_iterator<char>(is),
    std::istream_iterator<char>(),
    std::ostream_iterator<char>(std::cout));
基於流的操作
對於TCP協議來說,ASIO還提供了一個tcp::iostream。用它可以更簡單地實現我們的HTTP客戶端:


ip::tcp::iostream stream("www.163.com", "80");
if(stream)
{
        // 發送數據
        stream << "GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 ";
        // 不要忽略空白字符
        stream.unsetf(std::ios_base::skipws);
        // 顯示stream流里的內容
        std::copy(std::istream_iterator<char>(stream),
        std::istream_iterator<char>(),
        std::ostream_iterator<char>(std::cout));
}
用ASIO編寫UDP通信程序
ASIO的TCP協議通過boost::asio::ip名空間下的tcp類進行通信,舉一返三:ASIO的UDP協議通過boost::asio::ip名空間下的udp類進行通信。


我們知道UDP是基於數據報模式的,所以事先不需要建立連接。就象寄信一樣,要寄給誰只要寫上地址往門口的郵箱一丟,其它的事各級郵局 包辦;要收信用只要看看自家信箱里有沒有信件就行(或問門口傳達室老大爺)。在ASIO里,就是udp::socket的send_to和receive_from方法(異步版本是async_send_to和asnync_receive_from)。


下面的示例代碼是從ASIO官方文檔里拿來的(實在想不出更好的例子了:-P):


服務器端代碼


//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2008 Christopher M. Kohlhoff 
// (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. 
// (See accompanying
// file LICENSE_1_0.txt or 
// copy at <a href="http://www.boost.org/LICENSE_1_0.txt" title="http://www.boost.org/LICENSE_1_0.txt">http://www.boost.org/LICENSE_1_0.txt</a>)
//


#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/asio.hpp>


using boost::asio::ip::udp;


std::string make_daytime_string()
{
        using namespace std; // For time_t, time and ctime;
        time_t now = time(0);
        return ctime(&now);
}


int main()
{
        try
        {
                boost::asio::io_service io_service;
                // 在本機13端口建立一個socket
                udp::socket socket(io_service, udp::endpoint(udp::v4(), 13));


                for (;;)
                {
                        boost::array<char, 1> recv_buf;
                        udp::endpoint remote_endpoint;
                        boost::system::error_code error;
                        // 接收一個字符,這樣就得到了遠程端點(remote_endpoint)
                        socket.receive_from(boost::asio::buffer(recv_buf),
                        remote_endpoint, 0, error);


                        if (error && error != boost::asio::error::message_size)
                                throw boost::system::system_error(error);


                        std::string message = make_daytime_string();
                        // 向遠程端點發送字符串message(當前時間)    
                        boost::system::error_code ignored_error;
                        socket.send_to(boost::asio::buffer(message),
                        remote_endpoint, 0, ignored_error);
                }
        }
        catch (std::exception& e)
        {
                std::cerr << e.what() << std::endl;
        }


        return 0;
}
客戶端代碼


//
// client.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2008 Christopher M. Kohlhoff
// (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. 
// (See accompanying file LICENSE_1_0.txt or
//  copy at <a href="http://www.boost.org/LICENSE_1_0.txt" title="http://www.boost.org/LICENSE_1_0.txt">http://www.boost.org/LICENSE_1_0.txt</a>)
//


#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>


using boost::asio::ip::udp;


int main(int argc, char* argv[])
{
        try
        {
                if (argc != 2)
                {
                        std::cerr << "Usage: client <host>" << std::endl;
                        return 1;
                }


                boost::asio::io_service io_service;
                // 取得命令行參數對應的服務器端點
                udp::resolver resolver(io_service);
                udp::resolver::query query(udp::v4(), argv[1], "daytime");
                udp::endpoint receiver_endpoint = *resolver.resolve(query);


                udp::socket socket(io_service);
                socket.open(udp::v4());
                // 發送一個字節給服務器,讓服務器知道我們的地址
                boost::array<char, 1> send_buf  = { 0 };
                socket.send_to(boost::asio::buffer(send_buf), receiver_endpoint);
                // 接收服務器發來的數據
                boost::array<char, 128> recv_buf;
                udp::endpoint sender_endpoint;
                size_t len = socket.receive_from(
                boost::asio::buffer(recv_buf), sender_endpoint);


                std::cout.write(recv_buf.data(), len);
        }
        catch (std::exception& e)
        {
                std::cerr << e.what() << std::endl;
        }


        return 0;
}
用ASIO讀寫串行口
ASIO不僅支持網絡通信,還能支持串口通信。要讓兩個設備使用串口通信,關鍵是要設置好正確的參數,這些參數是:波特率、奇偶校驗 位、停止位、字符大小和流量控制。兩個串口設備只有設置了相同的參數才能互相交談。


ASIO提供了boost::asio::serial_port類,它有一個set_option(const SettableSerialPortOption& option)方法就是用於設置上面列舉的這些參數的,其中的option可以是:


serial_port::baud_rate 波特率,構造參數為unsigned int
serial_port::parity 奇偶校驗,構造參數為serial_port::parity::type,enum類型,可以是none, odd, even。
serial_port::flow_control 流量控制,構造參數為serial_port::flow_control::type,enum類型,可以是none software hardware
serial_port::stop_bits 停止位,構造參數為serial_port::stop_bits::type,enum類型,可以是one onepointfive two
serial_port::character_size 字符大小,構造參數為unsigned int
演示代碼
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>


using namespace std;
using namespace boost::asio;


int main(int argc, char* argv[])
{
        io_service iosev;
        // 串口COM1, Linux下為“/dev/ttyS0”
        serial_port sp(iosev, "COM1");
        // 設置參數
        sp.set_option(serial_port::baud_rate(19200));
        sp.set_option(serial_port::flow_control(serial_port::flow_control::none));
        sp.set_option(serial_port::parity(serial_port::parity::none));
        sp.set_option(serial_port::stop_bits(serial_port::stop_bits::one));
        sp.set_option(serial_port::character_size(8));
        // 向串口寫數據
        write(sp, buffer("Hello world", 12));


        // 向串口讀數據
        char buf[100];
        read(sp, buffer(buf));


        iosev.run();
        return 0;
}
上面這段代碼有個問題,read(sp, buffer(buf))非得讀滿100個字符才會返回,串口通信有時我們確實能知道對方發過來的字符長度,有時候是不能的。


如果知道對方發過來的數據里有分隔符的話(比如空格作為分隔),可以使用read_until來讀,比如:


boost::asio::streambuf buf;
// 一直讀到遇到空格為止
read_until(sp, buf, ' ');
copy(istream_iterator<char>(istream(&buf)>>noskipws),
        istream_iterator<char>(),
        ostream_iterator<char>(cout));
另外一個方法是使用前面說過的異步讀寫+超時的方式,代碼如下:


#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>


using namespace std;
using namespace boost::asio;
void handle_read(char *buf,boost::system::error_code ec,
std::size_t bytes_transferred)
{
        cout.write(buf, bytes_transferred);
}


int main(int argc, char* argv[])
{
        io_service iosev;
        serial_port sp(iosev, "COM1");
        sp.set_option(serial_port::baud_rate(19200));
        sp.set_option(serial_port::flow_control());
        sp.set_option(serial_port::parity());
        sp.set_option(serial_port::stop_bits());
        sp.set_option(serial_port::character_size(8));


        write(sp, buffer("Hello world", 12));


        // 異步讀
        char buf[100];
        async_read(sp, buffer(buf), boost::bind(handle_read, buf, _1, _2));
        // 100ms后超時
        deadline_timer timer(iosev);
        timer.expires_from_now(boost::posix_time::millisec(100));
        // 超時后調用sp的cancel()方法放棄讀取更多字符
        timer.async_wait(boost::bind(&serial_port::cancel, boost::ref(sp)));


        iosev.run();
        return 0;
}


 

usidc5 2011-07-08 18:32

asio自帶的例子里是用deadline_timer的async_wait方法來實現超時的,這種方法需要單獨寫一個回調函數,不利於把連接和超時封裝到單個函數里。傳統的Winsock編程可以先把socket設為非阻塞,然后connect,再用select來判斷超時,asio也可以這樣做,唯一“非主流”的是asio里沒有一個類似select的函數,所以得調用原始的Winsock API,也就犧牲了跨平台:

  1. #include <iostream>  
  2. #include <boost/asio.hpp>  
  3.    
  4. int main()  
  5. {  
  6.     boost::asio::io_service ios;  
  7.     boost::asio::ip::tcp::socket s(ios);  
  8.     boost::system::error_code ec;  
  9.    
  10.     s.open(boost::asio::ip::tcp::v4());  
  11.     // 設為非阻塞  
  12.     s.io_control(boost::asio::ip::tcp::socket::non_blocking_io(true));  
  13.     // connect時必須指定error_code參數,否則會有異常拋出  
  14.     s.connect(  
  15.         boost::asio::ip::tcp::endpoint(  
  16.         boost::asio::ip::address::from_string("192.168.1.1"), 80)  
  17.         , ec);  
  18.     fd_set fdWrite;  
  19.     FD_ZERO(&fdWrite);  
  20.     FD_SET(s.native(), &fdWrite);  
  21.     timeval tv = {5};    // 5秒超時  
  22.     if (select(0, NULL, &fdWrite, NULL, &tv) <= 0   
  23.         || !FD_ISSET(s.native(), &fdWrite))  
  24.     {  
  25.         std::cout << "超時/出錯啦" << std::endl;  
  26.         s.close();  
  27.         return 0;  
  28.     }  
  29.     // 設回阻塞  
  30.     s.io_control(boost::asio::ip::tcp::socket::non_blocking_io(false));  
  31.     std::cout << "連接成功" << std::endl;  
  32.     s.close();  
  33.    
  34.     return 0;  

 

usidc5 2011-07-08 18:34

所有的 asio 類都只要包含頭文件:   "asio.hpp"


例子1:   使用一個同步的定時器

#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>   //使用時間間隔
int main()
{
//所有使用 asio 的程序都必須至少擁有一個 io_service 類型的對象. 
//它提供 I/O 功能. 
boost::asio::io_service io;
//創建一個定時器 deadline_timer 的對象. 
//這種定時器有兩種狀態: 超時 和 未超時.
//在超時的狀態下. 調用它的 wait() 函數會立即返回. 
//在未超時的情況下則阻塞. 直到變為超時狀態.
//它的構造函數參數為: 一個 io_service 對象(asio中主要提供IO的類都用io_service對象做構造函數第一個參數).
//                     超時時間.
//從創建開始. 它就進入 "未超時"狀態. 且持續指定的時間. 轉變到"超時"狀態.
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
//這里等待定時器 t 超時. 所以會阻塞5秒.
t.wait();
std::cout << "Hello, world!\n";
return 0;
}






例子2: 使用一個異步的定時器
//一個將被定時器異步調用的函數. 
void print(const boost::system::error_code& /*e*/)
{
std::cout << "Hello, world!\n";
}
int main()
{
boost::asio::io_service io;
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
//和例子1不同. 這里調用 async_wait() 執行一個異步的等待. 它注冊一個可執行體(即此處的print函數).   //這里不懂的是: print的參數怎么傳入?
//實際上. 這個執行體被注冊到 deadline_timer 類的 io_service 成員上(即本例的 io 對象). 只有在以后調用 io.run() 時這些注冊的執行體才會被真正執行. 
t.async_wait(print);
//調用 io對象的 run() 函數執行那些被注冊的執行體. 
//這個函數不會立即返回. 除非和他相關的定時器對象超時並且在定時器超時后執行完所有注冊的執行體. 之后才返回. 
//所以它在這里阻塞一會兒. 等t超時后執行完print. 才返回.
//這里要注意的是. 調用 io.run() 可以放在其它線程中. 那樣所有的回調函數都在別的線程上運行.
io.run();
return 0;
}




例子3: 向超時回調函數綁定參數

// 這個例子中. 每次 定時器超時后. 都修改定時器的狀態到"未超時". 再注冊回調函數. 這樣循環 5 次. 所以 print 會被執行 5 次.
void print(const boost::system::error_code& /*e*/,
    boost::asio::deadline_timer* t, int* count)
{
if (*count < 5)
{
    std::cout << *count << "\n";
    ++(*count);
    //可以用 deadline_timer::expires_at() 來 獲取/設置 超時的時間點. 
    //在這里我們將超時的時間點向后推遲一秒. 
    t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
    //再次向 t 中的 io_service 對象注冊一個回掉函數. 
    // 注意這里綁定時. 指定了綁定到 print 的第一個參數為: boost::asio::placeholders::error //不懂. 這個error是什么東西. 為什么在例子2中不要綁定它?
    t->async_wait(boost::bind(print,
          boost::asio::placeholders::error, t, count));
}
}
int main()
{
boost::asio::io_service io;
int count = 0;
boost::asio::deadline_timer t(io, boost::posix_time::seconds(1));
t.async_wait(boost::bind(print,
        boost::asio::placeholders::error, &t, &count));
io.run();
std::cout << "Final count is " << count << "\n";
return 0;
}



例子4: 多線程處理定時器的回掉函數. 同步的問題.
前面的例子都只在一個線程中調用 boost::asio::io_service::run() 函數. 
向定時器注冊的回掉函數都是在調用該 run() 的線程中執行.
但實際上這個 run() 函數可以在多個線程中同時被調用. 例如:
boost::asio::io_service io; 
//兩個定時器
boost::asio::deadline_timer t1(io, boost::posix_time::seconds(1));
t1.async_wait(func1);   
boost::asio::deadline_timer t2(io, boost::posix_time::seconds(1));
t2.async_wait(func2); 

由於向 io 注冊了多個cmd. 這里為了效率我們想讓這些cmd並行執行:
boost::thread thread1(bind(&boost::asio::io_service::run, &io);
boost::thread thread2(bind(&boost::asio::io_service::run, &io);
thread1.join();
thread2.join();
這里在多個線程中調用 io.run() 所以我們注冊的cmd可能在任何一個線程中運行. 
這些線程會一直等待io對象相關的定時器超時並執行相關的 cmd. 直到所有的定時器都超時. run函數才返回. 線程才結束.
但這里有一個問題: 我們向定時器注冊的 func1 和 func2 . 它們可能會同時訪問全局的對象(比如 cout ). 
這時我們希望對 func1 和 func2 的調用是同步的. 即執行其中一個的時候. 另一個要等待.
這就用到了 boost::asio::strand 類. 它可以把幾個cmd包裝成同步執行的. 例如前面我們向定時器注冊 func1 和 func2 時. 可以改為:
boost::asio::strand the_strand;
t1.async_wait(the_strand.wrap(func1));      //包裝為同步執行的
t2.async_wait(the_strand.wrap(func2)); 
這樣就保證了在任何時刻. func1 和 func2 都不會同時在執行.

 

usidc5 2011-07-08 18:35
 
 
  • // test.cpp : 定義控制台應用程序的入口點。  
 
  • //  
 
  •   
 
  • #include "stdafx.h"  
 
  • #include <boost/asio.hpp>  
 
  • #include <boost/bind.hpp>  
 
  • #include <boost/date_time/posix_time/posix_time_types.hpp>  
 
  • #include <iostream>  
 
  •   
 
  • using namespace boost::asio;  
 
  • using boost::asio::ip::tcp;  
 
  •   
 
  • class connect_handler  
 
  • {  
 
  • public:  
 
  •     connect_handler(io_service& ios)  
 
  •         : io_service_(ios),  
 
  •         timer_(ios),  
 
  •         socket_(ios)  
 
  •     {  
 
  •         socket_.async_connect(  
 
  •             tcp::endpoint(boost::asio::ip::address_v4::loopback(), 3212),  
 
  •             boost::bind(&connect_handler::handle_connect, this,  
 
  •             boost::asio::placeholders::error));  
 
  •   
 
  •         timer_.expires_from_now(boost::posix_time::seconds(5));  
 
  •         timer_.async_wait(boost::bind(&connect_handler::close, this));  
 
  •     }  
 
  •   
 
  •     void handle_connect(const boost::system::error_code& err)  
 
  •     {  
 
  •         if (err)  
 
  •         {  
 
  •             std::cout << "Connect error: " << err.message() << "\n";  
 
  •         }  
 
  •         else  
 
  •         {  
 
  •             std::cout << "Successful connection\n";  
 
  •         }  
 
  •     }  
 
  •   
 
  •     void close()  
 
  •     {  
 
  •         socket_.close();  
 
  •     }  
 
  •   
 
  • private:  
 
  •     io_service& io_service_;  
 
  •     deadline_timer timer_;  
 
  •     tcp::socket socket_;  
 
  • };  
 
  •   
 
  • int main()  
 
  • {  
 
  •     try  
 
  •     {  
 
  •         io_service ios;  
 
  •         tcp::acceptor a(ios, tcp::endpoint(tcp::v4(), 32123), 1);  
 
  •   
 
  •         // Make lots of connections so that at least some of them will block.  
 
  •         connect_handler ch1(ios);  
 
  •         //connect_handler ch2(ios);  
 
  •         //connect_handler ch3(ios);  
 
  •         //connect_handler ch4(ios);  
 
  •         //connect_handler ch5(ios);  
 
  •         //connect_handler ch6(ios);  
 
  •         //connect_handler ch7(ios);  
 
  •         //connect_handler ch8(ios);  
 
  •         //connect_handler ch9(ios);  
 
  •   
 
  •         ios.run();  
 
  •     }  
 
  •     catch (std::exception& e)  
 
  •     {  
 
  •         std::cerr << "Exception: " << e.what() << "\n";  
 
  •     }  
 
  •   
 
  •     return 0;  
 
  • }  
 

 

usidc5 2011-07-08 19:49
服務器代碼:


Servier.cpp
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <iostream>
using boost::asio::ip::tcp;
#define max_len 1024
class clientSession
:public boost::enable_shared_from_this<clientSession>
{
public:
clientSession(boost::asio::io_service& ioservice)
:m_socket(ioservice)
{
memset(data_,‘\0′,sizeof(data_));
}
~clientSession()
{}
tcp::socket& socket()
{
return m_socket;
}
void start()
{
boost::asio::async_write(m_socket,
boost::asio::buffer(“link successed!”),
boost::bind(&clientSession::handle_write,shared_from_this(),
boost::asio::placeholders::error));
/*async_read跟客戶端一樣,還是不能進入handle_read函數,如果你能找到問題所在,請告訴我,謝謝*/
// --已經解決,boost::asio::async_read(...)讀取的字節長度不能大於數據流的長度,否則就會進入
// ioservice.run()線程等待,read后面的就不執行了。
//boost::asio::async_read(m_socket,boost::asio::buffer(data_,max_len),


//         boost::bind(&clientSession::handle_read,shared_from_this(),


//         boost::asio::placeholders::error));
//max_len可以換成較小的數字,就會發現async_read_some可以連續接收未收完的數據


m_socket.async_read_some(boost::asio::buffer(data_,max_len),
boost::bind(&clientSession::handle_read,shared_from_this(),
boost::asio::placeholders::error));
}
private:
void handle_write(const boost::system::error_code& error)
{
if(error)
{
m_socket.close();
}
}
void handle_read(const boost::system::error_code& error)
{
if(!error)
{
std::cout << data_ << std::endl;
//boost::asio::async_read(m_socket,boost::asio::buffer(data_,max_len),


//     boost::bind(&clientSession::handle_read,shared_from_this(),


//     boost::asio::placeholders::error));


m_socket.async_read_some(boost::asio::buffer(data_,max_len),
boost::bind(&clientSession::handle_read,shared_from_this(),
boost::asio::placeholders::error));
}
else
{
m_socket.close();
}
}
private:
tcp::socket m_socket;
char data_[max_len];
};
class serverApp
{
typedef boost::shared_ptr<clientSession> session_ptr;
public:
serverApp(boost::asio::io_service& ioservice,tcp::endpoint& endpoint)
:m_ioservice(ioservice),
acceptor_(ioservice,endpoint)
{
session_ptr new_session(new clientSession(ioservice));
acceptor_.async_accept(new_session->socket(),
boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
new_session));
}
~serverApp()
{
}
private:
void handle_accept(const boost::system::error_code& error,session_ptr& session)
{
if(!error)
{
std::cout << “get a new client!” << std::endl;
//實現對每個客戶端的數據處理


session->start();
//在這就應該看出為什么要封session類了吧,每一個session就是一個客戶端


session_ptr new_session(new clientSession(m_ioservice));
acceptor_.async_accept(new_session->socket(),
boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
new_session));
}
}
private:
boost::asio::io_service& m_ioservice;
tcp::acceptor acceptor_;
};
int main(int argc , char* argv[])
{
boost::asio::io_service myIoService;
short port = 8100/*argv[1]*/;
//我們用的是inet4


tcp::endpoint endPoint(tcp::v4(),port);
//終端(可以看作sockaddr_in)完成后,就要accept了


serverApp sa(myIoService,endPoint);
//數據收發邏輯


myIoService.run();
return 0;
}
客戶端代碼:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
using boost::asio::ip::tcp;
class client
{
public:
client(boost::asio::io_service& io_service,tcp::endpoint& endpoint)
: socket(io_service)//這里就把socket實例化了
{
//連接服務端 connect
socket.async_connect(endpoint,
boost::bind(&client::handle_connect,this,boost::asio::placeholders::error)
);
memset(getBuffer,‘\0′,1024);
}
~client()
{}
private:
void handle_connect(const boost::system::error_code& error)
{
if(!error)
{
//一連上,就向服務端發送信息
boost::asio::async_write(socket,boost::asio::buffer(“hello,server!”),
boost::bind(&client::handle_write,this,boost::asio::placeholders::error));
/**讀取服務端發下來的信息
*這里很奇怪,用async_read根本就不能進入handle_read函數
**/
// --已經解決,boost::asio::async_read(...)讀取的字節長度不能大於數據流的長度,否則就會進入
// ioservice.run()線程等待,read后面的就不執行了。
//boost::asio::async_read(socket,
//     boost::asio::buffer(getBuffer,1024),
//     boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
//    );
socket.async_read_some(boost::asio::buffer(getBuffer,1024),
boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
);
}
else
{
socket.close();
}
}
void handle_read(const boost::system::error_code& error)
{
if(!error)
{
std::cout << getBuffer << std::endl;
//boost::asio::async_read(socket,
//         boost::asio::buffer(getBuffer,1024),
//         boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
//        );
//這樣就可以實現循環讀取了,相當於while(1)
//當然,到了這里,做過網絡的朋友就應該相當熟悉了,一些邏輯就可以自行擴展了
//想做聊天室的朋友可以用多線程來實現
socket.async_read_some(
boost::asio::buffer(getBuffer,1024),
boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
);
}
else
{
socket.close();
}
}
void handle_write(const boost::system::error_code& error)
{
}
private:
tcp::socket socket;
char getBuffer[1024];
};
int main(int argc,char* argv[])
{
//if(argc != 3)
//{
// std::cerr << “Usage: chat_client <host> <port>\n”;
//    return 1;
//}
//我覺IO_SERVICE是一個基本性的接口,基本上通常用到的類實例都需要通過它來構造
//功能我們可以看似socket
boost::asio::io_service io_service;
//這個終端就是服務器
//它的定義就可以看作時sockaddr_in,我們用它來定義IP和PORT
tcp::endpoint endpoint(boost::asio::ip::address_v4::from_string("192.168.1.119"/*argv[1]*/),8100/*argv[2]*/);
//既然socket和sockaddr_in已經定義好了,那么,就可以CONNECT了
//之所以為了要把連接和數據處理封成一個類,就是為了方便管理數據,這點在服務端就會有明顯的感覺了
boost::shared_ptr<client> client_ptr(new client(io_service,endpoint));
//執行收發數據的函數
io_service.run();
return 0;
}
修改192.168.1.119為127.0.0.1,然后先運行server,再運行client,一切ok.

 

usidc5 2011-07-08 19:50
理論基礎
許多應用程序以某種方式和外界交互,例如文件,網絡,串口或者終端。某些情況下例如網絡,獨立IO操作需要很長時間才能完成,這對程序開發形成了一個特殊的挑戰。


Boost.Asio庫提供管理這些長時間操作的工具,並且不需要使用基於線程的並發模型和顯式的鎖。


Asio庫致力於如下幾點:


移植性


高負載


效率


基於已知API例如BSD sockets的模型概念


易於使用


作為進一步抽象的基礎


雖然asio主要關注網絡,它的異步概念也擴展到了其他系統資源,例如串口,文件等等。


主要概念和功能
基本架構(略)
Proactor設計模式:無需額外線程的並發機制(略)
這種模型感覺很像aio或者iocp,而select,epoll則應該類似於Reactor。


線程和Asio


線程安全


一般來說,並發使用不同的對象是安全的。但並發使用同一個對象是不安全的。不過io_service等類型的並發使用是安全的。


線程池


多個線程可以同時調用io_service::run,多個線程是平等的。


內部線程


為了某些特定功能,asio內部使用了thread以模擬異步,這些thread對用戶而言是不可見的。它們都符合如下原則:


它們不會直接調用任何用戶代碼
他們不會被任何信號中斷。
注意,如下幾種情況違背了原則1。


ip::basic_resolver::async_resolve() 所有平台


basic_socket::async_connect() windows平台


涉及null_buffers()的任何操作 windows平台


以上是容易理解的,asio本身盡可能不創建thread,某些情況下,例如connect,由於windows 2k平台下並不提供異步connect,所以asio只能用select模擬,這種情況下不得不創建新線程。windows xp下提供connectex,但考慮到兼容性,asio似乎並未使用。


asio完全保證然后異步完成函數都僅在運行io_service::run的線程中被調用。


同時,創建並且管理運行io_service::run的線程是用戶的責任。


Strands:使用多線程且無需顯式鎖


有3種方式可以顯式或隱式使用鎖。


只在一個線程中調用io_service::run,那么所有異步完成函數都會在該線程中串行化調用
應用邏輯保證
直接使用strand
strand::wrap可以創建一個包裹handler用於post或其他異步調用。


Buffers
Asio支持多個buffer同時用於讀寫,類似於WSARecv里面的WSABUF數組。mutable_buffer和const_buffer類似於WSABUF,MutableBufferSequence和ConstBufferSequence類似於WSABUF的容器。


Buffer本身不分配釋放內存,該數據結構很簡單。


vc8及以上的編譯器在debug編譯時缺省支持檢查越界等問題。其他編譯器可以用BOOST_ASIO_DISABLE_BUFFER_DEBUGGING打開這個開關。


流,不完全讀和不完全寫
許多io對象是基於流的,這意味着:


沒有消息邊界,數據是連續的字節。
讀或者寫操作可能僅傳送了要求的部分字節,這稱之為不完全讀/寫。
read_some,async_read_some,write_some,async_write_some則為這種不完全讀/寫。


系統API一般均為這種不完全讀寫。例如WSASend,WSARecv等等。


一般來說都需要讀/寫特定的字節。可以用read,async_read,write,async_write。這些函數在未完成任務之前會持續調用不完全函數。


EOF
read,async_read,read_until,async_read_until在遇到流結束時會產生一個錯誤。這是很合理的,例如要求讀4個字節,但僅讀了1個字節socket就關閉了。在handle_read中error_code將提示一個錯誤。


Reactor類型的操作
有些應用程序必須集成第3方的庫,這些庫希望自己執行io操作。


這種操作類似於select,考察select和aio的區別,前者是得到完成消息,然后再執行同步讀操作,aio是預發異步讀操作,在完成消息到來時,讀操作已經完成。


null_buffer設計用來實現這類操作。


ip::tcp::socket socket(my_io_service);
...
ip::tcp::socket::non_blocking nb(true);
socket.io_control(nb);
...
socket.async_read_some(null_buffers(), read_handler);
...
void read_handler(boost::system::error_code ec)
{
  if (!ec)
  {
    std::vector<char> buf(socket.available());
    socket.read_some(buffer(buf));
  }
}
注意一般asio的用法和這明顯不同。以上代碼非常類似select的方式。
常規代碼是:
boost::asio::async_read(socket_,boost::asio::buffer(data,length),handle_read);
void handle_read(){…}
行操作
許多應用協議都是基於行的,例如HTTP,SMTP,FTP。為了簡化這類操作,Asio提供read_until以及async_read_until。
例如:
class http_connection
{
  ...


  void start()
  {
    boost::asio::async_read_until(socket_, data_, "/r/n",
        boost::bind(&http_connection::handle_request_line, this, _1));
  }


  void handle_request_line(boost::system::error_code ec)
  {
    if (!ec)
    {
      std::string method, uri, version;
      char sp1, sp2, cr, lf;
      std::istream is(&data_);
      is.unsetf(std::ios_base::skipws);
      is >> method >> sp1 >> uri >> sp2 >> version >> cr >> lf;
      ...
    }
  }


  ...


  boost::asio::ip::tcp::socket socket_;
  boost::asio::streambuf data_;
};
read_until,async_read_until支持的判斷類型可以是char,string以及boost::regex,它還支持自定義匹配函數。
以下例子是持續讀,一直到讀到空格為止:
typedef boost::asio::buffers_iterator<
    boost::asio::streambuf::const_buffers_type> iterator;


std::pair<iterator, bool>
match_whitespace(iterator begin, iterator end)
{
  iterator i = begin;
  while (i != end)
    if (std::isspace(*i++))
      return std::make_pair(i, true);
  return std::make_pair(i, false);
}
...
boost::asio::streambuf b;
boost::asio::read_until(s, b, match_whitespace);



以下例子是持續讀,直到讀到特定字符為止:
class match_char
{
public:
  explicit match_char(char c) : c_(c) {}


  template <typename Iterator>
  std::pair<Iterator, bool> operator()(
      Iterator begin, Iterator end) const
  {
    Iterator i = begin;
    while (i != end)
      if (c_ == *i++)
        return std::make_pair(i, true);
    return std::make_pair(i, false);
  }


private:
  char c_;
};


namespace boost { namespace asio {
  template <> struct is_match_condition<match_char>
    : public boost::true_type {};
} } // namespace boost::asio
...
boost::asio::streambuf b;
boost::asio::read_until(s, b, match_char('a'));



自定義內存分配
Asio很多地方都需要復制拷貝handlers,缺省情況下,使用new/delete,如果handlers提供


void* asio_handler_allocate(size_t, ...);
void asio_handler_deallocate(void*, size_t, ...);
則會調用這兩個函數來進行分配和釋放。
The implementation guarantees that the deallocation will occur before the associated handler is invoked, which means the memory is ready to be reused for any new asynchronous operations started by the handler.


如果在完成函數中再發起一個異步請求,那么這塊內存可以重用,也就是說,如果永遠僅有一個異步請求在未完成的狀態,那么僅需要一塊內存就足夠用於asio的handler copy了。


The custom memory allocation functions may be called from any user-created thread that is calling a library function. The implementation guarantees that, for the asynchronous operations included the library, the implementation will not make concurrent calls to the memory allocation functions for that handler. The implementation will insert appropriate memory barriers to ensure correct memory visibility should allocation functions need to be called from different threads.


以上這段不很清楚,不明白多線程環境下,asio_handler_allocate是否要考慮同步問題。


Custom memory allocation support is currently implemented for all asynchronous operations with the following exceptions:


ip::basic_resolver::async_resolve() on all platforms.
basic_socket::async_connect() on Windows.
Any operation involving null_buffers() on Windows, other than an asynchronous read performed on a stream-oriented socket.

 

usidc5 2011-07-08 22:50
boost::asio是一個高性能的網絡開發庫,Windows下使用IOCP,Linux下使用epoll。與ACE不同的是,它並沒有提供一個網絡框架,而是采取組件的方式來提供應用接口。但是對於常見的情況,采用一個好用的框架還是能夠簡化開發過程,特別是asio的各個異步接口的用法都相當類似。
  受到 SP Server 框架的影響,我使用asio大致實現了一個多線程的半異步半同步服務器框架,以下是利用它來實現一個Echo服務器:

1. 實現回調:

    static void onSessionStarted(RequestPtr const& request, ResponsePtr const& response) {   request->setReadMode(Session::READ_LN); // 設置為行讀取}static void onSession(RequestPtr const& request, ResponsePtr const& response) {   print(request->message()); //打印收到的消息   response->addReply(request->message()); //回送消息   response->close();}

復制代碼
說明:close()是一個關閉請求,它並不馬上關閉Session,而是等待所有與該Session相關的異步操作全部結束后才關閉。

2. 一個單線程的Echo服務器:
    void server_main() {unsigned short port = 7;AsioService svc;AsioTcpServer tcp(svc, port);svc.callbacks().sessionStarted = &onSessionStarted;svc.callbacks().sessionHandle = &onSession;svc.run();}

復制代碼
3. 一個多線程的Echo服務器(半異步半同步:一個主線程,4個工作者線程)
    void server_main2() {unsigned short port = 7;int num_threads = 4;AsioService svc;AsioService worker(AsioService::HAS_WORK);AsioTcpServer tcp(svc, port);svc.callbacks().sessionStarted = worker.wrap(&onSessionStarted);svc.callbacks().sessionHandle = worker.wrap(&onSession);AsioThreadPool thr(worker, num_threads);svc.run();}

復制代碼
  有了這樣一個思路,實現起來就很容易了。重點是以下兩點:
  1。緩沖區的管理與內存池的使用
  2。為了保證Session的線程安全,必須要設置一個掛起狀態。
      
     還有一個好處,就是完全隔絕了asio的應用接口,不用再忍受asio漫長的編譯時間了。代碼就不貼在這里了,有興趣的可以通過email 探討。(說明,這里只提出一個思路,不再提供源代碼,請各位見諒)

 

usidc5 2011-07-08 22:54
2. 同步Timer
本章介紹asio如何在定時器上進行阻塞等待(blocking wait). 
實現,我們包含必要的頭文件. 
所有的asio類可以簡單的通過include "asio.hpp"來調用.
#include <iostream>
#include <boost/asio.hpp>
此外,這個示例用到了timer,我們還要包含Boost.Date_Time的頭文件來控制時間.
#include <boost/date_time/posix_time/posix_time.hpp>
使用asio至少需要一個boost::asio::io_service對象.該類提供了訪問I/O的功能.我們首先在main函數中聲明它.
int main()
{
    boost::asio::io_service io;
下一步我們聲明boost::asio::deadline_timer對象.這個asio的核心類提供I/O的功能(這里更確切的說是定時功能),總是把一個io_service對象作為他的第一個構造函數,而第二個構造函數的參數設定timer會在5秒后到時(expired).
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
這個簡單的示例中我們演示了定時器上的一個阻塞等待.就是說,調用boost::asio::deadline_timer::wait()的在創建后5秒內(注意:不是等待開始后),timer到時之前不會返回任何值. 
一個deadline_timer只有兩種狀態:到時,未到時. 
如果boost::asio::deadline_timer::wait()在到時的timer對象上調用,會立即return.
t.wait();
最后,我們輸出理所當然的"Hello, world!"來演示timer到時了.
    std::cout << "Hello, world! ";
    return 0;
}
完整的代碼:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
int main()
{
    boost::asio::io_service io;
    boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
    t.wait();
    std::cout << "Hello, world! ";
    return 0;
}

3. 異步Timer
#include <iostream>
#include <asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
asio的異步函數會在一個異步操作完成后被回調.這里我們定義了一個將被回調的函數.
void print(const asio::error& /*e*/)
{
    std::cout << "Hello, world! ";
}
int main()
{
    asio::io_service io;
    asio::deadline_timer t(io, boost::posix_time::seconds(5));
這里我們調用asio::deadline_timer::async_wait()來異步等待
t.async_wait(print);
最后,我們必須調用asio::io_service::run(). 
asio庫只會調用那個正在運行的asio::io_service::run()的回調函數. 
如果asio::io_service::run()不被調用,那么回調永遠不會發生. 
asio::io_service::run()會持續工作到點,這里就是timer到時,回調完成. 
別忘了在調用 asio::io_service::run()之前設置好io_service的任務.比如,這里,如果我們忘記先調用asio::deadline_timer::async_wait()則asio::io_service::run()會在瞬間return.
    io.run();
    return 0;
}
完整的代碼:
#include <iostream>
#include <asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
void print(const asio::error& /*e*/)
{
    std::cout << "Hello, world! ";
}
int main()
{
    asio::io_service io;
    asio::deadline_timer t(io, boost::posix_time::seconds(5));
    t.async_wait(print);
    io.run();
    return 0;
}
4. 回調函數的參數
這里我們將每秒回調一次,來演示如何回調函數參數的含義
#include <iostream>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
首先,調整一下timer的持續時間,開始一個異步等待.顯示,回調函數需要訪問timer來實現周期運行,所以我們再介紹兩個新參數
指向timer的指針
一個int*來指向計數器
void print(const asio::error& /*e*/,
    asio::deadline_timer* t, int* count)
{
我們打算讓這個函數運行6個周期,然而你會發現這里沒有顯式的方法來終止io_service.不過,回顧上一節,你會發現當 asio::io_service::run()會在所有任務完成時終止.這樣我們當計算器的值達到5時(0為第一次運行的值),不再開啟一個新的異步等待就可以了.
    if (*count < 5)
    {
        std::cout << *count << " ";
        ++(*count);
...
然后,我們推遲的timer的終止時間.通過在原先的終止時間上增加延時,我們可以確保timer不會在處理回調函數所需時間內的到期. 
(原文:By calculating the new expiry time relative to the old, we can ensure that the timer does not drift away from the whole-second mark due to any delays in processing the handler.)
t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
然后我們開始一個新的同步等待.如您所見,我們用把print和他的多個參數用boost::bind函數合成一個的形為void(const asio::error&)回調函數(准確的說是function object). 
在這個例子中, boost::bind的asio::placeholders::error參數是為了給回調函數傳入一個error對象.當進行一個異步操作,開始 boost::bind時,你需要使用它來匹配回調函數的參數表.下一節中你會學到回調函數不需要error參數時可以省略它.
     t->async_wait(boost::bind(print,
        asio::placeholders::error, t, count));
    }
}
int main()
{
    asio::io_service io;
    int count = 0;
    asio::deadline_timer t(io, boost::posix_time::seconds(1));
和上面一樣,我們再一次使用了綁定asio::deadline_timer::async_wait()
t.async_wait(boost::bind(print,
    asio::placeholders::error, &t, &count));
io.run();
在結尾,我們打印出的最后一次沒有設置timer的調用的count的值
    std::cout << "Final count is " << count << " ";
    return 0;
}
完整的代碼:
#include <iostream>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
void print(const asio::error& /*e*/,
  bsp;     asio::deadline_timer* t, int* count)
{
    if (*count < 5)
    {
        std::cout << *count << " ";
        ++(*count);
        t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
        t->async_wait(boost::bind(print,
                    asio::placeholders::error, t, count));
    }
}
int main()
{
    asio::io_service io;
    int count = 0;
    asio::deadline_timer t(io, boost::posix_time::seconds(1));
    t.async_wait(boost::bind(print,
                asio::placeholders::error, &t, &count));
    io.run();
    std::cout << "Final count is " << count << " ";
    return 0;
}

5. 成員函數作為回調函數
本例的運行結果和上一節類似
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
我們先定義一個printer類
class printer
{
public:
//構造函數有一個io_service參數,並且在初始化timer_時用到了它.用來計數的count_這里同樣作為了成員變量
    printer(boost::asio::io_service& io)
        : timer_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
boost::bind 同樣可以出色的工作在成員函數上.眾所周知,所有的非靜態成員函數都有一個隱式的this參數,我們需要把this作為參數bind到成員函數上.和上一節類似,我們再次用bind構造出void(const boost::asio::error&)形式的函數. 
注意,這里沒有指定boost::asio::placeholders::error占位符,因為這個print成員函數沒有接受一個error對象作為參數.
timer_.async_wait(boost::bind(&printer::print, this));

在類的折構函數中我們輸出最后一次回調的count的值
~printer()
{
    std::cout << "Final count is " << count_ << " ";
}


print函數於上一節的十分類似,但是用成員變量取代了參數.
    void print()
    {
        if (count_ < 5)
        {
            std::cout << count_ << " ";
            ++count_;
            timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
            timer_.async_wait(boost::bind(&printer::print, this));
        }
    }
private:
    boost::asio::deadline_timer timer_;
    int count_;
};

現在main函數清爽多了,在運行io_service之前只需要簡單的定義一個printer對象.
int main()
{
    boost::asio::io_service io;
    printer p(io);
    io.run();
    return 0;
}
完整的代碼:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class printer
{
    public:
        printer(boost::asio::io_service& io)
            : timer_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
        timer_.async_wait(boost::bind(&printer::print, this));
    }
        ~printer()
        {
            std::cout << "Final count is " << count_ << " ";
        }
        void print()
        {
            if (count_ < 5)
            {
                std::cout << count_ << " ";
                ++count_;
                timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
                timer_.async_wait(boost::bind(&printer::print, this));
            }
        }
    private:
        boost::asio::deadline_timer timer_;
        int count_;
};
int main()
{
    boost::asio::io_service io;
    printer p(io);
    io.run();
    return 0;
}


6. 多線程回調同步
本節演示了使用boost::asio::strand在多線程程序中進行回調同步(synchronise). 
先前的幾節闡明了如何在單線程程序中用boost::asio::io_service::run()進行同步.如您所見,asio庫確保 僅當當前線程調用boost::asio::io_service::run()時產生回調.顯然,僅在一個線程中調用 boost::asio::io_service::run() 來確保回調是適用於並發編程的. 
一個基於asio的程序最好是從單線程入手,但是單線程有如下的限制,這一點在服務器上尤其明顯:
當回調耗時較長時,反應遲鈍.
在多核的系統上無能為力
如果你發覺你陷入了這種困擾,可以替代的方法是建立一個boost::asio::io_service::run()的線程池.然而這樣就允許回調函數並發執行.所以,當回調函數需要訪問一個共享,線程不安全的資源時,我們需要一種方式來同步操作.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
在上一節的基礎上我們定義一個printer類,此次,它將並行運行兩個timer
class printer
{
public:
除了聲明了一對boost::asio::deadline_timer,構造函數也初始化了類型為boost::asio::strand的strand_成員. 
boost::asio::strand 可以分配的回調函數.它保證無論有多少線程調用了boost::asio::io_service::run(),下一個回調函數僅在前一個回調函數完成后開始,當然回調函數仍然可以和那些不使用boost::asio::strand分配,或是使用另一個boost::asio::strand分配的回調函數一起並發執行.
printer(boost::asio::io_service& io)
    : strand_(io),
    timer1_(io, boost::posix_time::seconds(1)),
    timer2_(io, boost::posix_time::seconds(1)),
    count_(0)
{
當一個異步操作開始時,用boost::asio::strand來 "wrapped(包裝)"回調函數.boost::asio::strand::wrap()會返回一個由boost::asio::strand分配的新的handler(句柄),這樣,我們可以確保它們不會同時運行.
    timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
    timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
}
~printer()
{
    std::cout << "Final count is " << count_ << " ";
}


多線程程序中,回調函數在訪問共享資源前需要同步.這里共享資源是std::cout 和count_變量. 
    void print1()
    {
        if (count_ < 10)
        {
            std::cout << "Timer 1: " << count_ << " ";
            ++count_;
            timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
            timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
        }
    }
    void print2()
    {
        if (count_ < 10)
        {
            std::cout << "Timer 2: " << count_ << " ";
            ++count_;
            timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
            timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
        }
    }
private:
    boost::asio::strand strand_;
    boost::asio::deadline_timer timer1_;
    boost::asio::deadline_timer timer2_;
    int count_;
};
main函數中boost::asio::io_service::run()在兩個線程中被調用:主線程、一個boost::thread線程. 
正如單線程中那樣,並發的boost::asio::io_service::run()會一直運行直到完成任務.后台的線程將在所有異步線程完成后終結. 
int main()
{
    boost::asio::io_service io;
    printer p(io);
    boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
    io.run();
    t.join();
    return 0;
}
完整的代碼:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class printer
{
public:
        printer(boost::asio::io_service& io)
            : strand_(io),
            timer1_(io, boost::posix_time::seconds(1)),
            timer2_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
        timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
        timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
    }
        ~printer()
        {
            std::cout << "Final count is " << count_ << " ";
        }
        void print1()
        {
            if (count_ < 10)
            {
                std::cout << "Timer 1: " << count_ << " ";
                ++count_;
                timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
                timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
            }
        }
        void print2()
        {
            if (count_ < 10)
            {
                std::cout << "Timer 2: " << count_ << " ";
                ++count_;
                timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
                timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
            }
        }
private:
        boost::asio::strand strand_;
        boost::asio::deadline_timer timer1_;
        boost::asio::deadline_timer timer2_;
        int count_;
};
int main()
{
    boost::asio::io_service io;
    printer p(io);
    boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
    io.run();
    t.join();
    return 0;
}



7. TCP客戶端:對准時間
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>
本程序的目的是訪問一個時間同步服務器,我們需要用戶指定一個服務器(如time-nw.nist.gov),用IP亦可. 
(譯者注:日期查詢協議,這種時間傳輸協議不指定固定的傳輸格式,只要求按照ASCII標准發送數據。)
using boost::asio::ip::tcp;
int main(int argc, char* argv[])
{
    try
    {
        if (argc != 2)
        {
            std::cerr << "Usage: client <host>" << std::endl;
            return 1;
            }
用asio進行網絡連接至少需要一個boost::asio::io_service對象
boost::asio::io_service io_service;


我們需要把在命令行參數中指定的服務器轉換為TCP上的節點.完成這項工作需要boost::asio::ip::tcp::resolver對象
tcp::resolver resolver(io_service);


一個resolver對象查詢一個參數,並將其轉換為TCP上節點的列表.這里我們把argv[1]中的sever的名字和要查詢字串daytime關聯.
tcp::resolver::query query(argv[1], "daytime");


節點列表可以用 boost::asio::ip::tcp::resolver::iterator 來進行迭代.iterator默認的構造函數生成一個end iterator.
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
現在我們建立一個連接的sockert,由於獲得節點既有IPv4也有IPv6的.所以,我們需要依次嘗試他們直到找到一個可以正常工作的.這步使得我們的程序獨立於IP版本
tcp::socket socket(io_service);
boost::asio::error error = boost::asio::error::host_not_found;
while (error && endpoint_iterator != end)
{
    socket.close();
    socket.connect(*endpoint_iterator++, boost::asio::assign_error(error));
}
if (error)
    throw error;
連接完成,我們需要做的是讀取daytime服務器的響應. 
我們用boost::array來保存得到的數據,boost::asio::buffer()會自動根據array的大小暫停工作,來防止緩沖溢出.除了使用boost::array,也可以使用char [] 或std::vector.
for (;;)
{
    boost::array<char, 128> buf;
    boost::asio::error error;
    size_t len = socket.read_some(
        boost::asio::buffer(buf), boost::asio::assign_error(error));
當服務器關閉連接時,boost::asio::ip::tcp::socket::read_some()會用boost::asio::error::eof標志完成, 這時我們應該退出讀取循環了.
if (error == boost::asio::error::eof)
    break; // Connection closed cleanly by peer.
else if (error)
    throw error; // Some other error.
std::cout.write(buf.data(), len);

如果發生了什么異常我們同樣會拋出它
}
catch (std::exception& e)
{
    std::cerr << e.what() << std::endl;
}


運行示例:在windowsXP的cmd窗口下 
輸入:upload.exe time-a.nist.gov
輸出:54031 06-10-23 01:50:45 07 0 0 454.2 UTC(NIST) *
完整的代碼:
#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>
using asio::ip::tcp;
int main(int argc, char* argv[])
{
    try
    {
        if (argc != 2)
        {
            std::cerr << "Usage: client <host>" << std::endl;
            return 1;
        }
        asio::io_service io_service;
        tcp::resolver resolver(io_service);
        tcp::resolver::query query(argv[1], "daytime");
        tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        tcp::resolver::iterator end;
        tcp::socket socket(io_service);
        asio::error error = asio::error::host_not_found;
        while (error && endpoint_iterator != end)
        {
            socket.close();
            socket.connect(*endpoint_iterator++, asio::assign_error(error));
        }
        if (error)
            throw error;
        for (;;)
        {
            boost::array<char, 128> buf;
            asio::error error;
            size_t len = socket.read_some(
                    asio::buffer(buf), asio::assign_error(error));
            if (error == asio::error::eof)
                break; // Connection closed cleanly by peer.
            else if (error)
                throw error; // Some other error.
            std::cout.write(buf.data(), len);
        }
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

8. TCP同步時間服務器
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
我們先定義一個函數返回當前的時間的string形式.這個函數會在我們所有的時間服務器示例上被使用.
std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}
int main()
{
    try
    {
        asio::io_service io_service;
新建一個asio::ip::tcp::acceptor對象來監聽新的連接.我們監聽TCP端口13,IP版本為V4
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));


這是一個iterative server,也就是說同一時間只能處理一個連接.建立一個socket來表示一個和客戶端的連接, 然后等待客戶端的連接.
for (;;)
{
    tcp::socket socket(io_service);
    acceptor.accept(socket);
當客戶端訪問服務器時,我們獲取當前時間,然后返回它.
        std::string message = make_daytime_string();
        asio::write(socket, asio::buffer(message),
            asio::transfer_all(), asio::ignore_error());
    }
}
最后處理異常
catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;

運行示例:運行服務器,然后運行上一節的客戶端,在windowsXP的cmd窗口下 
輸入:client.exe 127.0.0.1 
輸出:Mon Oct 23 09:44:48 2006
完整的代碼:
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}
int main()
{
    try
    {
        asio::io_service io_service;
        tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));
        for (;;)
        {
            tcp::socket socket(io_service);
            acceptor.accept(socket);
            std::string message = make_daytime_string();
            asio::write(socket, asio::buffer(message),
                    asio::transfer_all(), asio::ignore_error());
        }
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

 

usidc5 2011-07-08 22:55


構造函數


構造函數的主要動作就是調用CreateIoCompletionPort創建了一個初始iocp。


Dispatch和post的區別


Post一定是PostQueuedCompletionStatus並且在GetQueuedCompletionStatus 之后執行。


Dispatch會首先檢查當前thread是不是io_service.run/runonce/poll/poll_once線程,如果是,則直接運行。


poll和run的區別


兩者代碼幾乎一樣,都是首先檢查是否有outstanding的消息,如果沒有直接返回,否則調用do_one()。唯一的不同是在調用size_t do_one(bool block, boost::system::error_code& ec)時前者block = false,后者block = true。


該參數的作用體現在:


BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,


&completion_key, &overlapped, block ? timeout : 0);


因此可以看出,poll處理的是已經完成了的消息,也即GetQueuedCompletionStatus立刻能返回的。而run則會導致等待。


poll 的作用是依次處理當前已經完成了的消息,直到所有已經完成的消息處理完成為止。如果沒有已經完成了得消息,函數將退出。poll不會等待。這個函數有點類似於PeekMessage。鑒於PeekMessage很少用到,poll的使用場景我也有點疑惑。poll的一個應用場景是如果希望handler的處理有優先級,也即,如果消息完成速度很快,同時可能完成多個消息,而消息的處理過程可能比較耗時,那么可以在完成之后的消息處理函數中不真正處理數據,而是把handler保存在隊列中,然后按優先級統一處理。代碼如下:


while (io_service.run_one()) { 
    // The custom invocation hook adds the handlers to the priority queue 
    // rather than executing them from within the poll_one() call. 
    while (io_service.poll_one())      ;
    pri_queue.execute_all(); }


循環執行poll_one讓已經完成的消息的wrap_handler處理完畢,也即插入一個隊列中,然后再統一處理之。這里的wrap_handler是一個class,在post的時候,用如下代碼:


io_service.post(pri_queue.wrap(0, low_priority_handler));或者 acceptor.async_accept(server_socket, pri_queue.wrap(100, high_priority_handler));


template <typename Handler> wrapped_handler<Handler> handler_priority_queue::wrap(int priority, Handler handler) 
{    return wrapped_handler<Handler>(*this, priority, handler); }


參見boost_asio/example/invocation/prioritised_handlers.cpp


這個sample也同時表現了wrap的使用場景。


也即把handler以及參數都wrap成一個object,然后把object插入一個隊列,在pri_queue.execute_all中按優先級統一處理。


run的作用是處理消息,如果有消息未完成將一直等待到所有消息完成並處理之后才退出。


reset和stop


文檔中reset的解釋是重置io_service以便下一次調用。


當 run,run_one,poll,poll_one是被stop掉導致退出,或者由於完成了所有任務(正常退出)導致退出時,在調用下一次 run,run_one,poll,poll_one之前,必須調用此函數。reset不能在run,run_one,poll,poll_one正在運行時調用。如果是消息處理handler(用戶代碼)拋出異常,則可以在處理之后直接繼續調用 io.run,run_one,poll,poll_one。 例如:


boost::asio::io_service io_service;  
...  
for (;;){  
  try 
  {  
    io_service.run();  
    break; // run() exited normally  
  }  
  catch (my_exception& e)  
  {  
    // Deal with exception as appropriate.  
  }  

在拋出了異常的情況下,stopped_還沒來得及被asio設置為1,所以無需調用reset。
reset函數的代碼僅有一行:


void reset()  
{  
::InterlockedExchange(&stopped_, 0);  

也即,當io.stop時,會設置stopped_=1。當完成所有任務時,也會設置。


總的來說,單線程情況下,不管io.run是如何退出的,在下一次調用io.run之前調用一次reset沒有什么壞處。例如:


for(;;)  
{  
try 
{  
io.run();  
}  
catch(…)  
{  
}  
io.reset();  
}  

如果是多線程在運行io.run,則應該小心,因為reset必須是所有的run,run_one,poll,poll_one退出后才能調用。


文檔中的stop的解釋是停止io_service的處理循環。


此函數不是阻塞函數,也即,它僅僅只是給iocp發送一個退出消息而並不是等待其真正退出。因為poll和poll_one本來就不等待(GetQueuedCompletionStatus時timeout = 0),所以此函數對poll和poll_one無意義。對於run_one來說,如果該事件還未完成,則run_one會立刻返回。如果該事件已經完成,並且還在處理中,則stop並無特殊意義(會等待handler完成后自然退出)。對於run來說,stop的調用會導致run中的 GetQueuedCompletionStatus立刻返回。並且由於設置了stopped = 1,此前完成的消息的handlers也不會被調用。考慮一下這種情況:在io.stop之前,有1k個消息已經完成但尚未處理,io.run正在依次從 GetQueuedCompletionStatus中獲得信息並且調用handlers,調用io.stop設置stopped=1將導致后許 GetQueuedCompletionStatus返回的消息直接被丟棄,直到收到退出消息並退出io.run為止。


void stop()  
{  
if (::InterlockedExchange(&stopped_, 1) == 0)  
{  
if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))  
{  
DWORD last_error = ::GetLastError();  
boost::system::system_error e(  
boost::system::error_code(last_error,  
boost::asio::error::get_system_category()),  
"pqcs");  
boost::throw_exception(e);  
}  
}  

注意除了讓當前代碼退出之外還有一個副作用就是設置了stopped_=1。這個副作用導致在stop之后如果不調用reset,所有run,run_one,poll,poll_one都將直接退出。


另一個需要注意的是,stop會導致所有未完成的消息以及完成了但尚未處理得消息都直接被丟棄,不會導致handlers倍調用。


注意這兩個函數都不會CloseHandle(iocp.handle_),那是析構函數干的事情。


注意此處有個細節:一次PostQueuedCompletionStatus僅導致一次 GetQueuedCompletionStatus返回,那么如果有多個thread此時都在io.run,並且block在 GetQueuedCompletionStatus時,調用io.stop將PostQueuedCompletionStatus並且導致一個 thread的GetQueuedCompletionStatus返回。那么其他的thread呢?進入io_service的do_one(由run 函數調用)代碼可以看到,當GetQueuedCompletionStatus返回並且發現是退出消息時,會再發送一次 PostQueuedCompletionStatus。代碼如下:


else 
{  
    // Relinquish responsibility for dispatching timers. If the io_service  
    // is not being stopped then the thread will get an opportunity to  
    // reacquire timer responsibility on the next loop iteration.  
    if (dispatching_timers)  
    {  
      ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);  
    }  
    // The stopped_ flag is always checked to ensure that any leftover  
    // interrupts from a previous run invocation are ignored.  

    if (::InterlockedExchangeAdd(&stopped_, 0) != 0)  
    {  
      // Wake up next thread that is blocked on GetQueuedCompletionStatus.  
      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))  
      {  
        last_error = ::GetLastError();  
        ec = boost::system::error_code(last_error,  
            boost::asio::error::get_system_category());  
        return 0;  
      }  
      ec = boost::system::error_code();  
      return 0;  
    }  
}  

Wrap


這個函數是一個語法糖。


Void func(int a);


io_service.wrap(func)(a);


相當於io_service.dispatch(bind(func,a));


可以保存io_service.wrap(func)到g,以便在稍后某些時候調用g(a);


例如:


socket_.async_read_some(boost::asio::buffer(buffer_),      strand_.wrap( 
        boost::bind(&connection::handle_read, shared_from_this(), 
          boost::asio::placeholders::error, 
          boost::asio::placeholders::bytes_transferred)));


這是一個典型的wrap用法。注意async_read_some要求的參數是一個handler,在read_some結束后被調用。由於希望真正被調用的handle_read是串行化的,在這里再post一個消息給io_service。以上代碼類似於:


void A::func(error,bytes_transferred)  
{  
strand_.dispatch(boost::bind(handle_read,shared_from_this(),error,bytes_transferred);  
}  
socket_.async_read_some(boost::asio::buffer(buffer_), func); 
注意1點:


io_service.dispatch(bind(func,a1,…an)),這里面都是傳值,無法指定bind(func,ref(a1)…an)); 所以如果要用ref語義,則應該在傳入wrap時顯式指出。例如:


void func(int& i){i+=1;}  
void main()  
{  
int i = 0;  
boost::asio::io_service io;  
io.wrap(func)(boost::ref(i));  
io.run();  
printf("i=%d/n");  

當然在某些場合下,傳遞shared_ptr也是可以的(也許更好)。


從handlers拋出的異常的影響


當handlers拋出異常時,該異常會傳遞到本線程最外層的io.run,run_one,poll,poll_one,不會影響其他線程。捕獲該異常是程序員自己的責任。


例如:





boost::asio::io_service io_service;  

Thread1,2,3,4()  
{  
for (;;)  
{  
try 
{  
io_service.run();  
break; // run() exited normally  
}  
catch (my_exception& e)  
{  
// Deal with exception as appropriate.  
}  
}  
}  

Void func(void)  
{  
throw 1;  
}  

Thread5()  
{  
io_service.post(func);  

注意這種情況下無需調用io_service.reset()。


這種情況下也不能調用reset,因為調用reset之前必須讓所有其他線程正在調用的io_service.run退出。(reset調用時不能有任何run,run_one,poll,poll_one正在運行)


Work


有些應用程序希望在沒有pending的消息時,io.run也不退出。比如io.run運行於一個后台線程,該線程在程序的異步請求發出之前就啟動了。


可以通過如下代碼實現這種需求:


main()  
{  
boost::asio::io_service io_service;  
boost::asio::io_service::work work(io_service);  
Create thread 
Getchar();  
}  

Thread()  
{  
Io_service.run();  

這種情況下,如果work不被析構,該線程永遠不會退出。在work不被析構得情況下就讓其退出,可以調用io.stop。這將導致 io.run立刻退出,所有未完成的消息都將丟棄。已完成的消息(但尚未進入handler的)也不會調用其handler函數(由於在stop中設置了 stopped_= 1)。


如果希望所有發出的異步消息都正常處理之后io.run正常退出,work對象必須析構,或者顯式的刪除。


boost::asio::io_service io_service;  
auto_ptr<boost::asio::io_service::work> work(  
new boost::asio::io_service::work(io_service));  

...  

work.reset(); // Allow run() to normal exit. 
work是一個很小的輔助類,只支持構造函數和析構函數。(還有一個get_io_service返回所關聯的io_service)


代碼如下:


inline io_service::work::work(boost::asio::io_service& io_service)  
: io_service_(io_service)  
{  
io_service_.impl_.work_started();  
}  

inline io_service::work::work(const work& other)  
: io_service_(other.io_service_)  
{  
io_service_.impl_.work_started();  
}  

inline io_service::work::~work()  
{  
io_service_.impl_.work_finished();  
}  

void work_started()  
{  
::InterlockedIncrement(&outstanding_work_);  
}  

// Notify that some work has finished.  
void work_finished()  
{  
if (::InterlockedDecrement(&outstanding_work_) == 0)  
stop();  
}  
可以看出構造一個work時,outstanding_work_+1,使得io.run在完成所有異步消息后判斷outstanding_work_時不會為0,因此會繼續調用GetQueuedCompletionStatus並阻塞在這個函數上。


而析構函數中將其-1,並判斷其是否為0,如果是,則post退出消息給GetQueuedCompletionStatus讓其退出。


因此work如果析構,則io.run會在處理完所有消息之后正常退出。work如果不析構,則io.run會一直運行不退出。如果用戶直接調用io.stop,則會讓io.run立刻退出。


特別注意的是,work提供了一個拷貝構造函數,因此可以直接在任意地方使用。對於一個io_service來說,有多少個work實例關聯,則outstanding_work_就+1了多少次,只有關聯到同一個io_service的work全被析構之后,io.run才會在所有消息處理結束之后正常退出。


strand


strand是另一個輔助類,提供2個接口dispatch和post,語義和io_service的dispatch和post類似。區別在於,同一個strand所發出的dispatch和post絕對不會並行執行,dispatch和post所包含的handlers也不會並行。因此如果希望串行處理每一個tcp連接,則在accept之后應該在該連接的數據結構中構造一個strand,並且所有dispatch/post(recv /send)操作都由該strand發出。strand的作用巨大,考慮如下場景:有多個thread都在執行async_read_some,那么由於線程調度,很有可能后接收到的包先被處理,為了避免這種情況,就只能收完數據后放入一個隊列中,然后由另一個線程去統一處理。


void connection::start()   
{   
socket_.async_read_some(boost::asio::buffer(buffer_),   
strand_.wrap(   
boost::bind(&connection::handle_read, shared_from_this(),   
boost::asio::placeholders::error,   
boost::asio::placeholders::bytes_transferred)));   

不使用strand的處理方式:


前端tcp iocp收包,並且把同一個tcp連接的包放入一個list,如果list以前為空,則post一個消息給后端vnn iocp。后端vnn iocp收到post的消息后循環從list中獲取數據,並且處理,直到list為空為止。處理結束后重新調用 GetQueuedCompletionStatus進入等待。如果前端tcp iocp發現list過大,意味着處理速度小於接收速度,則不再調用iocpRecv,並且設置標志,當vnn iocp thread處理完了當前所有積壓的數據包后,檢查這個標志,重新調用一次iocpRecv。


使用strand的處理方式:


前端tcp iocp收包,收到包后直接通過strand.post(on_recved)發給后端vnn iocp。后端vnn iocp處理完之后再調用一次strand.async_read_some。


這兩種方式我沒看出太大區別來。如果對數據包的處理的確需要阻塞操作,例如db query,那么使用后端iocp以及后端thread是值得考慮的。這種情況下,前端iocp由於僅用來異步收發數據,因此1個thread就夠了。在確定使用2級iocp的情況下,前者似乎更為靈活,也沒有增加什么開銷。


值得討論的是,如果后端多個thread都處於db query狀態,那么實際上此時依然沒有thread可以提供數據處理服務,因此2級iocp意義其實就在於在這種情況下,前端tcp iocp依然可以accept,以及recv第一次數據,不會導致用戶connect不上的情況。在后端thread空閑之后會處理這期間的recv到的數據並在此async_read_some。


如果是單級iocp(假定handlers沒有阻塞操作),多線程,那么strand的作用很明顯。這種情況下,很明顯應該讓一個tcp連接的數據處理過程串行化。


Strand的實現原理


Strand內部實現機制稍微有點復雜。每次發出strand請求(例如 async_read(strand_.wrap(funobj1))),strand再次包裹了一次成為funobj2。在async_read完成時,系統調用funobj2,檢查是否正在執行該strand所發出的完成函數(檢查該strand的一個標志位),如果沒有,則直接調用 funobj2。如果有,則檢查是否就是當前thread在執行,如果是,則直接調用funobj2(這種情況可能發生在嵌套調用的時候,但並不產生同步問題,就像同一個thread可以多次進入同一個critical_session一樣)。如果不是,則把該funobj2插入到strand內部維護的一個隊列中。

 

usidc5 2011-07-13 18:18


最近在設計一個多線程分塊支持續傳的http的異步客戶端時, 測試部門經常發現http下載模
塊退出時偶爾會卡住, 在win7系統上由為明顯. 反復檢查代碼, 並未明顯問題, 於是專門寫
了一個反復退出的單元測試, 立即發現問題, 並定位在io_service的析構函數中, 奇怪的是, 
我的投遞io的所有socket都早已經關閉, run線程也已經退出, 按理說, 這時的io_service的
outstanding_work_應該為0才是, 可我一看它卻是1, 於是始終在win_iocp_io_service.hpp的
shutdown_service里一直循環調用GetQueuedCompletionStatus, 從而導致無法正常退出...
很明顯, 這是asio對outstanding_work_計數維護的有問題, 為了解決
問題, 於是我很快想到不使用iocp, 添加宏BOOST_ASIO_DISABLE_IOCP一切就正常了...
由於自己使用的是boost.1.45版本, 於是換了個boost.1.46.1再試試, 結果一樣. 難道這么嚴
重的bug跨在了這兩個非常重要的發行版本上而沒人知道?
在官方的郵件列表中細節檢查, 終於看到了某人的bug報告和我描述的情況差不多, 而且
在那個人報告了bug的第二天, asio作者就發布了補丁, 但這個補丁並未更新到boost.1.45
和boost.1.46中, 唉, 這兩個版本可是大版本啊, 估計受害人不少...
不過幸運的是, 我在boost的主分枝中看到了修正的代碼.
下面是這個補丁內容:


From 81a6a51c0cb66de6bc77e1fa5dcd46b2794995e4 Mon Sep 17 00:00:00 2001
From: Christopher Kohlhoff <chris@kohlhoff.com>
Date: Wed, 23 Mar 2011 15:03:56 +1100
Subject: [PATCH] On Windows, ensure the count of outstanding work is decremented for
abandoned operations (i.e. operations that are being cleaned up within
the io_service destructor).


---
asio/include/asio/detail/impl/dev_poll_reactor.ipp |    2 ++
asio/include/asio/detail/impl/epoll_reactor.ipp    |    2 ++
asio/include/asio/detail/impl/kqueue_reactor.ipp   |    2 ++
asio/include/asio/detail/impl/select_reactor.ipp   |    2 ++
.../asio/detail/impl/signal_set_service.ipp        |    2 ++
asio/include/asio/detail/impl/task_io_service.ipp  |    7 +++++++
.../asio/detail/impl/win_iocp_io_service.ipp       |   11 +++++++++++
asio/include/asio/detail/task_io_service.hpp       |    4 ++++
asio/include/asio/detail/win_iocp_io_service.hpp   |    4 ++++
9 files changed, 36 insertions(+), 0 deletions(-)


diff --git a/asio/include/asio/detail/impl/dev_poll_reactor.ipp b/asio/include/asio/detail/impl/dev_poll_reactor.ipp
index 06d89ea..2a01993 100644
--- a/asio/include/asio/detail/impl/dev_poll_reactor.ipp
+++ b/asio/include/asio/detail/impl/dev_poll_reactor.ipp
@@ -63,6 +63,8 @@ void dev_poll_reactor::shutdown_service()
     op_queue_.get_all_operations(ops);

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);


// Helper class to re-register all descriptors with /dev/poll.
diff --git a/asio/include/asio/detail/impl/epoll_reactor.ipp b/asio/include/asio/detail/impl/epoll_reactor.ipp
index 22f567a..d08dedb 100644
--- a/asio/include/asio/detail/impl/epoll_reactor.ipp
+++ b/asio/include/asio/detail/impl/epoll_reactor.ipp
@@ -84,6 +84,8 @@ void epoll_reactor::shutdown_service()
   }

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void epoll_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/kqueue_reactor.ipp b/asio/include/asio/detail/impl/kqueue_reactor.ipp
index f0cdf73..45aff60 100644
--- a/asio/include/asio/detail/impl/kqueue_reactor.ipp
+++ b/asio/include/asio/detail/impl/kqueue_reactor.ipp
@@ -74,6 +74,8 @@ void kqueue_reactor::shutdown_service()
   }

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void kqueue_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/select_reactor.ipp b/asio/include/asio/detail/impl/select_reactor.ipp
index f4e0314..00fd9fc 100644
--- a/asio/include/asio/detail/impl/select_reactor.ipp
+++ b/asio/include/asio/detail/impl/select_reactor.ipp
@@ -81,6 +81,8 @@ void select_reactor::shutdown_service()
     op_queue_.get_all_operations(ops);

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void select_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/signal_set_service.ipp b/asio/include/asio/detail/impl/signal_set_service.ipp
index f0f0e78..4cde184 100644
--- a/asio/include/asio/detail/impl/signal_set_service.ipp
+++ b/asio/include/asio/detail/impl/signal_set_service.ipp
@@ -145,6 +145,8 @@ void signal_set_service::shutdown_service()
       reg = reg->next_in_table_;
     }
   }
+
+  io_service_.abandon_operations(ops);
}

void signal_set_service::fork_service(
diff --git a/asio/include/asio/detail/impl/task_io_service.ipp b/asio/include/asio/detail/impl/task_io_service.ipp
index cb585d5..0a2c6fa 100644
--- a/asio/include/asio/detail/impl/task_io_service.ipp
+++ b/asio/include/asio/detail/impl/task_io_service.ipp
@@ -230,6 +230,13 @@ void task_io_service::post_deferred_completions(
   }
}

+void task_io_service::abandon_operations(
+    op_queue<task_io_service::operation>& ops)
+{
+  op_queue<task_io_service::operation> ops2;
+  ops2.push(ops);
+}
+
std::size_t task_io_service::do_one(mutex::scoped_lock& lock,
     task_io_service::idle_thread_info* this_idle_thread)
{
diff --git a/asio/include/asio/detail/impl/win_iocp_io_service.ipp b/asio/include/asio/detail/impl/win_iocp_io_service.ipp
index ca3125e..7aaa6b8 100644
--- a/asio/include/asio/detail/impl/win_iocp_io_service.ipp
+++ b/asio/include/asio/detail/impl/win_iocp_io_service.ipp
@@ -262,6 +262,17 @@ void win_iocp_io_service::post_deferred_completions(
   }
}

+void win_iocp_io_service::abandon_operations(
+    op_queue<win_iocp_operation>& ops)
+{
+  while (win_iocp_operation* op = ops.front())
+  {
+    ops.pop();
+    ::InterlockedDecrement(&outstanding_work_);
+    op->destroy();
+  }
+}
+
void win_iocp_io_service::on_pending(win_iocp_operation* op)
{
   if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
diff --git a/asio/include/asio/detail/task_io_service.hpp b/asio/include/asio/detail/task_io_service.hpp
index 285d83e..654f83c 100644
--- a/asio/include/asio/detail/task_io_service.hpp
+++ b/asio/include/asio/detail/task_io_service.hpp
@@ -105,6 +105,10 @@ public:
   // that work_started() was previously called for each operation.
   ASIO_DECL void post_deferred_completions(op_queue<operation>& ops);

+  // Process unfinished operations as part of a shutdown_service operation.
+  // Assumes that work_started() was previously called for the operations.
+  ASIO_DECL void abandon_operations(op_queue<operation>& ops);
+
private:
   // Structure containing information about an idle thread.
   struct idle_thread_info;
diff --git a/asio/include/asio/detail/win_iocp_io_service.hpp b/asio/include/asio/detail/win_iocp_io_service.hpp
index a562834..b5d7f0b 100644
--- a/asio/include/asio/detail/win_iocp_io_service.hpp
+++ b/asio/include/asio/detail/win_iocp_io_service.hpp
@@ -126,6 +126,10 @@ public:
   ASIO_DECL void post_deferred_completions(
       op_queue<win_iocp_operation>& ops);

+  // Enqueue unfinished operation as part of a shutdown_service operation.
+  // Assumes that work_started() was previously called for the operations.
+  ASIO_DECL void abandon_operations(op_queue<operation>& ops);
+
   // Called after starting an overlapped I/O operation that did not complete
   // immediately. The caller must have already called work_started() prior to
   // starting the operation.
-- 
1.7.0.1


注: 在boost.asio中, 使用這個補丁時, 需要將ASIO_DECL 改成 BOOST_ ASIO_DECL 


這是我第二次在使用asio的過程中, 發現的比較嚴重的bug了, 不過幸運的是, 每一次都能在官方的論壇 
或郵件列表中找到解決方案. 


結論, 再牛的人寫的代碼也會有bug, 個人非常崇拜asio的作者. 

 

usidc5 2011-09-30 22:59
在win32平台上,asio是基於IOCP技術實現的,我以前也用過IOCP,卻沒想到居然能擴展成這樣,真是神奇!在其他平台下還會有別的方法去實現,具體見io_service類下面這部分的源碼:
  // The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
   typedef detail::win_iocp_io_service impl_type;
   friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
   typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
   typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
   typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
   typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif
這部分代碼其實就在boost::asio::io_service類聲明中的最前面幾行,可以看見在不同平台下,io_service類的實現將會不同。很顯然,windows平台下當然是win_iocp_io_service類為實現了(不過我一開始還以為win_iocp_io_service是直接拿出來用的呢,還在疑惑這樣怎么有移植性呢?官方文檔也對該類只字不提,其實我卡殼就是卡在這里了,差點就直接用這個類了^_^!)。

那么就分析一下win_iocp_io_service的代碼吧,這里完全是用IOCP來路由各種任務,大家使用post來委托任務,內部調用的其實是IOCP的PostQueuedCompletionStatus函數,然后線程們用run來接受任務,內部其實是阻塞在IOCP的GetQueuedCompletionStatus函數上,一旦有了任務就立即返回,執行完后再一個循環,繼續阻塞在這里等待下一個任務的到來,這種設計思想堪稱神奇,對線程、服務以及任務完全解耦,靈活度達到了如此高度,不愧為boost庫的東西!我只能有拜的份了...

說一下總體的設計思想,其實io_service就像是勞工中介所,而一個線程就是一個勞工,而調用post的模塊相當於富人們,他們去中介所委托任務,而勞工們就聽候中介所的調遣去執行這些任務,任務的內容就寫在富人們給你的handler上,也就是函數指針,指針指向具體實現就是任務的實質內容。其實在整個過程中,富人們都不知道是哪個勞工幫他們做的工作,只知道是中介所負責完成這些就可以了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個不恰當的地方,如果硬要這樣比喻的話,我只能說:其實勞工里面也有很多富人的^o^! 。很多勞工在完成任務的過程中自己也托給中介所一些任務,然后這些任務很可能還是自己去完成。這也難怪,運行代碼的總是這些線程,那么調用post的肯定也會有這些線程了,不過不管怎么說,如此循環往復可以解決問題就行,比喻不見得就得恰當,任何事物之間都不可能完全相同,只要能闡述思想就行。

最后還要說明的一點就是:委托的任務其實可以設定執行的時間的,很不錯的設定,內部實現則是通過定時器原理,GetQueuedCompletionStatus有一個等待時間的參數似乎被用在這方面,還有源碼中的定時器線程我並沒有過多的去理解,總之大體原理已基本掌握,剩下的就是使勁的用它了!!!

另外為了方便人交流,在這里插入一些代碼可能更容易讓人理解吧,
下面這個是啟動服務時的代碼:
void ServerFramework::run()
{
     boost::thread_group workers;
    for (uint32 i = 0; i < mWorkerCount; ++i)
         workers.create_thread(
             boost::bind(&boost::asio::io_service::run, &mIoService));
     workers.join_all();
}

在打開前就得分配好任務,否則線程們運行起來就退出了,阻塞不住,任務的分配就交給open函數了,它是分配了監聽端口的任務,一旦有了連接就會拋出一個任務,其中一個線程就會開始行動啦。
void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /*= DEFAULT_WORKER_COUNT*/)
{
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
     boost::asio::ip::tcp::resolver resolver(mIoService);
     boost::asio::ip::tcp::resolver::query query(address, port);
     boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);

     mAcceptor.open(endpoint.protocol());
     mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
     mAcceptor.bind(endpoint);
     mAcceptor.listen();

     mNextConnection = new Connection(this);
     mAcceptor.async_accept(mNextConnection->getSocket(),
         boost::bind(&ServerFramework::__onConnect, this,
         boost::asio::placeholders::error));

     mWorkerCount = nWorkers;
    if (mWorkerCount == DEFAULT_WORKER_COUNT)
    {
         mWorkerCount = 4;
     }
}

open函數中給io_service的一個任務就是在有鏈接訪問服務器端口的情況下執行ServerFramework::__onConnect函數,有一點需要格外注意的,io_service必須時刻都有任務存在,否則線程io_service::run函數將返回,於是線程都會結束並銷毀,程序將退出,所以,你必須保證無論何時都有任務存在,這樣線程們即使空閑了也還是會繼續等待,不會銷毀。所以,我在ServerFramework::__onConnect函數中又一次給了io_service相同的任務,即:繼續監聽端口,有鏈接了還是調用ServerFramework::__onConnect函數。如果你在ServerFramework::__onConnect執行完了還沒有給io_service任務的話,那么一切都晚了...... 代碼如下:
void ServerFramework::__onConnect(const BoostSysErr& e)
{
    if (e)
    {
         MOELOG_DETAIL_WARN(e.message().c_str());
     }

     Connection* p = mNextConnection;
     mNextConnection = new Connection(this);

    // 再次進入監聽狀態
     mAcceptor.async_accept(mNextConnection->getSocket(),
         boost::bind(&ServerFramework::__onConnect, this,
         boost::asio::placeholders::error));

    // 處理當前鏈接
     __addConnection(p);
     p->start();
}


最后,展示一下這個類的所有成員變量吧:

 

    // 用於線程池異步處理的核心對象
     boost::asio::io_service mIoService;

    // 網絡鏈接的接收器,用於接收請求進入的鏈接
     boost::asio::ip::tcp::acceptor mAcceptor;

    // 指向下一個將要被使用的鏈接對象
     Connection* mNextConnection;

    // 存儲服務器鏈接對象的容器
     ConnectionSet mConnections;

    //// 為鏈接對象容器准備的strand,防止並行調用mConnections
    //boost::asio::io_service::strand mStrand_mConnections;

    // 為鏈接對象容器准備的同步鎖,防止並行調用mConnections
     boost::mutex mMutex4ConnSet;

    // 為控制台輸出流准備的strand,防止並行調用std::cout
     AsioService::strand mStrand_ConsoleIostream;

    // 工作線程的數量
     uint32 mWorkerCount;

 

usidc5 2013-10-07 16:41
花了足足3天時間,外加1天心情休整,終於在第5天編寫出了一個能運行的基於asio和thread_group的框架,差點沒氣暈過去,把源碼都看懂了才感覺會用了。
測試了一下,debug下一萬次回應耗時800+毫秒,release下是200+毫秒,機器配置雙核2.5G英特爾,4個線程並行工作,無錯的感覺真好,再也不用擔心iocp出一些奇怪的問題啦,因為是巨人們寫的實現,呵呵。


進入正題,簡要說一下asio的實現原理吧。在win32平台上,asio是基於IOCP技術實現的,我以前也用過IOCP,卻沒想到居然能擴展成這樣,真是神奇!在其他平台下還會有別的方法去實現,具體見io_service類下面這部分的源碼:
  // The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
  typedef detail::win_iocp_io_service impl_type;
  friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
  typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
  typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
  typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
  typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif


這部分代碼其實就在boost::asio::io_service類聲明中的最前面幾行,可以看見在不同平台下,io_service類的實現將會不同。很顯然,windows平台下當然是win_iocp_io_service類為實現了(不過我一開始還以為win_iocp_io_service是直接拿出來用的呢,還在疑惑這樣怎么有移植性呢?官方文檔也對該類只字不提,其實我卡殼就是卡在這里了,差點就直接用這個類了^_^!)。


那么就分析一下win_iocp_io_service的代碼吧,這里完全是用IOCP來路由各種任務,大家使用post來委托任務,內部調用的其實是IOCP的PostQueuedCompletionStatus函數,然后線程們用run來接受任務,內部其實是阻塞在IOCP的GetQueuedCompletionStatus函數上,一旦有了任務就立即返回,執行完后再一個循環,繼續阻塞在這里等待下一個任務的到來,這種設計思想堪稱神奇,對線程、服務以及任務完全解耦,靈活度達到了如此高度,不愧為boost庫的東西!我只能有拜的份了...


說一下總體的設計思想,其實io_service就像是勞工中介所,而一個線程就是一個勞工,而調用post的模塊相當於富人們,他們去中介所委托任務,而勞工們就聽候中介所的調遣去執行這些任務,任務的內容就寫在富人們給你的handler上,也就是函數指針,指針指向具體實現就是任務的實質內容。其實在整個過程中,富人們都不知道是哪個勞工幫他們做的工作,只知道是中介所負責完成這些就可以了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個不恰當的地方,如果硬要這樣比喻的話,我只能說:其實勞工里面也有很多富人的^o^! 。很多勞工在完成任務的過程中自己也托給中介所一些任務,然后這些任務很可能還是自己去完成。這也難怪,運行代碼的總是這些線程,那么調用post的肯定也會有這些線程了,不過不管怎么說,如此循環往復可以解決問題就行,比喻不見得就得恰當,任何事物之間都不可能完全相同,只要能闡述思想就行。


最后還要說明的一點就是:委托的任務其實可以設定執行的時間的,很不錯的設定,內部實現則是通過定時器原理,GetQueuedCompletionStatus有一個等待時間的參數似乎被用在這方面,還有源碼中的定時器線程我並沒有過多的去理解,總之大體原理已基本掌握,剩下的就是使勁的用它了!!!


另外為了方便人交流,在這里插入一些代碼可能更容易讓人理解吧,
下面這個是啟動服務時的代碼:
void ServerFramework::run()
{
    boost::thread_group workers;
    for (uint32 i = 0; i < mWorkerCount; ++i)
        workers.create_thread(
            boost::bind(&boost::asio::io_service::run, &mIoService));
    workers.join_all();
}


在打開前就得分配好任務,否則線程們運行起來就退出了,阻塞不住,任務的分配就交給open函數了,它是分配了監聽端口的任務,一旦有了連接就會拋出一個任務,其中一個線程就會開始行動啦。
void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /*= DEFAULT_WORKER_COUNT*/)
{
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
    boost::asio::ip::tcp::resolver resolver(mIoService);
    boost::asio::ip::tcp::resolver::query query(address, port);
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);


    mAcceptor.open(endpoint.protocol());
    mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    mAcceptor.bind(endpoint);
    mAcceptor.listen();


    mNextConnection = new Connection(this);
    mAcceptor.async_accept(mNextConnection->getSocket(),
        boost::bind(&ServerFramework::__onConnect, this,
        boost::asio::placeholders::error));


    mWorkerCount = nWorkers;
    if (mWorkerCount == DEFAULT_WORKER_COUNT)
    {
        mWorkerCount = 4;
    }
}


open函數中給io_service的一個任務就是在有鏈接訪問服務器端口的情況下執行ServerFramework::__onConnect函數,有一點需要格外注意的,io_service必須時刻都有任務存在,否則線程io_service::run函數將返回,於是線程都會結束並銷毀,程序將退出,所以,你必須保證無論何時都有任務存在,這樣線程們即使空閑了也還是會繼續等待,不會銷毀。所以,我在ServerFramework::__onConnect函數中又一次給了io_service相同的任務,即:繼續監聽端口,有鏈接了還是調用ServerFramework::__onConnect函數。如果你在ServerFramework::__onConnect執行完了還沒有給io_service任務的話,那么一切都晚了...... 代碼如下:
void ServerFramework::__onConnect(const BoostSysErr& e)
{
    if (e)
    {
        MOELOG_DETAIL_WARN(e.message().c_str());
    }


    Connection* p = mNextConnection;
    mNextConnection = new Connection(this);


    // 再次進入監聽狀態
    mAcceptor.async_accept(mNextConnection->getSocket(),
        boost::bind(&ServerFramework::__onConnect, this,
        boost::asio::placeholders::error));


    // 處理當前鏈接
    __addConnection(p);
    p->start();
}


最后,展示一下這個類的所有成員變量吧:
    // 用於線程池異步處理的核心對象
    boost::asio::io_service mIoService;


    // 網絡鏈接的接收器,用於接收請求進入的鏈接
    boost::asio::ip::tcp::acceptor mAcceptor;


    // 指向下一個將要被使用的鏈接對象
    Connection* mNextConnection;


    // 存儲服務器鏈接對象的容器
    ConnectionSet mConnections;


    //// 為鏈接對象容器准備的strand,防止並行調用mConnections
    //boost::asio::io_service::strand mStrand_mConnections;


    // 為鏈接對象容器准備的同步鎖,防止並行調用mConnections
    boost::mutex mMutex4ConnSet;


    // 為控制台輸出流准備的strand,防止並行調用std::cout
    AsioService::strand mStrand_ConsoleIostream;


    // 工作線程的數量
    uint32 mWorkerCount;

 

usidc5 2013-10-07 16:41
boost的官方例子,有單線程的網絡框架,httpserver2是線程池的。下面參照網上某人的代碼修改了一點(忘了哪位大仙的代碼了)
測試工具,適用stressmark,測試效果非常好, 9000個/s
復制代碼
#include <stdio.h>
#include "AuthenHandle.h"
#include "configure.h"
#ifdef WIN32 //for windows nt/2000/xp


#include <winsock.h>
#include <windows.h>
#include "gelsserver.h"
#pragma comment(lib,"Ws2_32.lib")
#else         //for unix




#include <sys/socket.h>
//    #include <sys/types.h>


//    #include <sys/signal.h>


//    #include <sys/time.h>


#include <netinet/in.h>     //socket


//    #include <netdb.h>


#include <unistd.h>            //gethostname


// #include <fcntl.h>


#include <arpa/inet.h>


#include <string.h>            //memset


typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef struct sockaddr SOCKADDR;
#ifdef M_I386
typedef int socklen_t;
#endif


#define BOOL             int
#define INVALID_SOCKET    -1
#define SOCKET_ERROR     -1
#define TRUE             1
#define FALSE             0
#endif        //end #ifdef WIN32








static int count111 = 0;
static time_t oldtime = 0, nowtime = 0;




#include <cstdlib>
#include <iostream>
#include <stdexcept>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>


using namespace std;
using boost::asio::ip::tcp;


class io_service_pool
    : public boost::noncopyable
{
public:


    explicit io_service_pool(std::size_t pool_size)
        : next_io_service_(0)
    { 
        for (std::size_t i = 0; i < pool_size; ++ i)
        {
            io_service_sptr io_service(new boost::asio::io_service);
            work_sptr work(new boost::asio::io_service::work(*io_service));
            io_services_.push_back(io_service);
            work_.push_back(work);
        }
    }


    void start()
    { 
        for (std::size_t i = 0; i < io_services_.size(); ++ i)
        {
            boost::shared_ptr<boost::thread> thread(new boost::thread(
                boost::bind(&boost::asio::io_service::run, io_services_)));
            threads_.push_back(thread);
        }
    }


    void join()
    {
        for (std::size_t i = 0; i < threads_.size(); ++ i)
        {
            threads_->join();
        } 
    }


    void stop()
    { 
        for (std::size_t i = 0; i < io_services_.size(); ++ i)
        {
            io_services_->stop();
        }
    }


    boost::asio::io_service& get_io_service()
    {
        boost::mutex::scoped_lock lock(mtx);
        boost::asio::io_service& io_service = *io_services_[next_io_service_];
        ++ next_io_service_;
        if (next_io_service_ == io_services_.size())
        {
            next_io_service_ = 0;
        }
        return io_service;
    }


private:
    typedef boost::shared_ptr<boost::asio::io_service> io_service_sptr;
    typedef boost::shared_ptr<boost::asio::io_service::work> work_sptr;
    typedef boost::shared_ptr<boost::thread> thread_sptr;


    boost::mutex mtx;


    std::vector<io_service_sptr> io_services_;
    std::vector<work_sptr> work_;
    std::vector<thread_sptr> threads_; 
    std::size_t next_io_service_;
};


boost::mutex cout_mtx;
int packet_size = 0;
enum {MAX_PACKET_LEN = 4096};


class session
{
public:
    session(boost::asio::io_service& io_service)
        : socket_(io_service)
        , recv_times(0)
    {
    }


    virtual ~session()
    {
        boost::mutex::scoped_lock lock(cout_mtx);
    }


    tcp::socket& socket()
    {
        return socket_;
    }


    inline void start()
    {


        socket_.async_read_some(boost::asio::buffer(data_, MAX_PACKET_LEN),
            boost::bind(&session::handle_read, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
    }


    void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
    {
        if (!error)
        {
            ++ recv_times;




            count111 ++;


            struct tm *today;
            time_t ltime;
            time( &nowtime );


            if(nowtime != oldtime){
                printf("%d\n", count111);
                oldtime = nowtime;
                count111 = 0;
            }




            boost::asio::async_write(socket_, boost::asio::buffer(data_, bytes_transferred),
                boost::bind(&session::handle_write, this, boost::asio::placeholders::error));






        }
        else
        {
            delete this;
        }
    }


    void handle_write(const boost::system::error_code& error)
    {
        if (!error)
        {
            start();
        }
        else
        {
            delete this;
        }
    }


private:
    tcp::socket socket_;
    char data_[MAX_PACKET_LEN];
    int recv_times;
};


class server
{
public:
    server(short port, int thread_cnt)
        : io_service_pool_(thread_cnt)
        , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
    {
        session* new_session = new session(io_service_pool_.get_io_service());
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
    }


    void handle_accept(session* new_session, const boost::system::error_code& error)
    {
        if (!error)
        {
            new_session->start();
        }
        else
        {
            delete new_session;
        }


        new_session = new session(io_service_pool_.get_io_service());
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
    }


    void run()
    {
        io_service_pool_.start();
        io_service_pool_.join();
    }


private:


    io_service_pool io_service_pool_;
    tcp::acceptor acceptor_;
};






int main()
{


    //boost


    server s(port, 50);
    s.run();


    while(true)
    {
        sleep(1000);




     }


    return 0;
}
復制代碼


 

usidc5 2013-10-07 16:42
網上大部分人都講boost.asio用完成端口實現,並且實現了線程池,所以效率非常的高。
      我在應用asio的時候發現完成端口是有,但是線程池確並不存在,而且在現有的架構下,要想用線程池來實現對數據的處理,可能寫出來不是很好看。
asio通過開啟線程調用io_service::run再調用win_iocp_io_service::run來處理收到的事件。
  size_t run(boost::system::error_code& ec)
  {
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
    {
      ec = boost::system::error_code();
      return 0;
    }

    call_stack<win_iocp_io_service>::context ctx(this);

    size_t n = 0;
    while (do_one(true, ec))
      if (n != (std::numeric_limits<size_t>::max)())
        ++n;
    return n;
  }
do_one里面為
BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
          &completion_key, &overlapped, block ? timeout : 0);
        operation* op = static_cast<operation*>(overlapped);
        op->do_completion(last_error, bytes_transferred);
實際上如果op->do_completion里面有時間比較長的操作,這個線程同樣為死在這個地方。
因為只有一個線程在驅動前面的run函數。
當然你也可以通過同時啟動幾個線程來調用run函數,這樣是可行的,但是這種手法確很笨拙,因為你可能一下啟動10個線程,卻只有一個線程比較忙,
或者你的10個線程根本就忙不過來,這根有沒有使用iocp完全沒什么兩樣。
     做事情要力求完美,不要以為NB的大師不提供的東西,你就不能自已弄一個。其實我覺得asio里面c++的運用,非常的完美,但是從實用的角度來說,
還不如我以前一個同事寫的iocp寫得好。

我們怎么對asio這部分進行改良,讓他支持線程池的方式呢。
實際上我們只需要對win_iocp_io_service進行一些加工即可。
在do_one里面
op->do_completion(last_error, bytes_transferred);
之前auto_work work(*this);
這個地方,實際上就是來計算當前有多少工作要做,
這個地方調用work_started
  ::InterlockedIncrement(&outstanding_work_);
只需要在這按照你的需求加入一個線程就可以了。
算法自已想吧,還存在work_finished函數,可以用來減少線程。
需要給win_iocp_io_service類增加一個thread_group成員變量,供上面使用。
改良的方式不是很好,也比較不好看,
唉,完美只存在心里,適可而止吧。

 

usidc5 2013-10-07 16:43
正如其名字,asio是一個異步網絡庫。但第一次使用它卻是把它作為一個線程池的實現。下面是一段實驗代碼。


#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
void foo() {
  sleep(1);
  printf("foo: %d\n", (int)pthread_self());
}




class TaskPool {
  typedef boost::shared_ptr<boost::thread> Thread;
  public:
  TaskPool(std::size_t num_workers) : num_workers_(num_workers) {
  }
  void Start() {
    manage_thread_.reset(new boost::thread(boost::bind(TaskPool::_Start, this)));
  }
  void Post() {
    ios_.post(foo);
  }
  private:
    static void _Start(TaskPool* pool) {
    for (std::size_t i = 0; i < pool->num_workers_; ++i) {
      pool->workers_.push_back(Thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &pool->ios_))));
    }
    for (std::size_t i = 0; i < pool->workers_.size(); ++i) {
      pool->workers_->join();
    }


  }
  private:
  std::size_t num_workers_;
  boost::asio::io_service ios_;
  std::vector<Thread> workers_;
  Thread manage_thread_;
};


int main()
{
  TaskPool pool(8);
  pool.Start();
  for (int i = 0; i < 1000; ++i) {
  pool.Post();
  }
  printf("post finished\n");
  sleep(10);
  return 0;
}

 

usidc5 2013-11-16 22:52
ACE是一個很成熟的中間件產品,為自適應通訊環境,但它過於宏大,一堆的設計模式,架構是一層又一層,對初學者來說,有點困難。
ASIO是基本Boost開發的異步IO庫,封裝了Socket,簡化基於socket程序的開發。


最近分析ASIO的源代碼,讓我無不驚呀於它設計。在ACE中開發中的內存管理一直讓人頭痛,ASIO的出現,讓我看到新的曙光,成為我新的好伙伴。簡單地與ACE做個比較。


1.層次架構:
ACE底層是C風格的OS適配層,上一層基於C++的wrap類,再上一層是一些框架(Accpetor, Connector,Reactor等),最上一層是框架上服務。
ASIO與之類似,底層是OS的適配層,上一層一些模板類,再上一層模板類的參數化(TCP/UDP),再上一層是服務,它只有一種框架為io_service。


2.涉及范圍:
ACE包含了日志,IPC,線程,共享內存,配置服務等。
ASIO只涉及到Socket,提供簡單的線程操作。


3.設計模式:
ACE主要應用了Reactor,Proactor等。
而ASIO主要應用了Proactor。


4.線程調度:
ACE的Reactor是單線程調度,Proactor支持多線程調度。
ASIO支持單線程與多線程調度。


5.事件分派處理:
ACE主要是注冊handler類,當事件分派時,調用其handler的虛掛勾函數。實現ACE_Handler/ACE_Svc_Handler/ACE_Event_handler等類的虛函數。
ASIO是基於函數對象的hanlder事件分派。任何函數都可能成功hanlder,少了一堆虛表的維護,調度上優於ACE。


6.發布方式:
ACE是開源免費的,不依賴於第3方庫, 一般應用使用它時,以動態鏈接的方式發布動態庫。
ASIO是開源免費的,依賴Boost,應用使用時只要include頭文件,不需動態庫。


7.可移植性:
ACE支持多種平台,可移植性不存在問題,據說socket編程在linux下有不少bugs。
ASIO支持多種平台,可移植性不存在問題。


8.開發難度:
基於ACE開發應用,對程序員要求比較高,要用好它,必須非常了解其框架。在其框架下開發,往往new出一個對象,不知在什么地方釋放好。
基於ASIO開發應用,要求程序員熟悉函數對象,函數指針,熟悉boost庫中的boost::bind。內存管理控制方便。




我個人覺得,如果應用socket編程,使用ASIO開發比較好,開發效率比較高。ACE適合於理論研究,它本來就是源於Douglas的學術研究。

 

usidc5 2013-11-16 22:53
在使用IOCP時,最重要的幾個API就是GetQueueCompeltionStatus、WSARecv、WSASend,數據的I/O及其完成狀態通過這幾個接口獲取並進行后續處理。


GetQueueCompeltionStatus attempts to dequeue an I/O completion packet from the specified I/O completion port. If there is no completion packet queued, the function waits for a pending I/O operation associated with the completion port to complete.


BOOL WINAPI GetQueuedCompletionStatus(
  __in   HANDLE CompletionPort,
  __out  LPDWORD lpNumberOfBytes,
  __out  PULONG_PTR lpCompletionKey,
  __out  LPOVERLAPPED *lpOverlapped,
  __in   DWORD dwMilliseconds
);
If the function dequeues a completion packet for a successful I/O operation from the completion port, the return value is nonzero. The function stores information in the variables pointed to by the lpNumberOfBytes, lpCompletionKey, and lpOverlapped parameters.


除了關心這個API的in & out(這是MSDN開頭的幾行就可以告訴我們的)之外,我們更加關心不同的return & out意味着什么,因為由於各種已知或未知的原因,我們的程序並不總是有正確的return & out。


If *lpOverlapped is NULL and the function does not dequeue a completion packet from the completion port, the return value is zero. The function does not store information in the variables pointed to by the lpNumberOfBytes and lpCompletionKey parameters. To get extended error information, call GetLastError. If the function did not dequeue a completion packet because the wait timed out, GetLastError returns WAIT_TIMEOUT.


假設我們指定dwMilliseconds為INFINITE。


這里常見的幾個錯誤有:


WSA_OPERATION_ABORTED (995): Overlapped operation aborted.


由於線程退出或應用程序請求,已放棄I/O 操作。


MSDN: An overlapped operation was canceled due to the closure of the socket, or the execution of the SIO_FLUSH command in WSAIoctl. Note that this error is returned by the operating system, so the error number may change in future releases of Windows.


成因分析:這個錯誤一般是由於peer socket被closesocket或者WSACleanup關閉后,針對這些socket的pending overlapped I/O operation被中止。


解決方案:針對socket,一般應該先調用shutdown禁止I/O操作后再調用closesocket關閉。


嚴重程度:輕微易處理。


WSAENOTSOCK (10038): Socket operation on nonsocket.


MSDN: An operation was attempted on something that is not a socket. Either the socket handle parameter did not reference a valid socket, or for select, a member of an fd_set was not valid.


成因分析:在一個非套接字上嘗試了一個操作。


使用closesocket關閉socket之后,針對該invalid socket的任何操作都會獲得該錯誤。


解決方案:如果是多線程存在對同一socket的操作,要保證對socket的I/O操作邏輯上的順序,做好socket的graceful disconnect。


嚴重程度:輕微易處理。


WSAECONNRESET (10054): Connection reset by peer.


遠程主機強迫關閉了一個現有的連接。


MSDN: An existing connection was forcibly closed by the remote host. This normally results if the peer application on the remote host is suddenly stopped, the host is rebooted, the host or remote network interface is disabled, or the remote host uses a hard close (see setsockopt for more information on the SO_LINGER option on the remote socket). This error may also result if a connection was broken due to keep-alive activity detecting a failure while one or more operations are in progress. Operations that were in progress fail with WSAENETRESET. Subsequent operations fail with WSAECONNRESET.


成因分析:在使用WSAAccpet、WSARecv、WSASend等接口時,如果peer application突然中止(原因如上所述),往其對應的socket上投遞的operations將會失敗。


解決方案:如果是對方主機或程序意外中止,那就只有各安天命了。但如果這程序是你寫的,而你只是hard close,那就由不得別人了。至少,你要知道這樣的錯誤已經出現了,就不要再費勁的繼續投遞或等待了。


嚴重程度:輕微易處理。


WSAECONNREFUSED (10061): Connection refused.


由於目標機器積極拒絕,無法連接。


MSDN: No connection could be made because the target computer actively refused it. This usually results from trying to connect to a service that is inactive on the foreign host—that is, one with no server application running.


成因分析:在使用connect或WSAConnect時,服務器沒有運行或者服務器的監聽隊列已滿;在使用WSAAccept時,客戶端的連接請求被condition function拒絕。


解決方案:Call connect or WSAConnect again for the same socket. 等待服務器開啟、監聽空閑或查看被拒絕的原因。是不是長的丑或者錢沒給夠,要不就是服務器拒絕接受天價薪酬自主創業去了?


嚴重程度:輕微易處理。


WSAENOBUFS (10055): No buffer space available.


由於系統緩沖區空間不足或列隊已滿,不能執行套接字上的操作。


MSDN: An operation on a socket could not be performed because the system lacked sufficient buffer space or because a queue was full.


成因分析:這個錯誤是我查看錯誤日志后,最在意的一個錯誤。因為服務器對於消息收發有明確限制,如果緩沖區不足應該早就處理了,不可能待到send/recv失敗啊。而且這個錯誤在之前的版本中幾乎沒有出現過。這也是這篇文章的主要內容。像connect和accept因為緩沖區空間不足都可以理解,而且危險不高,但如果send/recv造成擁堵並惡性循環下去,麻煩就大了,至少說明之前的驗證邏輯有疏漏。


WSASend失敗的原因是:The Windows Sockets provider reports a buffer deadlock. 這里提到的是buffer deadlock,顯然是由於多線程I/O投遞不當引起的。


解決方案:在消息收發前,對最大掛起的消息總的數量和容量進行檢驗和控制。


嚴重程度:嚴重。


本文主要參考MSDN。

 

usidc5 2013-11-16 22:59
1:在IOCP中投遞WSASend返回WSA_IO_PENDING的時候,表示異步投遞已經成功,但是稍后發送才會完成。這其中涉及到了三個緩沖區。
網卡緩沖區,TCP/IP層緩沖區,程序緩沖區。
情況一:調用WSASend發送正確的時候(即立即返回,且沒有錯誤),TCP/IP將數據從程序緩沖區中拷貝到TCP/IP層緩沖區中,然后不鎖定該程序緩沖區,由上層程序自己處理。TCP/IP層緩沖區在網絡合適的時候,將其數據拷貝到網卡緩沖區,進行真正的發送。
情況二:調用WSASend發送錯誤,但是錯誤碼是WSA_IO_PENDING的時候,表示此時TCP/IP層緩沖區已滿,暫時沒有剩余的空間將程序緩沖區的數據拷貝出來,這時系統將鎖定用戶的程序緩沖區,按照書上說的WSASend指定的緩沖區將會被鎖定到系統的非分頁內存中。直到TCP/IP層緩沖區有空余的地方來接受拷貝我們的程序緩沖區數據才拷貝走,並將給IOCP一個完成消息。
情況三:調用WSASend發送錯誤,但是錯誤碼不是WSA_IO_PENDING,此時應該是發送錯誤,應該釋放該SOCKET對應的所有資源。


2:在IOCP中投遞WSARecv的時候,情況相似。
情況一:調用WSARecv正確,TCP/IP將數據從TCP/IP層緩沖區拷貝到緩沖區,然后由我們的程序自行處理了。清除TCP/IP層緩沖區數據。
情況二:調用WSARecv錯誤,但是返回值是WSA_IO_PENDING,此時是因為TCP/IP層緩沖區中沒有數據可取,系統將會鎖定我們投遞的WSARecv的buffer,直到TCP/IP層緩沖區中有新的數據到來。
情況三:調用WSARecv錯誤,錯誤值不是WSA_IO_PENDING,此時是接收出錯,應該釋放該SOCKET對應的所有資源。


在以上情況中有幾個非常要注意的事情:
系統鎖定非分頁內存的時候,最小的鎖定大小是4K(當然,這個取決於您系統的設置,也可以設置小一些,在注冊表里面可以改,當然我想這些數值微軟應該比我們更知道什么合適了),所以當我們投遞了很多WSARecv或者WSASend的時候,不管我們投遞的Buffer有多大(0除外),系統在出現IO_PENGDING的時候,都會鎖定我們4K的內存。這也就是經常有開發者出現WSANOBUF的情況原因了。


我們在解決這個問題的時候,要針對WSASend和WSARecv做處理
1:投遞WSARecv的時候,可以采用一個巧妙的設計,先投遞0大小Buf的WSARecv,如果返回,表示有數據可以接收,我們開啟真正的recv將數據從TCP/IP層緩沖區取出來,直到WSA_IO_PENGDING.
2:對投遞的WSARecv以及WSASend進行計數統計,如果超過了我們預定義的值,就不進行WSASend或者WSARecv投遞了。
3:現在我們應該就可以明白為什么WSASend會返回小於我們投遞的buffer空間數據值了,是因為TCP/IP層緩沖區小於我們要發送的緩沖區,TCP/IP只會拷貝他剩余可被Copy的緩沖區大小的數據走,然后給我們的WSASend的已發送緩沖區設置為移走的大小,下一次投遞的時候,如果TCP/IP層還未被發送,將返回WSA_IO_PENGDING。
4:在很多地方有提到,可以關閉TCP/IP層緩沖區,可以提高一些效率和性能,這個從上面的分析來看,有這個可能,要實際的網絡情況去實際分析了。







==================


關於數據包在應用層亂序問題就不多說了(IOCP荒廢了TCP在傳輸層辛辛苦苦保證的有序)。


這無關緊要,因為iocp要管理上千個SOCKET,每個SOCKET的讀請求、寫請求分別保證串行即可。





=============


關於GetQueuedCompletionStatus的返回值判斷:


我給超時值傳的是0,直接測試,無須等待。


這里我們關心這幾個值:


第二個參數所傳回的byte值


第三個參數所傳回的complete key值 ——PER HANDLE DATA


第四個參數所傳回的OVERLAPPED結構指針 ——PER IO DATA


系統設置的ERROR值。





在超時情況下,byte值返回0,per handle data值是-1,per io data為NULL





1.如果返回FALSE


    one : iocp句柄在外部被關閉。


   WSAGetLastError返回6(無效句柄),byte值返回0,per handle data值是-1,per io data為NULL





    two: 我們主動close一個socket句柄,或者CancelIO(socket)(且此時有未決的操作)


    WSAGetLastError返回995(由於線程退出或應用程序請求,已放棄 I/O 操作)


   byte值為0,


   per handle data與per io data正確傳回。





   three:對端強退(且此時本地有未決的操作)


   WSAGetLastError返回64(指定的網絡名不再可用)


  byte值為0,per handle data與per io data正確傳回 





2.如果返回TRUE【此時一定得到了你投遞的OVERLAP結構】


    one:  我接收到對端數據,然后准備再投遞接收請求;但此期間,對端關閉socket。


   WSARecv返回錯誤碼10054:遠程主機強迫關閉了一個現有的連接。


TODO TODO


   從網上搜到一個做法,感覺很不錯:


如果返回FALSE, 那么:如果OVERLAP為空,那一定是發生了錯誤(注意:請排除TIMEOUT錯誤);


如果OVERLAP不為空,有可能發生錯誤。不用管它,這里直接投遞請求;如果有錯,WSARecv將返回錯誤。關閉連接即可。








============


關於closesocket操作:





The closesocket function will initiate cancellation on the outstanding I/O operations, but that does not mean that an application will receive I/O completion for these I/O operations by the time the closesocket function returns. Thus, an application should not cleanup any resources (WSAOVERLAPPED structures, for example) referenced by the outstanding I/O requests until the I/O requests are indeed completed.





在IOCP模式下,如果調用closesocket時有未決的pending   IO,將導致socket被重置,所以有時會出現數據丟失。正統的解決方式是使用shutdown函數(指定SD_SEND標志),注意這時可能有未完成的發送pengding   IO,所以你應該監測是否該連接的所有是否已完成(也許你要用一個計數器來跟蹤這些pending   IO),僅在所有send   pending   IO完成后調用shutdown。





MSDN推薦的優雅關閉socket:




Call WSAAsyncSelect to register for FD_CLOSE notification.
Call shutdown with how=SD_SEND.
When FD_CLOSE received, call recv until zero returned, or SOCKET_ERROR.
Call closesocket.



FD_CLOSE being posted after all data is read from a socket. An application should check for remaining data upon receipt of FD_CLOSE to avoid any possibility of losing data.


























對每個使用AcceptEx接受的連接套結字使用setsockopt設置SO_UPDATE_ACCEPT_CONTEXT選項,這個選項原義是把listen套結字一些屬性(包括socket內部接受/發送緩存大小等等)拷貝到新建立的套結字,卻可以使后續的shutdown調用成功。



/* SO_UPDATE_ACCEPT_CONTEXT is required for shutdown() to work fine*/
       setsockopt( sockClient,
                            SOL_SOCKET,
                            SO_UPDATE_ACCEPT_CONTEXT,
                            (char*)&m_sockListen,
                            sizeof(m_sockListen) ) ;
如果是調用AcceptEX接收的連接 不設置該選項的話,隨后的shutdown調用
將返回失敗, WSAGetLastError() returns 10057 -- WSANOTCONN 




2012.10.24


用智能指針重構了網絡庫,替換了裸指針。


但是發現IOCP如下一個問題:




1. 收到14字節數據
2012-10-25[02_02_05_906[DBG]:OnRecv : Worker thread [6268], socket = 11256, bytes = 14


2.再次投遞RECV請求,發生錯誤,因為對端已經關閉
2012-10-25[02_02_05_906[DBG]:Fatal error when post recv, error 10054, socket = 11256
2012-10-25[02_02_05_906[DBG]:Socket is set invalid 11256


3.於是准備回收資源,結束RECV請求;
2012-10-25[02_02_05_906[DBG]:EndRecv : Worker thread [6268], socket = 11256


4.但此時overlap結構仍然是掛起狀態?
2012-10-25[02_02_05_906[DBG]:2EndRecv socket 11256, now recv overlappe is complete ? 0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM