最近項目中用到了webSocket服務,由后台實時向所有的前端推送消息,前端暫時是不可以發消息給后端的,數據的來源是由具體的設備數據收集器收集起來,然后通過socket推送給后端,后端收到數據后,再將這些數據推送給前端。
聽起來業務邏輯有點復雜。其實單獨的實現socket或websocket都比較簡單,但是二者之間的數據傳輸問題,困擾了我很久。也想過用redis做一個消息隊列,將socket接收到的數據處理后丟進去,然后再用websocket從redis里取出數據,再推送給前端。
但是。問題來了,這么配置的話,一個簡單的功能,要另外再加一個服務出來,配置起來好麻煩的感覺。再說了,目前的業務邏輯是,數據不做任何處理就直接扔出去了,干嘛要一層一層的來設計這些東西呢?雖然它有很好的模式和很高的擴展性,可我就是懶的去寫多余的代碼來配置這些東西。so,本着能懶就懶的原則,我整出來一套自己適合的方案來做這個事情。
思路:socket推送給后端的數據是實時的,有則推送,沒有就一邊呆着,等消息發過來。所以呢,我干嘛不弄個http接口來接收呢,本來數據就不多,老半天才會推一條出來,有時候一天都不會有幾條數據,所以,搞一個socket還不如直接提供一個HTTP接口來接收數據來的划算,關鍵是代碼寫起來簡單啊。所以就有了這個:
@RequestMapping(value = "/socket", method = {RequestMethod.POST, RequestMethod.GET}) public void webSocket(HttpServletRequest request) { Map map = request.getParameterMap(); ... }
這個東西沒啥可說的,不用測試都知道沒問題。
好了,數據是接收到了,怎么發送給前端呢?我的想法是,把推送前端的代碼直接寫到上面的代碼體里面,這樣就能接到一個推送就直接廣播給前端,接不到數據就不推送,多好啊。
想象中的代碼應該是這樣的:
@RequestMapping(value = "/socket", method = {RequestMethod.POST, RequestMethod.GET}) public void webSocket(HttpServletRequest request) { Map map = request.getParameterMap(); ... // sendMessageToFront(message); }
如果想這樣寫,要么使用標簽注解,要么自定義一個方法,繼承websocket來實現功能。but how?
<坑里的生活就不播了,直接寫出坑后的成果吧>
標簽注解的方式或許可以實現,但是,這樣以來,就有三個URI提供給前端了,一個用來握手,一個用來發送消息,一個用來接收消息。好吧,前端也以懶為天,能少寫一個字母絕不多加半個符號。so,我的這種方案直接被否決了,所以得另尋出路。
然后就是寫個方法繼承websocket來實現這個功能了。代碼是這樣的:
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Resource private AliceWebSocketHandler webSocketHandler; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(webSocketHandler, "/webSocket").setAllowedOrigins("*") .addInterceptors(new AliceHandShakeInterceptor()); registry.addHandler(webSocketHandler, "/webSockJs").setAllowedOrigins("*") .addInterceptors(new AliceHandShakeInterceptor()).withSockJS(); } }
@Component public class AliceHandShakeInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { super.afterHandshake(request, response, wsHandler, exception); } }
@Component public class AliceWebSocketHandler extends TextWebSocketHandler { private static final Logger LOGGER = LoggerFactory.getLogger(AliceWebSocketHandler.class); private static Map<String, WebSocketSession> SESSION_MAP = Maps.newConcurrentMap(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { LOGGER.debug("[{} : {}] has be connected...", session.getUri(), session.getId()); SESSION_MAP.put(session.getId(), session); } @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { LOGGER.debug("[{} : {}]", session.getUri(), session.getId()); SESSION_MAP.remove(session.getId()); } @Override public boolean supportsPartialMessages() { return false; } /** * 群發消息 */ public void broadcast(final TextMessage message) throws IOException { for (Map.Entry<String, WebSocketSession> entry : SESSION_MAP.entrySet()) { if (entry.getValue().isOpen()) { new Thread(() -> { try { if (entry.getValue().isOpen()) { entry.getValue().sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } }).start(); } } } }
完事,方法體中,就直接調用broadcast方法就行了,推送消息服務完成。