視頻電警開發,是基於ACE框架上的一次重復開發,本文檔擬對ACE框架做一個梳理,以期對他人進行基於ace的開發有所幫助。
*二、系統安裝
ACE的安裝是一件比較麻煩的事情,這里簡單的記錄了我在VS2005下安裝ACE的過程,希望能給大家一個參考。
安裝環境:
l 編譯環境:VS2005中文版
l ACE版本:ACE-5.5.1
安裝過程:
a) 下載安裝包。Ace的安裝文件可以在http://download.dre.vanderbilt.edu/中下載到,由於我是在windows環境下安裝並且不需要TAO等其它庫,便下載了ACE-5.5.1.zip。
b) 下載完成后將其解壓。我的解壓路徑為D:\Develop\ACE_wrappers。
c) 設置環境變量
d) 在操作系統添加一個名為ACE_ROOT的用戶環境變量,值為剛才ace的解壓路徑D:\Develop\ACE_wrappers。
e) 添加用戶的Path環境變量,值為%ACE_ROOT%\lib,這樣才能保證系統能找到ace生成的動態連接庫。
f) 設置VS2005的C++開發項目信息,依次打開菜單 工具-選項-項目和解決方案-VC++目錄 ,在右側目錄列表中選擇"包含目錄",添加$(ACE_ROOT),在右側目錄列表中選擇"庫文件",添加$(ACE_ROOT)\lib。
g) 編譯ACE,在ACE_ROOT\ace目錄創建一個名為 config.h的文件。編輯文件並加入以下內容:
#define ACE_HAS_STANDARD_CPP_LIBRARY 1
#include "ace/config-win32.h"
其中第一行是因為我想用標准C++跨平台,第二行則是必須要的,表明當前是在win32的環境下進行ace的項目。
h) 進入ACE_ROOT\ace目錄中,能發現ACE現在已經帶VS2005的編譯項目了,直接打開ace_vc8.sln,直接生成ACE項目的Debug版和Release版,編譯過程還比較快,大概就幾分鍾的樣子。編譯鏈接完成后,在ACE_ROOT\lib中一共生成了四個文件,分別是"ACE.dll","ACE.lib", "ACEd.dll","ACEd.lib",其中帶"d"表示的是Debug版本。
i) 檢驗 ACE
j) 打開VS2005,建立一個空項目,將ACE程序員手冊中的第一個程序拷入其中。
k) 配置屬性->鏈接器->常規->附加依賴項,添入ACEd.lib。
l) 編譯,如果不出意外的話就能看到你的ace版的" hello world"啦。
注意:
ACE項目的字符集設置是"未設置",而VS2005的c++項目默認字符集是"使用 Unicode 字 符集",如果用到了ACE鏈接庫時需要將字符集改為"未設置"(在"項目屬性->配置屬性->項目默認值->字符集"中配置),否則可能出現鏈接錯誤。
至此,ACE的安裝工作便算完成.下面是完成unicode編譯的ACE設置:
*三、ACE的使用及其核心模塊講解等
下面為本人在使用ACE中遇到的一些問題的匯總,只介紹了大體的思路,具體的細節還需進佐證。
1. ACE配置模塊的使用
就一個正常項目而言,一個配置文件是必不可少的,那就先從這里入手了。linux/unix 程序可能經常用到命令行方式,不過我還是比較喜歡windows 的 ini 格式的,當然,有xml 的更好,不過 ACE 里暫時沒有提供。配置文件的使用很簡單,ACE 提供的類也很友好。代碼如下:

2. ACE的互斥管理機制
2.1、ACE Lock類屬
鎖類屬包含的類包裝簡單的鎖定機制,比如互斥體、信號量、讀/寫互斥體和令牌等。這里我就以互斥體為例簡單的介紹一下其使用方法,對其它的鎖類進行一些簡單的說明。
互斥體的使用
互斥體用於保護共享的易變代碼,也就是全局或靜態數據。這樣的數據必須通過互斥體進行保護,以防止它們在多個線程同時訪問時損壞。在ACE中可以通過ACE_Thread_Mutex實現線程的訪問互斥,下面的例子演示ACE_Thread_Mutex類的使用。
#include "ace/Thread.h"
#include "ace/Synch.h"
#include <iostream>
using namespace std;
ACE_Thread_Mutex mutex;
void* Thread1(void *arg)
{
mutex.acquire();
ACE_OS::sleep(3);
cout<<endl<<"hello thread1"<<endl;
mutex.release();
return NULL;
}
void* Thread2(void *arg)
{
mutex.acquire();
cout<<endl<<"hello thread2"<<endl;
mutex.release();
return NULL;
}
int main(int argc, char *argv[])
{
ACE_Thread::spawn((ACE_THR_FUNC)Thread1);
//Thread2 比Thread1晚創建1秒鍾,故后嘗試獲取互斥體
ACE_OS::sleep(1);
ACE_Thread::spawn((ACE_THR_FUNC)Thread2);
while(true)
ACE_OS::sleep(10); return 0;
}
ACE_Thread_Mutex主要有兩個方法:
acquire():用來獲取互斥體,如果無法獲取,將阻塞至獲取到為止。
release():用來釋放互斥體,從而使自己或者其它線程能夠獲取互斥體。
當線程要訪問共享資源時,首先調用acquire()方法獲取互斥體,從而獲取對改互斥體所保護的共享資源的唯一訪問權限,訪問結束時調用釋放互斥體,使得其它線程能獲取共享資源的訪問權限。
在此例中,本來Thread2的打印消息在Thread1之前,但由於Thread1先獲得互斥體,故Thread2只有待Thread1結束后才能進入臨界區。讀者朋友們可以通過將ACE_Thread_Mutex替換為ACE_NULL_Mutex看一下不加鎖的執行結果。
ACE Lock類屬簡介,列表如下:
名字
描述
ACE_Mutex
封裝互斥機制(根據平台,可以是mutex_t、pthread_mutex_t等等)的包裝類,用於提供簡單而有效的機制來使對共享資源的訪問序列化。它與二元信號量(binary semaphore)的功能相類似。可被用於線程和進程間的互斥。
ACE_Thread_Mutex
ACE_Process_Mutex
可用於替換ACE_Mutex,專用於進程同步。
ACE_NULL_Mutex
提供了ACE_Mutex接口的"無為"(do-nothing)實現,可在不需要同步時用作替換。
ACE_RW_Mutex
封裝讀者/作者鎖的包裝類。它們是分別為讀和寫進行獲取的鎖,在沒有作者在寫的時候,多個讀者可以同時進行讀取。
ACE_RW_Thread_Mutex
可用於替換ACE_RW_Mutex,專用於線程同步。
ACE_RW_Process_Mutex
可用於替換ACE_RW_Mutex,專用於進程同步。
ACE_Semaphore
這些類實現計數信號量,在有固定數量的線程可以同時訪問一個資源時很有用。在OS不提供這種同步機制的情況下,可通過互斥體來進行模擬。
ACE_Thread_Semaphore
應被用於替換ACE_Semaphore,專用於線程同步。
ACE_Process_Semaphore
應被用於替換ACE_Semaphore,專用於進程同步。
ACE_Token
提供"遞歸互斥體"(recursive mutex),也就是,當前持有某令牌的線程可以多次重新獲取它,而不會阻塞。而且,當令牌被釋放時,它確保下一個正阻塞並等待此令牌的線程就是下一個被放行的線程。
ACE_Null_Token
令牌接口的"無為"(do-nothing)實現,在你知道不會出現多個線程時使用。
ACE_Lock
定義鎖定接口的接口類。一個純虛類,如果使用的話,必須承受虛函數調用開銷。
ACE_Lock_Adapter
基於模板的適配器,允許將前面提到的任意一種鎖定機制適配到ACE_Lock接口。
可以簡單的分為以下幾類:
· 互斥鎖
互斥鎖(通常稱為"互斥體"或"二元信號量")用於保護多線程控制並發訪問的共享資源的完整性。互斥體通過定義臨界區來序列化多線程控制的執行,在臨界區中每一時刻只有一個線程在執行它的代碼。互斥體簡單而高效(時間和空間)。
ACE線程庫提供了Mutex式的類(是一組互斥體對象,擁有類似的接口),他是一種簡單而高效的類型是"非遞歸"互斥體。非遞歸互斥體不允許當前擁有互斥體的線程在釋放它之前重新獲取它。否則,將會立即發生死鎖。遞歸互斥體在ACE Recursive_Thread_Mutex類中可移植地實現。
· 讀者/作者鎖
讀者/作者鎖與互斥體相類似。例如,獲取讀者/作者鎖的線程也必須釋放它。多個線程可同時獲取一個讀者/作者鎖用於讀,但只有一個線程可以獲取該鎖用於寫。當互斥體保護的資源用於讀遠比用於寫要頻繁時,讀者/作者互斥體有助於改善並發的執行。
ACE線程庫提供了一個叫作RW_Mutex的類,在C++封裝類中可移植地實現了讀者/作者鎖的語義。讀者/作者鎖將優先選擇權給作者。因而,如果有多個讀者和一個作者在鎖上等待,作者將會首先獲取它。
計數信號量
在概念上,計數信號量是可以原子地增減的整數。如果線程試圖減少一個值為零的信號量的值,它就會阻塞,直到另一個線程增加該信號量的值。
計數信號量用於追蹤共享程序狀態的變化。它們記錄某種特定事件的發生。因為信號量維護狀態,它們允許線程根據該狀態來作決定,即使事件是發生在過去。
信號量比互斥體效率要低,但是,它們要更為通用,因為它們無需被最初獲取它們的同一線程獲取和釋放。這使得它們能夠用於異步的執行上下文中(比如信號處理器)。ACE線程庫提供一個叫作Semaphore的類來可移植地在C++包裝類中實現信號量語義。
2.2、ACE Guard類屬
與C一級的互斥體API相比較,Mutex包裝為同步多線程控制提供了一種優雅的接口。但是,Mutex潛在地容易出錯,因為程序員有可能忘記調用release方法(當然,C級的互斥體API更容易出錯)。這可能由於程序員的疏忽或是C++異常的發生而發生,然而,其導致及其嚴重的后果--死鎖。
因此,為改善應用的健壯性,ACE同步機制有效地利用C++類構造器和析構器的語義來確保Mutex鎖被自動獲取和釋放。
ACE提供了一個稱為Guard、Write_Guard和Read_Guard的類族,確保在進入和退出C++代碼塊時分別自動獲取和釋放鎖。
Guard類是最基本的守衛機制,定義可以簡化如下(實際定義比這相對要復雜而完善一點):
template <class LOCK>
class Guard
{
public:
Guard (LOCK &l): lock_ (&l){ lock_.acquire (); }
˜Guard (void) { lock_.release (); }
private:
LOCK lock_;
}
Guard類的對象定義一"塊"代碼,在其上鎖被自動獲取,並在退出塊時自動釋放,即使是程序拋異常也能保證自動解鎖。這種機制也能為Mutex、RW_Mutex和Semaphore同步封裝工作。
對於讀寫鎖,由於加鎖接口不一樣,ace也提供了相應的Read_Guard和Write_Guard類,Read_Guard和Write_Guard類有着與Guard類相同的接口。但是,它們的acquire方法分別對鎖進行讀和寫。
缺省地, Guard類構造器將會阻塞程序,直到鎖被獲取。會有這樣的情況,程序必須使用非阻塞的acquire調用(例如,防止死鎖)。因此,可以傳給ACE Guard的構造器第二個參數(請參看原始代碼,而不是我這里的簡化代碼),指示它使用鎖的try_acquire方法,而不是acquire。隨后調用者可以使用Guard的locked方法來原子地測試實際上鎖是否已被獲取。
用Guard重寫上一節的Thread1方法如下(注釋了的部分是原有代碼):
void* Thread1(void *arg)
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex);
//mutex.acquire();
ACE_OS::sleep(3);
cout<<endl<<"hello thread1"<<endl;
//mutex.release();
return NULL;
}
相比較而言,使用Guard更加簡潔,並且會自動解鎖,免除了一部分后顧之憂。
注意:
Guard只能幫你自動加解鎖,並不能解決死鎖問題,特別是對於那些非遞歸的互斥體來說使用Guard尤其要注意防止死鎖。
Guard是在Guard變量析構時解鎖,如果在同一函數中兩次對同一互斥體變量使用Guard要注意其對象生命周期,否則容易造成死鎖。
2.3、ACE Condition類屬
ACE Condition類屬(條件變量)提供風格與互斥體、讀者/作者鎖和計數信號量不同的鎖定機制。當持有鎖的線程在臨界區執行代碼時,這三種機制讓協作線程進行等待。相反,條件變量通常被一個線程用於使自己等待,直到一個涉及共享數據的條件表達式到達特定的狀態。當另外的協作線程指示共享數據的狀態已發生變化,調度器就喚醒一個在該條件變量上掛起的線程。於是新喚醒的線程重新對它的條件表達式進行求值,如果共享數據已到達合適狀態,就恢復處理。
ACE線程庫提供一個叫作Condition的類來可移植地在C++包裝類中實現條件變量語義。定義方式如下:
ACE_Thread_Mutex mutex;
ACE_Condition<ACE_Thread_Mutex> cond(mutex);
該對象有兩個常用方法。
signal()//向使用該條件變量的其它線程發送滿足條件信號。
wait()//查詢是否滿足條件,如果滿足,則繼續往下執行;如果不滿足條件,主線程就等待在此條件變量上。條件變量隨即自動釋放互斥體,並使主線程進入睡眠。
條件變量總是與互斥體一起使用。這是一種可如下描述的一般模式:
while( expression NOT TRUE ) wait on condition variable;
條件變量不是用於互斥,往往用於線程間的協作,下面例子演示了通過條件變量實現線程協作。
#include "ace/Thread.h"
#include "ace/Synch.h"
#include <iostream>
using namespace std;
ACE_Thread_Mutex mutex;
ACE_Condition<ACE_Thread_Mutex> cond(mutex);
void* worker(void *arg)
{
ACE_OS::sleep(2); //保證eater線程的cond.wait()在worker線程的cond.signal()先執行
mutex.acquire();
ACE_OS::sleep(1);
cout<<endl<<"produce"<<endl;
cond.signal();
mutex.release();
return NULL;
}
void* eater(void *arg)
{
mutex.acquire();
cond.wait();
cout<<endl<<"eat"<<endl;
mutex.release();
return NULL;
}
int main(int argc, char *argv[])
{
ACE_Thread::spawn((ACE_THR_FUNC)worker);
ACE_OS::sleep(1);
ACE_Thread::spawn((ACE_THR_FUNC)eater);
while(true)
ACE_OS::sleep(10);
return 0;
}
這個例子中,首先創建了一個生產者線程worker和一個消費者線程eater,消費者線程執行比生產者快,兩個線程不加限制並發執行會導致先消費,后生產的情況(只是加互斥鎖也不能很好的解決,以為無法保證生產者一定先獲得互斥體)。所以這里通過條件變量的通知方式保證線程的順序執行:
a) 消費者線程獲取互斥體,等待條件滿足(生產者生產了食品)。同時釋放互斥體,進入休眠狀態。
b) 生產者獲取互斥體(雖然是消費者先獲取的互斥體,但消費者調用的wait函數會釋放消費者的互斥體),生產商品后,通過條件變量發送信號(調用signal函數)通知消費者生產完成,結束生產過程,釋放互斥體。
c) 消費者收到信號后,重新獲取互斥體,完成消費過程。
使用條件變量的注意事項:
l 條件變量必須和互斥體一起使用,也就是說使用前必須加鎖(調用互斥體acquire函數),使用完后需釋放互斥體。
條件變量中的wait()和signal()成對使用的話,必須保證wait()函數在signal()之前執行,這樣才能保證wait()能收到條件滿足通知,不至於一直等待下去,形成死鎖(worker線程中的第一句話就是起的這個作用)。
3. ACE的線程管理機制
2.1、ACE Lock類屬
不同的操作系統下用c++進行過多線程編程的朋友對那些線程處理的API可能深有體會,這些API提供了相同或是相似的功能,但是它們的API的差別卻極為懸殊。
ACE_Thread提供了對不同OS的線程調用的簡單包裝,通過一個通用的接口進行處理線程創建、掛起、取消和刪除等問題。
一. 線程入口函數
所有線程必須從一個指定的函數開始執行,該函數稱為線程函數,它必須具有下列原型:
void* worker(void *arg) {}
該函數輸入一個void *型的參數,可以在創建線程時傳入。
注意:
所有的線程啟動函數(方法)必須是靜態的或全局的(就如同直接使用OS線程API時所要求的一樣)。
二.線程基本操作
1.創建一個線程
一個進程的主線程是由操作系統自動生成,如果你要讓一個主線程創建額外的線程,可以通過ACE_Thread::spawn()實現,該函數一般的使用方式如下:
ACE_thread_t threadId;
ACE_hthread_t threadHandle;
ACE_Thread::spawn(
(ACE_THR_FUNC)worker, //線程執行函數
NULL, //執行函數參數
THR_JOINABLE | THR_NEW_LWP,
&threadId,
&threadHandle
);
為了簡化,也可以使用其默認參數使用ACE_Thread::spawn((ACE_THR_FUNC)worker) 來創建一個worker的線程。
另外,ACE還提供了ACE_Thread::spawn_n函數來創建多個線程。
2.終止線程
在線程函數體中ACE_Thread::exit()調用即可終止線程執行。
3.設定線程的相對優先級
當一個線程被首次創建時,它的優先級等同於它所屬進程的優先級。一個線程的優先級是相對於其所屬的進程的優先級而言的。可以通過調用ACE_Thread::setprio函數改變線程的相對優先級,該函數的調用方式如下:
ACE_Thread::setprio(threadHandle,ACE_DEFAULT_THREAD_PRIORITY)
4.掛起及恢復線程
掛起線程可以通過來實現,它能暫停一個線程的執行,其調用方式如下ACE_Thread::suspend(threadHandle) 。
相應的,可以通過ACE_Thread::resume(threadHandle) 恢復被掛起的線程的執行。
5.等待線程結束
在主函數中調用ACE_Thread::join(threadHandle)可阻塞主函數,直道線程結束才能繼續執行。
6.停止線程
在主函數中調用ACE_Thread::cancel (threadHandle)可停止線程的執行(在Unix底下可以,而在windows下好像不起作用,有待檢驗)。
三.程序示例
下面例子演示了如何用ace創建一個線程。
#include "ace/Thread.h"
#include "ace/Synch.h"
#include <iostream>
using namespace std;
void* worker(void *arg)
{
for(int i=0;i<10;i++)
{
ACE_OS::sleep(1);
cout<<endl<<"hello world"<<endl;
}
return NULL;
}
int main(int argc, char *argv[])
{
ACE_thread_t threadId;
ACE_hthread_t threadHandle;
ACE_Thread::spawn(
(ACE_THR_FUNC)worker, //線程執行函數
NULL, //執行函數參數
THR_JOINABLE | THR_NEW_LWP,
&threadId,
&threadHandle
);
ACE_Thread::join(threadHandle);
return 0;
}
在這個簡單的例子中,創建了1個工作者線程,執行程序中定義的worker()函數。然后阻塞主函數,待線程結束后退出程序。
4. ACE的網絡通訊機制
4.1、TCP通訊
傳輸控制協議TCP(Transmission Control Protocol):TCP提供可靠的、面向連接的運輸服務,用於高可靠性數據的傳輸。TCP協議的可靠性是指保證每個tcp報文能按照發送順序到達客戶端。
Tcp通信過程一般為如下步驟:
b) 客戶端通過服務器的ip和服務器綁定的端口連接服務器。
c) 服務器和客戶端通過網絡建立一條數據通路,通過這條數據通路進行數據交互。
常用API:
1. ACE_INET_Addr類。
ACE"地址"類ACE_Addr的子類,表示TCP/IP和UDP/IP的地址。它通常包含機器的ip和端口信息,通過它可以定位到所通信的進程。
定義方式:
ACE_INET_Addr addInfo(3000,"192.168.1.100");
常用方法:
l get_host_name 獲取主機名
l get_ip_address 獲取ip地址
l get_port_number 獲取端口號
2. ACE_SOCK_Acceptor類。
服務期端使用,用於綁定端口和被動地接受連接。
常用方法:
l open 綁定端口
l accept建立和客戶段的連接
3. ACE_SOCK_Connector類。
客戶端使用,用於主動的建立和服務器的連接。
常用方法:
l connect() 建立和服務期的連接。
4. ACE_SOCK_Stream類。
客戶端和服務器都使用,表示客戶段和服務器之間的數據通路。
常用方法:
l send () 發送數據
l recv () 接收數據
l close() 關閉連接(實際上就是斷開了socket連接)。
代碼示例:
下面例子演示了如何如何用ACE創建TCP通信的Server端。
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/INET_Addr.h"
#include "ace/OS.h"
#include <string>
#include <iostream>
using namespace std;
int main(int argc, char *argv[])
{
ACE_INET_Addr port_to_listen(3000); //綁定的端口
ACE_SOCK_Acceptor acceptor;
if (acceptor.open (port_to_listen, 1) == -1) //綁定端口
{
cout<<endl<<"bind port fail"<<endl;
return -1;
}
while(true)
{
ACE_SOCK_Stream peer; //和客戶端的數據通路
ACE_Time_Value timeout (10, 0);
if (acceptor.accept (peer) != -1) //建立和客戶端的連接
{
cout<<endl<<endl<<"client connect. "<<endl;
char buffer[1024];
ssize_t bytes_received;
ACE_INET_Addr raddr;
peer.get_local_addr(raddr);
cout<<endl<<"local port\t"<<raddr.get_host_name()<<"\t"<<raddr.get_port_number()<<endl;
while ((bytes_received =
peer.recv (buffer, sizeof(buffer))) != -1) //讀取客戶端發送的數據
{
peer.send(buffer, bytes_received); //對客戶端發數據
}
peer.close ();
}
}
return 0;
}
這個例子實現的功能很簡單,服務器端綁定3000號端口,等待一個客戶端的連接,然后將從客戶端讀取的數據再次轉發給客戶端,也就是實現了一個EchoServer的功能。
相應的客戶端程序也比較簡單,代碼如下:
#include <ace/SOCK_Stream.h>
#include <ace/SOCK_Connector.h>
#include <ace/INET_Addr.h>
#include <ace/Time_Value.h>
#include <string>
#include <iostream>
using namespace std;
int main(int argc, char *argv[])
{
ACE_INET_Addr addr(3000,"127.0.0.1");
ACE_SOCK_Connector connector;
ACE_Time_Value timeout(5,0);
ACE_SOCK_Stream peer;
if(connector.connect(peer,addr,&timeout) != 0)
{
cout<<"connection failed !"<<endl;
return 1;
}
cout<<"conneced !"<<endl;
string s="hello world";
peer.send(s.c_str(),s.length()); //發送數據
cout<<endl<<"send:\t"<<s<<endl;
ssize_t bc=0; //接收的字節數
char buf[1024];
bc=peer.recv(buf,1024,&timeout); //接收數據
if(bc>=0)
{
buf[bc]='\0';
cout<<endl<<"rev:\t"<<buf<<endl;
}
peer.close();
return 0;
}
下表給出了服務器端和客戶端的傳輸過程的比較:
操作
客戶端
服務器端
初始化
不需要
調用acceptor.open()綁定端口
建立連接
調用connector.connect()方法
調用acceptor.accept()方法
傳輸數據
發送:調用peer.recv()方法
接收:調用peer.send()方法
關閉連接
調用peer.close()方法
4.2、UDP服務。
在ace中,通過ACE_SOCK_Dgram類提供udp通信服務,ACE_SOCK_Dgram和ACE_SOCK_Stream的API非常類似,一樣提供了send,recv及close等常用操作,這里就不再累述了。
udp通信時無需像tcp那樣建立連接和關閉連接,tcp編程時需要通過accept和connect來建立連接,而udp通信省略了這一步驟,相對來說編程更為簡單。
由於udp通信時無建立連接,服務器端不能像Tcp通信那樣在建立連接的時候就獲得客戶端的地址信息,故服務器端不能主動對客戶端發送信息(不知道客戶端的地址),只有等到收到客戶端發送的udp信息時才能確定客戶端的地址信息,從而進行通信。
udp通信過程如下:
l 服務器端綁定一固定udp端口,等待接收客戶端的通信。
l 客戶端通過服務器的ip和地址信息直接對服務器端發送消息。
l 服務器端收到客戶端發送的消息后獲取客戶端的ip和端口信息,通過該地址信息和客戶端通信。
下面代碼為EchoServer的udp版:
//server.cpp
#include <ace/SOCK_Dgram.h>
#include <ace/INET_Addr.h>
#include <ace/Time_Value.h>
#include <string>
#include <iostream>
using namespace std;
int main(int argc, char *argv[])
{
ACE_INET_Addr port_to_listen(3000); //綁定的端口
ACE_SOCK_Dgram peer(port_to_listen); //通信通道 char buf[100];
while(true)
{
ACE_INET_Addr remoteAddr; //所連接的遠程地址
int bc = peer.recv(buf,100,remoteAddr); //接收消息,獲取遠程地址信息
if( bc != -1)
{
string s(buf,bc);
cout<<endl<<"rev:\t"<<s<<endl;
}
peer.send(buf,bc,remoteAddr); //和遠程地址通信
} return 0;
}
相應的客戶端程序如下:
//client.cpp
#include <ace/SOCK_Dgram.h>
#include <ace/INET_Addr.h>
#include <ace/Time_Value.h>
#include <string>
#include <iostream>
using namespace std;
int main(int argc, char *argv[])
{
ACE_INET_Addr remoteAddr(3000,"127.0.0.1"); //所連接的遠程地址
ACE_INET_Addr localAddr; //本地地址信息
ACE_SOCK_Dgram peer(localAddr); //通信通道
peer.send("hello",5,remoteAddr); //發送消息
char buf[100];
int bc = peer.recv(buf,100,remoteAddr); //接收消息
if( bc != -1)
{
string s(buf,bc);
cout<<endl<<"rev:\t"<<s<<endl;
}
return 0;
}
和tcp編程相比,udp無需通過acceptor,connector來建立連接,故代碼相對tcp編程來說要簡單許多。另外,由於udp是一種無連接的通信方式,ACE_SOCK_Dgram的實例對象中無法保存遠端地址信息(保存了本地地址信息),故通信的時候需要加上遠端地址信息。
5. ACE的設計模式
5.1、主動對象模式
主動對象模式用於降低方法執行和方法調用之間的耦合。該模式描述了另外一種更為透明的任務間通信方法。
傳統上,所有的對象都是被動的代碼段,對象中的代碼是在對它發出方法調用的線程中執行的,當方法被調用時,調用線程將阻塞,直至調用結束。而主動對象卻不一樣。這些對象具有自己的命令執行線程,主動對象的方法將在自己的執行線程中執行,不會阻塞調用方法。
例如,設想對象"A"已在你的程序的main()函數中被實例化。當你的程序啟動時,OS創建一個線程,以從main()函數開始執行。如果你調用對象A的任何方法,該線程將"流過"那個方法,並執行其中的代碼。一旦執行完成,該線程返回調用該方法的點並繼續它的執行。但是,如果"A"是主動對象,事情就不是這樣了。在這種情況下,主線程不會被主動對象借用。相反,當"A"的方法被調用時,方法的執行發生在主動對象持有的線程中。另一種思考方法:如果調用的是被動對象的方法(常規對象),調用會阻塞(同步的);而另一方面,如果調用的是主動對象的方法,調用不會阻塞(異步的)。
由於主動對象的方法調用不會阻塞,這樣就提高了系統響應速度,在網絡編程中是大有用武之地的。
在這里我們將一個"Logger"(日志記錄器)對象對象為例來介紹如何將一個傳統對象改造為主動對象,從而提高系統響應速度。
Logger的功能是將一些系統事件的記錄在存儲器上以備查詢,由於Logger使用慢速的I/O系統來記錄發送給它的消息,因此對Logger的操作將會導致系統長時間的等待。
其功能代碼簡化如下:
class Logger: public ACE_Task<ACE_MT_SYNCH>
{
public:
void LogMsg(const string& msg)
{
cout<<endl<<msg<<endl;
ACE_OS::sleep(2);
}
};
為了實現記錄日志操作的主動執行,我們需要用命令模式將其封裝,從而使得記錄日志的方法能在合適的時間和地方主動執行,封裝方式如下:
class LogMsgCmd: public ACE_Method_Object
{
public:
LogMsgCmd(Logger *plog,const string& msg)
{
this->log=plog;
this->msg=msg;
}
int call()
{
this->log->LogMsg(msg);
return 0;
}
private:
Logger *log;
string msg;
};
class Logger: public ACE_Task<ACE_MT_SYNCH>
{
public:
void LogMsg(const string& msg)
{
cout<<endl<<msg<<endl;
ACE_OS::sleep(2);
}
LogMsgCmd *LogMsgActive(const string& msg)
{
new LogMsgCmd(this,msg);
}
};
這里對代碼功能做一下簡單的說明:
ACE_Method_Object是ACE提供的命令模式借口,命令接口調用函數為int call(),在這里通過它可以把每個操作日志的調用封裝為一個LogMsgCmd對象,這樣,當原來需要調用LogMsg的方法的地方只要調用LogMsgActive即可生成一個LogMsgCmd對象,由於調用LogMsgActive方法,只是對命令進行了封裝,並沒有進行日志操作,所以該方法會立即返回。然后再新開一個線程,將LogMsgCmd對象作為參數傳入,在該線程中執行LogMsgCmd對象的call方法,從而實現無阻塞調用。
然而,每次對一個LogMsg調用都開啟一個新線程,無疑是對資源的一種浪費,實際上我們往往將生成的LogMsgCmd對象插入一個命令隊列中,只新開一個命令執行線程依次執行命令隊列中的所有命令。並且,為了實現對象的封裝,命令隊列和命令執行線程往往也封裝到Logger對象中,代碼如下所示:
#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Method_Object.h"
#include "ace/Activation_Queue.h"
#include "ace/Auto_Ptr.h"
#include <string>
#include <iostream>
using namespace std;
class Logger: public ACE_Task<ACE_MT_SYNCH>
{
public:
Logger()
{
this->activate();
}
int svc();
void LogMsg(const string& msg);
void LogMsgActive (const string& msg);
private:
ACE_Activation_Queue cmdQueue; //命令隊列
};
class LogMsgCmd: public ACE_Method_Object
{
public:
LogMsgCmd(Logger *plog,const string& msg)
{
this->log=plog;
this->msg=msg;
}
int call()
{
this->log->LogMsg(msg);
return 0;
}
private:
Logger *log;
string msg;
};
void Logger::LogMsg(const string& msg)
{
cout<<endl<<msg<<endl;
ACE_OS::sleep(2);
}
//以主動的方式記錄日志
void Logger::LogMsgActive(const string& msg)
{
//生成命令對象,插入到命令隊列中
cmdQueue.enqueue(new LogMsgCmd(this,msg));
}
int Logger::svc()
{
while(true)
{
//遍歷命令隊列,執行命令
auto_ptr<ACE_Method_Object> mo
(this->cmdQueue.dequeue ());
if (mo->call () == -1)
break;
}
return 0;
}
int main (int argc, ACE_TCHAR *argv[])
{
Logger log;
log. LogMsgActive ("hello");
ACE_OS::sleep(1);
log.LogMsgActive("abcd");
while(true)
ACE_OS::sleep(1);
return 0;
}
在這里需要注意一下命令隊列ACE_Activation_Queue對象,它是線程安全的,使用方法比較簡單,這里我也不多介紹了。
主動對象的基本結構就是這樣,然而,由於主動對象是異步調用的,又引出了如下兩個新問題:
l 方法調用線程如何知道該方法已經執行完成?
l 如何或得方法的返回值?
要解決這兩個問題,首先得介紹一下ACE_Future對象,ACE_Future是表示一個會在將來被賦值的"期貨"對象,可以通過ready()函數查詢它是否已經被賦值。該對象創建的時候是未賦值的,后期可以通過set()函數來進行賦值,所賦的值可以通過get()函數來獲取。
下面代碼演示了它的基本用法:
#include "ace/Future.h"
#include <string>
#include <iostream>
using namespace std;
void get_info(ACE_Future<string> &fu)
{
string state = fu.ready()?"ready":"not ready";
cout<<endl<<state<<endl;
if(fu.ready())
{
string value;
fu.get(value);
cout<<"value:\t"<<value<<endl;
}
}
int main(int argc, char *argv[])
{
ACE_Future<string> fu;
get_info(fu);
fu.set("12345");
get_info(fu);
return 0;
}
通過ACE_Future對象來解決上述兩個問題的方法如下:
l 首先創建ACE_Future對象用以保留返回值。
l 調用主動命令時將ACE_Future對象作為參數傳入,生成的命令對象中保存ACE_Future對象的指針。
l 命令執行線程執行完命令后,將返回值通過set()函數設置到ACE_Future對象中。
l 調用線程可以通過ACE_Future對象的ready()函數查詢該命令是否執行完成,如果命令執行完成,則可通過get()函數來獲取返回值。
使用的時候要注意一下ACE_Future對象的生命周期。
為了演示了如何獲取主動命令的執行狀態和結果,我將上篇文章中的代碼改動了一下,日志類記錄日志后,會將記錄的內容作為返回值返回,該返回值會通過ACE_Future對象返回,代碼如下:
#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Method_Object.h"
#include "ace/Activation_Queue.h"
#include "ace/Auto_Ptr.h"
#include "ace/Future.h"
#include <string>
#include <iostream>
using namespace std;
class Logger: public ACE_Task<ACE_MT_SYNCH>
{
public:
Logger()
{
this->activate();
}
int svc();
string LogMsg(const string& msg);
void LogMsgActive (const string& msg,ACE_Future<string> *result);
private:
ACE_Activation_Queue cmdQueue; //命令隊列
};
class LogMsgCmd: public ACE_Method_Object
{
public:
LogMsgCmd(Logger *plog,const string& msg,ACE_Future<string> *result)
{
this->log=plog;
this->msg=msg;
this->result=result;
}
int call()
{
string reply = this->log->LogMsg(msg);
result->set(reply);
return 0;
}
private:
ACE_Future<string> *result;
Logger *log;
string msg;
};
string Logger::LogMsg(const string& msg)
{
ACE_OS::sleep(2);
cout<<endl<<msg<<endl;
return msg;
}
//以主動的方式記錄日志
void Logger::LogMsgActive(const string& msg,ACE_Future<string> *result)
{
//生成命令對象,插入到命令隊列中
cmdQueue.enqueue(new LogMsgCmd(this,msg,result));
}
int Logger::svc()
{
while(true)
{
//遍歷命令隊列,執行命令
auto_ptr<ACE_Method_Object> mo
(this->cmdQueue.dequeue ());
if (mo->call () == -1)
break;
}
return 0;
}
void get_info(ACE_Future<string> &fu)
{
string state = fu.ready()?"ready":"not ready";
cout<<endl<<state<<endl;
if(fu.ready())
{
string value;
fu.get(value);
cout<<"value:\t"<<value<<endl;
}
}
int main (int argc, ACE_TCHAR *argv[])
{
ACE_Future<string> result;
Logger log;
log.LogMsgActive ("hello",&result);
while(true)
{
get_info(result);
if(result.ready())
break;
ACE_OS::sleep(1);
}
cout<<endl<<"cmd end"<<endl;
while(true)
ACE_OS::sleep(1);
return 0;
}
這種查詢模式比較簡單有效,但存在一個問題:調用線程必須不斷輪詢ACE_Future對象以獲取返回值,這樣的效率比較低。可以通過觀察者模式解決這個問題:在ACE_Future對象上注冊一個觀察者,當ACE_Future對象的值發生改變(異步命令執行完成)時主動通知該觀察者,從而獲取返回值。
ACE中的觀察者模式可以通過ACE_Future_Observer來實現,使用方法如下:
#include "ace/Future.h"
#include <string>
#include <iostream>
using namespace std;
class MyObserver:public ACE_Future_Observer<string>
{
virtual void update (const ACE_Future<string> &future)
{
string value;
future.get(value);
cout<<endl<<"change:\t"<<value<<endl;
}
};
int main(int argc, char *argv[])
{
MyObserver obv;
ACE_Future<string> fu;
fu.attach(&obv);
ACE_OS::sleep(3);
fu.set("12345");
while(true)
ACE_OS::sleep(3);
return 0;
}
通過觀察者模式,可以更有效,及時的獲取異步命令的返回值,但同時也增加了程序結構的復雜度並且難以調試,使用的時候應該根據需要選取合適的方式。
5.2、Reactor模式
主動對象模式用於降低方法執行和方法調用之間的耦合。該模式描述了另外一種更為透明的任務間通信方法。
反應器(Reactor):用於事件多路分離和分派的體系結構模式
通常的,對一個文件描述符指定的文件或設備, 有兩種工作方式: 阻塞與非阻塞。所謂阻塞方式的意思是指, 當試圖對該文件描述符進行讀寫時, 如果當時沒有東西可讀,或者暫時不可寫, 程序就進入等待狀態, 直到有東西可讀或者可寫為止。而對於非阻塞狀態, 如果沒有東西可讀, 或者不可寫, 讀寫函數馬上返回, 而不會等待。
在前面的章節中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:當接收tcp數據時,如果遠端沒有數據可以讀,則會一直阻塞到讀到需要的數據為止。這種方式的傳輸和傳統的被動方法的調用類似,非常直觀,並且簡單有效,但是同樣也存在一個效率問題,如果你是開發一個面對着數千個連接的服務器程序,對每一個客戶端都采用阻塞的方式通信,如果存在某個非常耗時的讀寫操作時,其它的客戶端通信將無法響應,效率非常低下。
一種常用做法是:每建立一個Socket連接時,同時創建一個新線程對該Socket進行單獨通信(采用阻塞的方式通信)。這種方式具有很高的響應速度,並且控制起來也很簡單,在連接數較少的時候非常有效,但是如果對每一個連接都產生一個線程的無疑是對系統資源的一種浪費,如果連接數較多將會出現資源不足的情況。
另一種較高效的做法是:服務器端保存一個Socket連接列表,然后對這個列表進行輪詢,如果發現某個Socket端口上有數據可讀時(讀就緒),則調用該socket連接的相應讀操作;如果發現某個Socket端口上有數據可寫時(寫就緒),則調用該socket連接的相應寫操作;如果某個端口的Socket連接已經中斷,則調用相應的析構方法關閉該端口。這樣能充分利用服務器資源,效率得到了很大提高。
在Socket編程中就可以通過select等相關API實現這一方式。但直接用這些API控制起來比較麻煩,並且也難以控制和移植,在ACE中可以通過Reactor模式簡化這一開發過程。
反應器本質上提供一組更高級的編程抽象,簡化了事件驅動的分布式應用的設計和實現。除此而外,反應器還將若干不同種類的事件的多路分離集成到易於使用的API中。特別地,反應器對基於定時器的事件、信號事件、基於I/O端口監控的事件和用戶定義的通知進行統一地處理。
ACE中的反應器與若干內部和外部組件協同工作。其基本概念是反應器框架檢測事件的發生(通過在OS事件多路分離接口上進行偵聽),並發出對預登記事件處理器(event handler)對象中的方法的"回調"(callback)。該方法由應用開發者實現,其中含有應用處理此事件的特定代碼。
使用ACE的反應器,只需如下幾步:
l 創建事件處理器,以處理他所感興趣的某事件。
l 在反應器上登記,通知說他有興趣處理某事件,同時傳遞他想要用以處理此事件的事件處理器的指針給反應器。
隨后反應器框架將自動地:
l 在內部維護一些表,將不同的事件類型與事件處理器對象關聯起來。
l 在用戶已登記的某個事件發生時,反應器發出對處理器中相應方法的回調。
反應器模式在ACE中被實現為ACE_Reactor類,它提供反應器框架的功能接口。
如上面所提到的,反應器將事件處理器對象作為服務提供者使用。反應器內部記錄某個事件處理器的特定事件的相關回調方法。當這些事件發生時,反應器會創建這種事件和相應的事件處理器的關聯。
l 事件處理器
事件處理器就是需要通過輪詢發生事件改變的對象列表中的對象,如在上面的例子中就是連接的客戶端,每個客戶端都可以看成一個事件處理器。
l 回調事件
就是反應器支持的事件,如Socket讀就緒,寫就緒。拿上面的例子來說,如果某個客戶端(事件處理器)在反應器中注冊了讀就緒事件,當客戶端給服務器發送一條消息的時候,就會觸發這個客戶端的數據可讀的回調函數。
在反應器框架中,所有應用特有的事件處理器都必須由ACE_Event_Handler的抽象接口類派生。可以通過重載相應的"handle_"方法實現相關的回調方法。
使用ACE_Reactor基本上有三個步驟:
l 創建ACE_Event_Handler的子類,並在其中實現適當的"handle_"方法,以處理你想要此事件處理器為之服務的事件類型。
l 通過調用反應器對象的register_handler(),將你的事件處理器登記到反應器。
l 在事件發生時,反應器將自動回調相應的事件處理器對象的適當的handle_"方法。
下面我就以一個Socket客戶端的例子為例簡單的說明反應器的基本用法。
#include <ace/OS.h>
#include <ace/Reactor.h>
#include <ace/SOCK_Connector.h>
#include <string>
#include <iostream>
using namespace std;
class MyClient:public ACE_Event_Handler
{
public:
bool open()
{
ACE_SOCK_Connector connector;
ACE_INET_Addr addr(3000,"127.0.0.1");
ACE_Time_Value timeout(5,0);
if(connector.connect(peer,addr,&timeout) != 0)
{
cout<<endl<<"connecetd fail";
return false;
}
ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);
cout<<endl<<"connecetd ";
return true;
}
ACE_HANDLE get_handle(void) const
{
return peer.get_handle();
}
int handle_input (ACE_HANDLE fd)
{
int rev=0;
ACE_Time_Value timeout(5,0);
if((rev=peer.recv(buffer,1000,&timeout))>0)
{
buffer[rev]='\0';
cout<<endl<<"rev:\t"<<buffer<<endl;
}
return 3;
}
private:
ACE_SOCK_Stream peer;
char buffer[1024];
};
int main(int argc, char *argv[])
{
MyClient client;
client.open();
while(true)
{
ACE_Reactor::instance()->handle_events();
}
return 0;
}
在這個例子中,客戶端連接上服務器后,通過ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注冊了一個讀就緒的回調函數,當服務器端給客戶端發消息的時候,會自動觸發handle_input()函數,將接收到的信息打印出來。
下面對如何在Socket通信中使用反應器做進一步的介紹。
5.3、接收者-連接(Reactor-Connect)者模式
接受器-連接器設計模式(Acceptor-Connector)使分布式系統中的連接建立及服務初始化與一旦服務初始化后所執行的處理去耦合。
這樣的去耦合通過三種組件來完成:acceptor、connector 和 servicehandler(服務處理器)。
l 連接器主動地建立到遠地接受器組件的連接,並初始化服務處理器來處理在連接上交換的數據。
l 接受器被動地等待來自遠地連接器的連接請求,在這樣的請求到達時建立連接,並初始化服務處理器來處理在連接上交換的數據。
l 初始化的服務處理器執行應用特有的處理,並通過連接器和接受器組件建立的連接來進行通信。
5.3.1. 服務處理器(Service Handler):
Service Handler 實現應用服務,通常扮演客戶角色、服務器角色,或同時扮演這兩種角色。它提供掛鈎方法,由 Acceptor 或 Connector 調用,以在連接建立時啟用應用服務。此外,Service Handler 還提供數據模式傳輸端點,其中封裝了一個 I/O 句柄。一旦連接和初始化后,該端點被 Service Handler 用於與和其相連的對端交換數據。
5.3.2. 接受器(Acceptor):
Acceptor 是一個工廠,實現用於被動地建立連接並初始化與其相關聯的 Service Handler 的策略。此外,Acceptor 包含有被動模式的傳輸端點工廠,它創建新的數據模式端點,由 Service Handler 用於在相連的對端間傳輸數據。通過將傳輸端點工廠綁定到網絡地址,比如 Acceptor 在其上偵聽的 TCP 端口號,Acceptor的 open 方法對該工廠進行初始化。
一旦初始化后,被動模式的傳輸端點工廠偵聽來自對端的連接請求。當連接請求到達時,Acceptor 創建 Service Handler,並使用它的傳輸端點工廠來將新連接接受進Service Handler 中。
5.3.3. 連接器(Connector):
Connector 是一個工廠,實現用於主動地建立連接並初始化與其相關聯的 Service Handler 的策略。它提供方法,由其發起到遠地 Acceptor 的連接。同樣地,它還提供另一個方法,完成對 Service Handler 的啟用;該處理器的連接是被同步或異步地發起的。Connector 使用兩個分開的方法來透 明地支持異步連接建立。
5.3.4. 分派器(Dispatcher):
為 Acceptor,Dispatcher 將在一或多個傳輸端點上接收到的連接請求多路分離給適當的 Acceptor。Dispatcher允許多個 Acceptor 向其登記,以偵聽同時在不同端口上從不同對端而來的連接。 為 Connector,Dispatcher 處理異步發起的連接的完成。在這種情況下,當異步連接被建立時,Dispatcher 回調 Connector。Dispatcher 允許多個 Service Handler 通過一個 Connector 來異步地發起和完成它們 的連接。注意對於同步連接建立,Dispatcher 並不是必需的,因為發起連接的線程控制也完成服務服務處 理器的啟用。
Dispatcher 通常使用事件多路分離模式來實現,這些模式由反應器(Reactor)或前攝器(Proactor) 來提供,它們分別處理同步和異步的多路分離。同樣地,Dispatcher 也可以使用主動對象(Active Obj ect)模式來實現為單獨的線程或進程。
Acceptor 組件協作
Acceptor 和 Service Handler 之間的協作。這些協作被划分為三個階段:
1. 端點初始化階段:
為被動地初始化連接,應用調用 Acceptor 的 open 方法。該方法創建被動模式的傳 輸端點,將其綁定到網絡地址,例如,本地主機的 IP 地址和 TCP 端口號,並隨后偵聽來自對端 Connector 的連接請求。其次,open 方法將 Acceptor 對象登記到 Dispatcher,以使分派器能夠在連接事件 到達時回調 Acceptor。最后,應用發起 Dispatcher 的事件循環,等待連接請求從對端 Connector 到來。
2. 服務初始化階段:
當連接請求到達時,Dispatcher 回調 Acceptor 的accept 方法。該方法裝配以下活動 所必需的資源:
l 創建新的 Service Handler,
l 使用它的被動模式傳輸端點工廠來將連接接受進 該處理器的數據模式傳輸端點中,
l 通過調用 Service Handler 的 open 掛鈎將其啟用。Servic e Handler 的 open 掛鈎可以執行服務特有的初始化,比如分配鎖、派生線程、打開日志文件,和/或將 該 Service Handler 登記到 Dispatcher。
3. 服務處理階段:
在連接被動地建立和 Service Handler 被初始化后,服務處理階段開始了。在此階段, 應用級通信協議,比如 HTTP 或 IIOP,被用於在本地 Service Handler 和與其相連的遠地 Peer 之間、 經由前者的 peer_stream_端點交換數據。當交換完成,可關閉連接和 Service Handler,並釋放資源。
Connector 組件協作
Connector 組件可以使用同步和異步兩種方式來初始化它的 Service Handle,這里僅介紹一下同步時的協作情況。
同步的 Connector 情況中的參與者之間的協作可被划分為以下三個階段:
l 連接發起階段:
為在 Service Handler 和它的遠地 Peer 之間發起連接,應用調用 Connector 的 connect 方法。該方法阻塞調用線程的線程控制、直到連接同步完成,以主動地建立連接。
l 服務初始化階段:
在連接完成后,Connector 的 connect 方法調用 complete 方法來啟用 Service Handl er。complete 方法通過調用 Service_Handler 的 open 掛鈎方法來完成啟用;open 方法執行服務特有的 初始化。
l 服務處理階段:
此階段與 Service Handler 被 Acceptor 創建后所執行的服務處理階段相類似。特別地, 一旦 Service Handler 被啟用,它使用與和其相連接的遠地 Service Handler 交換的數據來執行應用特 有的服務處理。
實現及運行一般步驟:
l 創建 Service Handler;
l 被動地或主動地將 Service Handler 連接到它們的遠地對端;以及
l 一旦連接,啟用 Service Handler。
主要角色:Service Handler(服務處理器)、Acceptor 和 Connector。
服務處理器:該抽象類繼承自 Event_Handler,並為客戶、服務器或同時扮演兩種角色的組件所提供 的服務處理提供通用接口。應用必須通過繼承來定制此類,以執行特定類型的服務。Service Handler 接口如下所示:
template <class PEER_STREAM>
class Service_Handler : public Event_Handler
{
public:
//連接成功后的初始化入口函數 (子類定義).
virtual int open (void) = 0;
//返回通信流的引用
PEER_STREAM &peer (void)
{
return peer_stream_;
}
};
一旦 Acceptor 或 Connector 建立了連接,它們調用 Service Handler 的 open 掛鈎。該純虛方法必須被 Concrete Service Handler 子類定義;后者執行服務特有的初始化和后續處理。
連接器:該抽象類實現主動連接建立和初始化 Service Handler 的通用策略。它的接口如下所示:
template <class SERVICE_HANDLER,class PEER_CONNECTOR>
class Connector : public Event_Handler
{
public:
enum Connect_Mode
{
SYNC, //以同步方式連接
ASYNC //以異步方式連接
};
// 主動連接並激活服務處理器
int connect (SERVICE_HANDLER *sh,
const PEER_CONNECTOR::PEER_ADDR &addr,
Connect_Mode mode);
protected:
//定義連接激活策略
virtual int connect_service_handler(SERVICE_HANDLER *sh,
const PEER_CONNECTOR::PEER_ADDR &addr,
Connect_Mode mode);
// Defines the handler's concurrency strategy.
virtual int activate_service_handler(SERVICE_HANDLER *sh);
// 當以異步方式連接完成時激活服務處理器
virtual int complete (HANDLE handle);
private:
// IPC mechanism that establishes
// connections actively.
PEER_CONNECTOR connector_;
};
Conncetor 通過特定類型的 PEER CONNECTOR 和 SERVICE HANDLER 被參數化。PEER CONNECTO R 提供的傳輸機制被 Connector 用於主動地建立連接,或是同步地、或是異步地。SERVICE HANDLER提供的服務對與相連的對端交換的數據進行處理。C++參數化類型被用於使(1)連接建立策略與(2)服務處理器類型、網絡編程接口和傳輸層連接協議去耦合。
參數化類型是有助於提高可移植性的實現決策。例如,它們允許整體地替換 Connector 所用的 IPC 機 制。這使得 Connector 的連接建立代碼可在含有不同網絡編程接口(例如,有 socket,但沒有 TLI;反之 亦然)的平台間進行移植。
Service Handler 的 open 掛鈎在連接成功建立時被調用。
接受器(Acceptor):該抽象類為被動連接建立和初始化 Service Handler 實現通用的策略。Acceptor 的接 口如下所示:
template <class SERVICE_HANDLER,
class PEER_ACCEPTOR>
class Acceptor : public Event_Handler
{
public:
// Initialize local_addr transport endpoint factory
// and register with Initiation_Dispatcher Singleton.
virtual int open(const PEER_ACCEPTOR::PEER_ADDR &local_addr);
// Factory Method that creates, connects, and
// activates SERVICE_HANDLER's.
virtual int accept (void);
protected:
//定義服務處理器的創建策略
virtual SERVICE_HANDLER *make_service_handler (void);
// 定義服務處理器的連接策略
virtual int accept_service_handler(SERVICE_HANDLER *);
//定義服務處理器的激活策略
virtual int activate_service_handler(SERVICE_HANDLER *);
// Demultiplexing hooks inherited from Event_Handler,
// which is used by Initiation_Dispatcher for
// callbacks.
virtual HANDLE get_handle (void) const;
virtual int handle_close (void);
private:
// IPC mechanism that establishes
// connections passively.
PEER_ACCEPTOR peer_acceptor_;
};
Acceptor 通過特定類型的 PEER ACCEPTOR 和 SERVICE HANDLER 被參數化。PEER ACCEPTOR 提供的傳輸機制被 Acceptor 用於被動地建立連接。SERVICE HANDLER 提供的服務對與遠地對端交換的 數據進行處理。注意 SERVICE HANDLER 是由應用層提供的具體的服務處理器。
參數化類型使 Acceptor 的連接建立策略與服務處理器的類型、網絡編程接口及傳輸層連接發起協議去耦合。就如同 Connector 一樣,通過允許整體地替換 Acceptor 所用的機制,參數化類型的使用有助於提高可移植性。這使得連接建立代碼可在含有不同網絡編程接口(比如有 socket,但沒有 TLI;反之亦然)的平台間移植。
make_service_handler 工廠方法定義 Acceptor 用於創建 SERVICE HANDLER 的缺省策略。如下所示:
template <class SH, class PA> SH *
Acceptor<SH, PA>::make_service_handler (void)
{
return new SH;
}
缺省行為使用了"請求策略"(demand strategy),它為每個新連接創建新的 SERVICE HANDLER。但是, Acceptor 的子類可以重定義這一策略,以使用其他策略創建 SERVICE HANDLE,比如創建單獨的單體 (Singleton)[10]或從共享庫中動態鏈接 SERVICE HANDLER。
accept_service_handler 方法在下面定義 Acceptor 所用的 SERVICE HANDLER 連接接受策略:
template <class SH, class PA> int
Acceptor<SH, PA>::accept_service_handler(SH *handler)
{
peer_acceptor_->accept (handler->peer ());
}
缺省行為委托 PEER ACCEPTOR 所提供的 accept 方法。子類可以重定義 accept_service_handler 方法,以 執行更為復雜的行為,比如驗證客戶的身份,以決定是接受還是拒絕連接。
Activate_service_handler 定義 Acceptor 的 SERVICE HANDLER 並發策略:
程序示例:
在ACE中,默認的服務處理器是ACE_Svc_Handler,這也是一個模版類,可以通過相關的參數特化。由於ACE_Svc_Handler繼承自ACE_Task和ACE_Event_Handler,功能相當強大,同時也存在一定開銷,如果需要減小開銷可以自己寫一個僅繼承自ACE_Event_Handler的服務處理器。
為了演示簡單,我這里就以一個EchoServer的服務器端和客戶端為例,其中接收器和連接器都采用缺省策略,並沒有進行重載。
服務器端:
#include "ace/Reactor.h"
#include "ace/Svc_Handler.h"
#include "ace/Acceptor.h"
#include "ace/Synch.h"
#include "ace/SOCK_Acceptor.h"
class My_Svc_Handler;
typedef ACE_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;
class My_Svc_Handler:
public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>
{
public:
int open(void*)
{
ACE_OS::printf("\nConnection established\n");
//注冊相應事件
ACE_Reactor::instance()->register_handler(this,
ACE_Event_Handler::READ_MASK);
return 0;
}
int handle_input(ACE_HANDLE)
{
int rev = peer().recv(data,1024);
if(rev == 0)
{
delete this;
}
else
{
data[rev]='\0';
ACE_OS::printf("<<rev:\t %s\n",data);
peer().send(data,rev+1);
return 0;
}
}
private:
char data[1024];
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(3000);
MyAcceptor acceptor(addr,ACE_Reactor::instance());
while(1)
ACE_Reactor::instance()->handle_events();
}
客戶端:
#include "ace/Reactor.h"
#include "ace/Svc_Handler.h"
#include "ace/Connector.h"
#include "ace/Synch.h"
#include "ace/SOCK_Connector.h"
class My_Svc_Handler;
typedef ACE_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR> MyConnector;
class My_Svc_Handler:
public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>
{
public:
int open(void*)
{
ACE_OS::printf("\nConnection established\n");
//注冊相應事件
ACE_Reactor::instance()->register_handler(this,
ACE_Event_Handler::READ_MASK);
return 0;
}
int handle_input(ACE_HANDLE)
{
int rev = peer().recv(data,1024);
if(rev == 0)
{
delete this;
}
else
{
data[rev]='\0';
ACE_OS::printf("<<rev:\t %s\n",data);
return 0;
}
}
int sendData(char *msg)
{
ACE_OS::printf("<<send:\t %s\n",msg);
return peer().send(msg,strlen(msg));
}
private:
char data[1024];
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(3000,"192.168.1.142");
My_Svc_Handler *svchandler = new My_Svc_Handler();
MyConnector connector;
if(connector.connect(svchandler,addr)==-1)
{
ACE_OS::printf("Connect fail");
}
svchandler->sendData("hello wrold");
while(1)
ACE_Reactor::instance()->handle_events();
}
5.4、Proactor模式
當 OS 平台支持異步操作時,一種高效而方便的實現高性能 Web 服務器的方法是使用前攝式事件分派。使用前攝式事件分派模型設計的 Web 服務器通過一或多個線程控制來處理異步操作的完成。這樣,通過集成完成事件多路分離(completion event demultiplexing)和事件處理器分派,前攝器模式簡化了異步的 Web 服務器。
異步的 Web 服務器將這樣來利用前攝器模式:首先讓 Web 服務器向 OS 發出異步操作,並將回調方法登記到 Completion Dispatcher(完成分派器),后者將在操作完成時通知 Web 服務器。於是 OS 代表 Web 服務器執行操作,並隨即在一個周知的地方將結果排隊。Completion Dispatcher 負責使完成通知出隊,並執行適當的、含有應用特有的 Web 服務器代碼的回調。
使用前攝器模式的主要優點是可以啟動多個並發操作,並可並行運行,而不要求應用必須擁有多個線程。操作被應用異步地啟動,它們在 OS 的 I/O 子系統中運行直到完成。發起操作的線程現在可以服務 另外的請求了。
在ACE中,可以通過ACE_Proactor實現前攝器模式。實現方式如下。
5.4.1、創建服務處理器:
Proactor框架中服務處理器均派生自ACE_Service_Handler,它和Reactor框架的事件處理器非常類似。當發生IO操作完成事件時,會觸發相應的事件完成會調函數。
5.4.2、實現服務處理器IO操作
Proactor框架中所有的IO操作都由相應的異步操作類來完成,這些異步操作類都繼承自ACE_Asynch_Operation。常用的有以下幾種。
l ACE_Asynch_Read_Stream, 提供從TCP/IP socket連接中進行異步讀操作.
l ACE_Asynch_Write_Stream, 提供從TCP/IP socket連接中進行異步寫操作.
使用這些操作類的一般方式如下:
l 初始化
將相關的操作注冊到服務處理器中,一般可通過調用其open方法實現。
l 發出IO操作
發出異步IO操作請求,該操作不會阻塞,具體的IO操作過程由操作系統異步完成。
l IO操作完成回調處理
異步IO操作完成后,OS會觸發服務處理器中的相應回調函數,可通過該函數的ACE_Asynch_Result參數獲取相應的返回值。
5.2.3、使用連接器或接受器和遠端進行連接
ACE為Proactor框架提供了兩個工廠類來建立TCP/IP連接。
l ACE_Asynch_Acceptor, 用於被動地建立連接
l ACE_Asynch_Connector 用於主動地建立連接
當遠端連接建立時,連接器或接受器便會創建相應的服務處理器,從而可以實現服務處理。
5.2.4、啟動Proactor事件分發處理
啟動事件分發處理只需如下調用:
while(true)
ACE_Proactor::instance()->handle_events();
2.4.5、程序示例
服務器端:
服務器端簡單的實現了一個EchoServer,流程如下:當客戶端建立連接時,首先發出一個異步讀的異步請求,當讀完成時,將所讀的數據打印出來,並發出一個新的異步請求。
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
class HA_Proactive_Service : public ACE_Service_Handler
{
public:
~HA_Proactive_Service ()
{
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
}
virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
this->handle (h);
if (this->reader_.open (*this) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("HA_Proactive_Service open")));
delete this;
return;
}
ACE_Message_Block *mb = new ACE_Message_Block(buffer,1024);
if (this->reader_.read (*mb, mb->space ()) != 0)
{
ACE_OS::printf("Begin read fail\n");
delete this;
return;
}
return;
}
//異步讀完成后會調用此函數
virtual void handle_read_stream
(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0)
{
mb.release ();
delete this;
return;
}
mb.copy(""); //為字符串添加結束標記'\0'
ACE_OS::printf("rev:\t%s\n",mb.rd_ptr());
mb.release();
ACE_Message_Block *nmb = new ACE_Message_Block(buffer,1024);
if (this->reader_.read (*nmb, nmb->space ()) != 0)
return;
}
private:
ACE_Asynch_Read_Stream reader_;
char buffer[1024];
};
int main(int argc, char *argv[])
{
int port=3000;
ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor;
if (acceptor.open (ACE_INET_Addr (port)) == -1)
return -1;
while(true)
ACE_Proactor::instance ()->handle_events ();
return 0;
}
客戶端:
客戶端代碼比較簡單,就是每隔1秒鍾將當前的系統時間轉換為字符串形式通過異步形式發送給服務器,發送完成后,釋放時間字符的內存空間。
#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
class HA_Proactive_Service : public ACE_Service_Handler
{
public:
~HA_Proactive_Service ()
{
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
}
virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
this->handle (h);
if (this->writer_.open (*this) != 0 )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("HA_Proactive_Service open")));
delete this;
return;
}
ACE_OS::printf("connceted");
for(int i=0;i<10;i++) //每隔秒中發送時間至服務器
{
ACE_OS::sleep(1);
time_t now = ACE_OS::gettimeofday().sec();
char *time = ctime(&now); //獲取當前時間的字符串格式
ACE_Message_Block *mb = new ACE_Message_Block(100);
mb->copy(time);
if (this->writer_.write(*mb,mb->length()) !=0)
{
ACE_OS::printf("Begin read fail\n");
delete this;
return;
}
}
return;
}
//異步寫完成后會調用此函數
virtual void handle_write_dgram
(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
mb.release();
return;
}
private:
ACE_Asynch_Write_Stream writer_;
};
int main(int argc, char *argv[])
{
ACE_INET_Addr addr(3000,"192.168.1.142");
HA_Proactive_Service *client = new HA_Proactive_Service();
ACE_Asynch_Connector<HA_Proactive_Service> connector;
connector.open();
if (connector.connect(addr) == -1)
return -1;
while(true)
ACE_Proactor::instance ()->handle_events ();
return 0;
}
6. ACE的消息存放對象
2.1、ACE Lock類屬
鎖類屬包含的類包裝簡單的鎖定機制,比如互斥體、信號量、讀/寫互斥體和令牌等。這里我就以互斥體為例簡單的介紹一下其使用方法,對其它的鎖類進行一些簡單的說明。
ACE_Message_Block在Ace中用來表示消息的存放空間,可用做網絡通信中的消息緩沖區,使用非常頻繁,下面將在如下方簡單的介紹一下ACE_Message_Block相關功能。
l 創建消息塊
l 釋放消息塊
l 從消息塊中讀寫數據
l 數據的拷貝
l 其它常用函數
6.1、創建消息塊
創建消息塊的方式比較靈活,常用的有以下幾種方式 :
1、直接給消息塊分配內存空間創建。
ACE_Message_Block *mb = new ACE_Message_Block (30);
2、共享底層數據塊創建。
char buffer[100];
ACE_Message_Block *mb = new ACE_Message_Block (buffer,30);
這種方式共享底層的數據塊,被創建的消息塊並不拷貝該數據,也不假定自己擁有它的所有權。在消息塊mb被銷毀時,相關聯的數據緩沖區data將不會被銷毀。這是有意義的:消息塊沒有拷貝數據,因此內存也不是它分配的,這樣它也不應該負責銷毀它。
3、通過duplicate()函數從已有的消息塊中創建副本。
ACE_Message_Block *mb = new ACE_Message_Block (30);
ACE_Message_Block *mb2 = mb->duplicate();
這種方式下,mb2和mb共享同一數據空間,使用的是ACE_Message_Block的引用計數機制。它返回指向要被復制的消息塊的指針,並在內部增加內部引用計數。
4、通過clone()函數從已有的消息塊中復制。
ACE_Message_Block *mb = new ACE_Message_Block (30);
ACE_Message_Block *mb2 = mb->clone();
clone()方法實際地創建整個消息塊的新副本,包括它的數據塊和附加部分;也就是說,這是一次"深拷貝"。
6.2、釋放消息塊
一旦使用完消息塊,程序員可以調用它的release()方法來釋放它。
l 如果消息數據內存是由該消息塊分配的,調用release()方法就也會釋放此內存。
l 如果消息塊是引用計數的,release()就會減少計數,直到到達0為止;之后消息塊和與它相關聯的數據塊才從內存中被移除。
l 如果消息塊是通過共享已分配的底層數據塊創建的,底層數據塊不會被釋放。
無論消息塊是哪種方式創建的,只要在使用完后及時調用release()函數,就能確保相應的內存能正確的釋放。
6.3、從消息塊中讀寫數據
ACE_Message_Block提供了兩個指針函數以供程序員進行讀寫操作,rd_ptr()指向可讀的數據塊地址,wr_ptr()指向可寫的數據塊地址,默認情況下都執行數據塊的首地址。下面的例子簡單了演示它的使用方法。
#include "ace/Message_Queue.h"
#include "ace/OS.h"
int main(int argc, char *argv[])
{
ACE_Message_Block *mb = new ACE_Message_Block (30);
ACE_OS::sprintf(mb->wr_ptr(),"%s","hello");
ACE_OS::printf("%s\n",mb->rd_ptr ());
mb->release();
return 0;
}
注意:這兩個指針所指向的位置並不會自動移動,在上面的例子中,函數執行完畢后,執行的位置仍然是最開始的0,而不是最新的可寫位置5,程序員需要通過wr_ptr(5)函數手動移動寫指針的位置。
6.4、數據的拷貝
一般的數據的拷貝可以通過函數來實現數據的拷貝,copy()還會保證wr_ptr()的更新,使其指向緩沖區的新末尾處。
下面的例子演示了copy()函數的用法。
mb->copy("hello");
mb->copy("123",4);
注意:由於c++是以'\0'作為字符串結束標志的,對於上面的例子,底層數據塊中保存的是"hello\0123\0",而用ACE_OS::printf("%s\n",mb->rd_ptr ());打印出來的結果是"hello",使用copy函數進行字符串連接的時候需要注意。
6.5、其它常用函數
length() 返回當前的數據長度
next() 獲取和設置下一個ACE_Message_Block的鏈接。(用來建立消息隊列非常有用)
space() 獲取剩余可用空間大小
size() 獲取和設置數據存儲空間大小。
注意:
這里說一下ACE::read_n 的行為:
ACE::read_n 會試圖讀取buf長度的數據.如果遇到文件結束(EOF)或者錯誤則返回 0 或 -1;如果先到達了buf長度則返回數據區長度;問題來了:如果數據讀取成功,但是沒有到達buf長度怎么辦? 如何拿到已讀數據的長度? 這就要用到ACE::read_n的第4個參數,這個參數記錄了實際讀取的數據長度.
在上面的code里還用到了幾個函數:
ACE_Message_Block::size 指數據區的長度, 就是初始化時指定的長度,這里是10;
ACE_Message_Block::length 指數據的長度, 是 wr_ptr() - rd_ptr()的結果.
注意數據區和數據的區別....
ACE_Message_Block::cont ACE_Message_Block還實現了內存的鏈表結構;
7. 總結
一般文章整理的只是ACE的基礎部分,如果需要深入了解ACE還需要通過查看源代碼以進一步了解。可分為如下模塊:
1. 並發和同步
2. 進程間通信(IPC)
3. 內存管理
4. 定時器
5. 信號
6. 文件系統管理
7. 線程管理
8. 事件多路分離和處理器分派
9. 連接建立和服務初始化
10. 軟件的靜態和動態配置、重配置
11. 分層協議構建和流式框架
12. 分布式通信服務:名字、日志、時間同步、事件路由和網絡鎖定。
本文無法一一獵及,望諒。
