碼流回調過快導致下方處理不及時socket阻塞問題


在一個情形中遇到下面一個情況

簡述下該圖片,對sdk進行二次開發,通過第三方sdk接口獲取碼流信息。具體實現方式是通過回調函數CallBack_SDK來不停的回調第三方服務的視頻流。起初實現邏輯如下:

void CallBack_SDK(long flag, char* stream, int data,void* UsrData)
{
      ...  //必要的信息處理
    CallBack_mainProgram();  //調用主程序的回調函數,將碼流回調給主程序
    
}

即回調函數中再調用主函數的回調函數。這樣就遇到一個問題,CallBack_SDK 函數需要很快返回,而CallBack_mainProgram則返回慢,造成了一個生產者消費者問題,消費者的速度跟不上生產者的速度,造成CallBack_SDK不能及時返回造成3rd sdk認為socket阻塞從而造成斷鏈。由於歷史原因,CallBack_mainProgram()沒有可能改動,只能CallBack_SDK動手腳。一共采取了以下這些方法:

1 有鎖隊列

  構造一個有鎖的隊列,互斥鎖和deque構造一個隊列,互斥鎖類的代碼來自網絡(具體地址忘了)

#pragma once
#ifndef _Lock_H
#define _Lock_H

#include <windows.h>

//鎖接口類
class IMyLock
{
public:
    virtual ~IMyLock() {}

    virtual void Lock() const = 0;
    virtual void Unlock() const = 0;
};

//互斥對象鎖類
class Mutex : public IMyLock
{
public:
    Mutex();
    ~Mutex();

    virtual void Lock() const;
    virtual void Unlock() const;

private:
    HANDLE m_mutex;
};

//
class CLock
{
public:
    CLock(const IMyLock&);
    ~CLock();

private:
    const IMyLock& m_lock;
};


#endif

 

#include "Lock.h"
#include <cstdio>

//創建一個匿名互斥對象
Mutex::Mutex()
{
    m_mutex = ::CreateMutex(NULL, FALSE, NULL);
}

//銷毀互斥對象,釋放資源
Mutex::~Mutex()
{
    ::CloseHandle(m_mutex);
}

//確保擁有互斥對象的線程對被保護資源的獨自訪問
void Mutex::Lock() const
{
    DWORD d = WaitForSingleObject(m_mutex, INFINITE);
}

//釋放當前線程擁有的互斥對象,以使其它線程可以擁有互斥對象,對被保護資源進行訪問
void Mutex::Unlock() const
{
    ::ReleaseMutex(m_mutex);
}

//利用C++特性,進行自動加鎖
CLock::CLock(const IMyLock& m) : m_lock(m)
{
    m_lock.Lock();
    //    printf("Lock \n");
}

//利用C++特性,進行自動解鎖
CLock::~CLock()
{
    m_lock.Unlock();
    //printf("UnLock \n");
}

 

上邊左右分別為互斥鎖類的頭文件和實現文件。

#pragma once
#ifndef THREADSAFEDEQUE_H
#define THREADSAFEDEQUE_H
#include "Lock.h"
#include <DEQUE>
#include <vector>

typedef struct  CframeBuf {
    void* m_achframe;
    int    m_nsize;
    bool    m_bUsed;
    CframeBuf()
    {
        m_achframe    = NULL;
        m_nsize        = 0;
        m_bUsed        = false;
    }
}CFrameBuf;


#define MAX_FRAME_BUF_SIZE    150             /* 定義最大緩存隊列長度 */
#define MEM_POOL_SLOT_SIZE    512 * 1024      /* 內存池最小單位 */

typedef struct mempool {
    std::vector<char*>    m_memFragment;
    size_t        m_nsize;
    size_t        m_nused;
    mempool()
    {
        m_nused = 0;
        m_nsize = 0;
    }
}MemPool;

class SynchMemPool {
public:

    SynchMemPool()
    {
        AllocatBuf();
    }

    ~SynchMemPool()
    {
        EmptyBuf();
    }

    bool WriteDate2Mempool(CframeBuf & In, CframeBuf & Out);      /* 寫入一個結構體存儲 */

    bool ReadDataFromMempool(CframeBuf & Out);                    /* 讀取datalen長數據 */

    bool MinusOneMempool();                                         /* 減少一個結構體存儲 */

private:
    MemPool m_mempool;                                              /*下載內存池 */
    Mutex    m_Lock;                                                 /* 互斥鎖 */

    bool AllocatBuf();                                              /* 分配內存 */


    bool EmptyBuf();                                                /* 釋放 */
};

class SynchDeque
{
private:
    std::deque<CFrameBuf>    q;
    Mutex            m_lock;
    SynchMemPool        memorypool;
public:

    SynchDeque()
    {
    }
    ~SynchDeque()
    {
    }
    void Enqueue( CFrameBuf & item );

    void Dequeue( CFrameBuf & item );

    void ClearDeque()
    {
        q.clear();
    }

    int GetSize()
    {
        return(q.size() );
    }
};
#endif

 

#include "threadSafeDeque.h"

bool SynchMemPool::AllocatBuf(){
    CLock clock(m_Lock);
    for (int i = 0; i< MAX_FRAME_BUF_SIZE; ++i){
        char* mem = new char[MEM_POOL_SLOT_SIZE];
        if (mem != NULL)
        {
            m_mempool.m_memFragment.push_back(mem);
        }
        m_mempool.m_nsize++;
    }
    printf("分配內存池成功,大小為 %d\n",m_mempool.m_nsize);
    return TRUE;
}

bool SynchMemPool::EmptyBuf(){
    CLock clock(m_Lock);
    for (size_t i = 0; i < m_mempool.m_memFragment.size();++i){
        if (m_mempool.m_memFragment[i])
        {
            delete [] m_mempool.m_memFragment[i];
        }        
    }
    m_mempool.m_nsize = 0;
    m_mempool.m_nused = 0;
    printf("釋放內存池成功\n");
    return TRUE;
}


bool SynchMemPool::WriteDate2Mempool(CframeBuf & In, CframeBuf & Out) {
    if (In.m_achframe == NULL)
    {
        return false;
    }
    if (m_mempool.m_nsize == m_mempool.m_nused)
    {
        return false;
    }
    CLock clock(m_Lock);
    char* avaibleaddr = m_mempool.m_memFragment[m_mempool.m_nused];
    if (avaibleaddr)
    {
        memcpy(avaibleaddr,In.m_achframe,In.m_nsize);
        Out.m_achframe = avaibleaddr;
        Out.m_nsize = In.m_nsize;
        ++m_mempool.m_nused;
        return true;
    }
    else
        return false;
        
}

bool SynchMemPool::ReadDataFromMempool(CframeBuf& Out){
    if (Out.m_achframe != NULL && m_mempool.m_memFragment[m_mempool.m_nused -1] != NULL && Out.m_nsize < MEM_POOL_SLOT_SIZE)
    {
        memcpy(Out.m_achframe,m_mempool.m_memFragment[m_mempool.m_nused -1],Out.m_nsize);
    }
    return true;
}

bool SynchMemPool::MinusOneMempool(){
    if (m_mempool.m_nused == 0)
    {
        return false;
    }
    CLock clock(m_Lock);
    if (m_mempool.m_memFragment[m_mempool.m_nused - 1] != NULL)
    {
        memset(m_mempool.m_memFragment[m_mempool.m_nused - 1],0,MEM_POOL_SLOT_SIZE);
        --m_mempool.m_nused;
    }
    return true;
}

void SynchDeque::Enqueue(CFrameBuf& item) {
    CLock clock(m_lock);
    {
        CFrameBuf tmpfram;
        memorypool.WriteDate2Mempool(item, tmpfram);
        q.push_back(tmpfram);
    }
}
void SynchDeque::Dequeue(CFrameBuf& item)
{

    CLock clock(m_lock); // <= create critical block, based on q
    {

        if (q.size() != 0 && item.m_achframe && q.front().m_achframe)
        {

            memcpy(item.m_achframe,q.front().m_achframe,q.front().m_nsize);

            item.m_bUsed = q.front().m_bUsed;
            item.m_nsize = q.front().m_nsize;
            q.pop_front();

        //    memorypool.ReadDataFromMempool(item.m_achframe,item.m_nsize);        //讀取一個數據
            memorypool.MinusOneMempool();        //歸還一個slot給內存池

        }            
    }
}

 

 

上邊為采用互斥鎖的隊列簡單實現。

但是問題又出現了,采用上述方式,一個線程寫,一個線程讀,但寫的速度更快了(3rd sdk似乎是當回調函數返回就立即寫入,並沒有次數或時長的調用限制),這就意味着隊列的鎖不停的由寫線程持有,導致讀線程餓死,或者隊列一直處於飽和狀態。

遂考慮無鎖隊列,網絡上有很多無鎖隊列的實現,我采用了環形隊列,這種原理在linux內核的網絡包處理中也有使用。下面給一個簡單的實現,帶內存形式:

/*
本文件實現了對於單個消費者和單個接收者之間無鎖環形隊列,注意,不適用於多線程(多消費者和多生產者)

*/

#pragma once
#ifndef _RINGQUEUE_H
#define _RINGQUEUE_H
#include <cstring>
#include <VECTOR>


#define MAX_RING_SIXE 150
#define PACKAGE_SIZE 512*1024

//數據包和大小
typedef struct package{
    char* rawdata;    //數據包buf地址
    int len;    //數據包大小
    int pos;    //用於下載時獲取下載位置
    package(){
        rawdata = NULL;
        len = 0;
        pos = 0;
    }
}TPackage,*PPackage;


//實現一個環形隊列對象,注意,在構造和析構中創建和銷毀了內存池
class RingQue {
public:
    RingQue();
    ~RingQue();

    RingQue(int memSize){
        cap = memSize;
    }
    bool EnQue(TPackage& package);
    bool DeQue(TPackage& package);
    bool Empty();
    bool Full();
    size_t Size();
private:
    size_t head;
    size_t tail;
    size_t cap;
    std::vector<TPackage> memPool;
//    TPackage mem[MAX_RING_SIXE];
};


typedef struct TMemNode{
    TMemNode *prev;        //前一個內存節點
    TMemNode *next;        //后一個內存節點
    size_t idataSize;    //節點大小
    bool bUsed;            //節點是否正被使用
    bool bMemBegin;        //是否內存池分配的首地址
    void *data;            //    當前節點分配的地址
    void **pUser;        //使用者對象的地址
}TmemLinkNode;

#endif // _RINGQUEUE_H

 

#include <new>
#include "ringque.h"
#include <exception>


using namespace std;

RingQue::RingQue(){
    head = 0;
    tail = 0;
    cap = MAX_RING_SIXE;
    int i = 0;
    while (i < MAX_RING_SIXE)
    {
        TPackage tpack;

        char* mem = NULL;
        mem    = new char[PACKAGE_SIZE];
        if (NULL == mem)
        {
            continue;
        }
        tpack.rawdata = mem;
        memPool.push_back(tpack);

        ++i;
    }
    cap = memPool.size();
}

RingQue::~RingQue(){
    for (int i = 0;i <memPool.size();++i)
    {
        if (memPool[i].rawdata)
        {
            delete []memPool[i].rawdata;
            memPool[i].rawdata = NULL;
        }
    }
}

bool RingQue::Full(){
    return (tail + 1) % cap == head;
}

bool RingQue::Empty(){
    return (head + 1) % cap == tail;
}

bool RingQue::EnQue(TPackage& package){
    if (Full())
    {
        return false;
    }
    memcpy(memPool[tail].rawdata,package.rawdata,package.len);
    memPool[tail].len = package.len;
    memPool[tail].pos = package.pos;
    tail = (tail + 1) % cap;
    return true;
}

bool RingQue::DeQue(TPackage& package){
    if (Empty())
    {
        return false;
    }
    memcpy(package.rawdata,memPool[head].rawdata,memPool[head].len);
    package.len = memPool[head].len;
    package.pos = memPool[head].pos;
    head = (head + 1) % cap;
    return true;
}

size_t RingQue::Size(){
    if (head > tail)
    {
        return cap + tail - head;
    }
    else
        return tail - head;
}

 

 

 

這種方式實現了無鎖隊列,可以使用。

在讀線程中,使用了while(true) 不停的讀該無鎖隊列,但是該方式讀取會占用cpu過高,因為不停的查詢隊列中是否有數據到來,這樣很類似select的機制,當然有更先進的機制,或者也可以在隊列中實現CallBack_mainProgram的調用,這類似epoll的機制,但這個方法我沒有實現,而是采用了更為粗糙的方法,睡眠,這種方式帶來的后果是喚醒不及時或者回調不及時,會出現視頻碼流的拖影。只能將睡眠間隔調到一個折中的時間。這一塊還得深入分析。

另外一個值得使用的方法是,Sleep(0)。Sleep()函數的精確度不高,但Sleep(0)的作用是讓那些優先級高的線程獲得執行的機會,在單線程函數結尾使用該函數,能夠降低性能開銷。

https://stackoverflow.com/questions/1739259/how-to-use-queryperformancecounter


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM