再談多線程模型之生產者消費者(多生產者和多消費者 )(c++11實現)


0.關於

為縮短篇幅,本系列記錄如下:
再談多線程模型之生產者消費者(基礎概念)(c++11實現)
再談多線程模型之生產者消費者(單一生產者和單一消費者)(c++11實現)
再談多線程模型之生產者消費者(單一生產者和多消費者)(c++11實現)
再談多線程模型之生產者消費者(多生產者和單一消費者 )(c++11實現)
再談多線程模型之生產者消費者(多生產者和多消費者 )(c++11實現)本文
再談多線程模型之生產者消費者(總結)(c++11實現)

本文涉及到的代碼演示環境: VS2017

歡迎留言指正

1. 多生產者&多消費者

  • 1.1 相對一對一一對多多對多則是一對一多對多的結合體。
  • 1.2 生產者有多個,且其相互之間存在競爭
  • 1.3 消費者有多個,其其相互之間存在競爭
  • 1.4 大家共用一個緩沖區,還要考慮生產者與消費者之間的T同步情況。
  • 1.5 結構體模型是這樣的:
template<typename T>
struct repo_
{
	// 用作互斥訪問緩沖區
	std::mutex				_mtx_queue;

	// 緩沖區最大size
	unsigned int			_count_max_queue_10 = 10;

	// 緩沖區
	std::queue<T>			_queue;

	// 緩沖區沒有滿,通知生產者繼續生產
	std::condition_variable _cv_queue_not_full;

	// 緩沖區不為空,通知消費者繼續消費
	std::condition_variable _cv_queue_not_empty;



	// 用於生產者之間的競爭
	std::mutex				_mtx_pro;
	// 計算當前已經生產了多少數據了
	unsigned int			_cnt_cur_pro = 0;


	// 用於消費者之間的競爭
	std::mutex				_mtx_con;
	// 計算當前已經消費多少數據了
	unsigned int			_cnt_cur_con = 0;


	repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
		, _cnt_cur_con(0)

	{
		;
	}

	repo_(const repo_&instance) = delete;
	repo_& operator = (const repo_& instance) = delete;
	repo_(const repo_&&instance) = delete;
	repo_& operator = (const repo_&& instance) = delete;

};

可見,相對單一消費者和單一生產者模型,多了下面的代碼,用於解決競爭的問題。


	// 用於生產者之間的競爭
	std::mutex				_mtx_pro;
	// 計算當前已經生產了多少數據了
	unsigned int			_cnt_cur_pro = 0;


	// 用於消費者之間的競爭
	std::mutex				_mtx_con;
	// 計算當前已經消費多少數據了
	unsigned int			_cnt_cur_con = 0;

  • 1.6 生產者線程
template< typename T >
void thread_pro(const int thread_index, const int count_max_produce, repo<T>* param_repo)
{
	if (nullptr == param_repo || NULL == param_repo)
		return;
	

	while (true)
	{
		bool is_running = true;

		{
			// 用於生產者之間競爭
			std::unique_lock<std::mutex> lock(param_repo->_mtx_pro);

			// 緩沖區沒有滿,繼續生產
			if (param_repo->_cnt_cur_pro < cnt_total_10)
			{
				thread_produce_item<T>(thread_index, *param_repo, param_repo->_cnt_cur_pro);
				++param_repo->_cnt_cur_pro;
			}
			else
				is_running = false;
		}

		std::this_thread::sleep_for(std::chrono::microseconds(16));
		if (!is_running)
			break;
	}
}
  • 1.7 消費者線程
template< typename T >
void thread_con(const int thread_index, repo<T>* param_repo)
{
	if (nullptr == param_repo || NULL == param_repo)
		return;

	while (true)
	{
		bool is_running = true;
		{
			std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
			// 還沒消費到指定的數目,繼續消費
			if (param_repo->_cnt_cur_con < cnt_total_10)
			{
				thread_consume_item<T>(thread_index, *param_repo);
				++param_repo->_cnt_cur_con;
			}
			else
				is_running = false;

		}

		std::this_thread::sleep_for(std::chrono::microseconds(16));

		// 結束線程
		if ((!is_running))
			break;
	}
}

1.8 完整源碼

#pragma once

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>



std::mutex _mtx;
std::condition_variable _cv_not_full;
std::condition_variable _cv_not_empty;

const int max_queue_size_10 = 10;

enum
{
	// 總生產數目
	cnt_total_10 = 10,
};


template<typename T>
struct repo_
{
	// 用作互斥訪問緩沖區
	std::mutex				_mtx_queue;

	// 緩沖區最大size
	unsigned int			_count_max_queue_10 = 10;

	// 緩沖區
	std::queue<T>			_queue;

	// 緩沖區沒有滿,通知生產者繼續生產
	std::condition_variable _cv_queue_not_full;

	// 緩沖區不為空,通知消費者繼續消費
	std::condition_variable _cv_queue_not_empty;



	// 用於生產者之間的競爭
	std::mutex				_mtx_pro;
	// 計算當前已經生產了多少數據了
	unsigned int			_cnt_cur_pro = 0;


	// 用於消費者之間的競爭
	std::mutex				_mtx_con;
	// 計算當前已經消費多少數據了
	unsigned int			_cnt_cur_con = 0;


	repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
		, _cnt_cur_con(0)

	{
		;
	}

	repo_(const repo_&instance) = delete;
	repo_& operator = (const repo_& instance) = delete;
	repo_(const repo_&&instance) = delete;
	repo_& operator = (const repo_&& instance) = delete;

};

template <typename T>
using repo = repo_<T>;





//----------------------------------------------------------------------------------------

// 生產者生產數據
template <typename T>
void thread_produce_item(const int &thread_index, repo<T>& param_repo, const T& repo_item)
{
	std::unique_lock<std::mutex> lock(param_repo._mtx_queue);

	// 1. 生產者只要發現緩沖區沒有滿, 就繼續生產
	param_repo._cv_queue_not_full.wait(lock, [&] { return param_repo._queue.size() < param_repo._count_max_queue_10; });

	// 2. 將生產好的商品放入緩沖區
	param_repo._queue.push(repo_item);

	// log to console
	std::cout << "生產者" << thread_index << "生產數據:" << repo_item << "\n";

	// 3. 通知消費者可以消費了
	//param_repo._cv_queue_not_empty.notify_one();
	param_repo._cv_queue_not_empty.notify_one();
}


//----------------------------------------------------------------------------------------
// 消費者消費數據

template <typename T>
T thread_consume_item(const int thread_index, repo<T>& param_repo)
{
	std::unique_lock<std::mutex> lock(param_repo._mtx_queue);

	// 1. 消費者需要等待【緩沖區不為空】的信號
	param_repo._cv_queue_not_empty.wait(lock, [&] {return !param_repo._queue.empty(); });

	// 2. 拿出數據
	T item;
	item = param_repo._queue.front();
	param_repo._queue.pop();

	std::cout << "消費者" << thread_index << "從緩沖區中拿出一組數據:" << item << std::endl;

	// 3. 通知生產者,繼續生產
	param_repo._cv_queue_not_full.notify_one();

	return item;
}


//----------------------------------------------------------------------------------------

/**
*  @ brief: 生產者線程
*  @ thread_index - 線程標識,區分是哪一個線程
*  @ count_max_produce - 最大生產次數
*  @ param_repo - 緩沖區
*  @ return - void

*/
template< typename T >
void thread_pro(const int thread_index, const int count_max_produce, repo<T>* param_repo)
{
	if (nullptr == param_repo || NULL == param_repo)
		return;
	

	while (true)
	{
		bool is_running = true;

		{
			// 用於生產者之間競爭
			std::unique_lock<std::mutex> lock(param_repo->_mtx_pro);

			// 緩沖區沒有滿,繼續生產
			if (param_repo->_cnt_cur_pro < cnt_total_10)
			{
				thread_produce_item<T>(thread_index, *param_repo, param_repo->_cnt_cur_pro);
				++param_repo->_cnt_cur_pro;
			}
			else
				is_running = false;
		}

		std::this_thread::sleep_for(std::chrono::microseconds(16));
		if (!is_running)
			break;
	}
}



/**
*  @ brief: 消費者線程
*  @ thread_index - 線程標識,區分線程
*  @ param_repo - 緩沖區
*  @ return - void

*/
template< typename T >
void thread_con(const int thread_index, repo<T>* param_repo)
{
	if (nullptr == param_repo || NULL == param_repo)
		return;

	while (true)
	{
		bool is_running = true;
		{
			std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
			// 還沒消費到指定的數目,繼續消費
			if (param_repo->_cnt_cur_con < cnt_total_10)
			{
				thread_consume_item<T>(thread_index, *param_repo);
				++param_repo->_cnt_cur_con;
			}
			else
				is_running = false;

		}

		std::this_thread::sleep_for(std::chrono::microseconds(16));

		// 結束線程
		if ((!is_running))
			break;
	}
}


// 入口函數
//----------------------------------------------------------------------------------------

int main(int argc, char *argv[], char *env[])
{
	// 緩沖區
	repo<int> repository;
	// 線程池
	std::vector<std::thread> vec_thread;

	// 生產者
	vec_thread.push_back(std::thread(thread_pro<int>, 1, cnt_total_10, &repository));
	vec_thread.push_back(std::thread(thread_pro<int>, 2, cnt_total_10, &repository));

	// 消費者
	vec_thread.push_back(std::thread(thread_con<int>, 1, &repository));
	vec_thread.push_back(std::thread(thread_con<int>, 2, &repository));



	for (auto &item : vec_thread)
	{
		item.join();
	}

	return 0;
}
  • 1.9 可能的結果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM