在一個情形中遇到下面一個情況
簡述下該圖片,對sdk進行二次開發,通過第三方sdk接口獲取碼流信息。具體實現方式是通過回調函數CallBack_SDK來不停的回調第三方服務的視頻流。起初實現邏輯如下:
void CallBack_SDK(long flag, char* stream, int data,void* UsrData) { ... //必要的信息處理 CallBack_mainProgram(); //調用主程序的回調函數,將碼流回調給主程序 }
即回調函數中再調用主函數的回調函數。這樣就遇到一個問題,CallBack_SDK 函數需要很快返回,而CallBack_mainProgram則返回慢,造成了一個生產者消費者問題,消費者的速度跟不上生產者的速度,造成CallBack_SDK不能及時返回造成3rd sdk認為socket阻塞從而造成斷鏈。由於歷史原因,CallBack_mainProgram()沒有可能改動,只能CallBack_SDK動手腳。一共采取了以下這些方法:
1 有鎖隊列
構造一個有鎖的隊列,互斥鎖和deque構造一個隊列,互斥鎖類的代碼來自網絡(具體地址忘了)
#pragma once #ifndef _Lock_H #define _Lock_H #include <windows.h> //鎖接口類 class IMyLock { public: virtual ~IMyLock() {} virtual void Lock() const = 0; virtual void Unlock() const = 0; }; //互斥對象鎖類 class Mutex : public IMyLock { public: Mutex(); ~Mutex(); virtual void Lock() const; virtual void Unlock() const; private: HANDLE m_mutex; }; //鎖 class CLock { public: CLock(const IMyLock&); ~CLock(); private: const IMyLock& m_lock; }; #endif
|
#include "Lock.h" #include <cstdio> //創建一個匿名互斥對象 Mutex::Mutex() { m_mutex = ::CreateMutex(NULL, FALSE, NULL); } //銷毀互斥對象,釋放資源 Mutex::~Mutex() { ::CloseHandle(m_mutex); } //確保擁有互斥對象的線程對被保護資源的獨自訪問 void Mutex::Lock() const { DWORD d = WaitForSingleObject(m_mutex, INFINITE); } //釋放當前線程擁有的互斥對象,以使其它線程可以擁有互斥對象,對被保護資源進行訪問 void Mutex::Unlock() const { ::ReleaseMutex(m_mutex); } //利用C++特性,進行自動加鎖 CLock::CLock(const IMyLock& m) : m_lock(m) { m_lock.Lock(); // printf("Lock \n"); } //利用C++特性,進行自動解鎖 CLock::~CLock() { m_lock.Unlock(); //printf("UnLock \n"); }
|
上邊左右分別為互斥鎖類的頭文件和實現文件。
#pragma once #ifndef THREADSAFEDEQUE_H #define THREADSAFEDEQUE_H #include "Lock.h" #include <DEQUE> #include <vector> typedef struct CframeBuf { void* m_achframe; int m_nsize; bool m_bUsed; CframeBuf() { m_achframe = NULL; m_nsize = 0; m_bUsed = false; } }CFrameBuf; #define MAX_FRAME_BUF_SIZE 150 /* 定義最大緩存隊列長度 */ #define MEM_POOL_SLOT_SIZE 512 * 1024 /* 內存池最小單位 */ typedef struct mempool { std::vector<char*> m_memFragment; size_t m_nsize; size_t m_nused; mempool() { m_nused = 0; m_nsize = 0; } }MemPool; class SynchMemPool { public: SynchMemPool() { AllocatBuf(); } ~SynchMemPool() { EmptyBuf(); } bool WriteDate2Mempool(CframeBuf & In, CframeBuf & Out); /* 寫入一個結構體存儲 */ bool ReadDataFromMempool(CframeBuf & Out); /* 讀取datalen長數據 */ bool MinusOneMempool(); /* 減少一個結構體存儲 */ private: MemPool m_mempool; /*下載內存池 */ Mutex m_Lock; /* 互斥鎖 */ bool AllocatBuf(); /* 分配內存 */ bool EmptyBuf(); /* 釋放 */ }; class SynchDeque { private: std::deque<CFrameBuf> q; Mutex m_lock; SynchMemPool memorypool; public: SynchDeque() { } ~SynchDeque() { } void Enqueue( CFrameBuf & item ); void Dequeue( CFrameBuf & item ); void ClearDeque() { q.clear(); } int GetSize() { return(q.size() ); } }; #endif
|
#include "threadSafeDeque.h" bool SynchMemPool::AllocatBuf(){ CLock clock(m_Lock); for (int i = 0; i< MAX_FRAME_BUF_SIZE; ++i){ char* mem = new char[MEM_POOL_SLOT_SIZE]; if (mem != NULL) { m_mempool.m_memFragment.push_back(mem); } m_mempool.m_nsize++; } printf("分配內存池成功,大小為 %d\n",m_mempool.m_nsize); return TRUE; } bool SynchMemPool::EmptyBuf(){ CLock clock(m_Lock); for (size_t i = 0; i < m_mempool.m_memFragment.size();++i){ if (m_mempool.m_memFragment[i]) { delete [] m_mempool.m_memFragment[i]; } } m_mempool.m_nsize = 0; m_mempool.m_nused = 0; printf("釋放內存池成功\n"); return TRUE; } bool SynchMemPool::WriteDate2Mempool(CframeBuf & In, CframeBuf & Out) { if (In.m_achframe == NULL) { return false; } if (m_mempool.m_nsize == m_mempool.m_nused) { return false; } CLock clock(m_Lock); char* avaibleaddr = m_mempool.m_memFragment[m_mempool.m_nused]; if (avaibleaddr) { memcpy(avaibleaddr,In.m_achframe,In.m_nsize); Out.m_achframe = avaibleaddr; Out.m_nsize = In.m_nsize; ++m_mempool.m_nused; return true; } else return false; } bool SynchMemPool::ReadDataFromMempool(CframeBuf& Out){ if (Out.m_achframe != NULL && m_mempool.m_memFragment[m_mempool.m_nused -1] != NULL && Out.m_nsize < MEM_POOL_SLOT_SIZE) { memcpy(Out.m_achframe,m_mempool.m_memFragment[m_mempool.m_nused -1],Out.m_nsize); } return true; } bool SynchMemPool::MinusOneMempool(){ if (m_mempool.m_nused == 0) { return false; } CLock clock(m_Lock); if (m_mempool.m_memFragment[m_mempool.m_nused - 1] != NULL) { memset(m_mempool.m_memFragment[m_mempool.m_nused - 1],0,MEM_POOL_SLOT_SIZE); --m_mempool.m_nused; } return true; } void SynchDeque::Enqueue(CFrameBuf& item) { CLock clock(m_lock); { CFrameBuf tmpfram; memorypool.WriteDate2Mempool(item, tmpfram); q.push_back(tmpfram); } } void SynchDeque::Dequeue(CFrameBuf& item) { CLock clock(m_lock); // <= create critical block, based on q { if (q.size() != 0 && item.m_achframe && q.front().m_achframe) { memcpy(item.m_achframe,q.front().m_achframe,q.front().m_nsize); item.m_bUsed = q.front().m_bUsed; item.m_nsize = q.front().m_nsize; q.pop_front(); // memorypool.ReadDataFromMempool(item.m_achframe,item.m_nsize); //讀取一個數據 memorypool.MinusOneMempool(); //歸還一個slot給內存池 } } }
|
上邊為采用互斥鎖的隊列簡單實現。
但是問題又出現了,采用上述方式,一個線程寫,一個線程讀,但寫的速度更快了(3rd sdk似乎是當回調函數返回就立即寫入,並沒有次數或時長的調用限制),這就意味着隊列的鎖不停的由寫線程持有,導致讀線程餓死,或者隊列一直處於飽和狀態。
遂考慮無鎖隊列,網絡上有很多無鎖隊列的實現,我采用了環形隊列,這種原理在linux內核的網絡包處理中也有使用。下面給一個簡單的實現,帶內存形式:
/* 本文件實現了對於單個消費者和單個接收者之間無鎖環形隊列,注意,不適用於多線程(多消費者和多生產者) */ #pragma once #ifndef _RINGQUEUE_H #define _RINGQUEUE_H #include <cstring> #include <VECTOR> #define MAX_RING_SIXE 150 #define PACKAGE_SIZE 512*1024 //數據包和大小 typedef struct package{ char* rawdata; //數據包buf地址 int len; //數據包大小 int pos; //用於下載時獲取下載位置 package(){ rawdata = NULL; len = 0; pos = 0; } }TPackage,*PPackage; //實現一個環形隊列對象,注意,在構造和析構中創建和銷毀了內存池 class RingQue { public: RingQue(); ~RingQue(); RingQue(int memSize){ cap = memSize; } bool EnQue(TPackage& package); bool DeQue(TPackage& package); bool Empty(); bool Full(); size_t Size(); private: size_t head; size_t tail; size_t cap; std::vector<TPackage> memPool; // TPackage mem[MAX_RING_SIXE]; }; typedef struct TMemNode{ TMemNode *prev; //前一個內存節點 TMemNode *next; //后一個內存節點 size_t idataSize; //節點大小 bool bUsed; //節點是否正被使用 bool bMemBegin; //是否內存池分配的首地址 void *data; // 當前節點分配的地址 void **pUser; //使用者對象的地址 }TmemLinkNode; #endif // _RINGQUEUE_H
|
#include <new> #include "ringque.h" #include <exception> using namespace std; RingQue::RingQue(){ head = 0; tail = 0; cap = MAX_RING_SIXE; int i = 0; while (i < MAX_RING_SIXE) { TPackage tpack; char* mem = NULL; mem = new char[PACKAGE_SIZE]; if (NULL == mem) { continue; } tpack.rawdata = mem; memPool.push_back(tpack); ++i; } cap = memPool.size(); } RingQue::~RingQue(){ for (int i = 0;i <memPool.size();++i) { if (memPool[i].rawdata) { delete []memPool[i].rawdata; memPool[i].rawdata = NULL; } } } bool RingQue::Full(){ return (tail + 1) % cap == head; } bool RingQue::Empty(){ return (head + 1) % cap == tail; } bool RingQue::EnQue(TPackage& package){ if (Full()) { return false; } memcpy(memPool[tail].rawdata,package.rawdata,package.len); memPool[tail].len = package.len; memPool[tail].pos = package.pos; tail = (tail + 1) % cap; return true; } bool RingQue::DeQue(TPackage& package){ if (Empty()) { return false; } memcpy(package.rawdata,memPool[head].rawdata,memPool[head].len); package.len = memPool[head].len; package.pos = memPool[head].pos; head = (head + 1) % cap; return true; } size_t RingQue::Size(){ if (head > tail) { return cap + tail - head; } else return tail - head; }
|
這種方式實現了無鎖隊列,可以使用。
在讀線程中,使用了while(true) 不停的讀該無鎖隊列,但是該方式讀取會占用cpu過高,因為不停的查詢隊列中是否有數據到來,這樣很類似select的機制,當然有更先進的機制,或者也可以在隊列中實現CallBack_mainProgram的調用,這類似epoll的機制,但這個方法我沒有實現,而是采用了更為粗糙的方法,睡眠,這種方式帶來的后果是喚醒不及時或者回調不及時,會出現視頻碼流的拖影。只能將睡眠間隔調到一個折中的時間。這一塊還得深入分析。
另外一個值得使用的方法是,Sleep(0)。Sleep()函數的精確度不高,但Sleep(0)的作用是讓那些優先級高的線程獲得執行的機會,在單線程函數結尾使用該函數,能夠降低性能開銷。
https://stackoverflow.com/questions/1739259/how-to-use-queryperformancecounter