使用ZeroMQ(clrzmq)實現異步通信


ZeroMQ是對Socket的封裝,通過組合多種類型的結點可以實現復雜的網絡通信模式。而且ZeroMQ設計簡單,可以有多種平台實現,對於跨平台項目是一個福音。

clrzmq是ZeroMQ的C#語言的實現。當我在使用clrzmq時,發現ZeroMQ的server端,即REP,在接收到消息后,回復消息,但是在回復消息之前不能再接收消息。用偽代碼表示就是

while(true)
{
    byte[] receiveData = new byte[1024];
    receive(receiveData);
    
    //do some work
    byte[] responseData = new byte[1024];
    send(reponseData);
}

既然ZeroMQ的名稱里含有MQ(Message Queue),就應該有隊列的功能啊?在ZeroMQ的官方手冊中介紹了router-dealer模式:

router可以作為路由器,起到緩存消息的作用,如果服務端空閑,會把消息通過dealer發送給服務端。

這篇文章使用C++實現了ZeroMQ消息隊列。

幸運的是,clrzmq對router-dealer模式進行了封裝,可以使用QueueDevice類實現相同的效果。

在我的例子中,我將router-dealer放在了服務端進程中,dealer和服務端的通信是縣城通信,交互圖如下:

                  tcp                     inproc                 inproc       
              connect     ________________      connect
客戶端i  -------------|router -------- dealer| -----------服務端
                                 ——————---------
                                       queueDevice
客戶端代碼如下:
static void Main(string[] args)
{
            string serverAddress = "tcp://localhost:5555";
            // ZMQ Context and client socket
            using (ZmqContext context = ZmqContext.Create())
            using (ZmqSocket client = context.CreateSocket(SocketType.REQ))
            {
                client.Connect(serverAddress);

                string request = "Hello";
                while(true)//for (int requestNum = 0; requestNum < 10; requestNum++)
                {
                    string again = Console.ReadLine();

                    Console.WriteLine("Sending request...");
                    client.Send(again + request, Encoding.Unicode);

                    string reply = client.Receive(Encoding.Unicode);
                    Console.WriteLine("Received reply {0}: ", reply);
                }



            }
 }
服務端代碼:
   class Program
    {
        static ZmqContext context = ZmqContext.Create();
        static ManualResetEvent _deviceReady = new ManualResetEvent(false);
        //static ManualResetEvent _receiverReady = new ManualResetEvent(false);

        static void Main(string[] args)
        {
            startRouterDealer();
            // ZMQ Context, server socket
            _deviceReady.WaitOne();

            using (ZmqSocket server = context.CreateSocket(SocketType.REP))
            {
                //server.Bind("inproc://backend");
                server.Connect("inproc://backend");

                while (true)
                {

                    // Wait for next request from client
                    string message = server.Receive(Encoding.Unicode);
                    Console.WriteLine("Received request: {0}", message);

                    //ThreadPool.QueueUserWorkItem(new WaitCallback(procedeRequest), server);
                    // Do Some 'work'
                    Thread.Sleep(5000);

                    // Send reply back to client
                    server.Send(message, Encoding.Unicode);
                }
            }
        }

        private static void startRouterDealer()
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), null);
            //ThreadPool.QueueUserWorkItem(new WaitCallback(startRouterDealerThread), null);
        }
        private static void startQueueDeviceThread(object state)
        {
            //Thread.Sleep(2000);
            using (QueueDevice queue = new QueueDevice(context,
                "tcp://*:5555",
                "inproc://backend",
                DeviceMode.Threaded))
            {
                queue.Initialize();
                _deviceReady.Set();
                queue.Start();
                while(true)
                {
                    Thread.Sleep(1000);
                }
            }
        }
}

ZeroMQ的手冊中介紹說,router-dealer必須先啟動,服務端再啟動,因此ManualResetEvent 的作用是協調QueueDevice和服務端的啟動順序。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM