簡單的線程消息隊列實現


1. 線程使用場景
(1)流水線方式。根據業務特點,將一個流程的處理分割成多個線程,形成流水線的處理方式。產生的結果:延長單一流程的處理時間,提高系統整體的吞吐能力。
(2)線程池方式。針對處理時間比較長且沒有內蘊狀態的線程,使用線程池方式分流消息,加快對線程消息的處理,避免其成為系統瓶頸。
線程使用的關鍵是線程消息隊列、線程鎖、智能指針的使用。其中以線程消息隊列最為重要。

2. 線程消息隊列描述
所謂線程消息隊列,就是一個普通的循環隊列加上“多生產者-單(多)消費者的存/取操作”。流水線方式中的線程是單消費者,線程池方式中的線程是多消費者。
為了后文更好的描述問題,作如下說明:
(1)假定循環隊列queue中, 入隊操作put_queue, 出隊操作get_queue。
(2)生產者消費者:生產者線程生產消息,放在一個空緩沖區中,供消費者線程消費,生產者生產消息(put_queue),如果緩沖區滿,則被阻塞,消費者消費消息(get_queue),如果緩沖區空,則被阻塞。線程消息隊列就是生產者消費者問題中的緩沖區,而它的生產者是不限定的,任何線程都可以作為生產者向其中進行put_queue操作,消費線程則可能是一個,也可能是多個。因此對循環隊列的任何操作都要加鎖,以保證線程安全。

3. 線程相關的操作
(1)pthread_t類型的創建、屬性創建設置等。
這類具體可以: man pthread_creat; man pthread_attr_init; man pthread_detach; man pthread_join等查看
(2)pthread_mutex_t類型的操作。
這類具體可以: man pthread_mutex_init可以看到所有相關的操作。
(3)pthread_cond_t類型的操作。man pthread_cond_init。pthread_cond_t的wait和signal操作一定要和pthread_mutex_t的lock、unlock配合使用。類似於此:

4. linux的線程庫
2.6之后的內核的默認使用的是redhat公司的NPTL(原生posix線程庫),以前內核使用的是LinuxThreads庫,兩者的簡單介紹可以看http://www.ibm.com/developerworks/cn/linux/l-threading.html。不過對於應用者,分析兩者的區別和優劣也沒什么大意義。這里特別提下NPTL的futex機制。借助該機制,pthread_mutex的性能大大提高,只要不進入競爭態,進程就不會陷入內核態。這點可以自己寫示例程序,通過strace -c 跟蹤進程的系統調用,另外還可以證實總是進入內核態的操作有pthread_cond_signal和sem_post。

5. 通過上面的分析,我們可以有如下結論:
(1)減少pthread_cond_signal和sem_post的調用,只在有必要的時候調用;
(2)盡量避免pthread_mutex進入競爭態。增大消息隊列的大小,可以有效減少競態條件的出現。

6. 實用的線程消息隊列實現(msg_queue.h)

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

pthread_mutex_t mux;
pthread_cond_t cond_get, cond_put;

struct msg_queue {
    void** buffer; // 緩沖數據, .buffer = msg
    int size; // 隊列大小,使用的時候給出稍大的size,可以減少進入內核態的操作
    int lget; // 取隊列數據的偏移量
    int lput; // 放隊列數據的偏移量
    int nData; // 隊列中數據的個數,用來判斷隊列滿/空
    int nFullThread; // 由於隊列滿而阻塞在put_queue的線程個數
    int nEmptyThread; // 由於隊列空而阻塞在get_queue的線程個數
};

void* get_queue(struct msg_queue *q){
    void* data = NULL;
    pthread_mutex_lock(&mux);
    while(q->lget == q->lput && 0 == q->nData){
        // 此處循環判斷的原因是:假設2個消費者線程在get_queue阻塞,然后兩者都被激活,
        // 而其中一個線程運行比較塊,快速消耗了2個數據,另一個線程醒來的時候已
        // 經沒有新數據可以消耗了。這種情況是有可能的:比如,其它生產者線程快速
        // 調用put_queue兩次,如果有2個線程在get_queue處阻塞,就會被同時激活,
        // 而完全有可能,其中一個被激活的線程獲取到了cpu,快速處理了2個消息。

        // 對於循環隊列,如果lget與lput相等,那么只有兩種情況,
        // 1:nData不為0,隊列滿
        // 2:nData為0,隊列空
        q->nEmptyThread++;
        pthread_cond_wait(&cond_get, &mux);
        q->nEmptyThread--;
    }
#ifdef DEBUG
    printf("get data! lget:%d", q->lget);
#endif
    data = (q->buffer)[q->lget++];
    if(q->lget == q->size){
        // queue用作循環隊列
        q->lget = 0;
    }
    q->nData--;
#ifdef DEBUG
    printf(" nData:%d\n", q->nData);
#endif
    if(q->nFullThread){
        // 僅在必要時才調用pthread_cond_signal, 盡量少陷入內核態
        pthread_cond_signal(&cond_put);
    }
    pthread_mutex_unlock(&mux);
    return data;
}

void put_queue(struct msg_queue *q, void* data){
    pthread_mutex_lock(&mux);
    while(q->lget == q->lput && q->nData){
        q->nFullThread++;
        pthread_cond_wait(&cond_put, &mux);
        q->nFullThread--;
    }
#ifdef DEBUG
    printf("put data! lput:%d", q->lput);
#endif
    (q->buffer)[q->lput++] = data;
    if(q->lput == q->size){
        q->lput = 0;
    }
    q->nData++;
#ifdef DEBUG
    printf(" nData:%d\n", q->nData);
#endif
    if(q->nEmptyThread){
        pthread_cond_signal(&cond_get);
    }
    pthread_mutex_unlock(&mux);
}

7. demo程序(msg_queue.c)

#include "msg_queue.h"
struct msg_queue queue = {NULL, 10, 0, 0, 0, 0, 0};

void * produce(void * arg)
{
    pthread_detach(pthread_self());
    int i=0;
    while(1){
        put_queue(&queue, (void*)i++);
    }
}

void *consume(void *arg)
{
    int data;
    while(1){
        data = (int)(get_queue(&queue));
    }
}

int main()
{   
    pthread_t pid;
    int i=0;

    pthread_mutex_init(&mux, 0);
    pthread_cond_init(&cond_get, 0);
    pthread_cond_init(&cond_put, 0);

    queue.buffer = malloc(queue.size * sizeof(void*));
    if(queue.buffer == NULL){
        printf("malloc failed!\n");
        exit(-1);
    }

    pthread_create(&pid, 0, produce, 0);
    pthread_create(&pid, 0, produce, 0);
    pthread_create(&pid, 0, produce, 0);
    pthread_create(&pid, 0, consume, 0);
    pthread_create(&pid, 0, consume, 0);
    pthread_create(&pid, 0, consume, 0);

    sleep(60);

    free(queue.buffer);
    pthread_mutex_destroy(&mux);
    pthread_cond_destroy(&cond_get);
    pthread_cond_destroy(&cond_put);
}

Reference: http://www.cppblog.com/CppExplore/archive/2008/01/15/41175.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM