NetMQ(四): 推拉模式 Push-Pull


ZeroMQ系列 之NetMQ

一:zeromq簡介

二:NetMQ 請求響應模式 Request-Reply

三:NetMQ 發布訂閱模式 Publisher-Subscriber

四:NetMQ 推拉模式 Push-Pull

NetMQ 推拉模式 Push-Pull

1:簡介

推拉模式,也叫 管道模式”Parallel Pipeline”。想象一下這樣的場景,如果需要統計各個機器的日志,我們需要將統計任務分發到各個節點機器上,最后收集統計結果,做一個匯總。PipeLine比較適合於這種場景,他的結構圖,如圖1所示

圖1 官方圖

Ventilator,在管道中生產任務;
Worker ,處理任務;
Sink,收集Worker處理的結果。

2:案例

下面有三個對象Ventilator 消息分發者,Worker 消息處理者,Sink 接受Worker處理消息后返回的結果,耗時的計算處理工作是交給Worker的,如果開多個Worker.exe,可以提升處理速度,Worker的最終目的是分布式計算,部署到多台PC上面,把計算工作交給他們去做(在分布式爬蟲上面,每個Worker相當於一個爬蟲)。
下面案例結構,如圖2所示:


圖2

源碼:

Ventilator

    static void Main(string[] args)
    {
        // Task Ventilator
        // Binds PUSH socket to tcp://localhost:5557
        // Sends batch of tasks to workers via that socket
        Console.WriteLine("====== VENTILATOR ======");


        //socket to send messages on
        using (NetMQSocket sender = new DealerSocket())
        {
            sender.Bind("tcp://*:5557");

            using (var sink = new DealerSocket())
            {
                sink.Connect("tcp://localhost:5558");

                Console.WriteLine("Press enter when worker are ready");
                Console.ReadLine();

                //the first message it "0" and signals start of batch
                //see the Sink.csproj Program.cs file for where this is used
                Console.WriteLine("Sending start of batch to Sink");
                sink.SendFrame("0");

                Console.WriteLine("Sending tasks to workers");

                //initialise random number generator
                Random rand = new Random(0);

                //expected costs in Ms
                int totalMs = 0;

                //send 100 tasks (workload for tasks, is just some random sleep time that
                //the workers can perform, in real life each work would do more than sleep
                for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                {
                    //Random workload from 1 to 100 msec
                    int workload = rand.Next(0, 100);
                    totalMs += workload;
                    Console.WriteLine("Workload : {0}", workload);
                    sender.SendFrame(workload.ToString());
                }
                Console.WriteLine("Total expected cost : {0} msec", totalMs);
                Console.WriteLine("Press Enter to quit");
                Console.ReadLine();
            }
        }
    }  

Worker

    static void Main(string[] args)
    {
        // Task Worker
        // Connects PULL socket to tcp://localhost:5557
        // collects workload for socket from Ventilator via that socket
        // Connects PUSH socket to tcp://localhost:5558
        // Sends results to Sink via that socket
        Console.WriteLine("====== WORKER ======");


        //socket to receive messages on
        using (var receiver = new DealerSocket())
        {
            receiver.Connect("tcp://localhost:5557");

            //socket to send messages on
            using (var sender = new DealerSocket())
            {
                sender.Connect("tcp://localhost:5558");

                //process tasks forever
                while (true)
                {
                    //workload from the vetilator is a simple delay
                    //to simulate some work being done, see
                    //Ventilator.csproj Proram.cs for the workload sent
                    //In real life some more meaningful work would be done
                    string workload = receiver.ReceiveString();

                    //simulate some work being done
                    Thread.Sleep(int.Parse(workload));

                    //send results to sink, sink just needs to know worker
                    //is done, message content is not important, just the precence of
                    //a message means worker is done. 
                    //See Sink.csproj Proram.cs 
                    Console.WriteLine("Sending to Sink");
                    sender.SendFrame(string.Empty);
                }
            }
        }
    }

Sink

    static void Main(string[] args)
    {
        // Task Sink
        // Bindd PULL socket to tcp://localhost:5558
        // Collects results from workers via that socket
        Console.WriteLine("====== SINK ======");

        //socket to receive messages on
        using (var receiver = new DealerSocket())
        {
            receiver.Bind("tcp://localhost:5558");

            //wait for start of batch (see Ventilator.csproj Program.cs)
            var startOfBatchTrigger = receiver.ReceiveString();
            Console.WriteLine("Seen start of batch");

            //Start our clock now
            Stopwatch watch = new Stopwatch();
            watch.Start();

            for (int taskNumber = 0; taskNumber < 100; taskNumber++)
            {
                var workerDoneTrigger = receiver.ReceiveString();
                if (taskNumber % 10 == 0)
                {
                    Console.Write(":");
                }
                else
                {
                    Console.Write(".");
                }
            }
            watch.Stop();
            //Calculate and report duration of batch
            Console.WriteLine();
            Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
            Console.ReadLine();
        }
    }  

效果圖:

處理一個Ventilator任務,可以使用數量不同的worker:

一個worker:
在我本地計算機上,耗時 5566 mesc

二個worker:
在我本地計算機上,耗時2917 mesc

三個worker:
在我本地計算機上,耗時2031 msec

3:總結

  1. 使用的NetMQ版本是3.3.3.1,實例化DealerSocket,來創建socket。
  2. Ventilator分發工作到不同的Worker,實現負載均衡。
  3. Ventilator和Sink是靜態部分,Worker是動態的。開啟更多的Worker,理論上完成工作更快。
  4. Sink收集Worker處理的結果.

4:下載

NetMQ3.3.3.1例子
NetMQ3.3.2.2例子


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM