通信之消息隊列編程
1:生產者和消費者模式理解
(1) 生產者/消費者模式:需要使用到同步,以及線程,屬於多並發行列,產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。 單單抽象出生產者和消費者,還夠不上是生產者/消費者模式。該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據。
(2) 解耦:假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。
(3) 支持並發:生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。 使用了生產者/消費者模式之后,生產者和消費者可以是兩個獨立的並發主體(常見並發類型有進程和線程兩種)。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。其實當初這個模式,主要就是用來處理並發問題的。
(4) 支持忙閑不均:
緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。
(5) 總共理解:生產者生產數據,消費者消費數據,生產者和消費者通過緩存聯系起來,是通過消息隊列來連接共享隊列里面的東西,在隊列為空的時候,消費者無法消費signal.notify() //通知生產者,反之對於生產者來是通知消費者來進行消費。
2:POSIX消息隊列
(1) POSIX消息隊列是獨立於XSI消息隊列的一套新的消息隊列API,讓進程可以用消息的方式進行數據交換。
//包含的頭文件
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
(2)打開或者創建消息隊列。
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
https://www.cnblogs.com/LubinLew/p/POSIX-mq_open.html
//解釋 mq_open這個函數的
(3)發送或者接受消息隊列。
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
(4)int mq_close(mqd_t mqdes); 關閉一個消息隊列。
我們可以使用mq_close來關閉一個消息隊列,這里的關閉並非刪除了相關文件,關閉之后消息隊列在系統中依然存在,我們依然可以繼續打開它使用。這跟文件的close和unlink的概念是類似的。
(5)int mq_unlink(const char *name); 使用mq_unlink真正刪除一個消息隊列。
(6)使用mq_getattr和mq_setattr來查看和設置消息隊列。
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
(7) mq_attr結構體是這樣的結構:
struct mq_attr {
long mq_flags; /* 只可以通過此參數將消息隊列設置為是否非阻塞O_NONBLOCK */
long mq_maxmsg; /* 消息隊列的消息數上限 */
long mq_msgsize; /* 消息最大長度 */
long mq_curmsgs; /* 消息隊列的當前消息個數 */
};
(8)//掛載消息隊列
編譯posix mqueue時,要連接運行時庫(runtime library),既-lrt選項,
//把消息隊列掛載到 /dev/mqueue 下面
gcc -lrt -lpthread recv.c //
編譯的時候需要鏈接一些庫,所以我們可以創建Makefile
CFLAGS+=-lrt –lpthread
(9)創建消息隊列和接收消息隊列
include <fcntl.h> #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <string.h> #define MQNAME "/mqtest" int main(int argc, char *argv[]) { mqd_t mqd; int ret; if (argc != 3) { fprintf(stderr, "Argument error!\n"); exit(1); } mqd = mq_open(MQNAME, O_RDWR|O_CREAT, 0600, NULL); if (mqd == -1) { perror("mq_open()"); exit(1); } ret = mq_send(mqd, argv[1], strlen(argv[1]), atoi(argv[2])); if (ret == -1) { perror("mq_send()"); exit(1); } exit(0); } // gcc -lrt –lpthread 動態鏈接庫來編譯 // 存入消息隊列 ./send zorro 1 ./send shrek 2 ./send jerry 3 ./send zzzzz 1 ./send ssssss 2 ./send jjjjj 3 cat /dev/mqueue/mqtest 查看消息隊列的狀態 QSIZE:31 NOTIFY:0 SIGNO:0 NOTIFY_PID:0
(10)接收消息:
#include <fcntl.h> #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <string.h> #define MQNAME "/mqtest" int main() { mqd_t mqd; int ret; int val; char buf[BUFSIZ]; mqd = mq_open(MQNAME, O_RDWR); if (mqd == -1) { perror("mq_open()"); exit(1); } ret = mq_receive(mqd, buf, BUFSIZ, &val); if (ret == -1) { perror("mq_send()"); exit(1); } ret = mq_close(mqd); if (ret == -1) { perror("mp_close()"); exit(1); } printf("msq: %s, prio: %d\n", buf, val); exit(0); }
(10)刪除這個消息隊列
#include <fcntl.h> #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <string.h> #define MQNAME "/mqtest" int main() { int ret; ret = mq_unlink(MQNAME); //刪除 if (ret == -1) { perror("mp_unlink()"); exit(1); } exit(0); }
(11)異步通知機制 使用這個機制,我們就可以讓隊列在由空變成不空的時候觸發一個異步事件,通知調用進程,以便讓進程可以在隊列為空的時候不用阻塞等待。
int mq_notify(mqd_t mqdes, const struct sigevent *sevp); #include <pthread.h> #include <mqueue.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <signal.h> static mqd_t mqdes; void mq_notify_proc(int sig_num) { /* mq_notify_proc()是信號處理函數, 當隊列從空變成非空時,會給本進程發送信號, 觸發本函數執行。 */ struct mq_attr attr; void *buf; ssize_t size; int prio; struct sigevent sev; /* 我們約定使用SIGUSR1信號進行處理, 在此判斷發來的信號是不是SIGUSR1。 */ if (sig_num != SIGUSR1) { return; } /* 取出當前隊列的消息長度上限作為緩存空間大小。 */ if (mq_getattr(mqdes, &attr) < 0) { perror("mq_getattr()"); exit(1); } buf = malloc(attr.mq_msgsize); if (buf == NULL) { perror("malloc()"); exit(1); } /* 從消息隊列中接收消息。 */ size = mq_receive(mqdes, buf, attr.mq_msgsize, &prio); if (size == -1) { perror("mq_receive()"); exit(1); } /* 打印消息和其優先級。 */ printf("msq: %s, prio: %d\n", buf, prio); free(buf); /* 重新注冊mq_notify,以便下次可以出觸發。 */ sev.sigev_notify = SIGEV_SIGNAL; sev.sigev_signo = SIGUSR1; if (mq_notify(mqdes, &sev) == -1) { perror("mq_notify()"); exit(1); } return; } int main(int argc, char *argv[]) { struct sigevent sev; if (argc != 2) { fprintf(stderr, "Argument error!\n"); exit(1); } /* 注冊信號處理函數。 */ if (signal(SIGUSR1, mq_notify_proc) == SIG_ERR) { perror("signal()"); exit(1); } /* 打開消息隊列,注意此隊列需要先創建。 */ mqdes = mq_open(argv[1], O_RDONLY); if (mqdes == -1) { perror("mq_open()"); exit(1); } /* 注冊mq_notify。 */ sev.sigev_notify = SIGEV_SIGNAL; sev.sigev_signo = SIGUSR1; if (mq_notify(mqdes, &sev) == -1) { perror("mq_notify()"); exit(1); } /* 主進程每秒打印一行x,等着從消息隊列發來異步信號觸發收消息。 */ while (1) { printf("x\n"); sleep(1); } }