最近的任務是寫一個多線程的東西,就得接觸多線程隊列了,我反正是沒學過分布式的,代碼全憑感覺寫出來的,不過運氣好,代碼能夠work= =
話不多說,直接給代碼吧,一個多消費者,多生產者的模式。假設我的任務是求隊列的中位數是啥,每消費10000次的時候,我要知道中位數是什么。
至於加不加鎖,這個看你了,我反正是加了,代碼里面沒寫……我反正是把寫的代碼單獨封裝了一個函數,然后加了個鎖
歡迎交流,這個代碼已經在實際任務上面上線了,希望不會有bug。
用的是boost::lockfree::queue,官方文檔:http://www.boost.org/doc/libs/1_55_0/boost/lockfree/queue.hpp
/*
關於鎖的代碼:
偉大的Boost庫給我們提供了 shared_mutex 類,結合 unique_lock 與 shared_lock 的使用,可以實現讀寫鎖。
通常讀寫鎖需要完成以下功能:
1.當 data 被線程A讀取時,其他線程仍可以進行讀取卻不能寫入
2.當 data 被線程A寫入時,其他線程既不能讀取也不能寫入
對應於功能1,2我們可以這樣來描述:
1.當線程A獲得共享鎖時,其他線程仍可以獲得共享鎖但不能獲得獨占鎖
2.當線程A獲得獨占鎖時,其他線程既不能獲得共享鎖也不能獲得獨占鎖
typedef boost::shared_lock<boost::shared_mutex> read_lock;
typedef boost::unique_lock<boost::shared_mutex> write_lock;
boost::shared_mutex read_write_mutex;
int32_t data = 1;
//線程A,讀data
{
read_lock rlock(read_write_mutex);
std::cout << data << std:; endl;
}
//線程B,讀data
{
read_lock rlock(read_write_mutex);
std::cout << data << std:; endl;
}
//線程C,寫data
{
write_lock rlock(read_write_mutex);
data = 2;
}
*/
#ifndef DYNAMIC_QUEUE_H_
#define DYNAMIC_QUEUE_H_
#include "boost/lockfree/queue.hpp"
#include "boost/thread/thread.hpp"
#include "boost/thread/mutex.hpp"
#include "abtest_parameters.h"
namespace un {
class DynamicController {
public:
boost::lockfree::queue<size_t,boost::lockfree::capacity<40000> > lockfree_queue;
// boost::lockfree::queue boost里面的無鎖隊列,唯一比較蛋疼的就是空間最大65536以及沒法輸出size,其他的就將就用吧。
// 隊列長度可以自定義,也可以不定義,會自增長的。
size_t num = 0;
void StartDaemonUpdater(){
boost::function0<void> f = boost::bind(&DynamicController::UpdaterWorker, this);
boost::thread thrd(f);
thrd.detach();
}
// 啟動消費者隊列
void Producer(size_t number){
bool succ = lockfree_queue.bounded_push(number);
// 如果用push的話,沒空間的話,會等待消費完。
// bounded_push的話,如果每空間會返回false,然后棄掉這個數。成功返回true
}
// 生產者
size_t GetNumber(
return num;
}
// get代碼
void UpdaterWorker(void){
std::vector<size_t> V;
while(1){//穩妥起見,這個while里面可以寫個sleep以至於不需要一直在消費。
size_t tmp_value;
while(lockfree_queue.pop(tmp_value)){
V.push_back(tmp_value);
// 更新條件,10000個數
// 用p99更新
if(V.size()>10000){
std::sort(V.begin(),V.end());
num = V[size_t(V.size()*0.5)];
V.clear();
}
}
}
}
// 消費者
};
}
#endif