IO模型
io_service對象是asio框架中的調度器,所有異步io事件都是通過它來分發處理的(io對象的構造函數中都需要傳入一個io_service對象)。
asio::io_service io_service;
asio::ip::tcp::socket socket(io_service);
在asio框架中,同步的io主要流程如下:
-
應用程序調用IO對象成員函數執行IO操作
-
IO對象向io_service 提出請求.
-
io_service 調用操作系統的功能執行連接操作.
-
操作系統向io_service 返回執行結果.
-
io_service將錯誤的操作結果翻譯為boost::system::error_code類型,再傳遞給IO對象.
-
如果操作失敗,IO對象拋出boost::system::system_error類型的異常.
而異步IO的處理流程則有些不同:
-
應用程序調用IO對象成員函數執行IO操作
-
IO對象請求io_service的服務
-
io_service 通知操作系統其需要開始一個異步連接.
-
操作系統指示連接操作完成, io_service從隊列中獲取操作結果
-
應用程序必須調用io_service::run()以便於接收結果
-
調用io_service::run()后,io_service返回一個操作結果,並將其翻譯為error_code,傳遞到事件回調函數中
io_service對象
io_servuce的作用: io_servie 實現了一個任務隊列,這里的任務就是void(void)的函數。Io_servie最常用的兩個接口是post和run,post向任務隊列中投遞任務,run是執行隊列中的任務,直到全部執行完畢,並且run可以被N個線程調用。Io_service是完全線程安全的隊列。
io_service對象提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop:
-
post用於發布io事件,如timer,socket讀寫等,一般由asio框架相應對象調用,無需我們顯式調用。
-
run用於監聽io事件響應,並執行響應回調,對於異步io操作需要在代碼中顯式調用,對於同步io操作則由io對象隱式調用(並不是run函數,不過也是等待io事件)。
可見,io_service提供的是一個生產者消費者模型。在異步io操作中需要我們手動控制消費者,調用run函數,它的基本工作模式如下:
-
等待io事件響應,如果所有io事件響應完成則退出
-
等待到io事件響應后,執行其對應的回調
-
繼續等待下一個io事件,重復1-2
Io_servie 實現代碼的基本類結構:
l Io_servie是接口類,為實現跨平台,采用了策略模式,所有接口均有impl_type實現。根據平台不同impl_type分為
n win_iocp_io_service Win版本的實現,這里主要分析Linux版本。
n task_io_service 非win平台下的實現,其代碼結構為:
u detail/task_io_service_fwd.hpp 簡單聲明task_io_service名稱
u detail/task_io_service.hpp 聲明task_io_service的方法和屬性
u detail/impl/task_io_service.ipp 具體實現文件
u 隊列中的任務類型為opertioan,原型其實是typedef task_io_service_operation operation,其實現文件在detail/task_io_service_operation.hpp中,當隊列中的任務被執行時,就是task_io_service_operation:: complete被調用的時候。
Io_servie::Post方法的實現
Post向隊列中投遞任務,然后激活空閑線程執行任務。其實現流程如下:
l Post接收handler作為參數,實際上是個仿函數,通過此仿函數構造出completion_handler對象,completion_handler繼承自operation。然后調用post_immediate_completion。
l post_immediate_completion首先將outstanding_work_增加,然后調用post_deferred_completion。
l post_deferred_completion首先加鎖將任務入列,然后調用wake_one_thread_and_unlock
l wake_one_thread_and_unlock嘗試喚醒當前空閑的線程,其實現中特別之處在於,若沒有空閑線程,但是有線程在執行task->run,即阻塞在epoll_wait上,那么先中斷epoll_wait執行任務隊列完成后再執行epoll_wait。
l first_idle_thread_維護了所有當前空閑線程,實際上使用了Leader/Follower模式,每次喚醒時只喚醒空閑線程的第一個。
Io_servie::run方法的實現
Run方法執行隊列中的所有任務,直到任務執行完畢。
l run方法首先構造一個idle_thread_info,和first_idle_thread_類型相同,即通過first_idle_thread_將所有線程串聯起來,它這個串聯不是立即串聯的,當該線程無任務可做是加入到first_idle_thread_的首部,有任務執行時,從first_idle_thread_中斷開。這很正常,因為first_idle_thread_維護的是當前空閑線程。
l 加鎖,循環執行do_one方法,直到do_one返回false
l do_one每次執行一個任務。首先檢查隊列是否為空,若空將此線程追加到first_idle_thread_的首部,然后阻塞在條件變量上,直到被喚醒。
l 當被喚醒或是首次執行,若stopped_為true(即此時stop方法被調用了),返回0
l 隊列非空,pop出一個任務,檢查隊列無任務那么簡單的解鎖,若仍有,調用wake_one_thread_and_unlock嘗試喚醒其他空閑線程執行。然后執行該任務,返回1.
l 實際上在執行隊列任務時有一個特別的判斷if (o == &task_operation_),那么將會執行task_->run,task_變量類型為reactor,在linux平台實現為epoll_reactor,實現代碼文件為detail/impl/epoll_reactor.ipp,run方法實際上執行的是epoll_wait,run阻塞在epoll_wait上等待事件到來,並且處理完事件后將需要回調的函數push到io_servie的任務隊列中,雖然epoll_wait是阻塞的,但是它提供了interrupt函數,該interrupt是如何實現的呢,它向epoll_wait添加一個文件描述符,該文件描述符中有8個字節可讀,這個文件描述符是專用於中斷epoll_wait的,他被封裝到select_interrupter中,select_interrupter實際上實現是eventfd_select_interrupter,在構造的時候通過pipe系統調用創建兩個文件描述符,然后預先通過write_fd寫8個字節,這8個字節一直保留。在添加到epoll_wait中采用EPOLLET水平觸發,這樣,只要select_interrupter的讀文件描述符添加到epoll_wait中,立即中斷epoll_wait。很是巧妙。!!!實際上就是因為有了這個reactor,它才叫io_servie,否則就是一個純的任務隊列了。
l Run方法的原則是:
n 有任務立即執行任務,盡量使所有的線程一起執行任務
n 若沒有任務,阻塞在epoll_wait上等待io事件
n 若有新任務到來,並且沒有空閑線程,那么先中斷epoll_wait,先執行任務
n 若隊列中有任務,並且也需要epoll_wait監聽事件,那么非阻塞調用epoll_wait(timeout字段設置為0),待任務執行完畢在阻塞在epoll_wait上。
n 幾乎對線程的使用上達到了極致。
n 從這個函數中可以知道,在使用ASIO時,io_servie應該盡量多,這樣可以使其epoll_wait占用的時間片最多,這樣可以最大限度的響應IO事件,降低響應時延。但是每個io_servie::run占用一個線程,所以io_servie最佳應該和CPU的核數相同。
Io_servie::stop的實現
l 加鎖,調用stop_all_threads
l 設置stopped_變量為true,遍歷所有的空閑線程,依次喚醒
l task_interrupted_設置為true,調用task_的interrupt方法
l task_的類型為reactor,在run方法中已經做了分析
從中可以看出,io_service是一個工作隊列的模型。在使用過程中一般有如下幾個需要注意的地方:
1. run函數在io事件完成后會退出,導致后續基於該對象的異步io任務無法執行
由於io_service並不會主動常見調度線程,需要我們手動分配,常見的方式是給其分配一個線程,然后執行run函數。但run函數在io事件完成后會退出,線程會終止,后續基於該對象的異步io任務無法得到調度。
解決這個問題的方法是通過一個asio::io_service::work對象來守護io_service。這樣,即使所有io任務都執行完成,也不會退出,繼續等待新的io任務。
boost::asio::io_service io;
boost::asio::io_service::work work(io);
io.run();
2. 回調在run函數的線程中同步執行,當回調處理時間較長時阻塞后續io響應
解決這個問題的方法有兩種:1. 啟動多線程執行run函數(run函數是線程安全的),2. 新啟動一個線程(或通過線程池)來執行回調函數。一般來講,如果回調處理事件不是特別短,應該使用在線程池中處理回調的方式。
3. 回調在run函數的線程中同步執行,io事件較多的時候得不到及時響應
這個其實是性能問題了,在多核cpu上可以通過在多個線程中執行run函數來解決這一問題。這種方式也只能充分利用cpu性能,本身性能問題就不是光靠軟件就能解決的。
.net中的異步io調度方式
和io_service這種手動控制的方式比起來,.net則是純粹的自動檔了。IO調度由CLR托管了,無需手動控制。回調也是在線程池中執行,無需擔心影響后續IO響應。
正是由於CLR的托管,在.net 的異步IO框架中,就沒有類似io_service的調度對象存在,這也符合.net的一貫簡潔做法。
◆boost::asio::io_service使用時的注意事項:
①請讓boost::asio::io_service和boost::asio::io_service::work搭配使用。
②想讓event按照進入(strand)時的順序被執行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用。
③一般情況下,io_service的成員函數的使用順序:
boost::asio::io_service構造,
boost::asio::io_service::run(),
boost::asio::io_service::stop(),
boost::asio::io_service::reset(),
boost::asio::io_service::run(),
......
boost::asio::io_service析構,
④不論有沒有使用io_service::work,run()都會執行完io_service里面的event,(若沒有用work,run就會退出)。
⑤一個新創建的io_service不需要執行reset()函數。
⑥在調用stop()后,在調用run()之前,請先調用reset()函數。
⑦函數stop()和reset()並不能清除掉io_service里面尚未執行的event。
我個人認為,也只有析構掉io_service,才能清空它里面的那些尚未執行的event了。(可以用智能指針)。
⑧函數stop(),stopped(),reset(),很簡單,請單步調試,以明白它在函數里做了什么。
⑨boost的.hpp文件里面(一般情況下)有各個函數的使用說明,你可以隨時查看。
◆下面是boost::asio::io_service的stop()和reset()函數的注釋的翻譯:
void boost::asio::io_service::stop(); BOOST_ASIO_DECL void stop(); /// Stop the io_service object's event processing loop. /// 停止io_service對象的事件處理循環。 /** * This function does not block, but instead simply signals the io_service to * stop. All invocations of its run() or run_one() member functions should * return as soon as possible. Subsequent calls to run(), run_one(), poll() * or poll_one() will return immediately until reset() is called. */ /** 這個函數不阻塞,而是僅僅表示io_service停止了。 它的run()或run_one()成員函數的調用應當盡快返回。 對run()、run_one()、poll()、poll_one()的隨后的調用將會立即返回直到reset()函數被調用了。 */ void boost::asio::io_service::reset(); BOOST_ASIO_DECL void reset(); /// Reset the io_service in preparation for a subsequent run() invocation. /// 重置io_service對象,為隨后的run()調用做准備。 /** * This function must be called prior to any second or later set of * invocations of the run(), run_one(), poll() or poll_one() functions when a * previous invocation of these functions returned due to the io_service * being stopped or running out of work. After a call to reset(), the * io_service object's stopped() function will return @c false. * * This function must not be called while there are any unfinished calls to * the run(), run_one(), poll() or poll_one() functions. */ /** io_service被停止,或者執行完handler而缺乏工作時,run()、run_one()、poll()、poll_one()函數的調用會被返回。 這些函數在被調用之前,必須先調用reset函數。 在reset函數被調用后,io_service對象的stopped函數將會返回false。 當run()、run_one()、poll()、poll_one()函數的任何的調用未結束時,這個函數一定不能被調用。 */
◆對stop()和reset()函數的一點說明(是我單步調試時看到的):
在Windows下,boost::asio::io_service類里面有一個數據成員為"stopped_"(Flag to indicate whether the event loop has been stopped.)。它是一個標志,它標志着事件循環是不是被stopped了。而boost::asio::io_service::reset()函數僅僅是賦值"stopped_=0"。boost::asio::io_service::stopped()函數僅僅是判斷"0!=stopped_"的真假。你單步調試一下,就什么都知道了。
◆下面是我驗證boost::asio::io_service的一個例子:
1 #include <boost/asio.hpp> 2 #include <boost/thread.hpp> 3 #include <boost/atomic.hpp> 4 #include <boost/shared_ptr.hpp> 5 #include <boost/date_time/posix_time/ptime.hpp> 6 #include <boost/date_time.hpp>//boost::posix_time::to_iso_extended_string()需要此頭文件。 7 8 //boost::atomic_bool coutFlag = false; 9 //error C2440: 'initializing' : cannot convert from 'bool' to 'boost::atomics::atomic<bool>' 10 //故意寫錯,可以根據錯誤信息知道某類型的詳細信息。 11 boost::atomic_bool g_coutFlag(false); 12 boost::atomic_int g_numIn(0); 13 boost::atomic_int g_numOut(0); 14 15 boost::thread_group g_thgp; 16 boost::asio::io_service g_io; 17 boost::shared_ptr<boost::asio::io_service::work> g_pWork = \ 18 boost::shared_ptr<boost::asio::io_service::work>(new boost::asio::io_service::work(g_io)); 19 boost::asio::io_service::strand g_strand(g_io); 20 std::vector<boost::posix_time::ptime> g_vecTimes; 21 22 void my_run_4_io_service(boost::asio::io_service& _io, int _idx) 23 { 24 _io.run(); 25 //想得到boost::asio::io_service::run()退出時的時刻,只能對io_service進行封裝了。 26 g_vecTimes[_idx] = boost::posix_time::microsec_clock::local_time(); 27 } 28 29 void outFun(int idx) 30 {// io_service執行的handler。 31 ++g_numOut; 32 if (g_coutFlag.load()) 33 std::cout << "outFun: index=" << idx << std::endl; 34 boost::this_thread::sleep_for(boost::chrono::milliseconds(500)); 35 } 36 37 void inFun() 38 { 39 for (int i = 1; i <= 10; ++i) 40 { 41 g_strand.post(boost::bind(outFun, i)); 42 ++g_numIn; 43 boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); 44 } 45 g_coutFlag = true; 46 g_io.stop();//調用它后,不論io_service有沒有使用io_service::work類,各個線程的run()都會立即返回。 47 g_vecTimes[0] = boost::posix_time::microsec_clock::local_time(); 48 int numDelta = g_numIn - g_numOut; 49 std::cout << "inFun: numDelta=" << numDelta << std::endl;//還剩多少event沒有被執行。 50 } 51 52 int main() 53 { 54 int vecNum = 5; 55 g_vecTimes.reserve(vecNum); g_vecTimes.resize(vecNum); 56 //一個容納 void fun(int i) 函數的 function對象。 57 boost::function<void(int)> my_lambda_function_object = [vecNum](int secs) 58 { 59 boost::this_thread::sleep_for(boost::chrono::microseconds(1000 * 1000 * secs)); 60 std::cout << "now, time is " << boost::posix_time:: 61 to_iso_extended_string(boost::posix_time::microsec_clock::local_time()) << std::endl; 62 for (int i = 0; i < vecNum; ++i) 63 std::cout << i << " : " << boost::posix_time::to_iso_extended_string(g_vecTimes[i]) << std::endl; 64 }; 65 66 for (int i = 1; i < vecNum; ++i) 67 g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i)); 68 g_thgp.create_thread(inFun); 69 //等待5秒,確保執行完畢我設計的那些操作。 70 my_lambda_function_object(5); 71 //析構掉io_service對應的io_service::work對象,此時io_service里面還有event。 72 g_pWork = nullptr; 73 boost::this_thread::sleep_for(boost::chrono::milliseconds(1000 * 1)); 74 g_io.reset(); 75 boost::this_thread::sleep_for(boost::chrono::seconds(1)); 76 //因為work被析構掉了,所以啟動的那些線程在執行完event后,都自行退出了。 77 for (int i = 1; i < vecNum; ++i) 78 g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i)); 79 //等待6秒,確保io_service中剩余的event被執行完畢。 80 my_lambda_function_object(6); 81 std::cout << "done." << std::endl; 82 int cmd_val = getchar(); 83 return 0; 84 }