C#代碼實現阿里雲消息服務MNS消息監聽


十年河東,十年河西,莫欺少年窮

學無止境,精益求精

using Aliyun.Acs.Core;
using Aliyun.Acs.Core.Exceptions;
using Aliyun.Acs.Core.Http;
using Aliyun.Acs.Core.Profile;
using Aliyun.Acs.Iot.Model.V20180120;
using Aliyun.MNS;
using Iot.Common;
using Iot.Dal.Base;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Iot.Dal
{
    /// <summary>
    /// 用於 UI 端調用 回復設備
    /// </summary>
    public class IotSendMessage
    {
        public static void SendMessage(string Json)
        {
            try
            {
                IClientProfile clientProfile = DefaultProfile.GetProfile("cn-shanghai", IotParm._accessKeyId, IotParm._secretAccessKey);
                var DeviceNum = JsonKeyHelper.GetJsonValue(Json, "id");
                //
                DefaultAcsClient client = new DefaultAcsClient(clientProfile);
                PubRequest request = new PubRequest();
                request.ProductKey = IotParm.WA_ProductKey;
                request.TopicFullName = "/" + IotParm.WA_ProductKey + "/" + DeviceNum + "/user/get";
                byte[] payload = Encoding.Default.GetBytes(Json);
                String payloadStr = Convert.ToBase64String(payload);
                request.MessageContent = payloadStr;
                request.Qos = 0;
                try
                {
                    PubResponse response = client.GetAcsResponse(request);
                  
                }
                catch (ServerException ex)
                {
                    WriteLog(Json, ex, "_ServerException");
                    throw ex;
                }

            }
            catch (ClientException ex)
            {
                WriteLog(Json, ex, "_ClientException");
                throw ex;
            }
        }

        public static PubResponse SendMessage(string DeviceNum, string json)
        {
            IClientProfile clientProfile = DefaultProfile.GetProfile("cn-shanghai", IotParm._accessKeyId, IotParm._secretAccessKey);
            PubResponse response = null;
            DefaultAcsClient client = new Aliyun.Acs.Core.DefaultAcsClient(clientProfile);
            PubRequest request = new PubRequest();
            request.ProductKey = IotParm.WA_ProductKey;
            request.TopicFullName = "/" + IotParm.WA_ProductKey + "/" + DeviceNum + "/user/get";
            byte[] payload = Encoding.Default.GetBytes(json);
            String payloadStr = Convert.ToBase64String(payload);
            request.MessageContent = payloadStr;
            request.Qos = 0;

            try
            {
                response = client.GetAcsResponse(request);
            }
            catch (ServerException ex)
            {
                WriteLog(json, ex, "_ServerException");
                throw ex;
            }
            catch (ClientException ex)
            {
                WriteLog(json, ex, "_ClientException");
                throw ex;
            }
            return response;
        }

        /// <summary>
        /// 最新版【企業版發送指令】
        /// </summary>
        /// <param name="json"></param>
        public static void AliyunSendMessage(string json)
        {
            var DeviceNum = JsonKeyHelper.GetJsonValue(json, "id");
            IClientProfile profile = DefaultProfile.GetProfile("cn-shanghai", IotParm._accessKeyId, IotParm._secretAccessKey);
            DefaultAcsClient client = new DefaultAcsClient(profile);
            CommonRequest request = new CommonRequest();
            request.Method = MethodType.POST;
            request.Domain = "iot.cn-shanghai.aliyuncs.com";
            request.Version = "2018-01-20";
            request.Action = "Pub";
            // request.Protocol = ProtocolType.HTTP;
            request.AddQueryParameters("TopicFullName", string.Format(@"/{0}/{1}/user/get", IotParm.WA_ProductKey, DeviceNum));
            byte[] payload = Encoding.Default.GetBytes(json);
            String payloadStr = Convert.ToBase64String(payload);
            request.AddQueryParameters("MessageContent", payloadStr);
            request.AddQueryParameters("ProductKey", IotParm.WA_ProductKey);
            if (!string.IsNullOrEmpty(IotParm.IotInstanceId))
            {
                request.AddQueryParameters("IotInstanceId", IotParm.IotInstanceId);
            }
            request.AddQueryParameters("Qos", "0");
            try
            {
                CommonResponse response = client.GetCommonResponse(request);
                WriteLog(json, new Exception() { Source = JsonConvert.SerializeObject(response) });
                //Console.WriteLine(System.Text.Encoding.Default.GetString(response.HttpResponse.Content));
            }
            catch (ServerException e)
            {
                WriteLog(json, e, "_ServerException");
                throw e;
                //Console.WriteLine(e);
            }
            catch (ClientException e)
            {
                WriteLog(json, e, "_ClientException");
                throw e;
                // Console.WriteLine(e);
            }
        }

        public static void AliyunSendMessage(string DeviceNum, string json)
        {
            IClientProfile profile = DefaultProfile.GetProfile("cn-shanghai", IotParm._accessKeyId, IotParm._secretAccessKey);
            DefaultAcsClient client = new DefaultAcsClient(profile);
            CommonRequest request = new CommonRequest();
            request.Method = MethodType.POST;
            request.Domain = "iot.cn-shanghai.aliyuncs.com";
            request.Version = "2018-01-20";
            request.Action = "Pub";
            // request.Protocol = ProtocolType.HTTP;
            request.AddQueryParameters("TopicFullName", string.Format(@"/{0}/{1}/user/get", IotParm.WA_ProductKey, DeviceNum));
            byte[] payload = Encoding.Default.GetBytes(json);
            String payloadStr = Convert.ToBase64String(payload);
            request.AddQueryParameters("MessageContent", payloadStr);
            request.AddQueryParameters("ProductKey", IotParm.WA_ProductKey);
            if (!string.IsNullOrEmpty(IotParm.IotInstanceId))
            {
                request.AddQueryParameters("IotInstanceId", IotParm.IotInstanceId);
            }
            request.AddQueryParameters("Qos", "0");
            try
            {
               
                CommonResponse response = client.GetCommonResponse(request);

                WriteLog(json, new Exception() { Source = JsonConvert.SerializeObject(response) });


            }
            catch (ServerException e)
            {
                WriteLog(json, e, "_ServerException");
                throw e;
                //Console.WriteLine(e);
            }
            catch (ClientException e)
            {
                WriteLog(json, e, "_ClientException");
                throw e;
                //Console.WriteLine(e);
            }
        }

        private static void WriteLog(string json, Exception ex,string Fix="")
        {
            var methodValue = JsonKeyHelper.GetJsonValue(json, "cmd");
            var tid = JsonKeyHelper.GetJsonValue(json, "tid");
            var deviceNum = JsonKeyHelper.GetJsonValue(json, "id");
            MongoDbLogService.CreateAliyunLogs(deviceNum, tid, methodValue+"_AliYun"+ Fix, JsonConvert.SerializeObject(ex), 0);
        }

    }
}

 

近幾天一直都在看阿里雲的IOT雲服務及消息隊列MNS,一頭霧水好幾天了,直到今天,總算有點收獲了,記錄下來,方便以后查閱。

首先借用阿里雲的一張圖來說明:設備是如何通過雲服務平台和企業服務器‘通話的’

針對此圖,作如下說明:

1、物聯網平台作為中間組件,主要是通過消息隊MNS列來實現設備和企業服務器對話的,具體可描述為:

1.1、設備發送指令至物聯網平台的MNS隊列,MNS隊列將設備指令收錄,需要說明的是:設備發送指令是通過嵌入式開發人員開發的,例如C語言

1.2、企業通過C#、JAVA、PHP等高級語言開發人員開發監聽程序,當監聽到MNS隊列中的設備指令時,獲取指令,做相關業務處理,並發送新的設備指令至MNS隊列。【例如發送快遞櫃關門的指令】

1.3、企業發送的指令被MNS收錄,設備同樣通過監聽程序獲取企業服務器發送的關門指令,收到關門指令的設備執行相關指令,完成自動關門操作。

以上便是設備與企業服務器之間的對話過程

下面列出C#的監聽MNS代碼【需要MNS C# JDK 的支持】注意:消息是經過EncodeBase64編碼,接受消息要解碼,發送消息要編碼

異步監聽:

using System;
using System.Threading;
using System.Threading.Tasks;
using Aliyun.MNS;
using Aliyun.MNS.Model;
using IotCommon;
using IotDtos.MongodbDtos;
using IotService.Device;
using IotService.MongoDb;

namespace IotListener
{
    class Program
    {
        private static MongoLogService _logService;
        public static string _receiptHandle;
        public static DeviceResponseService service = new DeviceResponseService();
        public static Queue nativeQueue;
        static void Main(string[] args)
        {
            LogstoreDatabaseSettings st = new LogstoreDatabaseSettings() { LogsCollectionName = "LogsForDg_" + DateTime.Now.ToString("yyyyMMdd") };
            _logService = new MongoLogService(st);
            while (true)
            {
                try
                {
                    IMNS client = new Aliyun.MNS.MNSClient(IotParm._accessKeyId, IotParm._secretAccessKey, IotParm._endpoint, IotParm._stsToken);
                    nativeQueue = client.GetNativeQueue(IotParm._queueName);
                    for (int i = 0; i < IotParm._receiveTimes; i++)
                    {
                        ReceiveMessageRequest request = new ReceiveMessageRequest(1);
                        nativeQueue.BeginReceiveMessage(request, ListenerCallback, null);
                        Thread.Sleep(1);
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Receive message failed, exception info: " + ex.Message);
                }
            }
        }

        /// <summary>
        /// 回調函數
        /// </summary>
        /// <param name="ar"></param>
        public static void ListenerCallback(IAsyncResult ar)
        {
            try
            {
                Message message = nativeQueue.EndReceiveMessage(ar).Message;
                string Json = Base64Helper.DecodeBase64(message.Body);
                Console.WriteLine("Message: {0}", Json);
                Console.WriteLine("----------------------------------------------------\n");
                var methodValue = JsonKeyHelper.GetJsonValue(Json, "method");
                DeviceResponse(methodValue, Json);
                if (!string.IsNullOrEmpty(methodValue))
                {
                    _logService.Create(new LogsForDgModel { CreateTime = DateTime.Now, data = Json, methodNo = methodValue });
                }
                _receiptHandle = message.ReceiptHandle;
                nativeQueue.DeleteMessage(_receiptHandle);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Receive message failed, exception info: " + ex.Message);
            }

        }

        /// <summary>
        /// 響應設備上傳接口
        /// </summary>
        /// <param name="method"></param>
        /// <param name="message"></param>
        public static void DeviceResponse(string method, string message)
        {
            switch (method)
            {
                case "doorClosedReport": service.doorClosedReportResponse(message); break;
                case "doorOpenReport": service.doorOpenReportResponse(message); break;
                case "deviceStartReportToCloud": service.deviceStartReportToCloudResponse(message); break;
                case "qryDeviceConfig": service.qryDeviceConfigResponse(message); break;
                case "devicePingToCloud": service.devicePingToCloudResponse(message); break;
                case "deviceFatalReport": service.deviceFatalReportResponse(message); break;
                case "deviceVersionReport": service.deviceVersionReportResponse(message); break;
                case "deviceFirmwareData": service.deviceFirmwareDataResponse(message); break;
                case "deviceLocationReport": service.deviceLocationReportResponse(message); break;
            }
        }
    }
}
View Code

同步監聽:

using Aliyun.MNS;
using Aliyun.MNS.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MnsListener
{
    class Program
    {
        #region Private Properties
        private const string _accessKeyId = "";
        private const string _secretAccessKey = "";
        private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";
        private const string _stsToken = null;

        private const string _queueName = "Sub";
        private const string _queueNamePrefix = "my";
        private const int _receiveTimes = 1;
        private const int _receiveInterval = 2;
        private const int batchSize = 6;
        private static string _receiptHandle;

        #endregion


        static void Main(string[] args)
        {
            while (true)
            {
                try
                {
                    IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);
                    var nativeQueue = client.GetNativeQueue(_queueName);
                    for (int i = 0; i < _receiveTimes; i++)
                    {
                        var receiveMessageResponse = nativeQueue.ReceiveMessage(3);
                        Console.WriteLine("Receive message successfully, status code: {0}", receiveMessageResponse.HttpStatusCode);
                        Console.WriteLine("----------------------------------------------------");
                        Message message = receiveMessageResponse.Message;
                        string s = DecodeBase64(message.Body);
                        Console.WriteLine("MessageId: {0}", message.Id);
                        Console.WriteLine("ReceiptHandle: {0}", message.ReceiptHandle);
                        Console.WriteLine("MessageBody: {0}", message.Body);
                        Console.WriteLine("MessageBodyMD5: {0}", message.BodyMD5);
                        Console.WriteLine("EnqueueTime: {0}", message.EnqueueTime);
                        Console.WriteLine("NextVisibleTime: {0}", message.NextVisibleTime);
                        Console.WriteLine("FirstDequeueTime: {0}", message.FirstDequeueTime);
                        Console.WriteLine("DequeueCount: {0}", message.DequeueCount);
                        Console.WriteLine("Priority: {0}", message.Priority);
                        Console.WriteLine("----------------------------------------------------\n");

                        _receiptHandle = message.ReceiptHandle;
                        nativeQueue.DeleteMessage(_receiptHandle);

                        Thread.Sleep(_receiveInterval);
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Receive message failed, exception info: " + ex.Message);
                }
            }

        }

        ///編碼
        public static string EncodeBase64(string code, string code_type= "utf-8")
        {
            string encode = "";
            byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);
            try
            {
                encode = Convert.ToBase64String(bytes);
            }
            catch
            {
                encode = code;
            }
            return encode;
        }
        ///解碼
        public static string DecodeBase64(string code, string code_type = "utf-8")
        {
            string decode = "";
            byte[] bytes = Convert.FromBase64String(code);
            try
            {
                decode = Encoding.GetEncoding(code_type).GetString(bytes);
            }
            catch
            {
                decode = code;
            }
            return decode;
        }
    }
}
View Code

發送消息:

using Aliyun.MNS;
using Aliyun.MNS.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MnsSendMsg
{
    class Program
    {
        #region Private Properties
        private const string _accessKeyId = "";
        private const string _secretAccessKey = "";
        private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";
        private const string _stsToken = null;

        private const string _queueName = "Sub";
        private const string _queueNamePrefix = "my";
        private const int _receiveTimes = 1;
        private const int _receiveInterval = 2;
        private const int batchSize = 6;
        private static string _receiptHandle;
        #endregion
        static void Main(string[] args)
        {
            try
            {
                IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);
                // 1. 獲取Queue的實例
                var nativeQueue = client.GetNativeQueue(_queueName);
                var sendMessageRequest = new SendMessageRequest(EncodeBase64("阿里雲<MessageBody>計算"));
                sendMessageRequest.DelaySeconds = 2;
                var sendMessageResponse = nativeQueue.SendMessage(sendMessageRequest);
                Console.WriteLine("Send message successfully,{0}",
                    sendMessageResponse.ToString());
                Thread.Sleep(2000);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Send message failed, exception info: " + ex.Message);
            }
        }

        ///編碼
        public static string EncodeBase64(string code, string code_type = "utf-8")
        {
            string encode = "";
            byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);
            try
            {
                encode = Convert.ToBase64String(bytes);
            }
            catch
            {
                encode = code;
            }
            return encode;
        }
        ///解碼
        public static string DecodeBase64(string code, string code_type = "utf-8")
        {
            string decode = "";
            byte[] bytes = Convert.FromBase64String(code);
            try
            {
                decode = Encoding.GetEncoding(code_type).GetString(bytes);
            }
            catch
            {
                decode = code;
            }
            return decode;
        }
    }
}
View Code

關於MNS C# JDK下載,可以去阿里雲:https://help.aliyun.com/document_detail/32447.html?spm=a2c4g.11186623.6.633.61395f64IfHTRo

關於MNS隊列,主題,主題訂閱相關知識:https://help.aliyun.com/document_detail/34445.html?spm=a2c4g.11186623.6.542.699f38c6RO3nDS

關於阿里雲AMQP隊列接入,可以查詢:https://help.aliyun.com/document_detail/149716.html?spm=a2c4g.11186623.6.621.2cda31b4kS1zXR

關於阿里雲物聯網平台,請查閱:https://help.aliyun.com/document_detail/125800.html?spm=a2c4g.11186623.6.542.7b0241c8o5r6PT

最后:阿里雲物聯網平台

高級異步偵聽

using Aliyun.Acs.Core;
using Aliyun.Acs.Core.Exceptions;
using Aliyun.Acs.Core.Profile;
using Aliyun.Acs.Iot.Model.V20180120;
using Aliyun.MNS;
using Aliyun.MNS.Model;
using Iot.Common;
using Iot.Dal.WA_Device;
using Iot.Factory;
using Iot.Model.WA_Device;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace WaIotListener
{
    class Program
    {
        static IClientProfile clientProfile = DefaultProfile.GetProfile("cn-shanghai", IotParm._accessKeyId, IotParm._secretAccessKey);
        static string _receiptHandle;
        static Queue nativeQueue;
        static string Key = CommEnum.DeviceTypeEnm.WA.ToString();
        static string Methods = ConfigurationManager.AppSettings["Methods"];
        static ReceiveMessageRequest request = new ReceiveMessageRequest(1);
        static List<string> lst = new List<string>();
        static void Main(string[] args)
        {
            try
            {
                lst = Methods.Split('_').ToList();
                IMNS client = new Aliyun.MNS.MNSClient(IotParm._accessKeyId, IotParm._secretAccessKey, IotParm._endpoint, IotParm._stsToken);
                nativeQueue = client.GetNativeQueue(IotParm.WA_queueName);

                Task.Run(delegate { nativeQueue.BeginReceiveMessage(request, ListenerCallback, null); });

            }
            catch (Exception ex)
            {
                Console.WriteLine("Receive message failed");
            }
            Console.ReadKey();
        }


        public static void ListenerCallback(IAsyncResult ar)
        {
            try
            {
                Message message = nativeQueue.EndReceiveMessage(ar).Message;

                var msg = message.Body;
                msg = Base64Helper.DecodeBase64(msg);
                var payload = JsonKeyHelper.GetJsonValue(msg, "payload");
                msg = Base64Helper.DecodeBase64(payload);
                var methodValue = JsonKeyHelper.GetJsonValue(msg, "cmd");
                if (!string.IsNullOrEmpty(methodValue))
                {
                    Console.WriteLine(msg);
                    Task.Run(delegate { DeviceResponse(methodValue, msg); });

                    Task.Run(delegate { DeviceToDb(methodValue, msg); });
                }
                if (lst.Contains(methodValue))
                {
                    WA_Door.ReceiveDeviceResponse(methodValue, msg);
                }
                _receiptHandle = message.ReceiptHandle;
                nativeQueue.DeleteMessage(_receiptHandle);
            }
            catch (Exception ex)
            {
                if(ex.Message.Trim() == "Message not exist.")
                {
                    Console.WriteLine("Receive message failed");
                }
                else
                {
                    LogHelper.WriteLog("系統異常:", ex);
                }
            }
            finally
            {
                nativeQueue.BeginReceiveMessage(request, ListenerCallback, null);
            }
        }

        /// <summary>
        /// 響應設備上傳接口
        /// </summary>
        /// <param name="method"></param>
        /// <param name="message"></param>
        public static void DeviceResponse(string method, string message)
        {
            string result = string.Empty;
            var DeviceResponseBll = DeviceResponseFactory.GetTarget(Key);
            switch (method)
            {
                case "close": result = DeviceResponseBll.doorClosedReportResponse(message); break;//響應關門上報
                case "login": result = DeviceResponseBll.deviceStartReportToCloudResponse(message); break;//響應開機上報
                case "alm": result = DeviceResponseBll.deviceFatalReportResponse(message); break;//響應倉位預警上報
                case "allst": result = DeviceResponseBll.deviceVersionReportResponse(message); break;//心跳包 響應倉位狀態上報
            }
            if (!string.IsNullOrEmpty(result))
            {
                SendMessage(result);
            }
        }

        public static void DeviceToDb(string method, string message)
        {
            var DeviceResponseBll = DeviceResponseToDBFactory.GetTarget(Key);
            switch (method)
            {
                case "open": DeviceResponseBll.doorOpenToDb(message); break;
                //case "stopchg": DeviceResponseBll.doorOpenToDb(message); break;
                //case "chg": DeviceResponseBll.doorOpenToDb(message); break;
                case "close": DeviceResponseBll.doorClosedToDb(message); break;//關門上報
                case "login": DeviceResponseBll.deviceStartToDb(message); break;//開機上報
                case "allst": DeviceResponseBll.devicePingToCloudToDb(message); break;//倉位狀態上報  心跳包
                case "alm": DeviceResponseBll.deviceFatalToDb(message); break;//倉位預警上報
            }
        }


        public static void SendMessage(string Json)
        {
            try
            {
                var DeviceNum = JsonKeyHelper.GetJsonValue(Json, "id");
                //Aliyun.Acs.Core.Profile.DefaultProfile.GetProfile().AddEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
                //
                DefaultAcsClient client = new DefaultAcsClient(clientProfile);
                PubRequest request = new PubRequest();
                request.ProductKey = IotParm.WA_ProductKey;
                request.TopicFullName = "/" + IotParm.WA_ProductKey + "/"+ DeviceNum + "/user/get";
                byte[] payload = Encoding.Default.GetBytes(Json);
                String payloadStr = Convert.ToBase64String(payload);
                request.MessageContent = payloadStr;
                request.Qos = 0;
                try
                {
                    PubResponse response = client.GetAcsResponse(request);
                    Console.WriteLine("publish message result: " + response.Success);
                    Console.WriteLine(response.ErrorMessage);
                }
                catch (ServerException ex)
                {
                    Console.WriteLine(ex.ErrorCode);
                    Console.WriteLine(ex.ErrorMessage);
                }

            }
            catch (ClientException ex)
            {
                Console.WriteLine(ex.ErrorCode);
                Console.WriteLine(ex.ErrorMessage);
            }
        }
    }
}
View Code

發送消息及局部指令處理【用於返回Api結果】

using Aliyun.Acs.Core;
using Aliyun.Acs.Core.Exceptions;
using Aliyun.Acs.Core.Http;
using Aliyun.Acs.Core.Profile;
using Aliyun.Acs.Iot.Model.V20180120;
using Aliyun.MNS;
using Iot.Common;
using Iot.Dal.Pagination;
using Iot.Model;
using Iot.Model.Pagination;
using Iot.Model.WA_Device;
using MongoDB.Driver;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Iot.Dal.WA_Device
{
    public class WA_Door
    {
        /// <summary>
        /// 等待時間
        /// </summary>
        public static int Tim = 30;

        /// <summary>
        /// 查詢MongoDb 電櫃門狀態 帶有分頁
        /// </summary>
        /// <returns></returns>
        public static List<MsgModel> QueryDeviceResponse(string deviceNum,string tid,string method, ref BasePaginationModel Pagination)
        {
            try
            {
                List<MsgModel> resList = new List<MsgModel>();
                string DbName = "LogstoreDb" + "_" + DateTime.Now.ToString("yyyyMMdd");
                LogstoreDatabaseSettings settings = new LogstoreDatabaseSettings() { LogsCollectionName = "WA_" + CommEnum.MongoDbnameEnm.DeviceResponse.ToString(), DatabaseName = DbName };
                var client = new MongoClient(settings.ConnectionString);
                var database = client.GetDatabase(settings.DatabaseName);

                var Mongo = database.GetCollection<WA_DeviceResponseTodb>(settings.LogsCollectionName);
                List<FilterDefinition<WA_DeviceResponseTodb>> SearchList = new List<FilterDefinition<WA_DeviceResponseTodb>>();
                var builder = Builders<WA_DeviceResponseTodb>.Filter;
                if (!string.IsNullOrEmpty(deviceNum))
                {
                    var devicefilter = builder.Eq("DeviceNum", deviceNum);
                    SearchList.Add(devicefilter);
                }
                if (!string.IsNullOrEmpty(tid))
                {
                    var tidfilter = builder.Eq("tid", tid);
                    SearchList.Add(tidfilter);
                }
                //查詢十分鍾的數據
                var bDate = Convert.ToDateTime(DateTime.Now).AddHours(-8).AddMinutes(-10);
                //約束條件
                DateTime startTime = new DateTime(bDate.Year, bDate.Month, bDate.Day, bDate.Hour, bDate.Minute, bDate.Second, DateTimeKind.Utc);
                //大於等於
                var filter = builder.Gte("CreateTime", startTime);
                SearchList.Add(filter);
                //結束時間查詢
                var eDate = Convert.ToDateTime(DateTime.Now).AddHours(-8);
                //約束條件
                DateTime endTime = new DateTime(eDate.Year, eDate.Month, eDate.Day, eDate.Hour, eDate.Minute, eDate.Second, DateTimeKind.Utc);
                //小於等於
                var efilter = builder.Lte("CreateTime", endTime);
                SearchList.Add(efilter);

                var MongoR = Mongo.Find(Builders<WA_DeviceResponseTodb>.Filter.And(SearchList)).ToList();
                var mongorResult = MongoR.AsQueryable<WA_DeviceResponseTodb>().Where(A => A.DeviceNum == deviceNum).OrderByDescending(item => item.CreateTime);

                var result = MongoPaginationService.BasePager<WA_DeviceResponseTodb>(mongorResult, ref Pagination);
                foreach (var item in result.ToList())
                {
                    resList.Add(new MsgModel { DeviceNum = item.DeviceNum, tid = item.tid, message = item.message, Method = item.Method });
                }
                return resList;
            }
            catch(Exception ex)
            {
                LogHelper.WriteLog(ex.ToString());
            }
            return null;
        }
        /// <summary>
        /// 局部偵聽
        /// </summary>
        /// <param name="method">方法</param>
        /// <param name="deviceNum">設備號</param>
        /// <param name="tid">通訊ID</param>
        /// <param name="dr">查詢倉位狀態專屬</param>
        /// <returns></returns>
        public  static string GetDeviceResponse(string method, string deviceNum,long tid,int dr=1)
        {
            BasePaginationModel pagination = new BasePaginationModel();
            pagination = new BasePaginationModel()
            {
                PageNumber = 1,
                PageSize = 1
            };
            if (method == "drsts" && dr == 0)
            {
                pagination = new BasePaginationModel()
                {
                    PageNumber = 1,
                    PageSize = 4
                };
            }
            List<MsgModel> DeviceResponseList = new List<MsgModel>();
            var RequestMessageDat = new WA_Base<List<WA_doorportstatus>>();
            var datList = new List<WA_doorportstatus>();
            Stopwatch sw = new Stopwatch();
            sw.Start();
            string result = string.Empty;
            while (true)
            {
                TimeSpan ts2 = sw.Elapsed;
               
                try
                {
                    if (dr == 0)
                    {
                        //查詢所有倉位狀態時
                        if (ts2.TotalSeconds > 45)
                        {
                            break;
                        }
                    }
                    else
                    {
                        //默認值
                        if(ts2.TotalSeconds > Tim)
                        {
                            break;
                        }
                    }
                    Thread.Sleep(500);//線程掛起 釋放CPU資源
                    DeviceResponseList = QueryDeviceResponse(deviceNum, tid.ToString(), method, ref pagination);
                    if (DeviceResponseList!=null&& DeviceResponseList.Count > 0)
                    {
                        string TxId = tid.ToString();
                        var MsgList = DeviceResponseList.Where(A => A.tid == TxId && A.Method == method && A.DeviceNum == deviceNum).ToList();
                        if (MsgList.Count == 1&&dr!=0)
                        {
                            result = MsgList.FirstOrDefault().message;
                            break;
                        }
                        //特殊處理 查詢設備倉位信息 當傳值門號為0時。
                        if (MsgList.Count == 4 && dr == 0)
                        {
                            foreach (var item in MsgList)
                            {
                                var ResultDat = Newtonsoft.Json.JsonConvert.DeserializeObject<WA_Base<List<WA_doorportstatus>>>(item.message);
                                //賦值最外層 WA_Base
                                if (string.IsNullOrEmpty(RequestMessageDat.cmd))
                                {
                                    RequestMessageDat.cmd = ResultDat.cmd;
                                    RequestMessageDat.rst = ResultDat.rst;
                                    RequestMessageDat.id = ResultDat.id;
                                    RequestMessageDat.tid = tid;
                                }
                                //
                                WA_doorportstatus mol = ResultDat.dat[0];
                                if (ResultDat.dat != null && ResultDat.dat.Count > 0)
                                {
                                    datList.Add(mol);
                                }
                                if (datList.Count == 4)
                                {
                                    RequestMessageDat.dat = datList;
                                    result = JsonConvert.SerializeObject(RequestMessageDat);
                                    break;
                                }
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    LogHelper.WriteLog(ex.ToString());
                }
            }
            sw.Stop();
            return result;
        }


        /// <summary>
        /// 接收設備響應消息-放入緩存
        /// </summary>
        /// <returns></returns>
        public static void ReceiveDeviceResponse(string Method,string result)
        {
            var Txid = JsonKeyHelper.GetJsonValue(result, "tid");
            var DeviceNum = JsonKeyHelper.GetJsonValue(result, "id");
            var model = new MsgModel { message = result, tid = Txid, DeviceNum = DeviceNum, Method = Method };

            LogstoreDatabaseSettings st = new LogstoreDatabaseSettings() { LogsCollectionName = "WA_" + CommEnum.MongoDbnameEnm.DeviceResponse.ToString() };

            var TbModel = new WA_DeviceResponseTodb
            {
                CreateTime = DateTime.Now,
                DeviceNum = model.DeviceNum,
                message = model.message,
                Method = model.Method,
                tid = model.tid,
                //data = result
            };
            new MongoLogService<WA_DeviceResponseTodb>(st).Create(TbModel);
        }
    }

    public class MsgModel
    {
        public string tid { get; set; }
        public string message { get; set; }
        public string Method { get; set; }
        public string DeviceNum { get; set; }
    }

    public class WA_DeviceHelper
    {
        #region 下發設備指令,雲端->設備,由設備訂閱
        static IClientProfile clientProfile = DefaultProfile.GetProfile("cn-shanghai", IotParm._accessKeyId, IotParm._secretAccessKey);
        public static PubResponse SendMsg(string DeviceNum, string json)
        {
            LogHelper.WriteLog("發送數據:" + json);
            PubResponse response = null;
            //DefaultProfile.GetProfile().AddEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");

            DefaultAcsClient client = new Aliyun.Acs.Core.DefaultAcsClient(clientProfile);
            PubRequest request = new PubRequest();
            request.ProductKey = IotParm.WA_ProductKey;
            request.TopicFullName = "/" + IotParm.WA_ProductKey + "/" + DeviceNum + "/user/get";
            byte[] payload = Encoding.Default.GetBytes(json);
            String payloadStr = Convert.ToBase64String(payload);
            request.MessageContent = payloadStr;
            request.Qos = 0;
            try
            {
                response = client.GetAcsResponse(request);
            }
            catch (ServerException ex)
            {
                LogHelper.WriteLog("服務端異常:" + ex.ToString());
                throw ex;
            }
            catch (ClientException ex)
            {
                LogHelper.WriteLog("客戶端異常:" + ex.ToString());
                throw ex;
            }
            return response;
        }
        #endregion


    }
}
View Code

@天才卧龍的博客

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM