/**************************************************************
技術博客
http://www.cnblogs.com/itdef/
技術交流群
群號碼:324164944
歡迎c c++ windows驅動愛好者 服務器程序員溝通交流
**************************************************************/

#include "stdafx.h" #include "zhelpers.hpp" #include <thread> void worker_thread(void *arg) { zmq::context_t context(1); zmq::socket_t worker(context, ZMQ_REQ); s_set_id(worker, (intptr_t)arg); worker.connect("tcp://localhost:5671"); // "ipc" doesn't yet work on windows. int total = 0; while (1) { // Tell the broker we're ready for work s_send(worker, "Hi Boss"); // Get workload from broker, until finished std::string workload = s_recv(worker); if ("Fired!" == workload) { std::cout << "Processed: " << total << " tasks" << std::endl; break; } total++; // Do some random work s_sleep(within(500) + 1); } return; } int main() { zmq::context_t context(1); zmq::socket_t broker(context, ZMQ_ROUTER); broker.bind("tcp://*:5671"); // "ipc" doesn't yet work on windows. const int NBR_WORKERS = 10; std::thread workers[NBR_WORKERS]; for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { workers[worker_nbr]= std::thread( worker_thread, (void *)(intptr_t)worker_nbr); } // Run for five seconds and then tell workers to end int64_t end_time = s_clock() + 5000; int workers_fired = 0; while (1) { // Next message gives us least recently used worker std::string identity = s_recv(broker); s_recv(broker); // Envelope delimiter s_recv(broker); // Response from worker s_sendmore(broker, identity); s_sendmore(broker, ""); // Encourage workers until it's time to fire them if (s_clock() < end_time) s_send(broker, "Work harder"); else { s_send(broker, "Fired!"); if (++workers_fired == NBR_WORKERS) break; } } for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { workers[worker_nbr].join(); } return 0; }
ROUTER 與 DEALER通訊

// rtdealer_cpp.cpp : 定義控制台應用程序的入口點。 // #include "stdafx.h" // // Custom routing Router to Dealer // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> #include "zhelpers.hpp" #include <thread> static void * worker_task(void *args) { zmq::context_t context(1); zmq::socket_t worker(context, ZMQ_DEALER); #if (defined (WIN32)) s_set_id(worker, (intptr_t)args); #else s_set_id(worker); // Set a printable identity #endif worker.connect("tcp://localhost:5671"); int total = 0; while (1) { // Tell the broker we're ready for work s_sendmore(worker, ""); s_send(worker, "Hi Boss"); // Get workload from broker, until finished s_recv(worker); // Envelope delimiter std::string workload = s_recv(worker); // .skip if ("Fired!" == workload) { std::cout << "Completed: " << total << " tasks" << std::endl; break; } total++; // Do some random work s_sleep(within(500) + 1); } return NULL; } // .split main task // While this example runs in a single process, that is just to make // it easier to start and stop the example. Each thread has its own // context and conceptually acts as a separate process. int main() { zmq::context_t context(1); zmq::socket_t broker(context, ZMQ_ROUTER); broker.bind("tcp://*:5671"); srandom((unsigned)time(NULL)); const int NBR_WORKERS = 10; std::thread workers[NBR_WORKERS]; int worker_nbr = 0; for (; worker_nbr < NBR_WORKERS; ++worker_nbr) { workers[worker_nbr] = std::thread( worker_task, (void *)(intptr_t)worker_nbr); } // Run for five seconds and then tell workers to end int64_t end_time = s_clock() + 5000; int workers_fired = 0; while (1) { // Next message gives us least recently used worker std::string identity = s_recv(broker); { s_recv(broker); // Envelope delimiter s_recv(broker); // Response from worker } s_sendmore(broker, identity); s_sendmore(broker, ""); // Encourage workers until it's time to fire them if (s_clock() < end_time) s_send(broker, "Work harder"); else { s_send(broker, "Fired!"); if (++workers_fired == NBR_WORKERS) break; } } for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) { workers[worker_nbr].join(); } return 0; }
DEALER REQ區別在於 REQ模式socket發送信息時候會自動在信息前添加“”空白信息作為隔離
DEALER模式socket需要手動添加 s_sendmore(socket, "");
而ROUTER 在接受信息前會受到來源socket的地址信息 而發送時會將地址信息添加到socket的信息之前 同時還要在在信息前添加“”空白信息作為隔離
s_sendmore(broker, identity);
s_sendmore(broker, "");
均衡負載 此段代碼比較復雜 理解較難
我將代碼設置為單線程 然后調試 糾正了一些我理解的router模式
稍后畫圖
// lbbroker_cpp.cpp : 定義控制台應用程序的入口點。 // #include "stdafx.h" #include "zhelpers.hpp" #include <thread> #include <queue> void client_thread(void* arg) { zmq::context_t context(1); zmq::socket_t client(context, ZMQ_REQ); s_set_id(client, (intptr_t)1); client.connect("tcp://localhost:5672"); s_send(client, "HELLO"); std::string reply = s_recv(client); std::cout << "Client: " << reply << std::endl; return; } void worker_thread(void* arg){ zmq::context_t context(1); zmq::socket_t worker(context, ZMQ_REQ); s_set_id(worker, (intptr_t)9); worker.connect("tcp://localhost:5673"); s_send(worker, "READY"); while (1) { std::string address = s_recv(worker); { std::string empty = s_recv(worker); assert(empty.size() == 0); } std::string request = s_recv(worker); std::cout << "Worker: " << request << std::endl; s_sendmore(worker, address); s_sendmore(worker, ""); s_send(worker, "OK"); } return; } int main() { zmq::context_t context(1); zmq::socket_t frontend(context, ZMQ_ROUTER); zmq::socket_t backend(context, ZMQ_ROUTER); frontend.bind("tcp://*:5672"); backend.bind("tcp://*:5673"); int client_nbr; std::thread client[1]; for (client_nbr = 0; client_nbr < 1; client_nbr++) { client[client_nbr] = std::thread(client_thread,(void *)(intptr_t)client_nbr); } int worker_nbr; std::thread worker[1]; for (worker_nbr = 0; worker_nbr < 1; worker_nbr++) { worker[worker_nbr] = std::thread(worker_thread, (void *)(intptr_t)worker_nbr); } std::queue<std::string> worker_queue; while (1) { zmq::pollitem_t items[] = { {backend,0,ZMQ_POLLIN,0}, {frontend,0,ZMQ_POLLIN,0} }; if (worker_queue.size()) zmq::poll(&items[0], 2, -1); else zmq::poll(&items[0], 1, -1); if (items[0].revents & ZMQ_POLLIN) { // Queue worker address for LRU routing worker_queue.push(s_recv(backend)); { // Second frame is empty std::string empty = s_recv(backend); assert(empty.size() == 0); } // Third frame is READY or else a client reply address std::string client_addr = s_recv(backend); // If client reply, send rest back to frontend if (client_addr.compare("READY") != 0) { { std::string empty = s_recv(backend); assert(empty.size() == 0); } std::string reply = s_recv(backend); s_sendmore(frontend, client_addr); s_sendmore(frontend, ""); s_send(frontend, reply); if (--client_nbr == 0) break; } } if (items[1].revents & ZMQ_POLLIN) { // Now get next client request, route to LRU worker // Client request is [address][empty][request] std::string client_addr = s_recv(frontend); { std::string empty = s_recv(frontend); assert(empty.size() == 0); } std::string request = s_recv(frontend); std::string worker_addr = worker_queue.front();//worker_queue [0]; worker_queue.pop(); s_sendmore(backend, worker_addr); s_sendmore(backend, ""); s_sendmore(backend, client_addr); s_sendmore(backend, ""); s_send(backend, request); } } for (client_nbr = 0; client_nbr < 1; client_nbr++) { client[client_nbr].join(); } for (worker_nbr = 0; worker_nbr < 1; worker_nbr++) { worker[worker_nbr].join(); } return 0; }
此處圖僅為個人領悟 不能保證完全正確 請謹慎參考
開始看官方的圖示 我以為上述代碼是如官方所示
但是對於router來說
connect到router的客戶端或者工作者(client worker)
與router直接send到router的信息處理上是有區別的
下圖用直線和箭頭來區分
我將代碼設置為單線程 然后調試 得出流程如圖:
由於新建線程的隨機性
步驟1 2 是的先后次序是隨機的
1 client 發送信息到frontend router
信息格式為
"0001"
""
"HELLO"
2 worker發送信息到backend router
信息格式為
"0009"
""
"READY"
3函數主體進入POLL循環
根據代碼
if (worker_queue.size())
zmq::poll(&items[0], 2, -1);
else
zmq::poll(&items[0], 1, -1);
由於空閑worker隊列中暫時無記錄
所以僅僅對backend 進行poll輪詢
首先接受信息記錄的通訊ID 為9 並push進worker隊列
worker_queue.push(s_recv(backend));
然后接受空字節分隔符 在接受發送的正文“READY”
接收信息格式如下:
"0009"
""
"READY"
由於此次接受的正文是“READY” 根據代碼不進入發送流程
if (client_addr.compare("READY") != 0)
步驟3結束
4 主體代碼進入第二次POLL輪詢
frontend 接收信息如下
信息格式為
"0001"
""
"HELLO"
彈出之前接收空閑worker的ID
std::string worker_addr = worker_queue.front();
然后向backend發送信息格式如下:
"0009"
""
"0001"
""
"HELLO"
5 backend接收信息后直接路由到ID為0009的worker
6 worker接收到信息格式為
"0001"
""
"HELLO"
然后發送信息格式:
"0001"
""
"OK"
7 backend接收到步驟6的信息:
將步驟6的worker的id 0009 push僅worker隊列
worker_queue.push(s_recv(backend));
此次接受的信息格式為
"0009"
""
"001"
""
"OK"
由於此次接受正文不是"READY"
進入發送模式 發送信息到frontend
發送信息格式為
"0001"
""
"OK"
8 frontend直接路由此信息到ID 0001的client
最后client接收到的信息格式為
"OK"