ZeroMQ是對Socket的封裝,通過組合多種類型的結點可以實現復雜的網絡通信模式。而且ZeroMQ設計簡單,可以有多種平台實現,對於跨平台項目是一個福音。
clrzmq是ZeroMQ的C#語言的實現。當我在使用clrzmq時,發現ZeroMQ的server端,即REP,在接收到消息后,回復消息,但是在回復消息之前不能再接收消息。用偽代碼表示就是
while(true) { byte[] receiveData = new byte[1024]; receive(receiveData); //do some work byte[] responseData = new byte[1024]; send(reponseData); }
既然ZeroMQ的名稱里含有MQ(Message Queue),就應該有隊列的功能啊?在ZeroMQ的官方手冊中介紹了router-dealer模式:
router可以作為路由器,起到緩存消息的作用,如果服務端空閑,會把消息通過dealer發送給服務端。
這篇文章使用C++實現了ZeroMQ消息隊列。
幸運的是,clrzmq對router-dealer模式進行了封裝,可以使用QueueDevice類實現相同的效果。
在我的例子中,我將router-dealer放在了服務端進程中,dealer和服務端的通信是縣城通信,交互圖如下:
tcp inproc inproc
connect ________________ connect
客戶端i -------------|router -------- dealer| -----------服務端
——————---------
queueDevice
客戶端代碼如下:
static void Main(string[] args) { string serverAddress = "tcp://localhost:5555"; // ZMQ Context and client socket using (ZmqContext context = ZmqContext.Create()) using (ZmqSocket client = context.CreateSocket(SocketType.REQ)) { client.Connect(serverAddress); string request = "Hello"; while(true)//for (int requestNum = 0; requestNum < 10; requestNum++) { string again = Console.ReadLine(); Console.WriteLine("Sending request..."); client.Send(again + request, Encoding.Unicode); string reply = client.Receive(Encoding.Unicode); Console.WriteLine("Received reply {0}: ", reply); } } }
服務端代碼:
class Program { static ZmqContext context = ZmqContext.Create(); static ManualResetEvent _deviceReady = new ManualResetEvent(false); //static ManualResetEvent _receiverReady = new ManualResetEvent(false); static void Main(string[] args) { startRouterDealer(); // ZMQ Context, server socket _deviceReady.WaitOne(); using (ZmqSocket server = context.CreateSocket(SocketType.REP)) { //server.Bind("inproc://backend"); server.Connect("inproc://backend"); while (true) { // Wait for next request from client string message = server.Receive(Encoding.Unicode); Console.WriteLine("Received request: {0}", message); //ThreadPool.QueueUserWorkItem(new WaitCallback(procedeRequest), server); // Do Some 'work' Thread.Sleep(5000); // Send reply back to client server.Send(message, Encoding.Unicode); } } } private static void startRouterDealer() { ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), null); //ThreadPool.QueueUserWorkItem(new WaitCallback(startRouterDealerThread), null); } private static void startQueueDeviceThread(object state) { //Thread.Sleep(2000); using (QueueDevice queue = new QueueDevice(context, "tcp://*:5555", "inproc://backend", DeviceMode.Threaded)) { queue.Initialize(); _deviceReady.Set(); queue.Start(); while(true) { Thread.Sleep(1000); } } }
}
ZeroMQ的手冊中介紹說,router-dealer必須先啟動,服務端再啟動,因此ManualResetEvent 的作用是協調QueueDevice和服務端的啟動順序。
