應用場景:前端頁面發起一個websocket請求與后端進行實時通訊。后端監聽某端口獲取數據,將監聽到的數據加工處理,通過websocket發送到前端。
這里只提供后台的處理方案僅供參考。
1.后端監聽某端口,獲取數據並數據處理。可以在Global中單獨開啟一個后台線程用來監聽數據。數據處理交給datawatcher的單例對象來處理。由於是監控端口的工作,一般采用獨立線程在項目啟動的時候就進行監聽,因此可以將代碼放在Application_start中。
2.datawatcher對象,它需要有個委托隊列,允許外部進行注冊和刪除。原理同事件類似,但是采用委托隊列並將其暴露主要是為了容錯,因為可能會用到刪除委托隊列中的委托。嚴謹的事務邏輯應該用事件(事件有點久了,忘了事件怎么寫了)
3.webapi建立websocket。
以下為代碼部分:
protected void Application_Start() { //webapi相關配置 AreaRegistration.RegisterAllAreas(); GlobalConfiguration.Configure(WebApiConfig.Register); FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters); RouteConfig.RegisterRoutes(RouteTable.Routes); BundleConfig.RegisterBundles(BundleTable.Bundles); //todo 初始化datawatcher 監聽實時數據 //偽造實時數據 Thread bgthread = new Thread(Start); bgthread.IsBackground = true; bgthread.Start(); } //除紅色部分均為模擬實時數據,正常數據應是通過監聽獲取 private void Start() { Random r = new Random(); var datawatcher = DataWatcher.GetInit(); while (true) { dynamic dyobj = new ExpandoObject(); dyobj.stationid = "01"; dyobj.value = new { at = DateTime.Now.ToTimeStampMS(), value = (r.Next(0, 1024)) }.ToJson(); datawatcher.Updata(dyobj, CallBackType.Record); Thread.Sleep(1000); } }
//使用單例是為了確保項目中使用的是同一個對象,防止注冊的callback丟失 public class DataWatcher { private static DataWatcher _dataWatcher; static readonly object locker = new object(); private DataWatcher() { CallBackAlarms = new List<DynamicDelegate>(); CallBackRecords = new List<DynamicDelegate>(); CallBackSpes = new List<DynamicDelegate>(); } public static DataWatcher GetInit() { if (_dataWatcher == null) { lock (locker) { if (_dataWatcher == null) { _dataWatcher = new DataWatcher(); } } } return _dataWatcher; } public WebSocket WebSocket { get; set; } public List<DynamicDelegate> CallBackSpes; public List<DynamicDelegate> CallBackRecords; public List<DynamicDelegate> CallBackAlarms; /// <summary> /// 更新數據 /// </summary> /// <param name="dyobj"></param> /// <param name="type"></param> public void Updata(dynamic dyobj, CallBackType type) { switch (type) { case CallBackType.Spe: if (CallBackSpes.Count > 0) { foreach (var callback in CallBackSpes) { callback.Invoke(dyobj, WebSocket); } } //CallBackSpes?.Invoke(dyobj, WebSocket); break; case CallBackType.Record: if (CallBackRecords.Count > 0) { foreach (var callback in CallBackRecords) { callback.Invoke(dyobj, WebSocket); } } //CallBackRecords?.Invoke(dyobj, WebSocket); break; case CallBackType.Alarm: if (CallBackAlarms.Count > 0) { foreach (var callback in CallBackAlarms) { callback.Invoke(dyobj, WebSocket); } } //CallBackAlarms?.Invoke(dyobj, WebSocket); break; } } } public enum CallBackType { Spe, Record, Alarm } public delegate void DynamicDelegate(dynamic dynamic, WebSocket ws);
public class subscribeController : ApiController { private DataWatcher dataWatcher; public subscribeController() { dataWatcher = DataWatcher.GetInit();//獲取Datawatcher實例 if (!dataWatcher.CallBackRecords.Contains(RealtimeDataCallBack)) { dataWatcher.CallBackRecords.Add(RealtimeDataCallBack);
//注冊委托,若無特殊的場景需求可以將委托列表封裝為事件event } } public HttpResponseMessage Get() { //對原請求返回101變更協議狀態碼,開啟一個異步線程通過websocket進行數據傳輸 if (HttpContext.Current.IsWebSocketRequest) { HttpContext.Current.AcceptWebSocketRequest(CreateWS); } return new HttpResponseMessage(HttpStatusCode.SwitchingProtocols); } private async Task CreateWS(AspNetWebSocketContext aspNetWebSocketContext) { var socket = aspNetWebSocketContext.WebSocket; dataWatcher.WebSocket = socket; //將上下文中的websocket注入到Datawatcher中 //防止后台線程將socket釋放 while (true) { if (socket.State != WebSocketState.Open) { break; } } } private void WriteJson(string res, WebSocket socket) { lock (this) { res = res.Replace("\\", "").Replace("\"{", "{").Replace("\"}", "}").Replace("\"[", "["); ArraySegment<byte> bytes = new ArraySegment<byte>(Encoding.UTF8.GetBytes(res)); if (socket != null && socket.State == WebSocketState.Open) { socket.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None); Thread.Sleep(1000); } } } private void RealtimeDataCallBack(dynamic dyobj, WebSocket ws) { var templist = new List<string>(); templist.Add(dyobj.value); var temp = new Dictionary<string, string>(); temp.Add(dyobj.stationid, new { doseRateNaI = templist.toListString() }.ToJson()); var res = new { datastreamUpdate = temp.ToJson() }.ToJson(); WriteJson(res, ws); } }
Datawatcher的設計是作為一個中間人的角色,既要提供調用回調的方法,也要提供websocket 的注入接口。考慮到Datawatcher是單例,故采用屬性的方式為websocket提供外部訪問(此處的設計有待商榷)。在外部注入了websocket之后,在合適的位置進行回調。注意:數據的處理是在監聽時進行一次封裝處理,在進行回調的時候再進行一次封裝處理,如果類型是固定統一的可以只在監聽的時候進行封裝處理即可。同時應注意對回調方法中websocket的空判斷增強健壯性。
以上代碼為提供思路的簡易demo,使用時需要根據自己場景需求相應修改。
------------------------------------------------------------------------------分隔符----------------------------------------------------------------------------
存疑1:webapi中的controller是否是單例?(當然Mvc等的控制器也是同問)
如果是單例,Datawatcher的委托注冊可以在控制器的構造方法中注冊(前提是允許訪問公有的構造方法);如果不是單例,可以在post或者get中注冊委托,此時一定要判斷委托是否被注冊過,避免重復注冊。這是采用委托列表,而不用事件的一個原因。事件的內部委托鏈表是不允許外部直接訪問的,因此判斷是否注冊不是很方便。如果事件允許不判斷,直接注冊或注銷委托不會拋出異常,即如果已經注冊不會重復注冊,保證委托唯一性,如果已經注銷,再次注銷不會拋出異常,那么Datawatcher中的設計應該采用事件,確保封裝的安全性。(由於未對事件做深入了解,故此處存疑暫時保留)
釋疑1:經測試,controller不是單例(起碼webapi中不是)。測試方法,給控制器添加一個屬性和一個公有構造方法,構造方法給屬性賦一個初始值。寫一個訪問方法,每次給屬性加1,並返回。通過頁面進行訪問,查看api返回結果。或者打一個斷點在構造方法中,通過頁面訪問。
經測試,事件event中的委托鏈表在注銷未注冊的委托方法的時候不會拋出異常。注銷的時候,會在委托鏈表中查找對應的方法,如果找到則注銷掉,如果未找到也不會拋出異常。
參考資料:http://blog.csdn.net/snakorse/article/details/44208351
存疑2:websocket的async Task 為什么使用死循環?
釋疑2:websocket不是持續的,其中的代碼只執行一次,為了保持實時性,他的持續需要一個死循環。循環的作用只在於保持連接不斷開,真正的數據傳輸則是由Datawatcher來處理。每監聽到一條數據,會調用一次Datawatcher的update,這個時候如果websocket是打開的,數據則會被發送,如果此時socket為空或者關閉,則放棄發送或者插入緩存中等下次打開連接再發送,這部分的處理則需要根據業務需求來判斷。
強調:以上demo只提供思路,確認這種處理是可行的。關於存疑部分的思考,我會盡快驗證。
------------------------------------------------------------------------------最終代碼-------------------------------------------------------------------------------------------
public class DataWatcher { private static DataWatcher _dataWatcher; static readonly object locker = new object(); private DataWatcher() { } public static DataWatcher GetInit() { if (_dataWatcher == null) { lock (locker) { if (_dataWatcher == null) { _dataWatcher = new DataWatcher(); } } } return _dataWatcher; } public WebSocket WebSocket { get; set; } //委托列表被事件替代 public event DynamicDelegate CallbackRecoedEvent; public event DynamicDelegate CallbackAlarmEvent; public event DynamicDelegate CallbackSpeEvent; /// <summary> /// 更新譜圖 /// </summary> /// <param name="dyobj"></param> /// <param name="type"></param> public void Update(dynamic dyobj, CallBackType type) { switch (type) { case CallBackType.Spe: CallbackSpeEvent?.Invoke(dyobj,WebSocket); break; case CallBackType.Record: CallbackRecoedEvent?.Invoke(dyobj,WebSocket); break; case CallBackType.Alarm: CallbackAlarmEvent?.Invoke(dyobj,WebSocket); break; } } }
public class subscribeController : ApiController { public HttpResponseMessage Get() { if (HttpContext.Current.IsWebSocketRequest) { HttpContext.Current.AcceptWebSocketRequest(CreateWS); } return new HttpResponseMessage(HttpStatusCode.SwitchingProtocols); } private async Task CreateWS(AspNetWebSocketContext aspNetWebSocketContext) { var socket = aspNetWebSocketContext.WebSocket; var dataWatcher = DataWatcher.GetInit(); dataWatcher.WebSocket = socket; // dataWatcher.CallbackRecoedEvent -= RealtimeDataCallBack; dataWatcher.CallbackRecoedEvent += RealtimeDataCallBack; //防止后台線程將socket釋放 while (true) { if (socket.State != WebSocketState.Open) { break; } } } private void WriteJson(string res, WebSocket socket) { lock (this) { res = res.Replace("\\", "").Replace("\"{", "{").Replace("\"}", "}").Replace("\"[", "["); ArraySegment<byte> bytes = new ArraySegment<byte>(Encoding.UTF8.GetBytes(res)); if (socket != null && socket.State == WebSocketState.Open) { socket.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None); Thread.Sleep(1000); } } } private void RealtimeDataCallBack(dynamic dyobj, WebSocket ws) { var templist = new List<string>(); templist.Add(dyobj.value); var temp = new Dictionary<string, string>(); temp.Add(dyobj.stationid, new { doseRateNaI = templist.toListString() }.ToJson()); var res = new { datastreamUpdate = temp.ToJson() }.ToJson(); WriteJson(res, ws); } }
由於確定了controller非單例,所以沒有必要在構造中為Datawatcher注冊事件,可以在其他合適的地方注冊。
