參考:
基本上使用了“ ACE_Task介紹(生產者/消費者)v3.0 - CSDN博客 http://blog.csdn.net/calmreason/article/details/16922561/ ”中的例子和介紹
ACE_Task框架 與Windows 消息循環對比 - CSDN博客 http://blog.csdn.net/zzhongcy/article/details/41379917
ACE的Task框架 - CSDN博客 http://blog.csdn.net/dongyu_1989/article/details/72858166
一個很好的例子,其中ACE_MESSAGE_Block的用法完全正確,前面兩個例子里,有內存泄露之嫌:
用ACE實現的生產者和消費者模式 - CSDN博客 http://blog.csdn.net/colder2008/article/details/5838298
ACE_Thread_Manager(v3.12) - CSDN博客 http://blog.csdn.net/calmreason/article/details/36399697
ACE_Task框架結構:
Task框架與其他框架不同,它沒有對應的框架模式,可以將Task框架看成是ACE的多線程編程接口。Task框架分為3部分來分析:第一部分是ACE用於線程間通信的消息隊列;第二部分是ACE對操作系統多線程接口的面相對象的封裝,在linux環境下主要是對POSIX API的封裝;第三部分是ACE的多線程編程接口,它是第一,二部分更高層的封裝。
Task框架通過在對象的上下文創建線程,在獨立的線程間傳遞消息,來提高網絡並發程序的模塊性和可擴展性。Task框架類結構如下圖:

ACE_Message_Queue類則是ACE實現的一種消息隊列,用於線程間通信(TaskA將消息putq進入TaskB的消息隊列,TaskB從自己的消息隊列中getq獲得這個消息,從而實現通信),
當然也可以應用於其他場景,每個ACE_Task類都有一個消息隊列,將消息隊列和線程集成在一起,可以大大簡化線程間通信編程。
ACE_Thread_Manager類用於線程管理,它是ACE對各種平台下線程編程接口的封裝。它是一個線程管理倉庫,用來創建和銷毀線程,因此我們把它稱為線程管理器。
每一個通過Task框架創建的線程都有一個線程描述符對象保存在線程管理器的倉庫中,用於管理線程的運行狀態。
一個經典的 Task框架應用實例就是生產者,消費者模式,例子:
用ACE實現的生產者和消費者模式 - CSDN博客 http://blog.csdn.net/colder2008/article/details/5838298
這是一個很好的例子,其中ACE_MESSAGE_Block的用法完全正確,其他的例子里,有內存泄露之嫌。(詳細可百度ACE_MESSAGE_Block的用法)
ACE_Task面向對象的線程
ACE使用此類來實現主動對象模式。所有希望成為“主動對象”的對象都必須由此類派生。同時可將它看作是更高級的、更為面向對象的線程。
ACE_Task可用作:
ACE消息隊列實現分析
ACE_Task封裝了任務,每個任務都含有一或多個線程,以及一個底層消息隊列。各個任務通過這些消息隊列進行通信。
發送任務用putq() 將消息插入到另一任務的消息隊列中,接收任務通過使用getq()將消息提取出來。
ACE並沒有使用普通的隊列機制,比如STL的隊列容器,而是設計了一個功能強大的消息隊列——ACE_Message_Queue。ACE_Message_Queue類提供了隊列的數據和操作接口,ACE_Message_Block類是消息隊列中的數據單元,ACE_Data_Block類用於封裝應用程序的數據。ACE_Date_Block類封裝的應用程序的數據既可以是數據,又可以是指針。
同時提供這兩種封裝方式可以提高框架的靈活性,而提高對指針的封裝,可以避免數據復制,提高框架的性能。
應用程序只需要關注ACE_Message_Block對象。ACE_Data_Block對象默認情況隱藏在ACE_Message_Block對象內部,
應用程序不需要關注,這樣可以簡化應用程序對隊列元素的操作。
ACE_Message_Block和ACE_Message_Queue
ACE Message_Queue由一或多個通過prev_和next_指針鏈接在一起的Message_Block組成。這樣的結構可以高效地操作任意大的消息,而不會導致巨大的內存拷貝開銷。
ace 之 ACE_Message_Block - CSDN博客 http://blog.csdn.net/wskdgv666/article/details/49536053
ACE_Message_Block消息數據類 - 熾離 - 博客園 http://www.cnblogs.com/hgwang/p/5940168.html
ACE: ACE_Message_Queue<> Class Template Reference http://www.dre.vanderbilt.edu/Doxygen/5.4.8/html/ace/classACE__Message__Queue.html
ACE_Message_Queue例子 - CSDN博客 http://blog.csdn.net/dongyu_1989/article/details/72868964
ACE_Message_Block例子 - CSDN博客 http://blog.csdn.net/dongyu_1989/article/details/72863942
ACE_Task的主要方法:
open():初始化資源,如果創建線程,在此方法里面調用 activate
close():釋放資源,svc退出之后會自動調用此方法,常在此釋放資源,線程數是多個的時候不可以直接在close里面delete this
svc():線程的啟動位置,線程運行就是執行此函數
putq():放置消息到任務的消息隊列中
getq():從任務的消息隊列中取出消息
thr_count():返回任務中線程的數目
last_thread():返回任務中將線程計數器從1降為0的線程的ID
PS: 由於ACE_Task對象一般是在堆中創建的,因此必須要進行釋放操作.
{
public:
virtual int open (void *args = 0)
{
activate( THR_NEW_LWP, 1 );
return 0;
}
virtual int close (u_long flags = 0)
{
if ( ACE_OS::thr_equal ( ACE_Thread::self (), this->last_thread () ) )
{
//釋放對象
delete this;
}
return 0;
}
virtual int svc (void)
{
return 0;
}
};
多線程的常用方法
等待所有線程退出
ACE_Thread_Manager::instance()->cancel_all();
ACE_Thread_Manager::instance()->wait();
退出當前線程:
下面的這句話寫在線程執行的地方- ACE_Thread_Manager::instance()->exit();
與ACE_Task_Base的關系
ACE_Task_Base是主動對象的基類,ACE_Task繼承了ACE_Task_Base的線程功能之后添加了具有同步策略功能的消息隊列ACE_Message_Queue。
如果你只需要一個線程對象,你可以直接使用ACE_Task_Base
同步模式
分兩種: ACE_MT_SYNCH(多線程)和ACE_NULL_SYNCH(單線程)。
多線程模式下線程的消息隊列會使用多線程同步策略,會造成線程的阻塞;單線程模式下不存在同步的額外開銷;多線程下保證一個線程對象在同一時刻只有一個方法在執行。
ACE_Task可以啟動一個或多個線程,以及一個底層消息隊列。各個任務通過消息隊列進行通信。至於消息隊列實現的內在細節程序員不必關注。
Message_Queue類包含在Task類中。Message_Queue可被同步策略類型參數化,以獲取所期望的並發控制級。缺省地,並發控制級是“線程安全”。
如果MT_Synch被用於實例化Message_Queue,所有的公共方法都將是線程安全的,但同時也帶來相應的開銷。相反,
如果Null_Synch類用於實例化Message_Queue,所有公共方法都不是線程安全的,同時也就沒有額外的開銷。
搭建ACE_TASK
要搭架一個基於ACE_Task的消息系統,通常要做如下的步驟:
- 編寫一個派生自ACE_Task的類,指定它的同步模式
ACE_Task的消息隊列可以由多個處理線程共享使用,所以需要提供同步模式,例如 ACE_MT_SYNCH和ACE_NULL_SYNCH分別表示基於多線程的同步和不使用同步,這個參數是ACE_Task的一個模板參數。 -
class My_Task : public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc(); } - 重載 ACE_Task的 svc 方法,編寫消息循環相關的代碼,操作消息隊列。當然也可以不使用消息隊列,將svc作為一般線程函數使用也是可以的,例如使用recv方法接收udp、tcp數據等
int My_Task::svc() { ACE_Message_Block * msg; while(getq(msg) != -1) // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0); { // process msg here } }svc 方法相當與處理線程的入口方法。ACE_Task<ACE_MT_SYNCH> 類自帶一個消息隊列取消息的方法是:this->getq(blk);
放消息的方法是:this->putq(blk);//注意不是this->put(blk); - 假設 My_Task是一個基於ACE_Task的類,創建一個唯一的My_Task實例,這個可以通過
typedef ACE_Singleton<MyTask, SYNCH_METHOD> MYTASK;
然后總是使用MYTASK::instance方法來獲取一個My_Task的指針來完成。 - 在適當位置(一般是程序開始的時候),讓My_Task開始工作
MYTASK::intance()->activate(
THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , // 線程創建的屬性
n_threads = 1, // 線程的數目,即有多少處理線程
...) -
上面5中的activaate方法一般在open中調用,調用完activate方法之后svc方法自動運行。而open方法可以像5中描述的那樣,在適當的位置調用
指定線程的創建標志:在activate方法執行的時候可以指定線程的內部類型,THR_DETACHED(分離的,可以直接被ACE_Thread_Manager::wait()方法來回收),
默認情況下線程的內部類型是THR_JOINABLE(可結合的,此線程退出的狀態會被其他線程捕獲並作出相應的處理);
THR_NEW_LWP (掛鈎到內核級線程,會創建一個內核線程);
你的線程如果在獨立運行一般你會使用:activate(THR_NEW_LWP | THR_BOUND | THR_DETACHED,1);來創建你的線程,1表示創建一個線程。 - 在有消息發生的時候發送消息
ACE_Message_Block * msg; // fill the msg ... MYTASK::intance()->putq(msg);
-
等待所有線程退出,通常寫在main函數的最后:ACE_Thread_Manager::instance()->wait();
最后考慮一個使用ACE_Task的實例,在一個編寫WEB服務器的項目中,類 Request_Handler負責處理HTTP請求,Request_Hanlder派生自ACE_Task,當有請求時,其他的代碼將Http請求構造成一個ACE_Message_Block,並調用Request_Handler的putq方法將請求插入消息隊列,Request_Handler配置為根據CPU的數目創建處理線程,Request_Handler的svc方法從隊列中獲取請求進行處理,然后將處理的結果構造成為一個ACE_Message_Block,插入到Response_Handler的消息隊列,Response_Handler也派生自ACE_Task,但它只有一個處理線程,它僅僅將相應的數據寫回給客戶端。
生產者消費者實例1:生產者和消費者共享同一個內部消息隊列
- #ifndef PRODUCEAUDIO_H
- #define PRODUCEAUDIO_H
- #include "ace/Task.h"
- class ProduceAudio : public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- ProduceAudio(ACE_Thread_Manager *thr_man=0,
- ACE_Message_Queue<ACE_MT_SYNCH> *mq=0);
- ~ProduceAudio(void);
- int open(void*);
- int svc(void);
- };
- #endif
- #include "ProduceAudio.h"
- #include "ace/Log_Msg.h"
- #include "ace/OS.h"
- #include "Converter.h"
- #include <string>
- using namespace std;
- ProduceAudio::ProduceAudio(ACE_Thread_Manager *thr_man,
- ACE_Message_Queue<ACE_MT_SYNCH> *mq)
- :ACE_Task<ACE_MT_SYNCH>(thr_man,mq)
- {
- }
- ProduceAudio::~ProduceAudio(void)
- {
- ACE_DEBUG((LM_DEBUG, "(%t) ~ProduceAudio()\n"));
- }
- int ProduceAudio::open(void*)
- {
- ACE_DEBUG((LM_DEBUG, "(%t) ProduceAudio task opened\n"));
- activate(THR_NEW_LWP,1);
- return 0;
- }
- int ProduceAudio::svc(void)
- {
- ACE_DEBUG((LM_DEBUG, "(%t) ProduceAudio::svc() running\n"));
- string s("message");
- for ( int i=0;i<3;++i)
- {
- ACE_Message_Block * blk = new ACE_Message_Block(10);
- blk->copy( (s + lexical_cast<string>(i)).c_str());
- this->putq(blk);
- //this->put(blk);
- ACE_DEBUG((LM_DEBUG, "(%t) ProduceAudio::svc() put(%s),now msg_queue()->message_count()[%d]\n",blk->rd_ptr(),
- this->msg_queue()->message_count()));
- ACE_OS::sleep(1);
- }
- ACE_DEBUG((LM_DEBUG, "(%t) ProduceAudio::svc() return\n"));
- return 0;
- }
- #ifndef SENDTOSERVER_H
- #define SENDTOSERVER_H
- #include "ace/Task.h"
- class SendToServer : public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- SendToServer(ACE_Thread_Manager *thr_man=0,
- ACE_Message_Queue<ACE_MT_SYNCH> *mq=0);
- ~SendToServer(void);
- int open(void*);
- int svc(void);
- };
- #endif
- #include "SendToServer.h"
- #include "ace/OS.h"
- #include <string>
- using namespace std;
- SendToServer::SendToServer(ACE_Thread_Manager *thr_man,
- ACE_Message_Queue<ACE_MT_SYNCH> *mq)
- :ACE_Task<ACE_MT_SYNCH>(thr_man,mq)
- {
- }
- SendToServer::~SendToServer(void)
- {
- ACE_DEBUG((LM_DEBUG, "(%t) ~SendToServer()\n"));
- }
- int SendToServer::open(void*)
- {
- ACE_DEBUG((LM_DEBUG, "(%t) SendToServer task opened\n"));
- activate(THR_NEW_LWP,1);
- return 0;
- }
- int SendToServer::svc(void)
- {
- ACE_DEBUG((LM_DEBUG, "(%t) SendToServer::svc() running\n"));
- ACE_Message_Block * blk = NULL;
- int count =0;
- for ( ; count<3;)
- {
- if (this->msg_queue()->message_count()>0)
- {
- this->getq(blk);
- ++count;
- ACE_DEBUG((LM_DEBUG,"SendToServer get :%s\n",blk->rd_ptr()));
- blk->release();
- }
- ACE_OS::sleep(1);
- }
- ACE_DEBUG((LM_DEBUG, "(%t) SendToServer::svc() return\n"));
- return 0;
- }
- #include "ace/Thread_Manager.h"
- #include "SendToServer.h"
- #include "ProduceAudio.h"
- #ifdef _DEBUG
- #pragma comment (lib,"ACEd.lib")
- #else
- #pragma comment (lib,"ACE.lib")
- #endif
- int main(int argc, char* argv[])
- {
- SendToServer consumer(NULL,NULL);
- ProduceAudio producer(NULL,consumer.msg_queue());
- producer.open(NULL);
- consumer.open(NULL);
- ACE_Thread_Manager::instance()->wait();
- return 0;
- }
生產者消費者實例2:生產者通過引用消費者,來操作消費者的內部消息隊列
- #ifdef _DEBUG
- #pragma comment (lib,"ACEd.lib")
- #else
- #pragma comment (lib,"ACE.lib")
- #endif
- #include "ace/Log_Msg.h"
- #include "ace/Task.h"
- #include "ace/OS.h"
- #include "ace/Message_Block.h"
- #include <stdio.h>
- #include <string.h>
- #include <iostream>
- #include <string>
- #include <sstream>
- using namespace std;
- class My_Data
- {
- public:
- My_Data(){key = ++id;cout<<"My_Data("<<id<<")\n";}
- ~My_Data(){cout<<"~My_Data("<<id<<")\n";}
- string data;
- int key;
- static int id;
- };
- int My_Data::id = 0;
- class Consumer:
- public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- //啟動Task消費線程
- int open(void*)
- {
- ACE_DEBUG((LM_DEBUG, "(%t) Consumer task opened\n"));
- activate(THR_NEW_LWP,1);
- return 0;
- }
- int svc(void)
- {
- //Get ready to receive message from Producer
- do
- {
- ACE_Message_Block * msg =0;
- ACE_DEBUG((LM_DEBUG,"(%t)消費者開始取消息\n"));
- if (!this->msg_queue()->is_empty())//取消息的時候最好要判斷隊列是否為空,因為如果剛開始取就是空的,就會阻塞,后來沒有人喚醒的話就會一直阻塞
- {
- this->getq(msg);//從消息隊列中取出一個消息,這個消息的內存使用權就轉接到消息指針上面了。
- ACE_DEBUG((LM_DEBUG,"(%t)消費者收到消息: 內容[%s]\n",msg->rd_ptr()));
- msg->release();
- }else
- {
- cout<<"隊列空,等待10秒之后再取消息!"<<endl;
- ACE_OS::sleep(10);
- }
- }while(true);
- return 0;
- }
- int close(u_long)
- {
- ACE_DEBUG((LM_DEBUG,"Consumer closes down\n"));
- return 0;
- }
- };
- class Producer : public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- Producer(Consumer * consumer):consumer_(consumer){}
- int open(void*)
- {
- ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened\n"));
- activate(THR_NEW_LWP,1);
- return 0;
- }
- //The Service Processing routine
- int svc(void)
- {
- //生產者深入一個用戶名,放到消費者的隊列中
- do
- {
- My_Data one_data;
- ACE_OS::sleep(1);//防止CPU使用率過高
- ostringstream os;
- os<<one_data.key;
- one_data.data = "name" + os.str();
- ACE_Message_Block* mb = new ACE_Message_Block(100);
- mb->copy(one_data.data.c_str());
- cout<<"將"<<mb->rd_ptr()<<"放入到了隊列中\n";
- this->consumer_->putq(mb);
- } while (shutdown);
- return 0;
- }
- int close(u_long)
- {
- ACE_DEBUG((LM_DEBUG,"Producer closes down\n"));
- return 0;
- }
- private:
- Consumer * consumer_;
- };
- int main(int argc, char * argv[])
- {
- Consumer * consumer = new Consumer;
- Producer * producer = new Producer(consumer);
- producer->open(0);
- consumer->open(0);
- //Wait for all the tasks to exit.
- ACE_Thread_Manager::instance()->wait();
- ACE_OS::system("pause");
- delete producer;
- delete consumer;
- return 0;
- }

多個Task共用一個消息隊列
- SendToServer consumer(NULL,NULL);
- ProduceAudio producer(NULL,consumer.msg_queue());
- producer.msg_queue()->high_water_mark((size_t)(1024*1024*2));
- consumer.open(NULL);
- producer.open(NULL);
一個Task中開啟多個線程
例子1:
例子2:
#include "ace/proactor.h"
#include "ace/task_t.h"
#include "ace/thread_semaphore.h"
#include "ace/WIN32_Proactor.h"
class CProactorTask:public ACE_Task<ACE_MT_SYNCH>
{
public:
CProactorTask(void);
virtual ~CProactorTask(void);
int Start(const int nMax);
int Stop(void);
int Create(void);
int Release(void);
virtual int svc(void);
protected:
ACE_Thread_Semaphore m_sem; //信號量
ACE_Proactor *m_pProactor; // 完成端口對象指針
};
#include "CProactorTask.h"
CProactorTask::CProactorTask(void)
{
}
CProactorTask::~CProactorTask(void)
{
}
//創建完成端口對象
int CProactorTask::Create(void)
{
ACE_WIN32_Proactor *proactor_impl=0;
//新建
ACE_NEW_RETURN(proactor_impl, ACE_WIN32_Proactor, -1);
//關聯
ACE_NEW_RETURN(this->m_pProactor, ACE_Proactor(proactor_impl, 1 ), -1);
//保存
ACE_Proactor::instance(this->m_pProactor, 1);
return 0;
}
//啟動線程池
int CProactorTask::Start(const int nMax)
{
//創建完成端口對象
Create();
//創建線程
this->activate(THR_NEW_LWP,nMax);
int i;
//保證所有線程已啟動
for(i=nMax;i>0;i--)
{
m_sem.acquire();
//Block the thread until the semaphore count becomes greater than 0, then decrement it.
}
printf("start\n");
return 0;
}
//刪除線程池
int CProactorTask::Stop(void)
{
ACE_Proactor::event_loop_done();
this->wait();
return 0;
}
//
//每個線程調用
//
int CProactorTask::svc(void)
{
ACE_DEBUG((LM_INFO,ACE_TEXT("create a new ACE_Proactor::run_event_loop!\n")));
//Increment the semaphore by 1
m_sem.release(1);
ACE_Proactor::run_event_loop();
return 0;
}
//
//釋放
//
int CProactorTask::Release(void)
{
ACE_Proactor::close_singleton();
m_pProactor = 0;
printf("release\n");
return 0;
}
int ACE_TMAIN(int ,char*[])
{
printf("*************Echo Server************\n");
//獲取CPU數量
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
int threadNum=sysInfo.dwNumberOfProcessors<<1; //cpu*2
//開啟線程
CProactorTask task;
task.Start(threadNum);
ACE_INET_Addr serverAddr(5151);
ACE_Asynch_Acceptor<TCPTransfer> acceptor;
acceptor.open(serverAddr);
UDPTransfer UDPR;
UDPR.open_addr(serverAddr);
ACE_DEBUG ((LM_DEBUG,
"(%P | %t):Test ends\n"));
ACE_Thread_Manager::instance ()->wait ();
return 0;
}
