0.關於
為縮短篇幅,本系列記錄如下:
再談多線程模型之生產者消費者(基礎概念)(c++11實現)
再談多線程模型之生產者消費者(單一生產者和單一消費者)(c++11實現)
再談多線程模型之生產者消費者(單一生產者和多消費者)(c++11實現)【本文】
再談多線程模型之生產者消費者(多生產者和單一消費者 )(c++11實現)
再談多線程模型之生產者消費者(多生產者和多消費者 )(c++11實現)
再談多線程模型之生產者消費者(總結)(c++11實現)
本文涉及到的代碼演示環境: VS2017
歡迎留言指正
1 單一生產者 & 多消費者
- 1.1 生產者和消費者存在互斥與同步
- 1.2 生產者只有一個,所以,不存在生產者之間互斥
- 1.3 消費者有多個,所以,消費者之間存在互斥。需要考慮到,消費者同時從緩沖區中拿出數據的情況,考慮吃水果的情況,當過盤中放入了多個水果,兒子和女兒就可以同時拿取。當過盤中只有一個水果時,兩個消費者,怎么拿?誰先拿誰先吃。類似線程中的鎖,誰先拿到鎖,誰就能用。 基於 單一生產者&單一消費者中結構體,一個互斥已經不夠用了,那就再來一個。
- 1.4 總結: 到底是生產的快還是消費的快?既然是快,那要怎么處理?如下:
情況 處理 生產者速率 > 消費者速率 最開始,生產者只有一個,生產一件商品放入緩沖區,但是此時存在多個消費者,處理方法和下面的情況是一致的。慢慢的,就會出現: 商品數量>消費者數量 和 商品數量 < 消費者數量 兩種情況出現。當出現商品數量>消費者數量時,需要保證多個消費者不能消費同一個數據,而且,既然有多個商品,那么,消費者之間消費就無需等待了 生產者速率 < 消費者速率 生產者只有一個,不存在生產者之間的沖突;然而消費者存在多個,多個消費者之間存在競爭,既然是競爭,那就需要鎖,哪個線程先拿到鎖,就先消費;因為生產速率跟不上消費速率,所以,消費者與生產者之間不存在沖突
2.源碼
根據上面可知道,對比 單一生產者&單一消費者 的代碼,可以知道,僅僅多了消費者之間的競爭。
- 2.1 結構體模型
template<typename T>
struct repo_
{
// 用作互斥訪問緩沖區
std::mutex _mtx_queue;
// 緩沖區最大size
unsigned int _count_max_queue_10 = 10;
// 緩沖區
std::queue<T> _queue;
// 緩沖區沒有滿,通知生產者繼續生產
std::condition_variable _cv_queue_not_full;
// 緩沖區不為空,通知消費者繼續消費
std::condition_variable _cv_queue_not_empty;
// 用於消費者之間的競爭
std::mutex _mtx_con;
// 計算當前已經消費多少數據了
unsigned int _cnt_cur_con = 0;
repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
, _cnt_cur_con(0)
{
;
}
repo_(const repo_&instance) = delete;
repo_& operator = (const repo_& instance) = delete;
repo_(const repo_&&instance) = delete;
repo_& operator = (const repo_&& instance) = delete;
};
結構體僅僅增加了下面的幾行代碼
// 用於消費者之間的競爭
std::mutex _mtx_con;
// 計算當前已經消費多少數據了
unsigned int _cnt_cur_con = 0;
- 2.2 消費者線程之間的競爭怎么實現?已經有了用於消費者之間的鎖,用法如下:
template< typename T >
void thread_con(const int thread_index, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return;
while (true)
{
bool is_running = true;
{
std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
// 還沒消費到指定的數目,繼續消費
if (param_repo->_cnt_cur_con < cnt_total_10)
{
thread_consume_item<T>(thread_index, *param_repo);
++param_repo->_cnt_cur_con;
}
else
is_running = false;
}
std::this_thread::sleep_for(std::chrono::microseconds(16));
// 結束線程
if ((!is_running))
break;
}
}
因為消費者之間存在競爭,所以,消費者一開始就需要競爭鎖,哪個先拿到鎖就先消費。
- 2.3 完整源碼
#pragma once
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
std::mutex _mtx;
std::condition_variable _cv_not_full;
std::condition_variable _cv_not_empty;
const int max_queue_size_10 = 10;
enum
{
// 總生產數目
cnt_total_10 = 10,
};
template<typename T>
struct repo_
{
// 用作互斥訪問緩沖區
std::mutex _mtx_queue;
// 緩沖區最大size
unsigned int _count_max_queue_10 = 10;
// 緩沖區
std::queue<T> _queue;
// 緩沖區沒有滿,通知生產者繼續生產
std::condition_variable _cv_queue_not_full;
// 緩沖區不為空,通知消費者繼續消費
std::condition_variable _cv_queue_not_empty;
// 用於消費者之間的競爭
std::mutex _mtx_con;
// 計算當前已經消費多少數據了
unsigned int _cnt_cur_con = 0;
repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
, _cnt_cur_con(0)
{
;
}
repo_(const repo_&instance) = delete;
repo_& operator = (const repo_& instance) = delete;
repo_(const repo_&&instance) = delete;
repo_& operator = (const repo_&& instance) = delete;
};
template <typename T>
using repo = repo_<T>;
//----------------------------------------------------------------------------------------
// 生產者生產數據
template <typename T>
void thread_produce_item(const int &thread_index, repo<T>& param_repo, const T& repo_item)
{
std::unique_lock<std::mutex> lock(param_repo._mtx_queue);
// 1. 生產者只要發現緩沖區沒有滿, 就繼續生產
param_repo._cv_queue_not_full.wait(lock, [&] { return param_repo._queue.size() < param_repo._count_max_queue_10; });
// 2. 將生產好的商品放入緩沖區
param_repo._queue.push(repo_item);
// log to console
std::cout << "生產者" << thread_index << "生產數據:" << repo_item << "\n";
// 3. 通知消費者可以消費了
//param_repo._cv_queue_not_empty.notify_one();
param_repo._cv_queue_not_empty.notify_one();
}
//----------------------------------------------------------------------------------------
// 消費者消費數據
template <typename T>
T thread_consume_item(const int thread_index, repo<T>& param_repo)
{
std::unique_lock<std::mutex> lock(param_repo._mtx_queue);
// 1. 消費者需要等待【緩沖區不為空】的信號
param_repo._cv_queue_not_empty.wait(lock, [&] {return !param_repo._queue.empty(); });
// 2. 拿出數據
T item;
item = param_repo._queue.front();
param_repo._queue.pop();
std::cout << "消費者" << thread_index << "從緩沖區中拿出一組數據:" << item << std::endl;
// 3. 通知生產者,繼續生產
param_repo._cv_queue_not_full.notify_one();
return item;
}
//----------------------------------------------------------------------------------------
/**
* @ brief: 生產者線程
* @ thread_index - 線程標識,區分是哪一個線程
* @ count_max_produce - 最大生產次數
* @ param_repo - 緩沖區
* @ return - void
*/
template< typename T >
void thread_pro(const int thread_index, const int count_max_produce, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return;
for (int item = 0; item < count_max_produce; ++item)
{
thread_produce_item<T>(thread_index, *param_repo, item);
std::this_thread::sleep_for(std::chrono::microseconds(16));
}
}
/**
* @ brief: 消費者線程
* @ thread_index - 線程標識,區分線程
* @ param_repo - 緩沖區
* @ return - void
*/
template< typename T >
void thread_con(const int thread_index, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return;
while (true)
{
bool is_running = true;
{
std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
// 還沒消費到指定的數目,繼續消費
if (param_repo->_cnt_cur_con < cnt_total_10)
{
thread_consume_item<T>(thread_index, *param_repo);
++param_repo->_cnt_cur_con;
}
else
is_running = false;
}
std::this_thread::sleep_for(std::chrono::microseconds(16));
// 結束線程
if ((!is_running))
break;
}
}
// 入口函數
//----------------------------------------------------------------------------------------
int main(int argc, char *argv[], char *env[])
{
// 緩沖區
repo<int> repository;
// 線程池
std::vector<std::thread> vec_thread;
// 生產者
vec_thread.push_back(std::thread(thread_pro<int>, 1, cnt_total_10, &repository));
// 消費者
vec_thread.push_back(std::thread(thread_con<int>, 1, &repository));
vec_thread.push_back(std::thread(thread_con<int>, 2, &repository));
for (auto &item : vec_thread)
{
item.join();
}
return 0;
}
入口函數創建了1個消費者和兩個消費者。 消費者代號分別為 1 和 2。
- 2.4 可能結果