boost::lockfree::queue多線程讀寫實例


最近的任務是寫一個多線程的東西,就得接觸多線程隊列了,我反正是沒學過分布式的,代碼全憑感覺寫出來的,不過運氣好,代碼能夠work= =

話不多說,直接給代碼吧,一個多消費者,多生產者的模式。假設我的任務是求隊列的中位數是啥,每消費10000次的時候,我要知道中位數是什么。

至於加不加鎖,這個看你了,我反正是加了,代碼里面沒寫……我反正是把寫的代碼單獨封裝了一個函數,然后加了個鎖

歡迎交流,這個代碼已經在實際任務上面上線了,希望不會有bug。

用的是boost::lockfree::queue,官方文檔:http://www.boost.org/doc/libs/1_55_0/boost/lockfree/queue.hpp

/*
關於鎖的代碼:

偉大的Boost庫給我們提供了 shared_mutex  類,結合 unique_lock 與 shared_lock 的使用,可以實現讀寫鎖。



通常讀寫鎖需要完成以下功能:

1.當 data 被線程A讀取時,其他線程仍可以進行讀取卻不能寫入

2.當 data 被線程A寫入時,其他線程既不能讀取也不能寫入



對應於功能1,2我們可以這樣來描述:

1.當線程A獲得共享鎖時,其他線程仍可以獲得共享鎖但不能獲得獨占鎖

2.當線程A獲得獨占鎖時,其他線程既不能獲得共享鎖也不能獲得獨占鎖

typedef boost::shared_lock<boost::shared_mutex> read_lock;  
typedef boost::unique_lock<boost::shared_mutex> write_lock;  
  
boost::shared_mutex read_write_mutex;  
int32_t data = 1;  
  
//線程A,讀data  
{  
    read_lock rlock(read_write_mutex);  
    std::cout << data << std:; endl;  
}  
  
//線程B,讀data  
{  
    read_lock rlock(read_write_mutex);  
    std::cout << data << std:; endl;  
}  
  
//線程C,寫data  
{  
    write_lock rlock(read_write_mutex);  
    data = 2;  
}  
*/



#ifndef DYNAMIC_QUEUE_H_
#define DYNAMIC_QUEUE_H_

#include "boost/lockfree/queue.hpp"
#include "boost/thread/thread.hpp"
#include "boost/thread/mutex.hpp"
#include "abtest_parameters.h"

namespace un {
class DynamicController {

public:
boost::lockfree::queue<size_t,boost::lockfree::capacity<40000> > lockfree_queue;
// boost::lockfree::queue  boost里面的無鎖隊列,唯一比較蛋疼的就是空間最大65536以及沒法輸出size,其他的就將就用吧。
// 隊列長度可以自定義,也可以不定義,會自增長的。

size_t num = 0;

void StartDaemonUpdater(){
  boost::function0<void> f = boost::bind(&DynamicController::UpdaterWorker, this);
  boost::thread thrd(f);
  thrd.detach();
}
// 啟動消費者隊列

void Producer(size_t number){
  bool succ = lockfree_queue.bounded_push(number);
  // 如果用push的話,沒空間的話,會等待消費完。
  // bounded_push的話,如果每空間會返回false,然后棄掉這個數。成功返回true
}
// 生產者

size_t GetNumber(
  return num;
}
// get代碼

void UpdaterWorker(void){
  std::vector<size_t> V;
  while(1){//穩妥起見,這個while里面可以寫個sleep以至於不需要一直在消費。
    size_t tmp_value;
    while(lockfree_queue.pop(tmp_value)){
      V.push_back(tmp_value);
      // 更新條件,10000個數
      // 用p99更新
      if(V.size()>10000){
        std::sort(V.begin(),V.end());
        num = V[size_t(V.size()*0.5)];
        V.clear();
      }
    }
  }
}

// 消費者

};
}
#endif


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM