Webapi實現websocket實時通訊


  應用場景:前端頁面發起一個websocket請求與后端進行實時通訊。后端監聽某端口獲取數據,將監聽到的數據加工處理,通過websocket發送到前端。

  這里只提供后台的處理方案僅供參考。
  1.后端監聽某端口,獲取數據並數據處理。可以在Global中單獨開啟一個后台線程用來監聽數據。數據處理交給datawatcher的單例對象來處理。由於是監控端口的工作,一般采用獨立線程在項目啟動的時候就進行監聽,因此可以將代碼放在Application_start中。
  2.datawatcher對象,它需要有個委托隊列,允許外部進行注冊和刪除。原理同事件類似,但是采用委托隊列並將其暴露主要是為了容錯,因為可能會用到刪除委托隊列中的委托。嚴謹的事務邏輯應該用事件(事件有點久了,忘了事件怎么寫了)
  3.webapi建立websocket。

  以下為代碼部分:

      protected void Application_Start()
        {
     //webapi相關配置
            AreaRegistration.RegisterAllAreas();
            GlobalConfiguration.Configure(WebApiConfig.Register);
            FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
            RouteConfig.RegisterRoutes(RouteTable.Routes);
            BundleConfig.RegisterBundles(BundleTable.Bundles);
           //todo 初始化datawatcher 監聽實時數據 
            //偽造實時數據
            Thread bgthread = new Thread(Start);
            bgthread.IsBackground = true;
            bgthread.Start();

        }
    //除紅色部分均為模擬實時數據,正常數據應是通過監聽獲取
        private void Start()
        {
            Random r = new Random();
            var datawatcher = DataWatcher.GetInit();
            while (true)
            {
                dynamic dyobj = new ExpandoObject();
                dyobj.stationid = "01";
                dyobj.value = new { at = DateTime.Now.ToTimeStampMS(), value = (r.Next(0, 1024)) }.ToJson();
                datawatcher.Updata(dyobj, CallBackType.Record);
                Thread.Sleep(1000);
            }
        }
//使用單例是為了確保項目中使用的是同一個對象,防止注冊的callback丟失
 public class DataWatcher
    {
        private static DataWatcher _dataWatcher;
        static readonly object locker = new object();

        private DataWatcher()
        {
            CallBackAlarms = new List<DynamicDelegate>();
            CallBackRecords = new List<DynamicDelegate>();
            CallBackSpes = new List<DynamicDelegate>();
        }

        public static DataWatcher GetInit()
        {
            if (_dataWatcher == null)
            {
                lock (locker)
                {
                    if (_dataWatcher == null)
                    {
                        _dataWatcher = new DataWatcher();
                    }
                }
            }
            return _dataWatcher;
        }

        public WebSocket WebSocket { get; set; }


        public  List<DynamicDelegate> CallBackSpes;

        public  List<DynamicDelegate> CallBackRecords;

        public  List<DynamicDelegate> CallBackAlarms;

        /// <summary>
        /// 更新數據
        /// </summary>
        /// <param name="dyobj"></param>
        /// <param name="type"></param>
        public void Updata(dynamic dyobj, CallBackType type)
        {
            switch (type)
            {
                case CallBackType.Spe:
                    if (CallBackSpes.Count > 0)
                    {
                        foreach (var callback in CallBackSpes)
                        {
                            callback.Invoke(dyobj, WebSocket);
                        }
                    }
                    //CallBackSpes?.Invoke(dyobj, WebSocket);
                    break;
                case CallBackType.Record:
                    if (CallBackRecords.Count > 0)
                    {
                        foreach (var callback in CallBackRecords)
                        {
                            callback.Invoke(dyobj, WebSocket);
                        }
                    }
                    //CallBackRecords?.Invoke(dyobj, WebSocket);
                    break;
                case CallBackType.Alarm:
                    if (CallBackAlarms.Count > 0)
                    {
                        foreach (var callback in CallBackAlarms)
                        {
                            callback.Invoke(dyobj, WebSocket);
                        }
                    }
                    //CallBackAlarms?.Invoke(dyobj, WebSocket);
                    break;
            }
        }


    }
    public enum CallBackType
    {
        Spe, Record, Alarm
    }

    public delegate void DynamicDelegate(dynamic dynamic, WebSocket ws);
   public class subscribeController : ApiController
    {
        private DataWatcher dataWatcher;
        public subscribeController()
        {
            dataWatcher = DataWatcher.GetInit();//獲取Datawatcher實例
            if (!dataWatcher.CallBackRecords.Contains(RealtimeDataCallBack))
            {
                dataWatcher.CallBackRecords.Add(RealtimeDataCallBack);
         //注冊委托,若無特殊的場景需求可以將委托列表封裝為事件event } } public HttpResponseMessage Get() {        //對原請求返回101變更協議狀態碼,開啟一個異步線程通過websocket進行數據傳輸 if (HttpContext.Current.IsWebSocketRequest) { HttpContext.Current.AcceptWebSocketRequest(CreateWS); } return new HttpResponseMessage(HttpStatusCode.SwitchingProtocols); } private async Task CreateWS(AspNetWebSocketContext aspNetWebSocketContext) { var socket = aspNetWebSocketContext.WebSocket; dataWatcher.WebSocket = socket; //將上下文中的websocket注入到Datawatcher中 //防止后台線程將socket釋放 while (true) { if (socket.State != WebSocketState.Open) { break; } } } private void WriteJson(string res, WebSocket socket) { lock (this) { res = res.Replace("\\", "").Replace("\"{", "{").Replace("\"}", "}").Replace("\"[", "["); ArraySegment<byte> bytes = new ArraySegment<byte>(Encoding.UTF8.GetBytes(res)); if (socket != null && socket.State == WebSocketState.Open) { socket.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None); Thread.Sleep(1000); } } } private void RealtimeDataCallBack(dynamic dyobj, WebSocket ws) { var templist = new List<string>(); templist.Add(dyobj.value); var temp = new Dictionary<string, string>(); temp.Add(dyobj.stationid, new { doseRateNaI = templist.toListString() }.ToJson()); var res = new { datastreamUpdate = temp.ToJson() }.ToJson(); WriteJson(res, ws); } }

  Datawatcher的設計是作為一個中間人的角色,既要提供調用回調的方法,也要提供websocket 的注入接口。考慮到Datawatcher是單例,故采用屬性的方式為websocket提供外部訪問(此處的設計有待商榷)。在外部注入了websocket之后,在合適的位置進行回調。注意:數據的處理是在監聽時進行一次封裝處理,在進行回調的時候再進行一次封裝處理,如果類型是固定統一的可以只在監聽的時候進行封裝處理即可。同時應注意對回調方法中websocket的空判斷增強健壯性。
  以上代碼為提供思路的簡易demo,使用時需要根據自己場景需求相應修改。

------------------------------------------------------------------------------分隔符----------------------------------------------------------------------------

存疑1:webapi中的controller是否是單例?(當然Mvc等的控制器也是同問)

           如果是單例,Datawatcher的委托注冊可以在控制器的構造方法中注冊(前提是允許訪問公有的構造方法);如果不是單例,可以在post或者get中注冊委托,此時一定要判斷委托是否被注冊過,避免重復注冊。這是采用委托列表,而不用事件的一個原因。事件的內部委托鏈表是不允許外部直接訪問的,因此判斷是否注冊不是很方便。如果事件允許不判斷,直接注冊或注銷委托不會拋出異常,即如果已經注冊不會重復注冊,保證委托唯一性,如果已經注銷,再次注銷不會拋出異常,那么Datawatcher中的設計應該采用事件,確保封裝的安全性。(由於未對事件做深入了解,故此處存疑暫時保留)

釋疑1:經測試,controller不是單例(起碼webapi中不是)。測試方法,給控制器添加一個屬性和一個公有構造方法,構造方法給屬性賦一個初始值。寫一個訪問方法,每次給屬性加1,並返回。通過頁面進行訪問,查看api返回結果。或者打一個斷點在構造方法中,通過頁面訪問。

經測試,事件event中的委托鏈表在注銷未注冊的委托方法的時候不會拋出異常。注銷的時候,會在委托鏈表中查找對應的方法,如果找到則注銷掉,如果未找到也不會拋出異常。

參考資料:http://blog.csdn.net/snakorse/article/details/44208351

存疑2:websocket的async Task 為什么使用死循環?

釋疑2:websocket不是持續的,其中的代碼只執行一次,為了保持實時性,他的持續需要一個死循環。循環的作用只在於保持連接不斷開,真正的數據傳輸則是由Datawatcher來處理。每監聽到一條數據,會調用一次Datawatcher的update,這個時候如果websocket是打開的,數據則會被發送,如果此時socket為空或者關閉,則放棄發送或者插入緩存中等下次打開連接再發送,這部分的處理則需要根據業務需求來判斷。
強調:以上demo只提供思路,確認這種處理是可行的。關於存疑部分的思考,我會盡快驗證。

 ------------------------------------------------------------------------------最終代碼-------------------------------------------------------------------------------------------

    public class DataWatcher
    {
        private static DataWatcher _dataWatcher;
        static readonly object locker = new object();

        private DataWatcher()
        {
        }

        public static DataWatcher GetInit()
        {
            if (_dataWatcher == null)
            {
                lock (locker)
                {
                    if (_dataWatcher == null)
                    {
                        _dataWatcher = new DataWatcher();
                    }
                }
            }
            return _dataWatcher;
        }

        public WebSocket WebSocket { get; set; }
     //委托列表被事件替代
        public event DynamicDelegate CallbackRecoedEvent;
        public event DynamicDelegate CallbackAlarmEvent;
        public event DynamicDelegate CallbackSpeEvent;

        /// <summary>
        /// 更新譜圖
        /// </summary>
        /// <param name="dyobj"></param>
        /// <param name="type"></param>
        public void Update(dynamic dyobj, CallBackType type)
        {
            switch (type)
            {
                case CallBackType.Spe:
                  CallbackSpeEvent?.Invoke(dyobj,WebSocket);
                    break;
                case CallBackType.Record:
                   CallbackRecoedEvent?.Invoke(dyobj,WebSocket);
                    break;
                case CallBackType.Alarm:
                   CallbackAlarmEvent?.Invoke(dyobj,WebSocket);
                    break;
            }
        }       
    }
    public class subscribeController : ApiController
    {
        public HttpResponseMessage Get()
        {
            if (HttpContext.Current.IsWebSocketRequest)
            {
                HttpContext.Current.AcceptWebSocketRequest(CreateWS);
            }
            return new HttpResponseMessage(HttpStatusCode.SwitchingProtocols);
        }

        private async Task CreateWS(AspNetWebSocketContext aspNetWebSocketContext)
        {
            var socket = aspNetWebSocketContext.WebSocket;
            var dataWatcher = DataWatcher.GetInit();
            dataWatcher.WebSocket = socket;
            //
            dataWatcher.CallbackRecoedEvent -= RealtimeDataCallBack;
            dataWatcher.CallbackRecoedEvent += RealtimeDataCallBack;
            //防止后台線程將socket釋放
            while (true)
            {
                if (socket.State != WebSocketState.Open)
                {
                    break;
                }
            }
        }

        private void WriteJson(string res, WebSocket socket)
        {
            lock (this)
            {
                res = res.Replace("\\", "").Replace("\"{", "{").Replace("\"}", "}").Replace("\"[", "[");
                ArraySegment<byte> bytes = new ArraySegment<byte>(Encoding.UTF8.GetBytes(res));
                if (socket != null && socket.State == WebSocketState.Open)
                {
                    socket.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None);
                    Thread.Sleep(1000);
                }
            }
        }

        private void RealtimeDataCallBack(dynamic dyobj, WebSocket ws)
        {
            var templist = new List<string>();
            templist.Add(dyobj.value);
            var temp = new Dictionary<string, string>();
            temp.Add(dyobj.stationid, new { doseRateNaI = templist.toListString() }.ToJson());
            var res = new { datastreamUpdate = temp.ToJson() }.ToJson();
            WriteJson(res, ws);
        }
    }

  由於確定了controller非單例,所以沒有必要在構造中為Datawatcher注冊事件,可以在其他合適的地方注冊。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM