用C++11實現一個有界的阻塞隊列


對於一個無界的阻塞隊列而言,其實現非常簡單,即用一個鎖(鎖隊列)+ 一個條件變量(判空)即可。那么對於一個有界阻塞隊列而言,其隊列的容量有上限,其實只要再加一個條件變量用來判斷是否滿即可。
綜上,我們需要

  • mutex: 保護隊列的讀寫操作
  • notEmptyCV: 條件變量,在take時wait, 在put之后notify
  • notFullCV: 條件變量, 在put時wait, 在take之后notify.

C++11提供了豐富的多線程編程庫,包括加鎖解鎖、同步原語等封裝。我的實現如下:

#pragma once
// Must use it higher than C++11

#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <assert.h>
#include <iostream>
#include <thread>

using namespace std::chrono_literals;

template<typename T>
class BoundedBlockingQueue {
public:
	// make class non-copyable
	BoundedBlockingQueue(const BoundedBlockingQueue<T>&) = delete;
	BoundedBlockingQueue& operator=(const BoundedBlockingQueue<T>&) = delete;

	explicit BoundedBlockingQueue<T>(size_t maxSize)
		: mtx_(),
		maxSize_(maxSize)
	{

	}

	void put(const T& x) {
	//	std::cout << std::this_thread::get_id() << " puting" << x << std::endl;
		std::unique_lock<std::mutex> locker(mtx_);
		notFullCV_.wait(locker, [this]() {return queue_.size() < maxSize_; });			

		queue_.push(x);
		notEmptyCV_.notify_one();
	}

	T take() {
	//	std::cout << std::this_thread::get_id() << " taking" << std::endl;
		std::unique_lock<std::mutex> locker(mtx_);
		notEmptyCV_.wait(locker, [this]() {return !queue_.empty(); });

		T front(queue_.front());
		queue_.pop();
		notFullCV_.notify_one();

		return front;
	}

	// with time out
	// @param timeout: max wait time, ms
	// @param outRes: reference result if take successfully
	// @return take successfully or not
	bool take(int timeout, T& outRes) {
		std::unique_lock<std::mutex> locker(mtx_);
		notEmptyCV_.wait_for(locker, timeout*1ms, [this]() {return !queue_.empty(); });
		if(queue_.empty()) return false;
		
		outRes = queue_.front(); queue_.pop();
		notFullCV_.notify_one();

		return true;
	}

	// Checking BlockingQueue status from outside
	// DO NOT use it as internal call, which will cause DEADLOCK
	bool empty() const {
		std::unique_lock<std::mutex> locker(mtx_);
		return queue_.empty();
	}

	size_t size() const {
		std::unique_lock<std::mutex> locker(mtx_);
		return queue_.size();
	}

	size_t maxSize() const {
		return maxSize_;
	}

private:
	mutable std::mutex mtx_;
	std::condition_variable notEmptyCV_;
	std::condition_variable notFullCV_;
	size_t maxSize_;
	std::queue<T> queue_;
};

如果實現了一個阻塞隊列,其實寫一個生產者消費者模型也就變得十分簡單。甚至都不用去考慮線程同步的問題,就讓生產者put, 消費者take就可以了。

#include "BoundedBlockingQueue.h"
#include <iostream>
#include <vector>
#include <atomic>
#include <thread>
#include <functional>
#include <cstdio>

template<typename T>
class Runable{
public:
    virtual void run(int ThreadId) = 0;

    static void stop() {
        running_ = false;
    }
    static void start() {
        running_ = true;
    }

    static std::atomic_bool running_;
    static BoundedBlockingQueue<T> blque_;
private:
    
};

template<typename T> std::atomic_bool Runable<T>::running_(false);
template<typename T> BoundedBlockingQueue<T> Runable<T>::blque_(3);

template <typename T>
class Producer : public Runable<T> {
public:
    explicit Producer(std::function<T()> createResource, std::function<bool()> stopPred) 
    : createResource_(createResource),
      stopPred_(stopPred)
    {

    }

    virtual void run(int threadId) {
        while(Runable<T>::running_) {
            T obj = createResource_();
            if(stopPred_()) {
                Runable<int>::stop();
                break;
            }            
            Runable<T>::blque_.put(obj);
            printf("Producer %d: put %d\n", threadId, obj);             
        }
    }

private:
    std::function<T()> createResource_;
    std::function<bool()> stopPred_;
};

template <typename T>
class Consumer : public Runable<T> {
public:
    explicit Consumer(std::function<void(T&)> useResource )
    : useResource_(useResource)
    {

    }

    virtual void run(int threadId) {
        T obj;
        while(Runable<T>::running_ || !Runable<T>::blque_.empty()) {
            if(Runable<T>::blque_.take(1000, obj)) { // try to take
                useResource_(obj);
                printf("Consumer %d: take %d\n", threadId, obj);
            }
            else {
                printf("Consumer %d: take timeout!\n", threadId);
            } 
        }   
    }

private:
    std::function<void(T&)> useResource_;
};

using namespace std;
using namespace std::placeholders;

volatile atomic<int> logId;

int create() {
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    return logId++;
}

void useResource(int& obj) {
    //printf("got %d\n", T);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
} 

bool stopPred() {
    return logId >= 10;
}

int main() {
    int pSize = 10;
    int cSize = 3;

    Producer<int> producer(create, stopPred);
    Consumer<int> consumer(useResource);    

    vector<std::thread> producerThrs;
    vector<std::thread> consumerThrs;
    
    Runable<int>::start();
    for(int i = 0; i < pSize; i++) producerThrs.push_back(std::thread(&Producer<int>::run, &producer, i));
    for(int i = 0; i < cSize; i++) consumerThrs.push_back(std::thread(&Consumer<int>::run, &consumer, i));  
       
    for(int i = 0; i < pSize; i++) producerThrs[i].join();
    for(int i = 0; i < cSize; i++) consumerThrs[i].join();

    return 0;
}

輸出結果為

Producer 0: put 6
Producer 5: put 4
Producer 7: put 1
Producer 2: put 5
Producer 1: put 7
Producer 4: put 0
Consumer 1: take 6
Consumer 2: take 4
Consumer 0: take 1
Producer 9: put 8
Producer 6: put 2
Producer 3: put 3
Consumer 0: take 0
Consumer 1: take 5
Consumer 2: take 7
Consumer 1: take 2
Consumer 0: take 3
Consumer 2: take 8


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM