Linux進程間通信(二) - 消息隊列


消息隊列

消息隊列是Linux IPC中很常用的一種通信方式,它通常用來在不同進程間發送特定格式的消息數據。

消息隊列和之前討論過的管道和FIFO有很大的區別,主要有以下兩點(管道請查閱我的另一篇文章:http://www.cnblogs.com/linuxbug/p/4863724.html)

Ø  一個進程向消息隊列寫入消息之前,並不需要某個進程在該隊列上等待該消息的到達,而管道和FIFO是相反的,進程向其中寫消息時,管道和FIFO必須已經打開來讀,否則寫進程就會阻塞(默認情況下)。

Ø  IPC的持續性不同。管道和FIFO是隨進程的持續性,當管道和FIFO最后一次關閉發生時,仍在管道和FIFO中的數據會被丟棄。消息隊列是隨內核的持續性,即一個進程向消息隊列寫入消息后,然后終止,另外一個進程可以在以后某個時刻打開該隊列讀取消息。只要內核沒有重新自舉,消息隊列沒有被刪除。

消息隊列中的每條消息通常具有以下屬性:

Ø  一個表示優先級的整數;

Ø  消息的數據部分的長度;

Ø  消息數據本身;

下面我們分別闡述POSIX消息隊列和System V消息隊列,這2種消息隊列目前Linux都支持。

POSIX消息隊列

數據結構

先給出mq_attr 結構的定義

#include <bits/mqueue.h>

struct mq_attr

{

  long int mq_flags;      /* Message queue flags. 0 or O_NONBLOCK */

  long int mq_maxmsg;   /* Maximum number of messages.  */

  long int mq_msgsize;   /* Maximum message size.  */

  long int mq_curmsgs;   /* Number of messages currently queued.  */

  long int __pad[4];

};

函數說明

// 打開一個已經創建的消息隊列

mqd_t mq_open(const char *name, int oflag);

// 創建消息隊列

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

name:表示消息隊列的名字,它符合POSIX IPC的名字規則。

oflag:表示打開的方式,和 open函數的類似。有必須的選項:O_RDONLYO_WRONLYO_RDWR,還有可選的選項:O_NONBLOCKO_CREATO_EXCL

mode:是一個可選參數,在oflag中含有O_CREAT標志且消息隊列不存在時,才需要提供該參數。表示默認訪問權限。可以參考open

attr:也是一個可選參數,在 oflag中含有O_CREAT標志且消息隊列不存在時才需要。該參數用於給新隊列設定某些屬性,如果是空指針,那么就采用默認屬性。

mq_open返回值是mqd_t類型的值,被稱為消息隊列描述符。

Linux 2.6中該類型的定義為整型:

#include <bits/mqueue.h>

typedef int mqd_t;

 

// 關閉消息隊列

mqd_t mq_close(mqd_t mqdes);

mq_close用於關閉一個消息隊列,和文件的close類型一樣,關閉后,消息隊列並不從系統中刪除。一個進程結束,會自動調用關閉打開着的消息隊列。

 

// 刪除消息隊列

mqd_t mq_unlink(const char *name);

mq_unlink用於刪除一個消息隊列。消息隊列創建后只有通過調用該函數或者是內核自舉才能進行刪除。每個消息隊列都有一個保存當前打開着描述符數的引用計數器,和文件一樣,因此本函數能夠實現類似於unlink函數刪除一個文件的機制。

 

// 獲取消息隊列參數

mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);

// 設置消息隊列參數

mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

mq_getattr用於獲取當前消息隊列的屬性,mq_setattr用於設置當前消息隊列的屬性。其中mq_setattr中的oldattr用於保存修改前的消息隊列的屬性,可以為空。

mq_setattr可以設置的屬性只有mq_flags,用來設置或清除消息隊列的非阻塞標志。newattr結構的其他屬性被忽略。mq_maxmsgmq_msgsize屬性只能在創建消息隊列時通過mq_open來設置。mq_open只會設置該兩個屬性,忽略另外兩個屬性。mq_curmsgs屬性只能被獲取而不能被設置。

 

// 發送接收消息

mqd_t mq_send(mqd_t mqdes, const char *msg_ptr,

size_t msg_len, unsigned msg_prio); //成功返回0,出錯返回-1

mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,

unsigned *msg_prio); //成功返回接收到消息的字節數,出錯返回-1

如果mq為空,mq_receive默認阻塞,如果設置了O_NONBLOCKmq_receive立即返回,並將errno設置為EAGAIN

多進程情況下,如果多個進程阻塞在mq_receive調用,當消息到來時,具有最高優先級和等待時間最長的進程將得到這條消息。因此可以確認,mq接收消息在應用層看來是原子操作。

 

#ifdef __USE_XOPEN2K

mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr,

                      size_t msg_len, unsigned msg_prio,

                      const struct timespec *abs_timeout);

mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,

                      size_t msg_len, unsigned *msg_prio,

                      const struct timespec *abs_timeout);

#endif

mq_send向消息隊列中寫入一條消息,mq_receive從消息隊列中讀取一條消息。

mqdes:消息隊列描述符;

msg_ptr:指向消息體緩沖區的指針;

msg_len:消息體的長度,其中mq_receive 的該參數不能小於能寫入隊列中消息的最大大小,即一定要大於等於該隊列的mq_attr結構中mq_msgsize的大小。如果 mq_receive中的msg_len小於該值,就會返回EMSGSIZE錯誤。POXIS消息隊列發送的消息長度可以為0

msg_prio:消息的優先級;它是一個小於 MQ_PRIO_MAX的數,數值越大,優先級越高。POSIX消息隊列在調用mq_receive時總是返回隊列中最高優先級的最早消息。如果消息不需要設定優先級,那么可以在 mq_send是置msg_prio0mq_receivemsg_prio置為NULL

還有兩個XSI定義的擴展接口限時發送和接收消息的函數:mq_timedsendmq_timedreceive函數。默認情況下mq_sendmq_receive是阻塞進行調用,可以通過mq_setattr來設置為O_NONBLOCK

mq使用詳解

創建一個mq
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_create <mq name>\n");
        exit(0);
    }
    
    mqID = mq_open(argv[optind], O_RDWR|O_CREAT|O_EXCL, 0666, NULL);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    printf("open message queue succ, mqID = %d\n", mqID);
    
    mq_attr mqAttr;
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        printf("get the message queue attribute error\n");
        return -1;
    }
    
    printf("mq_flags = %d, mq_maxmsg = %d, mq_msgsize = %d, mq_curmsgs = %d\n",
        mqAttr.mq_flags, mqAttr.mq_maxmsg, mqAttr.mq_msgsize, mqAttr.mq_curmsgs);
    
    return 0;
}

結果說明:

[root@rocket ipc]# g++ -g -o ipc_posix_mq_create ipc_posix_mq_create.cpp -lrt

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue /a.txt error[File exists]

[root@rocket ipc]# ./ipc_posix_mq_create /b.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_create /b.txt

open message queue /b.txt error[File exists]

刪除一個mq
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_unlink <mq name>\n");
        exit(0);
    }
    
    mq_unlink(argv[optind]);
    printf("error = %s\n", strerror(errno));
    return 0;
}

結果說明:

[root@rocket ipc]# g++ -g -o ipc_posix_mq_unlink ipc_posix_mq_unlink.cpp -lrt

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue /a.txt error[File exists]

[root@rocket ipc]# ./ipc_posix_mq_unlink /a.txt

error = Success

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

獲取mq的屬性
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_getattr <mq name>\n");
        exit(0);
    }
    
    mqID = mq_open(argv[optind], O_RDONLY);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    printf("open message queue succ, mqID = %d\n", mqID);
    
    mq_attr mqAttr;
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        printf("get the message queue attribute error\n");
        return -1;
    }
    
    printf("mq_flags = %d, mq_maxmsg = %d, mq_msgsize = %d, mq_curmsgs = %d\n",
        mqAttr.mq_flags, mqAttr.mq_maxmsg, mqAttr.mq_msgsize, mqAttr.mq_curmsgs);
    mq_close(mqID);
    
    return 0;
}

結果說明:

[root@rocket ipc]# g++ -g -o ipc_posix_mq_getattr ipc_posix_mq_getattr.cpp -lrt

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_getattr /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_unlink /a.txt

error = Success

[root@rocket ipc]# ./ipc_posix_mq_getattr /a.txt

open message queue /a.txt error[No such file or directory]

設置mq屬性
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    if (argc!=4)
    {
        printf("usage: ./ipc_posix_mq_open_setattr <mq name> <max msg> <msgsize>\n");
        exit(0);
    }
    
    mq_attr mqAttr;
    mqAttr.mq_maxmsg = atoi(argv[2]);
    mqAttr.mq_msgsize = atoi(argv[3]);
    
    mqID = mq_open(argv[optind], O_RDWR|O_CREAT|O_EXCL, 0666, &mqAttr);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    printf("open message queue succ, mqID = %d\n", mqID);
    
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        printf("get the message queue attribute error\n");
        return -1;
    }
    
    printf("mq_flags = %d, mq_maxmsg = %d, mq_msgsize = %d, mq_curmsgs = %d\n",
        mqAttr.mq_flags, mqAttr.mq_maxmsg, mqAttr.mq_msgsize, mqAttr.mq_curmsgs);
    mq_close(mqID);
    
    return 0;
}

結果說明:

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_unlink /a.txt

error = Success

[root@rocket ipc]# ./ipc_posix_mq_open_setattr /a.txt 100 1024

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 100, mq_msgsize = 1024, mq_curmsgs = 0

這里可以看出,屬性修改符合預期,已經和默認屬性不一樣了。

發送接收mq消息

發送mq消息

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    unsigned int iprio;
    if (argc!=4)
    {
        printf("usage: ./ipc_posix_mq_send <mq name> <message> <priority>\n");
        exit(0);
    }
    iprio = atoi(argv[3]);
    
    mqID = mq_open(argv[optind], O_WRONLY);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    printf("open message queue succ, mqID = %d\n", mqID);
    mq_send(mqID, argv[2], strlen(argv[2]), iprio);
    return 0;
}

接收mq消息

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    mq_attr mqAttr;
    unsigned int iprio;
    unsigned int n;
    char buff[8192];
    
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_recv <mq name>\n");
        exit(0);
    }
    
    mqID = mq_open(argv[optind], O_RDONLY);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    mq_getattr(mqID, &mqAttr);
    n = mq_receive(mqID, buff, mqAttr.mq_msgsize, &iprio);
    printf("read from mq`s msg = %s\n", buff);
    return 0;
}

結果說明:

[root@rocket ipc]# g++ -g -o ipc_posix_mq_recv ipc_posix_mq_recv.cpp -lrt

[root@rocket ipc]# g++ -g -o ipc_posix_mq_send ipc_posix_mq_send.cpp -lrt

[root@rocket ipc]# ./ipc_posix_mq_unlink /a.txt

error = No such file or directory

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "how are you?" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_recv /a.txt

read from mq`s msg = hello

[root@rocket ipc]# ./ipc_posix_mq_recv /a.txt

read from mq`s msg = how are you?

 

多進程阻塞接收mq消息,發送進程跟前面一樣,接收進程修改為循環接收消息

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    mq_attr mqAttr;
    unsigned int iprio;
    unsigned int n;
    char buff[8192];
    
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_recv <mq name>\n");
        exit(0);
    }
    
    mqID = mq_open(argv[optind], O_RDONLY);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    mq_getattr(mqID, &mqAttr);
    while(1)
    {
        n = mq_receive(mqID, buff, mqAttr.mq_msgsize, &iprio);
        printf("read from mq`s msg = %s\n", buff);
    }
    return 0;
}

結果說明:可以看到當2個進程調用mq_receive,當消息到來時,只有1個進程能接收到這條消息,2個進程輪流的接收mq_send發出的消息

tty1發送消息:

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello222" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello223" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello123" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello333" 10

open message queue succ, mqID = 3

tty2接收消息:

[root@rocket ipc]# ./ipc_posix_mq_recv_loop /a.txt

read from mq`s msg = hello

read from mq`s msg = hello222

read from mq`s msg = hello123

tty3接收消息:

[root@rocket ipc]# ./ipc_posix_mq_recv_loop /a.txt

read from mq`s msg = hello

read from mq`s msg = hello223

read from mq`s msg = hello333

POSIX消息隊列的限制

POSIX消息隊列本身的限制就是mq_attr中的mq_maxmsgmq_msgsize,分別用於限定消息隊列中的最大消息數和每個消息的最大字節數。在前面已經說過了,這兩個參數可以在調用mq_open創建一個消息隊列的時候設定。但這個設定是受到系統內核限制的。

下面是在Linux 2.6shell對啟動進程的POSIX消息隊列大小的限制:

[root@rocket ipc]# ulimit -a|grep message

POSIX message queues     (bytes, -q) 819200

當然你可以調大這個參數

[root@rocket ipc]# ulimit -q 8192000

[root@rocket ipc]# ulimit -a|grep message

POSIX message queues     (bytes, -q) 8192000

 

System V消息隊列

數據結構

控制結構:

struct msqid_ds {

   struct ipc_perm msg_perm;     /* Ownership and permissions */

   time_t          msg_stime;    /* Time of last msgsnd(2) */

   time_t          msg_rtime;    /* Time of last msgrcv(2) */

   time_t          msg_ctime;    /* Time of last change */

   unsigned long   __msg_cbytes; /* Current number of bytes in

                                                                                    queue (non-standard) */

   msgqnum_t       msg_qnum;     /* Current number of messages

                                                                                    in queue */

   msglen_t        msg_qbytes;   /* Maximum number of bytes

                                                                                    allowed in queue */

   pid_t           msg_lspid;    /* PID of last msgsnd(2) */

   pid_t           msg_lrpid;    /* PID of last msgrcv(2) */

};

發送接收數據:

struct msgbuf {

   long mtype;       /* message type, must be > 0 */

   char mtext[1];    /* message data */

};

函數說明

msgget函數

該函數用來創建和訪問一個消息隊列。它的原型為:

int msgget(key_t key, int msgflg);

與其他的IPC機制一樣,程序必須提供一個鍵來命名某個特定的消息隊列。msgflg是一個權限標志,表示消息隊列的訪問權限,它與文件的訪問權限一樣。msgflg可以與IPC_CREAT做或操作,表示當key所命名的消息隊列不存在時創建一個消息隊列,如果key所命名的消息隊列存在時,IPC_CREAT標志會被忽略,而只返回一個標識符。

它返回一個以key命名的消息隊列的標識符(非零整數),失敗時返回-1

 

msgsnd函數

該函數用來把消息添加到消息隊列中。它的原型為:

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);

msgid是由msgget函數返回的消息隊列標識符。

msg_ptr是一個指向准備發送消息的指針,但是消息的數據結構卻有一定的要求,指針msg_ptr所指向的消息結構一定要是以一個長整型成員變量開始的結構體,接收函數將用這個成員來確定消息的類型。所以消息結構要定義成這樣:

struct my_message{

long int message_type;

/* The data you wish to transfer*/

};

struct my_message{

    long int message_type;

    /* The data you wish to transfer*/

};

msg_szmsg_ptr指向的消息的長度,注意是消息的長度,而不是整個結構體的長度,也就是說msg_sz是不包括長整型消息類型成員變量的長度。

msgflg用於控制當前消息隊列滿或隊列消息到達系統范圍的限制時將要發生的事情。

如果調用成功,消息數據的一分副本將被放到消息隊列中,並返回0,失敗時返回-1.

 

msgrcv函數

該函數用來從一個消息隊列獲取消息,它的原型為

int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);

msgid, msg_ptr, msg_st的作用也函數msgsnd函數的一樣。

msgtype可以實現一種簡單的接收優先級。如果msgtype0,就獲取隊列中的第一個消息。如果它的值大於零,將獲取具有相同消息類型的第一個信息。如果它小於零,就獲取類型等於或小於msgtype的絕對值的第一個消息。

msgflg用於控制當隊列中沒有相應類型的消息可以接收時將發生的事情。

調用成功時,該函數返回放到接收緩存區中的字節數,消息被復制到由msg_ptr指向的用戶分配的緩存區中,然后刪除消息隊列中的對應消息。失敗時返回-1.

 

msgctl函數

該函數用來控制消息隊列,它與共享內存的shmctl函數相似,它的原型為:

int msgctl(int msgid, int command, struct msgid_ds *buf);

int msgctl(int msgid, int command, struct msgid_ds *buf);

command是將要采取的動作,它可以取3個值,

IPC_STAT:把msgid_ds結構中的數據設置為消息隊列的當前關聯值,即用消息隊列的當前關聯值覆蓋msgid_ds的值。

IPC_SET:如果進程有足夠的權限,就把消息列隊的當前關聯值設置為msgid_ds結構中給出的值

IPC_RMID:刪除消息隊列

 

buf是指向msgid_ds結構的指針,它指向消息隊列模式和訪問權限的結構。msgid_ds結構至少包括以下成員:

struct msgid_ds

{

uid_t shm_perm.uid;

uid_t shm_perm.gid;

mode_t shm_perm.mode;

};

struct msgid_ds

{

    uid_t shm_perm.uid;

    uid_t shm_perm.gid;

    mode_t shm_perm.mode;

};

成功時返回0,失敗時返回-1.

使用詳解

mq創建,代碼說明:
#include <unistd.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define BUFF_SIZE 1024
struct mq_msg_st  
{  
    long msg_type;  
    char text[BUFF_SIZE];  
};  

int main(int argc, char** argv)  
{    
    int msgid = -1;  
    struct mq_msg_st data;  
    long msgtype = 0;
    int iret = 0;
  
    //建立消息隊列  
    msgid = msgget((key_t)1234, 0666 | IPC_CREAT | IPC_EXCL);  
    if(msgid == -1)  
    {  
        printf("msgget failed with error: %s\n", strerror(errno));  
        return -1;
    }
    printf("msgget succ, msgid = %d\n", msgid);
    
    // 獲取消息隊列狀態
    struct msqid_ds ds;
    iret = msgctl(msgid, IPC_STAT, (struct msqid_ds *)&ds);
    if(iret == -1)
    {
        printf("msgctl IPC_STAT failed\n");
        return -2;
    }
    
    // 設置消息隊列最大容量
    const unsigned int QBYTES_NUM = 10000000;
    ds.msg_qbytes = QBYTES_NUM;
    iret = msgctl(msgid, IPC_SET, (struct msqid_ds *)&ds);
    if(iret == -1)
    {
        printf("msgctl IPC_SET failed\n");
        return -3;
    }
    
    return 0;
}

結果說明:

[root@rocket ipc]# g++ -g -o ipc_systemv_mq_create ipc_systemv_mq_create.cpp

[root@rocket ipc]# ./ipc_systemv_mq_create

msgget succ, msgid = 0

[root@rocket ipc]# ./ipc_systemv_mq_create

msgget failed with error: File exists

[root@rocket ipc]# ipcs

 

------ Shared Memory Segments --------

key        shmid      owner      perms      bytes      nattch     status     

0x00000000 0          gdm        600        393216     2          dest        

0x00000000 32769      gdm        600        393216     2          dest        

0x00000000 65538      gdm        600        393216     2          dest        

0x00000000 98307      gdm        600        393216     2          dest        

 

------ Semaphore Arrays --------

key        semid      owner      perms      nsems    

0x00000000 0          root       600        1        

0x00000000 32769      root       600        1        

 

------ Message Queues --------

key        msqid      owner      perms      used-bytes   messages   

0x000004d2 0          root       666        0            0    

這里看到已經創建了一個key1234(16進制為4d2)的消息隊列。

mq刪除,代碼說明:
#include <unistd.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int main(int argc, char** argv)  
{    
    int msgid = -1;   
    
    //建立消息隊列  
    msgid = msgget((key_t)1234, 0666);  
    if(msgid == -1)  
    {  
        printf("msgget failed with error: %s\n", strerror(errno));  
        return -1;
    }
    printf("msgget succ, msgid = %d\n", msgid);
    
    if (msgctl(msgid, IPC_RMID, 0) == -1)  
    {  
        printf("msgctl IPC_RMID failed\n");  
        return -1;
    }  
    
    return 0;
}
mq發送,代碼說明:

 

#include <unistd.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>  

#define BUFF_SIZE 1024
struct mq_msg_st  
{  
    long msg_type;  
    char text[BUFF_SIZE];  
};

int main(int argc, char** argv)  
{    
    int msgid = -1;  
    struct mq_msg_st data;
    long msgtype = 0;
    int iret = 0;
    char buffer[BUFF_SIZE];
  
    //建立消息隊列  
    msgid = msgget((key_t)1234, 0666);  
    if(msgid == -1)  
    {  
        printf("msgget failed with error: %s\n", strerror(errno));  
        return -1;
    }
    printf("msgget succ, msgid = %d\n", msgid);
    
    // 獲取消息隊列狀態
    struct msqid_ds ds;
    iret = msgctl(msgid, IPC_STAT, (struct msqid_ds *)&ds);
    if(iret == -1)
    {
        printf("msgctl IPC_STAT failed\n");
        return -2;
    }
    
    while(1)  
    {  
        //輸入數據  
        printf("Enter some text: ");  
        fgets(buffer, BUFF_SIZE, stdin);  
        data.msg_type = 1;      
        strcpy(data.text, buffer);  
        //向隊列發送數據  
        iret = msgsnd(msgid, (void*)&data, strlen(data.text)+1, IPC_NOWAIT);
        if(iret == -1)
        {  
            if (errno == EAGAIN)
            {
                continue;
            }
            else
            {
                printf("msgsnd failed, error = %s\n", strerror(errno));
            return -1;
            }
        }
        //輸入end結束輸入  
        if(strncmp(buffer, "end", 3) == 0)
        {
            break;
        }            
    }      
    return 0;
}

結果說明:

[root@rocket ipc]# g++ -g -o ipc_systemv_mq_send ipc_systemv_mq_send.cpp

[root@rocket ipc]# ./ipc_systemv_mq_send

msgget succ, msgid = 32768

Enter some text: hello

Enter some text: world

Enter some text: end

[root@rocket ipc]# ipcs

 

------ Shared Memory Segments --------

key        shmid      owner      perms      bytes      nattch     status     

0x00000000 0          gdm        600        393216     2          dest        

0x00000000 32769      gdm        600        393216     2          dest        

0x00000000 65538      gdm        600        393216     2          dest        

0x00000000 98307      gdm        600        393216     2          dest        

 

------ Semaphore Arrays --------

key        semid      owner      perms      nsems    

0x00000000 0          root       600        1        

0x00000000 32769      root       600        1        

 

------ Message Queues --------

key        msqid      owner      perms      used-bytes   messages   

0x000004d2 32768      root       666        19           3  

這里看到發送3條消息之后這里的messages3

mq接收,代碼說明:
#include <unistd.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define BUFF_SIZE 1024
struct mq_msg_st  
{  
    long msg_type;  
    char text[BUFF_SIZE];  
};

int main(int argc, char** argv)  
{    
    int msgid = -1;  
    struct mq_msg_st data;
    long msgtype = 0;
    int iret = 0;
  
    //建立消息隊列  
    msgid = msgget((key_t)1234, 0666);  
    if(msgid == -1)  
    {  
        printf("msgget failed with error: %s\n", strerror(errno));  
        return -1;
    }
    printf("msgget succ, msgid = %d\n", msgid);
    
    // 獲取消息隊列狀態
    struct msqid_ds ds;
    iret = msgctl(msgid, IPC_STAT, (struct msqid_ds *)&ds);
    if(iret == -1)
    {
        printf("msgctl IPC_STAT failed\n");
        return -2;
    }
    
    //從隊列中獲取消息,直到遇到end消息為止  
    while(1)
    {
        iret = msgrcv(msgid, (void*)&data, BUFF_SIZE, msgtype, IPC_NOWAIT);
        if (iret == -1)
        {  
            if (errno == ENOMSG)
            {
                usleep(100);
                continue;
            }
            else
            {
                printf("msgrcv failed, error = %s\n", strerror(errno));
                return -1;
            }
        }
        
        printf("get message: %s\n", data.text);  
        //遇到end結束  
        if(strncmp(data.text, "end", 3) == 0)
        {
            break;
        }  
    }
    
    return 0;
}

結果說明:

[root@rocket ipc]# g++ -g -o ipc_systemv_mq_recv ipc_systemv_mq_recv.cpp

[root@rocket ipc]# ./ipc_systemv_mq_recv

msgget succ, msgid = 32768

get message: hello

get message: world

get message: end

 

[root@rocket ipc]# ipcs

 

------ Shared Memory Segments --------

key        shmid      owner      perms      bytes      nattch     status     

0x00000000 0          gdm        600        393216     2          dest        

0x00000000 32769      gdm        600        393216     2          dest        

0x00000000 65538      gdm        600        393216     2          dest        

0x00000000 98307      gdm        600        393216     2          dest        

 

------ Semaphore Arrays --------

key        semid      owner      perms      nsems    

0x00000000 0          root       600        1        

0x00000000 32769      root       600        1        

 

------ Message Queues --------

key        msqid      owner      perms      used-bytes   messages   

0x000004d2 32768      root       666        0            0   

這里看到消息接收完了,messages0

msgrcv接收消息類型說明:

The argument msgtyp specifies the type of message requested as follows:

       * If msgtyp is 0, then the first message in the queue is read.

       * If  msgtyp  is  greater than 0, then the first message in the queue of type msgtyp is read, unless MSG_EXCEPT was specified in msgflg, in which case the first message in the queue of type not equal to msgtyp will be read.

       * If msgtyp is less than 0, then the first message in the queue with the lowest type less than or equal to the absolute value of msgtyp will be read.

這幾段也說得比較清楚了,這里就不翻譯了,在開發的過程中我們可以方便的使用msgtype來分發消息到不同的進程。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM