ZeroMQ系列 之NetMQ
一:zeromq簡介
二:NetMQ 請求響應模式 Request-Reply
三:NetMQ 發布訂閱模式 Publisher-Subscriber
四:NetMQ 推拉模式 Push-Pull
NetMQ 推拉模式 Push-Pull
1:簡介
推拉模式,也叫 管道模式”Parallel Pipeline”。想象一下這樣的場景,如果需要統計各個機器的日志,我們需要將統計任務分發到各個節點機器上,最后收集統計結果,做一個匯總。PipeLine比較適合於這種場景,他的結構圖,如圖1所示
圖1 官方圖
Ventilator,在管道中生產任務;
Worker ,處理任務;
Sink,收集Worker處理的結果。
2:案例
下面有三個對象Ventilator 消息分發者,Worker 消息處理者,Sink 接受Worker處理消息后返回的結果,耗時的計算處理工作是交給Worker的,如果開多個Worker.exe,可以提升處理速度,Worker的最終目的是分布式計算,部署到多台PC上面,把計算工作交給他們去做(在分布式爬蟲上面,每個Worker相當於一個爬蟲)。
下面案例結構,如圖2所示:
圖2
源碼:
Ventilator
static void Main(string[] args)
{
// Task Ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
Console.WriteLine("====== VENTILATOR ======");
//socket to send messages on
using (NetMQSocket sender = new DealerSocket())
{
sender.Bind("tcp://*:5557");
using (var sink = new DealerSocket())
{
sink.Connect("tcp://localhost:5558");
Console.WriteLine("Press enter when worker are ready");
Console.ReadLine();
//the first message it "0" and signals start of batch
//see the Sink.csproj Program.cs file for where this is used
Console.WriteLine("Sending start of batch to Sink");
sink.SendFrame("0");
Console.WriteLine("Sending tasks to workers");
//initialise random number generator
Random rand = new Random(0);
//expected costs in Ms
int totalMs = 0;
//send 100 tasks (workload for tasks, is just some random sleep time that
//the workers can perform, in real life each work would do more than sleep
for (int taskNumber = 0; taskNumber < 100; taskNumber++)
{
//Random workload from 1 to 100 msec
int workload = rand.Next(0, 100);
totalMs += workload;
Console.WriteLine("Workload : {0}", workload);
sender.SendFrame(workload.ToString());
}
Console.WriteLine("Total expected cost : {0} msec", totalMs);
Console.WriteLine("Press Enter to quit");
Console.ReadLine();
}
}
}
Worker
static void Main(string[] args)
{
// Task Worker
// Connects PULL socket to tcp://localhost:5557
// collects workload for socket from Ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to Sink via that socket
Console.WriteLine("====== WORKER ======");
//socket to receive messages on
using (var receiver = new DealerSocket())
{
receiver.Connect("tcp://localhost:5557");
//socket to send messages on
using (var sender = new DealerSocket())
{
sender.Connect("tcp://localhost:5558");
//process tasks forever
while (true)
{
//workload from the vetilator is a simple delay
//to simulate some work being done, see
//Ventilator.csproj Proram.cs for the workload sent
//In real life some more meaningful work would be done
string workload = receiver.ReceiveString();
//simulate some work being done
Thread.Sleep(int.Parse(workload));
//send results to sink, sink just needs to know worker
//is done, message content is not important, just the precence of
//a message means worker is done.
//See Sink.csproj Proram.cs
Console.WriteLine("Sending to Sink");
sender.SendFrame(string.Empty);
}
}
}
}
Sink
static void Main(string[] args)
{
// Task Sink
// Bindd PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
Console.WriteLine("====== SINK ======");
//socket to receive messages on
using (var receiver = new DealerSocket())
{
receiver.Bind("tcp://localhost:5558");
//wait for start of batch (see Ventilator.csproj Program.cs)
var startOfBatchTrigger = receiver.ReceiveString();
Console.WriteLine("Seen start of batch");
//Start our clock now
Stopwatch watch = new Stopwatch();
watch.Start();
for (int taskNumber = 0; taskNumber < 100; taskNumber++)
{
var workerDoneTrigger = receiver.ReceiveString();
if (taskNumber % 10 == 0)
{
Console.Write(":");
}
else
{
Console.Write(".");
}
}
watch.Stop();
//Calculate and report duration of batch
Console.WriteLine();
Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
Console.ReadLine();
}
}
效果圖:
處理一個Ventilator任務,可以使用數量不同的worker:
一個worker:
在我本地計算機上,耗時 5566 mesc
二個worker:
在我本地計算機上,耗時2917 mesc
三個worker:
在我本地計算機上,耗時2031 msec
3:總結
- 使用的NetMQ版本是3.3.3.1,實例化DealerSocket,來創建socket。
- Ventilator分發工作到不同的Worker,實現負載均衡。
- Ventilator和Sink是靜態部分,Worker是動態的。開啟更多的Worker,理論上完成工作更快。
- Sink收集Worker處理的結果.