Boost多線程編程
背景
• 今天互聯網應用服務程序普遍使用多線程來提高與多客戶鏈接時的效率;為了達到最大的吞吐量,事務服務器在單獨的線程上運行服務程序;
GUI應用程序將那些費時,復雜的處理以線程的形式單獨運行,以此來保證用戶界面能夠及時響應用戶的操作。這樣使用多線程的例子還有很多。
• 跨平台
創建線程
• 頭文件 <boost/thread/thread.hpp>
namespace boost {
class thread;
class thread_group;
}
• thread():構造一個表示當前執行線程的線程對象
• explicit thread(const boost::function0<void>& threadfunc)
注:boost::function0<void>可以簡單看為:一個無返回(返回void),無參數的函數。這里的函數也可以是類重載operator()構成的函數。
第一種方式:最簡單方法
• #include <boost/thread/thread.hpp>
• #include <iostream>
•
• void hello()
• {
• std::cout <<
• "Hello world, I''m a thread!"
• << std::endl;
• }
•
• int main(int argc, char* argv[])
• {
• boost::thread thrd(&hello);
• thrd.join();
• return 0;
• }
第二種方式:復雜類型對象作為參數來創建線程
• #include <boost/thread/thread.hpp>
• #include <boost/thread/mutex.hpp>
• #include <iostream>
•
• boost::mutex io_mutex;
•
• struct count
• {
• count(int id) : id(id) { }
•
• void operator()()
• {
• for (int i = 0; i < 10; ++i)
• {
• boost::mutex::scoped_lock
• lock(io_mutex);
• std::cout << id << ": "
• << i << std::endl;
• }
• }
•
• int id;
• };
•
• int main(int argc, char* argv[])
• {
• boost::thread thrd1(count(1));
• boost::thread thrd2(count(2));
• thrd1.join();
• thrd2.join();
• return 0;
• }
第三種方式:在類內部創建線程
• (1)類內部靜態方法啟動線程
• #include <boost/thread/thread.hpp>
• #include <iostream>
• class HelloWorld
• {
• public:
• static void hello()
• {
• std::cout <<
• "Hello world, I''m a thread!"
• << std::endl;
• }
• static void start()
• {
•
• boost::thread thrd( hello );
• thrd.join();
• }
•
• };
• int main(int argc, char* argv[])
• {
• HelloWorld::start();
•
• return 0;
• }
• 在這里start()和hello()方法都必須是static方法。
• (2)如果要求start()和hello()方法不能是靜態方法則采用下面的方法創建線程:
• #include <boost/thread/thread.hpp>
• #include <boost/bind.hpp>
• #include <iostream>
• class HelloWorld
• {
• public:
• void hello()
• {
• std::cout <<
• "Hello world, I''m a thread!"
• << std::endl;
• }
• void start()
• {
• boost::function0< void> f = boost::bind(&HelloWorld::hello,this);
• boost::thread thrd( f );
• thrd.join();
• }
•
• };
• int main(int argc, char* argv[])
• {
• HelloWorld hello;
• hello.start();
• return 0;
• }
• (3)在Singleton模式內部創建線程:
• #include <boost/thread/thread.hpp>
• #include <boost/bind.hpp>
• #include <iostream>
• class HelloWorld
• {
• public:
• void hello()
• {
• std::cout <<
• "Hello world, I''m a thread!"
• << std::endl;
• }
• static void start()
• {
• boost::thread thrd( boost::bind
• (&HelloWorld::hello,&HelloWorld::getInstance() ) ) ;
• thrd.join();
• }
• static HelloWorld& getInstance()
• {
• if ( !instance )
• instance = new HelloWorld;
• return *instance;
• }
• private:
• HelloWorld(){}
• static HelloWorld* instance;
•
• };
• HelloWorld* HelloWorld::instance = 0;
• int main(int argc, char* argv[])
• {
• HelloWorld::start();
• return 0;
• }
第四種方法:用類內部函數在類外部創建線程
• #include <boost/thread/thread.hpp>
• #include <boost/bind.hpp>
• #include <string>
• #include <iostream>
• class HelloWorld
• {
• public:
• void hello(const std::string& str)
• {
• std::cout <<str<< std::endl;
• }
• };
•
• int main(int argc, char* argv[])
• {
• HelloWorld obj;
• boost::thread thrd( boost::bind(&HelloWorld::hello,&obj,"Hello
• world, I''m a thread!" ) ) ;
• thrd.join();
• return 0;
• }
如果線程需要綁定的函數有參數則需要使用boost::bind。比如想使用 boost::thread創建一個線程來執行函數:void f(int i),
如果這樣寫:boost::thread thrd(f)是不對的,因為thread構造函數聲明接受的是一個沒有參數且返回類型為void的型別,而且
不提供參數i的值f也無法運行,這時就可以寫:boost::thread thrd(boost::bind(f,1))。涉及到有參函數的綁定問題基本上都
是boost::thread、boost::function、boost::bind結合起來使用。
互斥體
• 一個互斥體一次只允許一個線程訪問共享區。當一個線程想要訪問共享區時,首先要做的就是鎖住(lock)互斥體。
• Boost線程庫支持兩大類互斥體,包括簡單互斥體(simple mutex)和遞歸互斥體(recursive mutex)。
有了遞歸互斥體,單個線程就可以對互斥體多次上鎖,當然也必須解鎖同樣次數來保證其他線程可以對這個互斥體上鎖。
• Boost線程庫提供的互斥體類型:
boost::mutex,
boost::try_mutex,
boost::timed_mutex,
boost::recursive_mutex,
boost::recursive_try_mutex,
boost::recursive_timed_mutex,
boost::shared_mutex
• mutex類采用Scope Lock模式實現互斥體的上鎖和解鎖。即構造函數對互斥體加鎖,析構函數對互斥體解鎖。
• 對應現有的幾個mutex導入了scoped_lock,scoped_try_lock,scoped_timed_lock.
• scoped系列的特色就是析構時解鎖,默認構造時加鎖,這就很好的確定在某個作用域下某線程獨占某段代碼。
mutex+scoped_lock
• #include <boost/thread/thread.hpp>
• #include <boost/thread/mutex.hpp>
• #include <boost/bind.hpp>
• #include <iostream>
• boost::mutex io_mutex;
• void count(int id)
• {
• for (int i = 0; i < 10; ++i)
• {
• boost::mutex::scoped_lock lock(io_mutex);
• std::cout << id << ": " << i << std::endl;
• }
• }
• int main(int argc, char* argv[])
• {
• boost::thread thrd1(boost::bind(&count, 1));
• boost::thread thrd2(boost::bind(&count, 2));
• thrd1.join();
• thrd2.join();
• return 0;
• }
try_mutex+scoped_try_lock
• void loop(void)
• {
• bool running = true;
• while (running)
• {
• static boost::try_mutex iomutex;
• {
• boost::try_mutex::scoped_try_lock lock(iomutex);//鎖定mutex
• if (lock.owns_lock())
• {
• std::cout << "Get lock." << std::endl;
• }
• else
• {
• // To do
• std::cout << "Not get lock." << std::endl;
• boost::thread::yield(); //釋放控制權
• continue;
• }
• } //lock析構,iomutex解鎖
• }
• }
timed_mutex+scoped_timed_mutex
• void loop(void)
• {
• bool running = true;
• while (running)
• {
• typedef boost::timed_mutex MUTEX;
• typedef MUTEX::scoped_timed_lock LOCK;
• static MUTEX iomutex;
• {
• boost::xtime xt;
• boost::xtime_get(&xt,boost::TIME_UTC);
• xt.sec += 1; //超時時間秒
• LOCK lock(iomutex, xt); //鎖定mutex
• if (lock.owns_lock())
• {
• std::cout << "Get lock." << std::endl;
• }
• else
• {
• std::cout << "Not get lock." << std::endl;
• boost::thread::yield(); //釋放控制權
• }
• //::sleep(10000); //長時間
• } //lock析構,iomutex解鎖
• //::sleep(250);
• }
• }
shared_mutex
• 應用boost::thread的shared_mutex實現singled_write/multi_read的簡單例子
• #include <iostream>
• #include <boost/thread/thread.hpp>
• #include <boost/thread/shared_mutex.hpp>
• using namespace std;
• using namespace boost;
• boost::shared_mutex shr_mutex;
• /// 這個是輔助類,能夠保證log_info被完整的輸出
• class safe_log {
• public:
• static void log(const std::string& log_info) {
• boost::mutex::scoped_lock lock(log_mutex);
• cout << log_info << endl;
• }
• private:
• static boost::mutex log_mutex;
• };
• boost::mutex safe_log::log_mutex;
• void write_process() {
• shr_mutex.lock();
• safe_log::log("begin of write_process");
• safe_log::log("end of write_process");
• shr_mutex.unlock();
• }
• void read_process() {
• shr_mutex.lock_shared();
• safe_log::log("begin of read_process");
• safe_log::log("end of read_process");
• shr_mutex.unlock_shared();
• }
• int main() {
• thread_group threads;
• for (int i = 0; i < 10; ++ i) {
• threads.create_thread(&write_process);
• threads.create_thread(&read_process);
• }
• threads.join_all();
• ::system("PAUSE");
• return 0;
• }
條件變量
• 有的時候僅僅依靠鎖住共享資源來使用它是不夠的。有時候共享資源只有某些狀態的時候才能夠使用。
比方說,某個線程如果要從堆棧中讀取數據,那么如果棧中沒有數據就必須等待數據被壓棧。這種情
況下的同步使用互斥體是不夠的。另一種同步的方式--條件變量,就可以使用在這種情況下。
• boost::condition
typedef condition_variable_any condition;
void wait(unique_lock<mutex>& m);
• boost::condition_variable
template<typename lock_type>
void wait(lock_type& m);
• #include <boost/thread/thread.hpp>
• #include <boost/thread/mutex.hpp>
• #include <boost/thread/condition.hpp>
• #include <iostream>
• const int BUF_SIZE = 10;
• const int ITERS = 100;
• boost::mutex io_mutex;
• class buffer
• {
• public:
• typedef boost::mutex::scoped_lock scoped_lock;
• buffer()
• : p(0), c(0), full(0)
• {
• }
• void put(int m)
• {
• scoped_lock lock(mutex);
• if (full == BUF_SIZE)
• {
• {
• boost::mutex::scoped_lock lock(io_mutex);
• std::cout << "Buffer is full. Waiting..." << std::endl;
• }
• while (full == BUF_SIZE)
• cond.wait(lock);
• }
• buf[p] = m;
• p = (p+1) % BUF_SIZE;
• ++full;
• cond.notify_one();
• }
• int get()
• {
• scoped_lock lk(mutex);
• if (full == 0)
• {
• {
• boost::mutex::scoped_lock lock(io_mutex);
• std::cout << "Buffer is empty. Waiting..." << std::endl;
• }
• while (full == 0)
• cond.wait(lk);
• }
• int i = buf[c];
• c = (c+1) % BUF_SIZE;
• --full;
• cond.notify_one();
• return i;
• }
• private:
• boost::mutex mutex;
• boost::condition cond;
• unsigned int p, c, full;
• int buf[BUF_SIZE];
• };
• buffer buf;
• void writer()
• {
• for (int n = 0; n < ITERS; ++n)
• {
• {
• boost::mutex::scoped_lock lock(io_mutex);
• std::cout << "sending: " << n << std::endl;
• }
• buf.put(n);
• }
• }
• void reader()
• {
• for (int x = 0; x < ITERS; ++x)
• {
• int n = buf.get();
• {
• boost::mutex::scoped_lock lock(io_mutex);
• std::cout << "received: " << n << std::endl;
• }
• }
• }
• int main(int argc, char* argv[])
• {
• boost::thread thrd1(&reader);
• boost::thread thrd2(&writer);
• thrd1.join();
• thrd2.join();
• return 0;
• }
線程局部存儲
• 函數的不可重入。
• Boost線程庫提供了智能指針boost::thread_specific_ptr來訪問本地存儲線程(thread local storage)。
• #include <boost/thread/thread.hpp>
• #include <boost/thread/mutex.hpp>
• #include <boost/thread/tss.hpp>
• #include <iostream>
• boost::mutex io_mutex;
• boost::thread_specific_ptr<int> ptr;
• struct count
• {
• count(int id) : id(id) { }
• void operator()()
• {
• if (ptr.get() == 0)
• ptr.reset(new int(0));
• for (int i = 0; i < 10; ++i)
• {
• (*ptr)++; // 往自己的線程上加
• boost::mutex::scoped_lock lock(io_mutex);
• std::cout << id << ": " << *ptr << std::endl;
• }
• }
• int id;
• };
• int main(int argc, char* argv[])
• {
• boost::thread thrd1(count(1));
• boost::thread thrd2(count(2));
• thrd1.join();
• thrd2.join();
• return 0;
• }
僅運行一次的例程
• 如何使得初始化工作(比如說構造函數)也是線程安全的。
• “一次實現”(once routine)。“一次實現”在一個應用程序只能執行一次。
• Boost線程庫提供了boost::call_once來支持“一次實現”,並且定義了一個標志boost::once_flag及一個初始化這個標志的宏 BOOST_ONCE_INIT。
• #include <boost/thread/thread.hpp>
• #include <boost/thread/once.hpp>
• #include <iostream>
• int i = 0;
• boost::once_flag flag = BOOST_ONCE_INIT;
• void init()
• {
• ++i;
• }
• void thread()
• {
• boost::call_once(&init, flag);
• }
• int main(int argc, char* argv[])
• {
• boost::thread thrd1(&thread);
• boost::thread thrd2(&thread);
• thrd1.join();
• thrd2.join();
• std::cout << i << std::endl;
• return 0;
• }
Boost線程庫的未來
• Boost線程庫正在計划加入一些新特性。其中包括boost::read_write_mutex,它可以讓多個線程同時從共享區中讀取數據,
但是一次只可能有一個線程向共享區寫入數據;boost::thread_barrier,它使得一組線程處於等待狀態,知道所有得線程
都都進入了屏障區;boost::thread_pool,他允許執行一些小的routine而不必每一都要創建或是銷毀一個線程。
• Boost線程庫已經作為標准中的類庫技術報告中的附件提交給C++標准委員會,它的出現也為下一版C++標准吹響了第一聲號角。
委員會成員對 Boost線程庫的初稿給予了很高的評價,當然他們還會考慮其他的多線程庫。他們對在C++標准中加入對多線程的
支持非常感興趣。從這一點上也可以看出,多線程在C++中的前途一片光明。