生產者與消費者模式(理解) 進程間通信之消息隊列編程


                                                                           

通信之消息隊列編程

1:生產者和消費者模式理解

(1)       生產者/消費者模式:需要使用到同步,以及線程,屬於多並發行列,產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。 單單抽象出生產者和消費者,還夠不上是生產者/消費者模式。該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據。

(2)       解耦:假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。

(3)       支持並發:生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。 使用了生產者/消費者模式之后,生產者和消費者可以是兩個獨立的並發主體(常見並發類型有進程和線程兩種)。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。其實當初這個模式,主要就是用來處理並發問題的。

 

(4)       支持忙閑不均:

緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。

 

(5)       總共理解:生產者生產數據,消費者消費數據,生產者和消費者通過緩存聯系起來,是通過消息隊列來連接共享隊列里面的東西,在隊列為空的時候,消費者無法消費signal.notify() //通知生產者,反之對於生產者來是通知消費者來進行消費。

 

2:POSIX消息隊列

(1) POSIX消息隊列是獨立於XSI消息隊列的一套新的消息隊列API,讓進程可以用消息的方式進行數據交換。

//包含的頭文件

#include <fcntl.h>           /* For O_* constants */

#include <sys/stat.h>        /* For mode constants */

#include <mqueue.h>

 

(2)打開或者創建消息隊列。

mqd_t mq_open(const char *name, int oflag);

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

https://www.cnblogs.com/LubinLew/p/POSIX-mq_open.html

//解釋 mq_open這個函數的

 

(3)發送或者接受消息隊列。

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);

 

int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);

 

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);

 

ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);

 

(4)int mq_close(mqd_t mqdes);  關閉一個消息隊列。

我們可以使用mq_close來關閉一個消息隊列,這里的關閉並非刪除了相關文件,關閉之后消息隊列在系統中依然存在,我們依然可以繼續打開它使用。這跟文件的close和unlink的概念是類似的。

 

(5)int mq_unlink(const char *name);  使用mq_unlink真正刪除一個消息隊列。

 

(6)使用mq_getattr和mq_setattr來查看和設置消息隊列。

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);

 

(7)   mq_attr結構體是這樣的結構:

struct mq_attr {

    long mq_flags;       /* 只可以通過此參數將消息隊列設置為是否非阻塞O_NONBLOCK */

    long mq_maxmsg;      /* 消息隊列的消息數上限 */

    long mq_msgsize;     /* 消息最大長度 */

    long mq_curmsgs;     /* 消息隊列的當前消息個數 */

};

(8)//掛載消息隊列

編譯posix mqueue時,要連接運行時庫(runtime library),既-lrt選項,

//把消息隊列掛載到 /dev/mqueue 下面

gcc -lrt -lpthread recv.c //

編譯的時候需要鏈接一些庫,所以我們可以創建Makefile

CFLAGS+=-lrt –lpthread

(9)創建消息隊列和接收消息隊列

include <fcntl.h>
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define MQNAME "/mqtest"


int main(int argc, char *argv[])
{
    mqd_t mqd;
    int ret;
    if (argc != 3) {
        fprintf(stderr, "Argument error!\n");
        exit(1);
    }

mqd = mq_open(MQNAME, O_RDWR|O_CREAT, 0600, NULL);

    if (mqd == -1) {
        perror("mq_open()");
        exit(1);
    }

    ret = mq_send(mqd, argv[1], strlen(argv[1]), atoi(argv[2]));
    if (ret == -1) {
        perror("mq_send()");
        exit(1);
    }

    exit(0);
}

// gcc -lrt –lpthread 動態鏈接庫來編譯 
// 存入消息隊列
./send zorro 1
./send shrek 2
./send jerry 3
./send zzzzz 1
./send ssssss 2
./send jjjjj 3

cat /dev/mqueue/mqtest  查看消息隊列的狀態 QSIZE:31 NOTIFY:0     SIGNO:0     NOTIFY_PID:0

 

(10)接收消息:

#include <fcntl.h>
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define MQNAME "/mqtest"


int main()
{

    mqd_t mqd;
    int ret;
    int val;
    char buf[BUFSIZ];

    mqd = mq_open(MQNAME, O_RDWR);
    if (mqd == -1) {
        perror("mq_open()");
        exit(1);
    }

    ret = mq_receive(mqd, buf, BUFSIZ, &val);
    if (ret == -1) {
        perror("mq_send()");
        exit(1);
    }

    ret = mq_close(mqd);
    if (ret == -1) {
        perror("mp_close()");
        exit(1);
    }

    printf("msq: %s, prio: %d\n", buf, val);

    exit(0);
}

  

(10)刪除這個消息隊列

#include <fcntl.h>
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main()
{

    int ret;
    ret = mq_unlink(MQNAME); //刪除
    if (ret == -1) {
        perror("mp_unlink()");
        exit(1);
    }
    exit(0);
}

(11)異步通知機制  使用這個機制,我們就可以讓隊列在由空變成不空的時候觸發一個異步事件,通知調用進程,以便讓進程可以在隊列為空的時候不用阻塞等待。

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

#include <pthread.h>
#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>

static mqd_t mqdes;

void mq_notify_proc(int sig_num)
{
    /* mq_notify_proc()是信號處理函數,
    當隊列從空變成非空時,會給本進程發送信號,
    觸發本函數執行。 */

    struct mq_attr attr;
    void *buf;
    ssize_t size;
    int prio;
    struct sigevent sev;

    /* 我們約定使用SIGUSR1信號進行處理,
    在此判斷發來的信號是不是SIGUSR1。 */
    if (sig_num != SIGUSR1) {
        return;
    }

    /* 取出當前隊列的消息長度上限作為緩存空間大小。 */
    if (mq_getattr(mqdes, &attr) < 0) {
        perror("mq_getattr()");
        exit(1);
    }

    buf = malloc(attr.mq_msgsize);
    if (buf == NULL) {
        perror("malloc()");
        exit(1);
    }

    /* 從消息隊列中接收消息。 */
    size = mq_receive(mqdes, buf, attr.mq_msgsize, &prio);
    if (size == -1) {
        perror("mq_receive()");
        exit(1);
    }

    /* 打印消息和其優先級。 */
    printf("msq: %s, prio: %d\n", buf, prio);

    free(buf);

    /* 重新注冊mq_notify,以便下次可以出觸發。 */
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = SIGUSR1;
    if (mq_notify(mqdes, &sev) == -1) {
        perror("mq_notify()");
        exit(1);
    }

    return;
}

int main(int argc, char *argv[])
{
    struct sigevent sev;

    if (argc != 2) {
        fprintf(stderr, "Argument error!\n");
        exit(1);
    }

    /* 注冊信號處理函數。 */
    if (signal(SIGUSR1, mq_notify_proc) == SIG_ERR) {
        perror("signal()");
        exit(1);
    }

    /* 打開消息隊列,注意此隊列需要先創建。 */
    mqdes = mq_open(argv[1], O_RDONLY);
    if (mqdes == -1) {
        perror("mq_open()");
        exit(1);
    }

    /* 注冊mq_notify。 */
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = SIGUSR1;
    if (mq_notify(mqdes, &sev) == -1) {
        perror("mq_notify()");
        exit(1);
    }

    /* 主進程每秒打印一行x,等着從消息隊列發來異步信號觸發收消息。 */
    while (1) {
        printf("x\n");
        sleep(1);
    }
}

  

  

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM