工業物聯網或系統集成中應用消息隊列(ActiveMQ,C#的demo)的場景全面分析


1.[連載]《C#通訊(串口和網絡)框架的設計與實現》

2.[開源]C#跨平台物聯網通訊框架ServerSuperIO(SSIO)介紹

2.應用SuperIO(SIO)和開源跨平台物聯網框架ServerSuperIO(SSIO)構建系統的整體方案

3.C#工業物聯網和集成系統解決方案的技術路線(數據源、數據采集、數據上傳與接收、ActiveMQ、Mongodb、WebApi、手機App)

5.ServerSuperIO開源地址:https://github.com/wxzz/ServerSuperIO

 

目       錄

工業物聯網或系統集成中應用消息隊列(ActiveMQ)的場景全面分析... 1

前言... 1

第一章           終端/交互場景... 3

1.1           終端設備... 3

1.2           通訊機制... 3

第二章           ActvieMQ應用場景... 4

2.1           發布/訂閱(Publish/Subscribe)... 4

2.2           生產者/消費者(Producer/Consumer)... 7

2.3           請求/應答(Request/Response)... 10

第三章           假定場景分析... 16

3.1           通訊層... 16

3.2           數據業務層... 16

3.3           綜述... 16

 

前言

     互聯網技術已經發展的很成熟了,各種開源的代碼、框架和解決方案等。鑒於互聯網技術的通用性,勢必向其他領域延展。不管是工業4.0,還是互聯網+  工業,互聯網技術向工業領域傳導也是必然的。

     所以,對於工業方面的應用場景的技術儲備和技術線路調研也是日常工作很重要的一部分,為公司的橫向和縱向發展提供技術平台和保障,當然也取決於領導的視野。

第一章     終端/交互場景

    任何技術都是為業務服務,而業務是有特定的應用場景。離開了實現環境去談技術是沒有實際意義的,解決實際問題而又能保證相當長時間內的穩定性是我們努力實現的目標。同時要從多個角度來考慮問題,以及做出平衡。

1.1    終端設備

(1)    終端種類:嵌入式硬件/傳感器、PC機(監測站、大型監控設備等)、手機終端等。

(2)    交互方式:單向交互,數據上傳,可能服務端會有返回確認信息,證明數據已經收到了;雙向交互,服務端不僅僅會返回確認信息,同時還要主動下發給指定終端命令信息,例如:控制硬件設備機械動作命令、修改硬件設備參數命令、以及補傳相關數據信息命令等等。

(3)    設備管理:這里指的設備管理是說設備的狀態,包括兩個方面:設備IO狀態和設備通訊狀態。設備IO狀態包括:IO打開和IO關閉。設備通訊狀態包括:通訊中斷、通訊干擾和通訊正常。為了判斷故障,這里的邏輯關系是:IO打開的時候不一定代表通訊正常;IO關閉不一定代表通訊中斷;通訊中斷不一定代表IO關閉;通訊干擾不一定代表IO打開。

(4)    數據完整性:允許數據缺失,一般在原來數據基礎上的增量數據是可以允許丟失的;不允許數據缺失,一般脈沖數據是不允許數據丟失的。

1.2    通訊機制

(1)主動請求數據:服務器端主動下發命令給終端,讓誰上傳數據、上傳什么數據都由服務器端決定。

(2)被動接收數據:服務器端被動接收終端上傳的數據,根據數據信息進行數據處理,以及返回確認信息。

第二章     ActvieMQ應用場景

     消息隊列比較多,本文以ActiveMQ為例進行介紹,全部代碼實現C#為主,主要考慮到常見的應用模式。事例代碼下載:http://pan.baidu.com/s/1qXZ1sU4

2.1    發布/訂閱(Publish/Subscribe)

     一個信息發布者在某一個主題上發布消息,所有訂閱該主題的訂閱都會收到相同的消息,這種模式是一對多的關系,如下圖:

發布端代碼:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Topic"));

                        string text = Console.ReadLine();
                        while (text!="exit")
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = text;
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }
                    }
                }
                Console.ReadLine();
            }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}", e.Message);
                Console.ReadLine();
            }
        }

 訂閱端代碼:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    connection.ClientId = "testing listener1";
                    connection.Start();

                    using (ISession session = connection.CreateSession())
                    {
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Topic"), "testing listener1", null, false);
                        consumer.Listener += new MessageListener(consumer_Listener);
                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

2.2    生產者/消費者(Producer/Consumer)

    生產者生產了一塊香皂,消費者購買了該塊香皂,使用完了,就在這個世界上消息了,生產者和消費者之間的關系存在一種偶然性,這是一對一的關系,如下圖:

 

生產端代碼:

 static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("Queue"));

                        string text = Console.ReadLine();
                        while (text != "exit")
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = text;
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }
                    }
                }
                Console.ReadLine();
            }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}", e.Message);
                Console.ReadLine();
            }
        }

 消費端代碼:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    //connection.ClientId = "testing listener2";
                    connection.Start(); 
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("Queue"));
                        consumer.Listener += new MessageListener(consumer_Listener);
                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

 

2.3    請求/應答(Request/Response)

請求-應答的通信方式應用很普遍,客戶端向服務端上傳實時數據或參數,服務端處理完之后,要返回確認信息,這種交互關系如下圖:

 

客戶端代碼:

static void Main(string[] args)
        {
            IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
            try
            {
                using (IConnection connection = factory.CreateConnection())
                {
                    connection.Start();
                    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        IDestination destination =  new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("client.messages");

                        IMessageProducer producer = session.CreateProducer(destination);
                        producer.DeliveryMode=MsgDeliveryMode.NonPersistent;

                        IDestination tempDest = session.CreateTemporaryQueue();
                        IMessageConsumer responseConsumer = session.CreateConsumer(tempDest);
                        responseConsumer.Listener += new MessageListener(consumer_Listener);
                       
                        string text = Console.ReadLine();
                        while (text != "exit")
                        {
                            ITextMessage msg = session.CreateTextMessage();
                            msg.Text = text;
                            msg.NMSReplyTo = tempDest;
                            msg.NMSCorrelationID = DateTime.Now.ToString("yyyyMMddHHmmss");
                            producer.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }

                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

 服務端代碼:

 private static ISession session;

        private static IMessageProducer replyProducer;
        static void Main(string[] args)
        {
            IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
            try
            {
                    IConnection connection = factory.CreateConnection();
                    connection.Start();
                    session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

                    IDestination adminQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("client.messages");
                    replyProducer = session.CreateProducer();
                    replyProducer.DeliveryMode=MsgDeliveryMode.NonPersistent;

                    IMessageConsumer consumer = session.CreateConsumer(adminQueue);
                    consumer.Listener += new MessageListener(consumer_Listener);
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage response = session.CreateTextMessage();
                if (message is ITextMessage) {
                    ITextMessage txtMsg = (ITextMessage)message;
                    string messageText = txtMsg.Text;
                    response.Text = messageText;

                    Console.WriteLine("Receive:" + messageText);
                }

                response.NMSCorrelationID=message.NMSCorrelationID;

                replyProducer.Send(message.NMSReplyTo, response);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);

            }
        }

第三章     假定場景分析

     我們以系統建設過程中的構架來分析消息隊列在兩個層面的問題,通訊層和數據業務層。

3.1    通訊層

     通訊層是否可以用消息隊列(ActiveMQ)?這個問題取決於兩方面:1、如果終端設備有嵌入式硬件,甚至還是C51開發的,那么在系統集成和物聯的過程中,就涉及到兼容性的問題。顯然和消息隊列進行對接是一件頭痛的事,用C51寫一個對接的驅動不是不可能,但是要評估工作量和穩定性。2、服務端與指定某個終端雙向交互頻繁的情況,特別是服務端實時發送設備校准命令的情況,這種情況消息隊列是不如通訊框架的。

3.2    數據業務層

     服務端接收到數據后,完全可以使用消息隊列的生產者和消費者模式處理數據,對系統的業務進行解耦。

     下發命令也可以通過消息隊列,這樣可以統一控制端的接口,再由通訊框架下發到指定的終端。

3.3    綜述

     綜合考慮,建議在通訊層使用通訊框架,對於設備的IO狀態和通訊狀態能夠及時反應,通訊效率也是能夠得到保障的;對於數據業務層,建議不要放在通訊框架內部進行處理,可以使用消息隊列,配合通訊框架使用。

    整體架構圖如下:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

文章得到了群友支持:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM