無鎖隊列的實現-循環數組


通過CAS操作免鎖設計:

  • CAS原子 操作(Compare & Set):包含三個操作數,內存值V、舊的預期值 oldval、要修改的新值newval,當且僅當內存V中的值和舊值oldval相同時,將內存V修改為newval。
  • 數組隊列是一個循環數組,隊列少用一個元素,當頭等於尾標示隊空,尾加1等於頭標示隊滿。
  • 數組的元素用EMPTY(無數據,標示可以入隊)和FULL(有數據,標示可以出隊)標記指示,數組一開始全部初始化成 EMPTY標示空隊列。
  • EnQue 操作:如果當前隊尾位置為EMPTY,標示線程可以在當前位置入隊,通過CAS原子操作把該位置設置為FULL,避免其它線程操作這個位置,操作完后修改隊尾位置。各個線程競爭新的隊尾位置。如下圖所示:

  1. 線程T1/T2競爭隊尾位置。
  2. T1競爭成功,首先設置FULL標記,然后對該位置進行操作。
  3. T2輪詢該位置標識為FULL繼續輪詢。
  4. T1操作完成后將隊尾位置后移。
  5. T1/T2又開始競爭新的隊尾。
  • DeQue 操作:如果當前隊頭位置為FULL,標示線程可以在當前位置出隊,通過CAS原子操作把該位置設置為EMPTY,避免其它線程操作這個位置,操作完后修改隊頭位置。各個線程競爭新的隊頭位置。
  • 操作沒有加鎖,每個線程都假設沒有沖突的去完成操作,如果因為沖突失敗就重試。
#include "stdlib.h"
#include "stdio.h"
#include <pthread.h>
#define MAXLEN 2
#define CAS __sync_bool_compare_and_swap

typedef struct
{
    int elem;
    int status;//用於狀態監測
}node;

typedef struct
{
    node elePool[MAXLEN];
    int front;
    int rear;
}queue;

enum
{
    EMPTY =1,
    FULL,
};

queue g_que;

void initQue()
{
    int i = 0;
    g_que.front = 0;
    g_que.rear  = 0;
    
    for(i=0;i<MAXLEN;i++)
    {
        g_que.elePool[i].status = EMPTY;
    }
    return;
}

int enque(int elem)
{
    do
    {
        if((g_que.rear+1)%MAXLEN == g_que.front)
        {
            return -1;
        }
    }while(!CAS(&(g_que.elePool[g_que.rear].status),EMPTY,FULL));
    g_que.elePool[g_que.rear].elem = elem;
    printf("in--%d(%lu)\n",elem,pthread_self());
    CAS(&(g_que.rear),g_que.rear,(g_que.rear+1)%MAXLEN);
    
    return 0;
}

int deque(int* pElem)
{
    do
    {
        if(g_que.rear == g_que.front)
        {
            return -1;
        }
    }while(!CAS(&(g_que.elePool[g_que.front].status),FULL,EMPTY));
    *pElem = g_que.elePool[g_que.front].elem;
    printf("out--%d(%lu)\n",*pElem,pthread_self());
    CAS(&(g_que.front),g_que.front,(g_que.front+1)%MAXLEN);
    return 0;
}
View Code

通過CAS、FAA、FAS操作免鎖設計:

  • FAA操作:原子加1操作,返回更新前的值。
  • FAS操作:原子減1操作,返回更新前的值。
  • 增加writeableCnt指示隊列還可以寫入元素個數,readableCnt指示隊列中存在的元素個數。用來控制可以並發操作的線程個數。
  • EnQue 操作:通過原子加操作給每個要求操作的線程分配為唯一一個位置信息存放在局部變量pos中,各個線程並行的操作對應位置的信息,不再需要輪詢等待。如下圖所示:

  1. T1/T2線程初始操作隊尾的兩個位置。
  2. T1操作完后直接操作下一個隊尾位置。
  • DeQue 操作:如果當前隊頭位置為FULL,標示線程可以在當前位置出隊,通過CAS原子操作把該位置設置為EMPTY,避免其它線程操作這個位置,操作完后修改隊頭位置。各個線程競爭新的隊頭位置。
  • 多個線程可以同時進行入隊,避免了在同一個位置等待輪詢,對效率有明顯提升。
#include "stdlib.h"
#include "stdio.h"
#include <pthread.h>
#define MAXLEN 2000
 
#define NUM_THREADS 8
#define NUM_MSG        500
#define CAS __sync_bool_compare_and_swap
#define FAA __sync_fetch_and_add
#define FAS __sync_fetch_and_sub
#define VCAS __sync_val_compare_and_swap

int g_inputOver = 0;
typedef struct
{
    int elem;
    long threadId;
    int status;//indicate whether the node can be read
}node;

typedef struct
{
    node elePool[MAXLEN];
    int front;
    int rear;
    int writeableCnt;//the number of node that can be written
    int readableCnt; //the number of node that have been written
}queue;

enum
{
    EMPTY =1,
    FULL,
};

queue g_que;

void initQue()
{
    int i = 0;
    g_que.front = 0;
    g_que.rear  = 0;
    g_que.readableCnt  = 0;
    g_que.writeableCnt = MAXLEN;
    
    for(i=0;i<MAXLEN;i++)
    {
        g_que.elePool[i].status = EMPTY;
    }
    return;
}

int enque(int elem)
{    
    int pos = 0;   
    if(FAS(&(g_que.writeableCnt),1) <= 0)
    {
        printf("dis-%d(%u)\n",elem,pthread_self());
        FAA(&(g_que.writeableCnt),1);
        return -1;
    } 
    //CAS(&(g_que.rear),g_que.rear,g_que.rear%MAXLEN);
    CAS(&(g_que.rear),MAXLEN,0);
    pos = FAA(&(g_que.rear),1)%MAXLEN;
    g_que.elePool[pos].elem = elem;
    g_que.elePool[pos].threadId = pthread_self();
    printf("in-%d(%u),inpos=(%d),rear=(%d)\n",elem,pthread_self(),pos,g_que.rear);
    CAS(&(g_que.elePool[pos].status),EMPTY,FULL);
    FAA(&(g_que.readableCnt),1);
    return 0;
}
int deque(int* pElem, int* pThreadId)
{   
    //printf("readableCnt--%d,pos = %d\n",g_que.readableCnt,g_que.front);
    do
    {
        if(g_que.readableCnt == 0)
        {
            return -1;
        }
    }while(!CAS(&(g_que.elePool[g_que.front].status),FULL,EMPTY));
    *pElem = g_que.elePool[g_que.front].elem;
    *pThreadId = g_que.elePool[g_que.front].threadId;
    CAS(&(g_que.front),g_que.front,(g_que.front+1)%MAXLEN);
    FAS(&(g_que.readableCnt),1);
    FAA(&(g_que.writeableCnt),1);
    printf("out-%d(%u)(%u)\n",*pElem,*pThreadId,pthread_self());
    return 0;
}

void* SendMsg(void *arg) 
{
    int msgNo = 0;
    for ( msgNo = 0; msgNo < NUM_MSG; msgNo++ )
    {
        usleep(1000);
        enque(msgNo);
    }
    g_inputOver++;
    return NULL;
}

int main(void)
{
 
    int rc,i;     
    pthread_t thread[NUM_THREADS];
    int elem,threadId;

    initQue();
    for( i = 0; i < NUM_THREADS; i++ )  
    {
        printf("Creating thread %i\n", i);   
        rc = pthread_create(&thread[i], NULL, SendMsg, NULL);  
        if(rc)
        {   
            printf("ERROR; return code is %d\n", rc); 
        return -1;
        }
    }
    while((NUM_THREADS != g_inputOver) || (g_que.readableCnt != 0))
    {
        //printf("readableCnt--%d,g_inputOver--%d\n",g_que.readableCnt,g_inputOver);
        rc = deque(&elem,&threadId);
        if (0 == rc)    
        {
            usleep(100);
        }
    }
    printf("WCnt-%d\n",g_que.writeableCnt);
    printf("RCnt-%d\n",g_que.readableCnt);
    return 0;
}
View Code

 

轉載請注明原始出處http://www.cnblogs.com/chencheng/p/3527692.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM