ZMQ多線程編程基本規則:
-
不要在不同的線程之間訪問同一份數據,如果要用到傳統編程中的互斥機制,那就有違ZMQ的思想了。唯一的例外是ZMQ上下文對象,它是線程安全的。
-
必須為進程創建ZMQ上下文,並將其傳遞給所有你需要使用inproc協議進行通信的線程;
-
你可以將線程作為單獨的任務來對待,使用自己的上下文,但是這些線程之間就不能使用inproc協議進行通信了。這樣做的好處是可以在日后方便地將程序拆分為不同的進程來運行。
-
不要在不同的線程之間傳遞套接字對象,這些對象不是線程安全的。從技術上來說,你是可以這樣做的,但是會用到互斥和鎖的機制,這會讓你的應用程序變得緩慢和脆弱。唯一合理的情形是,在某些語言的ZMQ類庫內部,需要使用垃圾回收機制,這時可能會進行套接字對象的傳遞。
下面我們看一個多線程的Hello word服務
Client:
#include "../zhelpers.h" #include <stdio.h> int main (void) { void *context = zmq_ctx_new (); // Socket to talk to server void *requester = zmq_socket (context, ZMQ_REQ); zmq_connect (requester, "tcp://localhost:5555"); int request_nbr; for (request_nbr = 0; request_nbr != 10; request_nbr++) { char strDst[256] = {0}; snprintf(strDst,256,"Hello %d",request_nbr); s_send (requester, strDst); char *string = s_recv (requester); printf ("Received reply %d [%s]\n", request_nbr, string); free (string); } zmq_close (requester); zmq_ctx_destroy (context); return 0; }
Server:
#include "../zhelpers.h" #include <pthread.h> #include <unistd.h> #include <sys/syscall.h> #define gettidv1() syscall(__NR_gettid) static void * worker_routine (void *context) { // Socket to talk to dispatcher void *receiver = zmq_socket (context, ZMQ_REP); zmq_connect (receiver, "inproc://workers"); while (1) { char *string = s_recv (receiver); printf ("[%ld]Received request: [%s]\n",(long int)gettidv1(), string); free (string); // Do some 'work' sleep (1); // Send reply back to client s_send (receiver, "World"); } zmq_close (receiver); return NULL; } int main (void) { void *context = zmq_ctx_new (); // Socket to talk to clients void *clients = zmq_socket (context, ZMQ_ROUTER); zmq_bind (clients, "tcp://*:5555"); // Socket to talk to workers void *workers = zmq_socket (context, ZMQ_DEALER); zmq_bind (workers, "inproc://workers"); // Launch pool of worker threads int thread_nbr; for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { pthread_t worker; pthread_create (&worker, NULL, worker_routine, context); } // Connect work threads to client threads via a queue proxy zmq_proxy (clients, workers, NULL); // We never get here, but clean up anyhow zmq_close (clients); zmq_close (workers); zmq_ctx_destroy (context); return 0; }
Out:
// client Received reply 0 [World] Received reply 1 [World] Received reply 2 [World] Received reply 3 [World] Received reply 4 [World] Received reply 5 [World] Received reply 6 [World] Received reply 7 [World] Received reply 8 [World] Received reply 9 [World] // server [17403]Received request: [Hello 0] [17402]Received request: [Hello 1] [17404]Received request: [Hello 2] [17406]Received request: [Hello 3] [17405]Received request: [Hello 4] [17403]Received request: [Hello 5] [17402]Received request: [Hello 6] [17404]Received request: [Hello 7] [17406]Received request: [Hello 8] [17405]Received request: [Hello 9]
代碼還是簡單的。
- 服務端啟動一組worker線程,每個worker創建一個REP套接字,並處理收到的請求。worker線程就像是一個單線程的服務,唯一的區別是使用了inproc而非tcp協議,以及綁定-連接的方向調換了。
- 服務端創建ROUTER套接字用以和client通信,因此提供了一個TCP協議的外部接口。
- 服務端創建DEALER套接字用以和worker通信,使用了內部接口(inproc)。
- 服務端啟動了QUEUE內部裝置,連接兩個端點上的套接字。QUEUE裝置會將收到的請求分發給連接上的worker,並將應答路由給請求方。