zeromq學習記錄(八)負載均衡 附ZMQ_ROUTER的流程分析


/**************************************************************
技術博客
http://www.cnblogs.com/itdef/
 
技術交流群
群號碼:324164944
 
歡迎c c++ windows驅動愛好者 服務器程序員溝通交流
**************************************************************/
ROUTER 與 REQ通訊
#include "stdafx.h"
#include "zhelpers.hpp"
#include <thread>

void worker_thread(void *arg) {
    zmq::context_t context(1);
    zmq::socket_t worker(context, ZMQ_REQ);

    s_set_id(worker, (intptr_t)arg);
    worker.connect("tcp://localhost:5671"); // "ipc" doesn't yet work on windows.


    int total = 0;
    while (1) {
        //  Tell the broker we're ready for work
        s_send(worker, "Hi Boss");

        //  Get workload from broker, until finished
        std::string workload = s_recv(worker);
        if ("Fired!" == workload) {
            std::cout << "Processed: " << total << " tasks" << std::endl;
            break;
        }
        total++;

        //  Do some random work
        s_sleep(within(500) + 1);
    }
    return;
}

int main() {
    zmq::context_t context(1);
    zmq::socket_t broker(context, ZMQ_ROUTER);

    broker.bind("tcp://*:5671"); // "ipc" doesn't yet work on windows.

    const int NBR_WORKERS = 10;
    std::thread workers[NBR_WORKERS];
    for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        workers[worker_nbr]= std::thread( worker_thread, (void *)(intptr_t)worker_nbr);
    }

    //  Run for five seconds and then tell workers to end
    int64_t end_time = s_clock() + 5000;
    int workers_fired = 0;
    while (1) {
        //  Next message gives us least recently used worker
        std::string identity = s_recv(broker);
        s_recv(broker);     //  Envelope delimiter
        s_recv(broker);     //  Response from worker       

        s_sendmore(broker, identity);
        s_sendmore(broker, "");
        //  Encourage workers until it's time to fire them
        if (s_clock() < end_time)
            s_send(broker, "Work harder");
        else {
            s_send(broker, "Fired!");
            if (++workers_fired == NBR_WORKERS)
                break;
        }
    }

    for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        workers[worker_nbr].join();
    }
    return 0;
}
View Code

 ROUTER 與 DEALER通訊

// rtdealer_cpp.cpp : 定義控制台應用程序的入口點。
//

#include "stdafx.h"
//
//  Custom routing Router to Dealer
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>

#include "zhelpers.hpp"
#include <thread>

static void *
worker_task(void *args)
{
    zmq::context_t context(1);
    zmq::socket_t worker(context, ZMQ_DEALER);

#if (defined (WIN32))
    s_set_id(worker, (intptr_t)args);
#else
    s_set_id(worker);          //  Set a printable identity
#endif

    worker.connect("tcp://localhost:5671");

    int total = 0;
    while (1) {
        //  Tell the broker we're ready for work
        s_sendmore(worker, "");
        s_send(worker, "Hi Boss");

        //  Get workload from broker, until finished
        s_recv(worker);     //  Envelope delimiter
        std::string workload = s_recv(worker);
        //  .skip
        if ("Fired!" == workload) {
            std::cout << "Completed: " << total << " tasks" << std::endl;
            break;
        }
        total++;

        //  Do some random work
        s_sleep(within(500) + 1);
    }

    return NULL;
}

//  .split main task
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. Each thread has its own
//  context and conceptually acts as a separate process.
int main() {
    zmq::context_t context(1);
    zmq::socket_t broker(context, ZMQ_ROUTER);

    broker.bind("tcp://*:5671");
    srandom((unsigned)time(NULL));

    const int NBR_WORKERS = 10;
    std::thread workers[NBR_WORKERS];
    int worker_nbr = 0;
    for (; worker_nbr < NBR_WORKERS; ++worker_nbr) {
        workers[worker_nbr] = std::thread( worker_task, (void *)(intptr_t)worker_nbr);
    }


    //  Run for five seconds and then tell workers to end
    int64_t end_time = s_clock() + 5000;
    int workers_fired = 0;
    while (1) {
        //  Next message gives us least recently used worker
        std::string identity = s_recv(broker);
        {
            s_recv(broker);     //  Envelope delimiter
            s_recv(broker);     //  Response from worker
        }

        s_sendmore(broker, identity);
        s_sendmore(broker, "");

        //  Encourage workers until it's time to fire them
        if (s_clock() < end_time)
            s_send(broker, "Work harder");
        else {
            s_send(broker, "Fired!");
            if (++workers_fired == NBR_WORKERS)
                break;
        }
    }

    for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) {
        workers[worker_nbr].join();
    }

    return 0;
}
View Code

DEALER REQ區別在於 REQ模式socket發送信息時候會自動在信息前添加“”空白信息作為隔離 

DEALER模式socket需要手動添加 s_sendmore(socket, "");

 

 

而ROUTER 在接受信息前會受到來源socket的地址信息 而發送時會將地址信息添加到socket的信息之前 同時還要在在信息前添加“”空白信息作為隔離 

s_sendmore(broker, identity);
s_sendmore(broker, "");

 

均衡負載 此段代碼比較復雜 理解較難 

我將代碼設置為單線程 然后調試 糾正了一些我理解的router模式

稍后畫圖

// lbbroker_cpp.cpp : 定義控制台應用程序的入口點。
//

#include "stdafx.h"
#include "zhelpers.hpp"
#include <thread>
#include <queue>

void client_thread(void* arg) {
    zmq::context_t context(1);
    zmq::socket_t client(context, ZMQ_REQ);

    s_set_id(client, (intptr_t)1);
    client.connect("tcp://localhost:5672");

    s_send(client, "HELLO");
    std::string reply = s_recv(client);
    std::cout << "Client: " << reply << std::endl;
    return;
}


void worker_thread(void* arg){
    zmq::context_t context(1);
    zmq::socket_t worker(context, ZMQ_REQ);

    s_set_id(worker, (intptr_t)9);
    worker.connect("tcp://localhost:5673");

    s_send(worker, "READY");
    while (1) {
        std::string address = s_recv(worker);
        {
            std::string empty = s_recv(worker);
            assert(empty.size() == 0);
        }
        std::string request = s_recv(worker);
        std::cout << "Worker: " << request << std::endl;
    
        s_sendmore(worker, address);
        s_sendmore(worker, "");
        s_send(worker, "OK");
    }
    return;
}


int main()
{
    zmq::context_t context(1);
    zmq::socket_t frontend(context, ZMQ_ROUTER);
    zmq::socket_t backend(context, ZMQ_ROUTER);

    frontend.bind("tcp://*:5672");
    backend.bind("tcp://*:5673");

    int client_nbr;
    std::thread client[1];
    for (client_nbr = 0; client_nbr < 1; client_nbr++)
    {
        client[client_nbr] = std::thread(client_thread,(void *)(intptr_t)client_nbr);
    }

    int worker_nbr;
    std::thread worker[1];
    for (worker_nbr = 0; worker_nbr < 1; worker_nbr++)
    {
        worker[worker_nbr] = std::thread(worker_thread, (void *)(intptr_t)worker_nbr);
    }

    std::queue<std::string> worker_queue;

    while (1)
    {
        zmq::pollitem_t items[] = {
            {backend,0,ZMQ_POLLIN,0},
            {frontend,0,ZMQ_POLLIN,0}
        };
        if (worker_queue.size())
            zmq::poll(&items[0], 2, -1);
        else
            zmq::poll(&items[0], 1, -1);

        if (items[0].revents & ZMQ_POLLIN) {

            //  Queue worker address for LRU routing
            worker_queue.push(s_recv(backend));

            {
                //  Second frame is empty
                std::string empty = s_recv(backend);
                assert(empty.size() == 0);
            }

            //  Third frame is READY or else a client reply address
            std::string client_addr = s_recv(backend);

            //  If client reply, send rest back to frontend
            if (client_addr.compare("READY") != 0) {

                {
                    std::string empty = s_recv(backend);
                    assert(empty.size() == 0);
                }

                std::string reply = s_recv(backend);
                s_sendmore(frontend, client_addr);
                s_sendmore(frontend, "");
                s_send(frontend, reply);

                if (--client_nbr == 0)
                    break;
            }
        }
        if (items[1].revents & ZMQ_POLLIN) {

            //  Now get next client request, route to LRU worker
            //  Client request is [address][empty][request]
            std::string client_addr = s_recv(frontend);

            {
                std::string empty = s_recv(frontend);
                assert(empty.size() == 0);
            }

            std::string request = s_recv(frontend);

            std::string worker_addr = worker_queue.front();//worker_queue [0];
            worker_queue.pop();

            s_sendmore(backend, worker_addr);
            s_sendmore(backend, "");
            s_sendmore(backend, client_addr);
            s_sendmore(backend, "");
            s_send(backend, request);
        }
    }

    for (client_nbr = 0; client_nbr < 1; client_nbr++)
    {
        client[client_nbr].join();
    }


    for (worker_nbr = 0; worker_nbr < 1; worker_nbr++)
    {
        worker[worker_nbr].join();
    }

    return 0;
}

 此處圖僅為個人領悟 不能保證完全正確 請謹慎參考

開始看官方的圖示 我以為上述代碼是如官方所示

但是對於router來說

connect到router的客戶端或者工作者(client worker)

與router直接send到router的信息處理上是有區別的

下圖用直線和箭頭來區分

 

 

我將代碼設置為單線程 然后調試 得出流程如圖:

由於新建線程的隨機性

步驟1 2 是的先后次序是隨機的

1 client 發送信息到frontend router

信息格式為

"0001"

""

"HELLO"

 

2 worker發送信息到backend router

信息格式為

"0009"

""

"READY"

 

3函數主體進入POLL循環

根據代碼

if (worker_queue.size())
zmq::poll(&items[0], 2, -1);
else
zmq::poll(&items[0], 1, -1);

由於空閑worker隊列中暫時無記錄

所以僅僅對backend 進行poll輪詢

首先接受信息記錄的通訊ID 為9 並push進worker隊列

worker_queue.push(s_recv(backend));

然后接受空字節分隔符 在接受發送的正文“READY”

接收信息格式如下:

"0009"

""

"READY"

由於此次接受的正文是“READY” 根據代碼不進入發送流程

if (client_addr.compare("READY") != 0) 

步驟3結束

 

4 主體代碼進入第二次POLL輪詢

frontend 接收信息如下

信息格式為

"0001"

""

"HELLO"

彈出之前接收空閑worker的ID

std::string worker_addr = worker_queue.front();

然后向backend發送信息格式如下:

"0009"

""

"0001"

""

"HELLO"

 

5 backend接收信息后直接路由到ID為0009的worker

 

6 worker接收到信息格式為

"0001"

""

"HELLO"

然后發送信息格式:

"0001"

""

"OK"

 

7 backend接收到步驟6的信息:

將步驟6的worker的id 0009 push僅worker隊列

worker_queue.push(s_recv(backend));

此次接受的信息格式為

"0009"

""

"001"

""

"OK"

由於此次接受正文不是"READY"

進入發送模式 發送信息到frontend

發送信息格式為

"0001"

""

"OK"

 

8 frontend直接路由此信息到ID 0001的client

最后client接收到的信息格式為

"OK"


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM