ZeroMQ_09 ZMQ多線程編程


ZMQ多線程編程基本規則:

  • 不要在不同的線程之間訪問同一份數據,如果要用到傳統編程中的互斥機制,那就有違ZMQ的思想了。唯一的例外是ZMQ上下文對象,它是線程安全的。

  • 必須為進程創建ZMQ上下文,並將其傳遞給所有你需要使用inproc協議進行通信的線程;

  • 你可以將線程作為單獨的任務來對待,使用自己的上下文,但是這些線程之間就不能使用inproc協議進行通信了。這樣做的好處是可以在日后方便地將程序拆分為不同的進程來運行。

  • 不要在不同的線程之間傳遞套接字對象,這些對象不是線程安全的。從技術上來說,你是可以這樣做的,但是會用到互斥和鎖的機制,這會讓你的應用程序變得緩慢和脆弱。唯一合理的情形是,在某些語言的ZMQ類庫內部,需要使用垃圾回收機制,這時可能會進行套接字對象的傳遞。

下面我們看一個多線程的Hello word服務

Client:

#include "../zhelpers.h"
#include <stdio.h>

int main (void) 
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to server
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5555");

    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++) {
        char strDst[256] = {0};
        snprintf(strDst,256,"Hello %d",request_nbr);
        s_send (requester, strDst);
        char *string = s_recv (requester);
        printf ("Received reply %d [%s]\n", request_nbr, string);
        free (string);
    }
    zmq_close (requester);
    zmq_ctx_destroy (context);
    return 0;
}

Server:

#include "../zhelpers.h"
#include <pthread.h>
#include <unistd.h>

#include <sys/syscall.h>

#define gettidv1() syscall(__NR_gettid)
static void *
worker_routine (void *context) {
    //  Socket to talk to dispatcher
    void *receiver = zmq_socket (context, ZMQ_REP);
    zmq_connect (receiver, "inproc://workers");

    while (1) {
        char *string = s_recv (receiver);
        printf ("[%ld]Received request: [%s]\n",(long int)gettidv1(), string);
        free (string);
        //  Do some 'work'
        sleep (1);
        //  Send reply back to client
        s_send (receiver, "World");
    }
    zmq_close (receiver);
    return NULL;
}

int main (void)
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to clients
    void *clients = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (clients, "tcp://*:5555");

    //  Socket to talk to workers
    void *workers = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (workers, "inproc://workers");

    //  Launch pool of worker threads
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }
    //  Connect work threads to client threads via a queue proxy
    zmq_proxy (clients, workers, NULL);

    //  We never get here, but clean up anyhow
    zmq_close (clients);
    zmq_close (workers);
    zmq_ctx_destroy (context);
    return 0;
}

Out:

// client
Received reply 0 [World]
Received reply 1 [World]
Received reply 2 [World]
Received reply 3 [World]
Received reply 4 [World]
Received reply 5 [World]
Received reply 6 [World]
Received reply 7 [World]
Received reply 8 [World]
Received reply 9 [World]

// server
[17403]Received request: [Hello 0]
[17402]Received request: [Hello 1]
[17404]Received request: [Hello 2]
[17406]Received request: [Hello 3]
[17405]Received request: [Hello 4]
[17403]Received request: [Hello 5]
[17402]Received request: [Hello 6]
[17404]Received request: [Hello 7]
[17406]Received request: [Hello 8]
[17405]Received request: [Hello 9]

代碼還是簡單的。

  • 服務端啟動一組worker線程,每個worker創建一個REP套接字,並處理收到的請求。worker線程就像是一個單線程的服務,唯一的區別是使用了inproc而非tcp協議,以及綁定-連接的方向調換了。
  • 服務端創建ROUTER套接字用以和client通信,因此提供了一個TCP協議的外部接口。
  • 服務端創建DEALER套接字用以和worker通信,使用了內部接口(inproc)。
  • 服務端啟動了QUEUE內部裝置,連接兩個端點上的套接字。QUEUE裝置會將收到的請求分發給連接上的worker,並將應答路由給請求方。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM