ZeroMQ示例(C/C++/PHP)詳解三種模式


源自:https://blog.csdn.net/qq_16836151/article/details/52108152
1、應答模式
2、均衡分配模式(推拉模式)
3、發布訂閱模式(天氣預報)

提問-回答

讓我們從簡單的代碼開始,一段傳統的Hello World程序。我們會創建一個客戶端和一個服務端,客戶端發送Hello給服務端,服務端返回World。下文是C語言編寫的服務端,它在5555端口打開一個ZMQ套接字,等待請求,收到后應答World。

hwserver.c: Hello World server

//
// Hello World 服務端
// 綁定一個REP套接字至tcp://*:5555
// 從客戶端接收Hello,並應答World
//
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>

int main (void)
{
    void *context = zmq_init (1);

    // 與客戶端通信的套接字
    void *responder = zmq_socket (context, ZMQ_REP);
    zmq_bind (responder, "tcp://*:5555");

    while (1) {
        // 等待客戶端請求
        zmq_msg_t request;
        zmq_msg_init (&request);
        zmq_recv (responder, &request, 0);
        printf ("收到 Hello\n");
        zmq_msg_close (&request);

        // 做些“處理”
        sleep (1);

        // 返回應答
        zmq_msg_t reply;
        zmq_msg_init_size (&reply, 5);
        memcpy (zmq_msg_data (&reply), "World", 5);
        zmq_send (responder, &reply, 0);
        zmq_msg_close (&reply);
    }
    // 程序不會運行到這里,以下只是演示我們應該如何結束
    zmq_close (responder);
    zmq_term (context);
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

1

使用REQ-REP套接字發送和接受消息是需要遵循一定規律的。客戶端首先使用zmq_send()發送消息,再用zmq_recv()接收,如此循環。如果打亂了這個順序(如連續發送兩次)則會報錯。類似地,服務端必須先進行接收,后進行發送。

ZMQ使用C語言作為它參考手冊的語言,本指南也以它作為示例程序的語言。如果你正在閱讀本指南的在線版本,你可以看到示例代碼的下方有其他語言的實現。如以下是C++語言:

hwserver.cpp: Hello World server

//
// Hello World 服務端 C++語言版
// 綁定一個REP套接字至tcp://*:5555
// 從客戶端接收Hello,並應答World
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

int main () {
    // 准備上下文和套接字
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REP);
    socket.bind ("tcp://*:5555");

    while (true) {
        zmq::message_t request;

        // 等待客戶端請求
        socket.recv (&request);
        std::cout << "收到 Hello" << std::endl;

        // 做一些“處理”
        sleep (1);

        // 應答World
        zmq::message_t reply (5);
        memcpy ((void *) reply.data (), "World", 5);
        socket.send (reply);
    }
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

可以看到C語言和C++語言的API代碼差不多,而在PHP這樣的語言中,代碼就會更為簡潔:

hwserver.php: Hello World server

<?php
/** * Hello World 服務端 * 綁定REP套接字至 tcp://*:5555 * 從客戶端接收Hello,並應答World * @author Ian Barber <ian(dot)barber(at)gmail(dot)com> */

$context = new ZMQContext(1);

// 與客戶端通信的套接字
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while(true) {
    // 等待客戶端請求
    $request = $responder->recv();
    printf ("Received request: [%s]\n", $request);

    // 做一些“處理”
    sleep (1);

    // 應答World
    $responder->send("World");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

下面是客戶端的代碼:

hwclient: Hello World client in C

//
// Hello World 客戶端
// 連接REQ套接字至 tcp://localhost:5555
// 發送Hello給服務端,並接收World
//
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main (void)
{
    void *context = zmq_init (1);

    // 連接至服務端的套接字
    printf ("正在連接至hello world服務端...\n");
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5555");

    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++) {
        zmq_msg_t request;
        zmq_msg_init_size (&request, 5);
        memcpy (zmq_msg_data (&request), "Hello", 5);
        printf ("正在發送 Hello %d...\n", request_nbr);
        zmq_send (requester, &request, 0);
        zmq_msg_close (&request);

        zmq_msg_t reply;
        zmq_msg_init (&reply);
        zmq_recv (requester, &reply, 0);
        printf ("接收到 World %d\n", request_nbr);
        zmq_msg_close (&reply);
    }
    zmq_close (requester);
    zmq_term (context);
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

這看起來是否太簡單了?ZMQ就是這樣一個東西,你往里加點兒料就能制作出一枚無窮能量的原子彈,用它來拯救世界吧!

2

理論上你可以連接千萬個客戶端到這個服務端上,同時連接都沒問題,程序仍會運作得很好。你可以嘗試一下先打開客戶端,再打開服務端,可以看到程序仍然會正常工作,想想這意味着什么。

讓我簡單介紹一下這兩段程序到底做了什么。首先,他們創建了一個ZMQ上下文,然后是一個套接字。不要被這些陌生的名詞嚇到,后面我們都會講到。服務端將REP套接字綁定到5555端口上,並開始等待請求,發出應答,如此循環。客戶端則是發送請求並等待服務端的應答。

這些代碼背后其實發生了很多很多事情,但是程序員完全不必理會這些,只要知道這些代碼短小精悍,極少出錯,耐高壓。這種通信模式我們稱之為請求-應答模式,是ZMQ最直接的一種應用。你可以拿它和RPC及經典的C/S模型做類比。

關於字符串

ZMQ不會關心發送消息的內容,只要知道它所包含的字節數。所以,程序員需要做一些工作,保證對方節點能夠正確讀取這些消息。如何將一個對象或復雜數據類型轉換成ZMQ可以發送的消息,這有類似Protocol Buffers的序列化軟件可以做到。但對於字符串,你也是需要有所注意的。

在C語言中,字符串都以一個空字符結尾,你可以像這樣發送一個完整的字符串:

zmq_msg_init_data (&request, "Hello", 6, NULL, NULL);
  • 1

但是,如果你用其他語言發送這個字符串,很可能不會包含這個空字節,如你使用Python發送:

socket.send ("Hello")
  • 1

實際發送的消息是:

3

如果你從C語言中讀取該消息,你會讀到一個類似於字符串的內容,甚至它可能就是一個字符串(第六位在內存中正好是一個空字符),但是這並不合適。這樣一來,客戶端和服務端對字符串的定義就不統一了,你會得到一些奇怪的結果。

當你用C語言從ZMQ中獲取字符串,你不能夠相信該字符串有一個正確的結尾。因此,當你在接受字符串時,應該建立多一個字節的緩沖區,將字符串放進去,並添加結尾。

所以,讓我們做如下假設:ZMQ的字符串是有長度的,且傳送時不加結束符。在最簡單的情況下,ZMQ字符串和ZMQ消息中的一幀是等價的,就如上圖所展現的,由一個長度屬性和一串字節表示。

下面這個功能函數會幫助我們在C語言中正確的接受字符串消息:

// 從ZMQ套接字中接收字符串,並轉換為C語言的字符串
static char *
s_recv (void *socket) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_recv (socket, &message, 0);
    int size = zmq_msg_size (&message);
    char *string = malloc (size + 1);
    memcpy (string, zmq_msg_data (&message), size);
    zmq_msg_close (&message);
    string [size] = 0;
    return (string);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

這段代碼我們會在日后的示例中使用,我們可以順手寫一個s_send()方法,並打包成一個.h文件供我們使用。

這就誕生了zhelpers.h,一個供C語言使用的ZMQ功能函數庫。它的源代碼比較長,而且只對C語言程序員有用,你可以在閑暇時看一看

獲取版本號

ZMQ目前有多個版本,而且仍在持續更新。如果你遇到了問題,也許這在下一個版本中已經解決了。想知道目前的ZMQ版本,你可以在程序中運行如下:

version: ØMQ version reporting in C

//
// 返回當前ZMQ的版本號
//
#include "zhelpers.h"

int main (void)
{
    int major, minor, patch;
    zmq_version (&major, &minor, &patch);
    printf ("當前ZMQ版本號為 %d.%d.%d\n", major, minor, patch);

    return EXIT_SUCCESS;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM