ZeroMQ_04 發布訂閱模式


簡單來說,就是服務端不斷發布消息,客戶端訂閱了就會收到消息。

下面我們看個簡單的實例:

Server:

#include <stdlib.h> 
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h> 

#define buffersize 4096
#define randof(num)  (int) ((float) (num) * random () / (RAND_MAX + 1.0))

int main(int argc, char* argv[])
{
    // [0]創建對象
    void* ctx = zmq_ctx_new();
    void* publisher = zmq_socket(ctx, ZMQ_PUB);
    // [1]綁定到5566端口
    zmq_bind(publisher, "tcp://*:5566");

     //  初始化隨機數生成器
    srandom ((unsigned) time (NULL));
    while (1) {
       int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        //  Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        printf("server send: %s\n", update);
        //s_send (publisher, update);
        zmq_send (publisher, update, strlen (update), 0);
        sleep(1);
    }
    zmq_close(publisher);
    zmq_ctx_destroy(ctx);
    return 0;
}

Client:

#include <stdlib.h> 
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h> 
#include <assert.h>

static char *s_recv (void *socket) {
    char buffer [256];
    int size = zmq_recv (socket, buffer, 255, 0);
    if (size == -1)
        return NULL;
    buffer[size] = '\0';

    return strndup (buffer, sizeof(buffer) - 1);
}

int main (int argc, char *argv [])
{
    //  [0]創建對象,連接到5566端口
    printf ("Collecting updates from weather server...\n");
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5566");
    assert (rc == 0);

    //  [1]設置過濾條件,設置為空,表示全訂閱,這里“1”表示匹配開頭為“1”的數據
    const char *filter =  "1";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                         filter, strlen (filter));
    assert (rc == 0);
    //  [2]接受數據
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        
        char *string = s_recv (subscriber);
        printf ("client: %s\n", string);
        int zipcode, temperature, relhumidity;
        sscanf (string, "%d %d %d",
            &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        free (string);
    }
    printf ("Average temperature for zipcode '%s' was %dF\n",
        filter, (int) (total_temp / update_nbr));

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}

 

out:

// server
server send: 43345 -41 19
server send: 44203 110 59
server send: 78038 2 25
server send: 55377 59 18
server send: 40135 -65 36
server send: 37950 43 10

// client
zf@eappsvr-0:~/ds/zmq/test/pub_sub> ./client
Collecting updates from weather server...
client....
client: 10057 67 11
client: 16839 94 25

 總的來說就是發布者不斷發布消息,訂閱者可以有選擇的訂閱消息,訂閱規則可以設置多個。這里我們再加個訂閱規則看一下。

//  [1]設置過濾條件,設置為空,表示全訂閱,這里“1”表示匹配開頭為“1”的數據
    const char *filter =  "1";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                         filter, strlen (filter));
    assert (rc == 0);
    const char *filter2 =  "2";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                         filter2, strlen (filter2));
    assert (rc == 0);
zf@eappsvr-0:~/ds/zmq/test/pub_sub> ./client
Collecting updates from weather server...
client: 25113 56 58
client: 29957 62 21
client: 17914 -14 55
client: 15522 -80 57
client: 13588 34 53
client: 13291 -22 58
client: 18876 -62 13
client: 25827 38 54
client: 13747 -55 23

我們可以看到訂閱規則可以設置多個,就是訂閱多了,就收的多了。

注意: 

需要注意的是,在使用SUB套接字時,必須使用zmq_setsockopt()方法來設置訂閱的內容。如果你不設置訂閱內容,那將什么消息都收不到,新手很容易犯這個錯誤。訂閱信息可以是任何字符串,可以設置多次。只要消息滿足其中一條訂閱信息,SUB套接字就會收到。訂閱者可以選擇不接收某類消息,也是通過zmq_setsockopt()方法實現的。

PUB-SUB套接字組合是異步的。客戶端在一個循環體中使用zmq_recv()接收消息,如果向SUB套接字發送消息則會報錯;類似地,服務端可以不斷地使用zmq_send()發送消息,但不能在PUB套接字上使用zmq_recv()。

關於PUB-SUB套接字,還有一點需要注意:你無法得知SUB是何時開始接收消息的。就算你先打開了SUB套接字,后打開PUB發送消息,這時SUB還是會丟失一些消息的,因為建立連接是需要一些時間的。很少,但並不是零。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM