雙緩沖隊列,生產者消費者模式


#include "stdafx.h"
#include <windows.h>
#include <process.h>

template <typename T>
class CircularQueue
{
public:
explicit CircularQueue(int capacity) : m_capacity(capacity), m_head(0), m_tail(0)
{
m_array = new T[m_capacity + 1];
}

~CircularQueue()
{
delete[] m_array;
m_capacity = m_head = m_tail = 0;
}

bool IsFull()
{
int offset = (m_tail + 1) % (m_capacity + 1);

return (offset == m_head);
}

bool IsEmpty()
{
return (m_tail == m_head);
}

void Push(const T& item)
{
if ( !IsFull() )
{
m_array[m_tail] = item;
m_tail = (m_tail + 1) % (m_capacity + 1);
}
}

T Pop()
{
if ( IsEmpty() )
{
return T();
}

int index = m_head;
m_head = (m_head + 1) % (m_capacity + 1);

return m_array[index];
}

private:
T* m_array; // 緩沖區隊列
int m_capacity; // 隊列最大存儲容量
int m_head; // 隊列頭指針
int m_tail; // 隊列尾指針
};

// 同步隊列
template <typename T>
struct SynQueue
{
SynQueue(int size) : buffer(size)
{
synEventHandle = CreateEvent(NULL, FALSE, FALSE, 0);
}

~SynQueue()
{
CloseHandle(synEventHandle);
}

operator HANDLE()
{
return synEventHandle;
}

CircularQueue<T> buffer; // 緩沖區
HANDLE synEventHandle; // 同步事件
};

class PacketQueue
{
public:
explicit PacketQueue(int size) : m_readQueue(size),
m_sendQueue(size), m_bFreezeQueue(false)
{
}

// 刷新讀取緩沖區,使得發送線程有機會接管讀取緩沖區,在讀取完畢后調用
void Flush()
{
SetEvent(m_readQueue);
SetEvent(m_sendQueue);
}

// 凍結緩沖區
void FreezeQueue()
{
m_bFreezeQueue = true;
}

// 從發送緩沖區取出一個數據包
int Popup()
{
static SynQueue<int>* pSendQueue = &m_sendQueue;
static SynQueue<int>* pReadQueue = &m_readQueue;

// 判斷發送緩沖區數據包是否為空
if ( pSendQueue->buffer.IsEmpty() )
{
// 釋放當前發送緩沖區
SetEvent(pSendQueue->synEventHandle);

// 得到當前讀取緩沖區
pReadQueue = ExchangeQueue(pSendQueue);

// 接管當前讀取緩沖區的擁有權
WaitForSingleObject(pReadQueue->synEventHandle, INFINITE);

// 接管讀取緩沖區
pSendQueue = pReadQueue;
}

// 從緩沖區取出一個數據包
return pSendQueue->buffer.Pop();
}

bool Push(int item)
{
static SynQueue<int>* pReadQueue = &m_readQueue;
static SynQueue<int>* pSendQueue = &m_sendQueue;

if ( m_bFreezeQueue )
{
return false;
}

// 判斷讀取緩沖區是否數據已填滿
if ( pReadQueue->buffer.IsFull() )
{
// 釋放當前緩沖區擁有權
SetEvent(pReadQueue->synEventHandle);

pSendQueue = ExchangeQueue(pReadQueue);

// 等待接管另一個緩沖區的擁有權
WaitForSingleObject(pSendQueue->synEventHandle, INFINITE);

// 接管另一個緩沖區
pReadQueue = pSendQueue;
}

// 插入數據包
pReadQueue->buffer.Push(item);

return true;
}

private:
// 交換緩沖區
SynQueue<int>* ExchangeQueue(SynQueue<int>* queue)
{
if ( (queue != &m_readQueue) && (queue != &m_sendQueue) )
{
return 0;
}

return (queue == &m_readQueue ? &m_sendQueue : &m_readQueue);
}

private:
// 雙緩沖隊列
SynQueue<int> m_readQueue; // 數據讀取緩沖區
SynQueue<int> m_sendQueue; // 數據發送緩沖區

bool m_bFreezeQueue; // 發送線程出現致命錯誤,凍結緩沖區
};

// 讀取線程入口函數
unsigned WINAPI ReadThreadEntry( PVOID param )
{
PacketQueue* queue = (PacketQueue*)param;

// 插入100個數據包
for ( int i = 0; i < 10000; ++i )
{
Sleep(100); // 讀取間隔時間

if ( !queue->Push(i) )
{
goto exit;
}

printf("intput data: %d\n", i);
}

exit:

// 插入結束包
queue->Push(-1);
printf("input end data\n");

// 刷新讀緩沖區
queue->Flush();
printf("flush read buffer\n");

return 0;
}

// 發送線程入口函數
unsigned WINAPI SendThreadEntry( PVOID param )
{
PacketQueue* queue = (PacketQueue*)param;

// 循環讀取數據包
for (;;)
{
//queue->FreezeQueue();
//return 0;

int item = queue->Popup();
if ( -1 == item )
{
printf("send thread fetch end data\n");
break; // 遇到結束包
}

//Sleep(150);
printf("send thread fetch data: %d\n", item);
}

return 0;
}

int main(int argc, char* argv[])
{
PacketQueue queue(50);

// 啟動讀取線程
HANDLE hReadThread = (HANDLE)_beginthreadex(NULL, 0, ReadThreadEntry, (void*)&queue, 0, NULL);

// 啟動發送線程
HANDLE hSendThread = (HANDLE)_beginthreadex(NULL, 0, SendThreadEntry, (void*)&queue, 0, NULL);

DWORD T1 = GetTickCount();

WaitForSingleObject(hReadThread, INFINITE);
WaitForSingleObject(hSendThread, INFINITE);

printf("total time: %d", GetTickCount() - T1);

return 0;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM