boost.asio系列——io_service


IO模型

io_service對象是asio框架中的調度器,所有異步io事件都是通過它來分發處理的(io對象的構造函數中都需要傳入一個io_service對象)。

    asio::io_service io_service;
    asio::ip::tcp::socket socket(io_service);

在asio框架中,同步的io主要流程如下:

    

  1. 應用程序調用IO對象成員函數執行IO操作
  2. IO對象向io_service 提出請求.
  3. io_service 調用操作系統的功能執行連接操作.
  4. 操作系統向io_service 返回執行結果.
  5. io_service將錯誤的操作結果翻譯為boost::system::error_code類型,再傳遞給IO對象.
  6. 如果操作失敗,IO對象拋出boost::system::system_error類型的異常.

而異步IO的處理流程則有些不同:

    

  1. 應用程序調用IO對象成員函數執行IO操作
  2. IO對象請求io_service的服務
  3. io_service 通知操作系統其需要開始一個異步連接.
  4. 操作系統指示連接操作完成, io_service從隊列中獲取操作結果
  5. 應用程序必須調用io_service::run()以便於接收結果
  6. 調用io_service::run()后,io_service返回一個操作結果,並將其翻譯為error_code,傳遞到事件回調函數中

io_service對象

io_servuce的作用: io_servie 實現了一個任務隊列,這里的任務就是void(void)的函數。Io_servie最常用的兩個接口是post和run,post向任務隊列中投遞任務,run是執行隊列中的任務,直到全部執行完畢,並且run可以被N個線程調用。Io_service是完全線程安全的隊列。

io_service對象提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop:

  1. post用於發布io事件,如timer,socket讀寫等,一般由asio框架相應對象調用,無需我們顯式調用。
  2. run用於監聽io事件響應,並執行響應回調,對於異步io操作需要在代碼中顯式調用,對於同步io操作則由io對象隱式調用(並不是run函數,不過也是等待io事件)。

可見,io_service提供的是一個生產者消費者模型。在異步io操作中需要我們手動控制消費者,調用run函數,它的基本工作模式如下:

  1. 等待io事件響應,如果所有io事件響應完成則退出
  2. 等待到io事件響應后,執行其對應的回調
  3. 繼續等待下一個io事件,重復1-2

Io_servie 實現代碼的基本類結構:

l Io_servie是接口類,為實現跨平台,采用了策略模式,所有接口均有impl_type實現。根據平台不同impl_type分為

n win_iocp_io_service Win版本的實現,這里主要分析Linux版本。

n task_io_service 非win平台下的實現,其代碼結構為:

u detail/task_io_service_fwd.hpp 簡單聲明task_io_service名稱

u detail/task_io_service.hpp 聲明task_io_service的方法和屬性

u detail/impl/task_io_service.ipp 具體實現文件

u 隊列中的任務類型為opertioan,原型其實是typedef task_io_service_operation operation,其實現文件在detail/task_io_service_operation.hpp中,當隊列中的任務被執行時,就是task_io_service_operation:: complete被調用的時候。

Io_servie::Post方法的實現

Post向隊列中投遞任務,然后激活空閑線程執行任務。其實現流程如下:

l Post接收handler作為參數,實際上是個仿函數,通過此仿函數構造出completion_handler對象,completion_handler繼承自operation。然后調用post_immediate_completion。

l post_immediate_completion首先將outstanding_work_增加,然后調用post_deferred_completion。

l post_deferred_completion首先加鎖將任務入列,然后調用wake_one_thread_and_unlock

l wake_one_thread_and_unlock嘗試喚醒當前空閑的線程,其實現中特別之處在於,若沒有空閑線程,但是有線程在執行task->run,即阻塞在epoll_wait上,那么先中斷epoll_wait執行任務隊列完成后再執行epoll_wait。

l first_idle_thread_維護了所有當前空閑線程,實際上使用了Leader/Follower模式,每次喚醒時只喚醒空閑線程的第一個。

Io_servie::run方法的實現

Run方法執行隊列中的所有任務,直到任務執行完畢。

l run方法首先構造一個idle_thread_info,和first_idle_thread_類型相同,即通過first_idle_thread_將所有線程串聯起來,它這個串聯不是立即串聯的,當該線程無任務可做是加入到first_idle_thread_的首部,有任務執行時,從first_idle_thread_中斷開。這很正常,因為first_idle_thread_維護的是當前空閑線程。

l 加鎖,循環執行do_one方法,直到do_one返回false

l do_one每次執行一個任務。首先檢查隊列是否為空,若空將此線程追加到first_idle_thread_的首部,然后阻塞在條件變量上,直到被喚醒。

l 當被喚醒或是首次執行,若stopped_為true(即此時stop方法被調用了),返回0

l 隊列非空,pop出一個任務,檢查隊列無任務那么簡單的解鎖,若仍有,調用wake_one_thread_and_unlock嘗試喚醒其他空閑線程執行。然后執行該任務,返回1.

l 實際上在執行隊列任務時有一個特別的判斷if (o == &task_operation_),那么將會執行task_->run,task_變量類型為reactor,在linux平台實現為epoll_reactor,實現代碼文件為detail/impl/epoll_reactor.ipp,run方法實際上執行的是epoll_wait,run阻塞在epoll_wait上等待事件到來,並且處理完事件后將需要回調的函數push到io_servie的任務隊列中,雖然epoll_wait是阻塞的,但是它提供了interrupt函數,該interrupt是如何實現的呢,它向epoll_wait添加一個文件描述符,該文件描述符中有8個字節可讀,這個文件描述符是專用於中斷epoll_wait的,他被封裝到select_interrupter中,select_interrupter實際上實現是eventfd_select_interrupter,在構造的時候通過pipe系統調用創建兩個文件描述符,然后預先通過write_fd寫8個字節,這8個字節一直保留。在添加到epoll_wait中采用EPOLLET水平觸發,這樣,只要select_interrupter的讀文件描述符添加到epoll_wait中,立即中斷epoll_wait。很是巧妙。!!!實際上就是因為有了這個reactor,它才叫io_servie,否則就是一個純的任務隊列了。

l Run方法的原則是:

n 有任務立即執行任務,盡量使所有的線程一起執行任務

n 若沒有任務,阻塞在epoll_wait上等待io事件

n 若有新任務到來,並且沒有空閑線程,那么先中斷epoll_wait,先執行任務

n 若隊列中有任務,並且也需要epoll_wait監聽事件,那么非阻塞調用epoll_wait(timeout字段設置為0),待任務執行完畢在阻塞在epoll_wait上。

n 幾乎對線程的使用上達到了極致。

n 從這個函數中可以知道,在使用ASIO時,io_servie應該盡量多,這樣可以使其epoll_wait占用的時間片最多,這樣可以最大限度的響應IO事件,降低響應時延。但是每個io_servie::run占用一個線程,所以io_servie最佳應該和CPU的核數相同。

Io_servie::stop的實現

l 加鎖,調用stop_all_threads

l 設置stopped_變量為true,遍歷所有的空閑線程,依次喚醒

l task_interrupted_設置為true,調用task_的interrupt方法

l task_的類型為reactor,在run方法中已經做了分析

 

從中可以看出,io_service是一個工作隊列的模型。在使用過程中一般有如下幾個需要注意的地方:

1. run函數在io事件完成后會退出,導致后續基於該對象的異步io任務無法執行

由於io_service並不會主動常見調度線程,需要我們手動分配,常見的方式是給其分配一個線程,然后執行run函數。但run函數在io事件完成后會退出,線程會終止,后續基於該對象的異步io任務無法得到調度。

解決這個問題的方法是通過一個asio::io_service::work對象來守護io_service。這樣,即使所有io任務都執行完成,也不會退出,繼續等待新的io任務。

    boost::asio::io_service io;
    boost::asio::io_service::work work(io);
    io.run();

2. 回調在run函數的線程中同步執行,當回調處理時間較長時阻塞后續io響應

解決這個問題的方法有兩種:1. 啟動多線程執行run函數(run函數是線程安全的),2. 新啟動一個線程(或通過線程池)來執行回調函數。一般來講,如果回調處理事件不是特別短,應該使用在線程池中處理回調的方式。

3. 回調在run函數的線程中同步執行,io事件較多的時候得不到及時響應

這個其實是性能問題了,在多核cpu上可以通過在多個線程中執行run函數來解決這一問題。這種方式也只能充分利用cpu性能,本身性能問題就不是光靠軟件就能解決的。

.net中的異步io調度方式

和io_service這種手動控制的方式比起來,.net則是純粹的自動檔了。IO調度由CLR托管了,無需手動控制。回調也是在線程池中執行,無需擔心影響后續IO響應。

正是由於CLR的托管,在.net 的異步IO框架中,就沒有類似io_service的調度對象存在,這也符合.net的一貫簡潔做法。

 

◆boost::asio::io_service使用時的注意事項:

①請讓boost::asio::io_service和boost::asio::io_service::work搭配使用。

②想讓event按照進入(strand)時的順序被執行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用

③一般情況下,io_service的成員函數的使用順序:

boost::asio::io_service構造,
boost::asio::io_service::run(),
boost::asio::io_service::stop(),
boost::asio::io_service::reset(),
boost::asio::io_service::run(),
......
boost::asio::io_service析構,

④不論有沒有使用io_service::work,run()都會執行完io_service里面的event,(若沒有用work,run就會退出)。
⑤一個新創建的io_service不需要執行reset()函數。
⑥在調用stop()后,在調用run()之前,請先調用reset()函數。
⑦函數stop()和reset()並不能清除掉io_service里面尚未執行的event。
我個人認為,也只有析構掉io_service,才能清空它里面的那些尚未執行的event了。(可以用智能指針)。

⑧函數stop(),stopped(),reset(),很簡單,請單步調試,以明白它在函數里做了什么。

⑨boost的.hpp文件里面(一般情況下)有各個函數的使用說明,你可以隨時查看。

◆下面是boost::asio::io_service的stop()和reset()函數的注釋的翻譯:

void boost::asio::io_service::stop();
BOOST_ASIO_DECL void stop();
/// Stop the io_service object's event processing loop.
/// 停止io_service對象的事件處理循環。
/**
 * This function does not block, but instead simply signals the io_service to
 * stop. All invocations of its run() or run_one() member functions should
 * return as soon as possible. Subsequent calls to run(), run_one(), poll()
 * or poll_one() will return immediately until reset() is called.
 */
 /**
 這個函數不阻塞,而是僅僅表示io_service停止了。
 它的run()或run_one()成員函數的調用應當盡快返回。
 對run()、run_one()、poll()、poll_one()的隨后的調用將會立即返回直到reset()函數被調用了。
 */
void boost::asio::io_service::reset();
BOOST_ASIO_DECL void reset();
/// Reset the io_service in preparation for a subsequent run() invocation.
/// 重置io_service對象,為隨后的run()調用做准備。
/**
 * This function must be called prior to any second or later set of
 * invocations of the run(), run_one(), poll() or poll_one() functions when a
 * previous invocation of these functions returned due to the io_service
 * being stopped or running out of work. After a call to reset(), the
 * io_service object's stopped() function will return @c false.
 *
 * This function must not be called while there are any unfinished calls to
 * the run(), run_one(), poll() or poll_one() functions.
 */
 /**
 io_service被停止,或者執行完handler而缺乏工作時,run()、run_one()、poll()、poll_one()函數的調用會被返回。
 這些函數在被調用之前,必須先調用reset函數。
 在reset函數被調用后,io_service對象的stopped函數將會返回false。
 當run()、run_one()、poll()、poll_one()函數的任何的調用未結束時,這個函數一定不能被調用。
 */

◆對stop()和reset()函數的一點說明(是我單步調試時看到的):

在Windows下,boost::asio::io_service類里面有一個數據成員為"stopped_"(Flag to indicate whether the event loop has been stopped.)。它是一個標志,它標志着事件循環是不是被stopped了。而boost::asio::io_service::reset()函數僅僅是賦值"stopped_=0"。boost::asio::io_service::stopped()函數僅僅是判斷"0!=stopped_"的真假。你單步調試一下,就什么都知道了。

◆下面是我驗證boost::asio::io_service的一個例子:

 1 #include <boost/asio.hpp>  
 2 #include <boost/thread.hpp>  
 3 #include <boost/atomic.hpp>  
 4 #include <boost/shared_ptr.hpp>  
 5 #include <boost/date_time/posix_time/ptime.hpp>  
 6 #include <boost/date_time.hpp>//boost::posix_time::to_iso_extended_string()需要此頭文件。  
 7   
 8 //boost::atomic_bool coutFlag = false;  
 9 //error C2440: 'initializing' : cannot convert from 'bool' to 'boost::atomics::atomic<bool>'  
10 //故意寫錯,可以根據錯誤信息知道某類型的詳細信息。  
11 boost::atomic_bool g_coutFlag(false);  
12 boost::atomic_int g_numIn(0);  
13 boost::atomic_int g_numOut(0);  
14   
15 boost::thread_group g_thgp;  
16 boost::asio::io_service g_io;  
17 boost::shared_ptr<boost::asio::io_service::work> g_pWork = \  
18 boost::shared_ptr<boost::asio::io_service::work>(new boost::asio::io_service::work(g_io));  
19 boost::asio::io_service::strand g_strand(g_io);  
20 std::vector<boost::posix_time::ptime> g_vecTimes;  
21   
22 void my_run_4_io_service(boost::asio::io_service& _io, int _idx)  
23 {  
24     _io.run();  
25     //想得到boost::asio::io_service::run()退出時的時刻,只能對io_service進行封裝了。  
26     g_vecTimes[_idx] = boost::posix_time::microsec_clock::local_time();  
27 }  
28   
29 void outFun(int idx)  
30 {// io_service執行的handler。  
31     ++g_numOut;  
32     if (g_coutFlag.load())  
33         std::cout << "outFun: index=" << idx << std::endl;  
34     boost::this_thread::sleep_for(boost::chrono::milliseconds(500));  
35 }  
36   
37 void inFun()  
38 {  
39     for (int i = 1; i <= 10; ++i)  
40     {  
41         g_strand.post(boost::bind(outFun, i));  
42         ++g_numIn;  
43         boost::this_thread::sleep_for(boost::chrono::milliseconds(100));  
44     }  
45     g_coutFlag = true;  
46     g_io.stop();//調用它后,不論io_service有沒有使用io_service::work類,各個線程的run()都會立即返回。  
47     g_vecTimes[0] = boost::posix_time::microsec_clock::local_time();  
48     int numDelta = g_numIn - g_numOut;  
49     std::cout << "inFun: numDelta=" << numDelta << std::endl;//還剩多少event沒有被執行。  
50 }  
51   
52 int main()  
53 {  
54     int vecNum = 5;  
55     g_vecTimes.reserve(vecNum); g_vecTimes.resize(vecNum);  
56     //一個容納 void fun(int i) 函數的 function對象。  
57     boost::function<void(int)> my_lambda_function_object = [vecNum](int secs)  
58     {  
59         boost::this_thread::sleep_for(boost::chrono::microseconds(1000 * 1000 * secs));  
60         std::cout << "now, time is " << boost::posix_time::  
61             to_iso_extended_string(boost::posix_time::microsec_clock::local_time()) << std::endl;  
62         for (int i = 0; i < vecNum; ++i)  
63             std::cout << i << " : " << boost::posix_time::to_iso_extended_string(g_vecTimes[i]) << std::endl;  
64     };  
65   
66     for (int i = 1; i < vecNum; ++i)  
67         g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i));  
68     g_thgp.create_thread(inFun);  
69     //等待5秒,確保執行完畢我設計的那些操作。  
70     my_lambda_function_object(5);  
71     //析構掉io_service對應的io_service::work對象,此時io_service里面還有event。  
72     g_pWork = nullptr;  
73     boost::this_thread::sleep_for(boost::chrono::milliseconds(1000 * 1));  
74     g_io.reset();  
75     boost::this_thread::sleep_for(boost::chrono::seconds(1));  
76     //因為work被析構掉了,所以啟動的那些線程在執行完event后,都自行退出了。  
77     for (int i = 1; i < vecNum; ++i)  
78         g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i));  
79     //等待6秒,確保io_service中剩余的event被執行完畢。  
80     my_lambda_function_object(6);  
81     std::cout << "done." << std::endl;  
82     int cmd_val = getchar();  
83     return 0;  
84 }  

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM