通信之消息隊列編程

1:生產者和消費者模式理解
(1) 生產者/消費者模式:需要使用到同步,以及線程,屬於多並發行列,產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。 單單抽象出生產者和消費者,還夠不上是生產者/消費者模式。該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據。
(2) 解耦:假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。
(3) 支持並發:生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。 使用了生產者/消費者模式之后,生產者和消費者可以是兩個獨立的並發主體(常見並發類型有進程和線程兩種)。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。其實當初這個模式,主要就是用來處理並發問題的。
(4) 支持忙閑不均:
緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。
(5) 總共理解:生產者生產數據,消費者消費數據,生產者和消費者通過緩存聯系起來,是通過消息隊列來連接共享隊列里面的東西,在隊列為空的時候,消費者無法消費signal.notify() //通知生產者,反之對於生產者來是通知消費者來進行消費。
2:POSIX消息隊列
(1) POSIX消息隊列是獨立於XSI消息隊列的一套新的消息隊列API,讓進程可以用消息的方式進行數據交換。
//包含的頭文件
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
(2)打開或者創建消息隊列。
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
https://www.cnblogs.com/LubinLew/p/POSIX-mq_open.html
//解釋 mq_open這個函數的
(3)發送或者接受消息隊列。
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
(4)int mq_close(mqd_t mqdes); 關閉一個消息隊列。
我們可以使用mq_close來關閉一個消息隊列,這里的關閉並非刪除了相關文件,關閉之后消息隊列在系統中依然存在,我們依然可以繼續打開它使用。這跟文件的close和unlink的概念是類似的。
(5)int mq_unlink(const char *name); 使用mq_unlink真正刪除一個消息隊列。
(6)使用mq_getattr和mq_setattr來查看和設置消息隊列。
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
(7) mq_attr結構體是這樣的結構:
struct mq_attr {
long mq_flags; /* 只可以通過此參數將消息隊列設置為是否非阻塞O_NONBLOCK */
long mq_maxmsg; /* 消息隊列的消息數上限 */
long mq_msgsize; /* 消息最大長度 */
long mq_curmsgs; /* 消息隊列的當前消息個數 */
};
(8)//掛載消息隊列

編譯posix mqueue時,要連接運行時庫(runtime library),既-lrt選項,
//把消息隊列掛載到 /dev/mqueue 下面
gcc -lrt -lpthread recv.c //
編譯的時候需要鏈接一些庫,所以我們可以創建Makefile
CFLAGS+=-lrt –lpthread
(9)創建消息隊列和接收消息隊列
include <fcntl.h>
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main(int argc, char *argv[])
{
mqd_t mqd;
int ret;
if (argc != 3) {
fprintf(stderr, "Argument error!\n");
exit(1);
}
mqd = mq_open(MQNAME, O_RDWR|O_CREAT, 0600, NULL);
if (mqd == -1) {
perror("mq_open()");
exit(1);
}
ret = mq_send(mqd, argv[1], strlen(argv[1]), atoi(argv[2]));
if (ret == -1) {
perror("mq_send()");
exit(1);
}
exit(0);
}
// gcc -lrt –lpthread 動態鏈接庫來編譯
// 存入消息隊列
./send zorro 1
./send shrek 2
./send jerry 3
./send zzzzz 1
./send ssssss 2
./send jjjjj 3
cat /dev/mqueue/mqtest 查看消息隊列的狀態 QSIZE:31 NOTIFY:0 SIGNO:0 NOTIFY_PID:0
(10)接收消息:
#include <fcntl.h>
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main()
{
mqd_t mqd;
int ret;
int val;
char buf[BUFSIZ];
mqd = mq_open(MQNAME, O_RDWR);
if (mqd == -1) {
perror("mq_open()");
exit(1);
}
ret = mq_receive(mqd, buf, BUFSIZ, &val);
if (ret == -1) {
perror("mq_send()");
exit(1);
}
ret = mq_close(mqd);
if (ret == -1) {
perror("mp_close()");
exit(1);
}
printf("msq: %s, prio: %d\n", buf, val);
exit(0);
}
(10)刪除這個消息隊列
#include <fcntl.h>
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main()
{
int ret;
ret = mq_unlink(MQNAME); //刪除
if (ret == -1) {
perror("mp_unlink()");
exit(1);
}
exit(0);
}
(11)異步通知機制 使用這個機制,我們就可以讓隊列在由空變成不空的時候觸發一個異步事件,通知調用進程,以便讓進程可以在隊列為空的時候不用阻塞等待。
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
#include <pthread.h>
#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
static mqd_t mqdes;
void mq_notify_proc(int sig_num)
{
/* mq_notify_proc()是信號處理函數,
當隊列從空變成非空時,會給本進程發送信號,
觸發本函數執行。 */
struct mq_attr attr;
void *buf;
ssize_t size;
int prio;
struct sigevent sev;
/* 我們約定使用SIGUSR1信號進行處理,
在此判斷發來的信號是不是SIGUSR1。 */
if (sig_num != SIGUSR1) {
return;
}
/* 取出當前隊列的消息長度上限作為緩存空間大小。 */
if (mq_getattr(mqdes, &attr) < 0) {
perror("mq_getattr()");
exit(1);
}
buf = malloc(attr.mq_msgsize);
if (buf == NULL) {
perror("malloc()");
exit(1);
}
/* 從消息隊列中接收消息。 */
size = mq_receive(mqdes, buf, attr.mq_msgsize, &prio);
if (size == -1) {
perror("mq_receive()");
exit(1);
}
/* 打印消息和其優先級。 */
printf("msq: %s, prio: %d\n", buf, prio);
free(buf);
/* 重新注冊mq_notify,以便下次可以出觸發。 */
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = SIGUSR1;
if (mq_notify(mqdes, &sev) == -1) {
perror("mq_notify()");
exit(1);
}
return;
}
int main(int argc, char *argv[])
{
struct sigevent sev;
if (argc != 2) {
fprintf(stderr, "Argument error!\n");
exit(1);
}
/* 注冊信號處理函數。 */
if (signal(SIGUSR1, mq_notify_proc) == SIG_ERR) {
perror("signal()");
exit(1);
}
/* 打開消息隊列,注意此隊列需要先創建。 */
mqdes = mq_open(argv[1], O_RDONLY);
if (mqdes == -1) {
perror("mq_open()");
exit(1);
}
/* 注冊mq_notify。 */
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = SIGUSR1;
if (mq_notify(mqdes, &sev) == -1) {
perror("mq_notify()");
exit(1);
}
/* 主進程每秒打印一行x,等着從消息隊列發來異步信號觸發收消息。 */
while (1) {
printf("x\n");
sleep(1);
}
}
