物聯網架構成長之路(58)-用戶與應用服務器之WebSocket通信


一、前言

  之前的物聯網篇,有介紹過用戶手機APP或在H5端,可以作為mqtt的客戶端,通過tcp:1883或者Websocket:8083,連接到EMQ Broker上,訂閱設備的主題,從而收到設備上報的數據。但是,在處理工業物聯網的時候,會出現,物聯網平台會對接客戶自己的應用。設備上報與下發都是通過物聯網平台與客戶的應用服務器進行通信。客戶應用服務器轉發物聯網平台,設備上報的數據到客戶應用下自己的用戶終端(手機)。這個時候,就需要建立起客戶應用服務器與用戶的通道了。傳統上,可以使用http輪詢。也可以使用mqtt類似的進行通信。但是這里就介紹通過SpringBoot 簡單搭建一個WebSocket服務。建立起客戶應用服務器與用戶的通信。

二、簡單流程

  物聯網平台只做設備接入、設備管理、客戶接入、客戶管理。客戶自己的應用與終端客戶,則又客戶自行開發。從而進行解耦,這個流程方式與功能划分,與阿里的物聯網平台類似。后續我們也會基於物聯網平台開發一些標准化的應用。

三、后端代碼

  3.1 pom.xml

1 <dependency>
2     <groupId>org.springframework.boot</groupId>
3     <artifactId>spring-boot-starter-websocket</artifactId>
4 </dependency>

  3.2 config配置

  WebSocketConfig.java

 1 package com.wunaozai.demo.websocket.config;
 2 
 3 import org.springframework.context.annotation.Bean;
 4 import org.springframework.context.annotation.Configuration;
 5 import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 6 
 7 @Configuration
 8 public class WebSocketConfig {
 9 
10     /**
11      * 自動注入Websocket Bean
12      * @return
13      */
14     @Bean
15     public ServerEndpointExporter serverEndpointExporter() {
16         return new ServerEndpointExporter();
17     }
18 }

  GetHttpHeaderConfig.java

 1 package com.wunaozai.demo.websocket.config;
 2 
 3 import java.util.List;
 4 import java.util.Map;
 5 
 6 import javax.websocket.HandshakeResponse;
 7 import javax.websocket.server.HandshakeRequest;
 8 import javax.websocket.server.ServerEndpointConfig;
 9 import javax.websocket.server.ServerEndpointConfig.Configurator;
10 
11 import org.springframework.context.annotation.Bean;
12 import org.springframework.web.socket.server.standard.ServerEndpointExporter;
13 
14 /**
15  * 將請求中的Header設置到屬性中
16  * @author wunaozai
17  * @Date 2020-10-27
18  */
19 public class GetHttpHeaderConfigurator extends Configurator {
20 
21     @Override
22     public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
23         System.out.println("modifyHandshake 協議升級101");
24         Map<String, List<String>> headers = request.getHeaders();
25         config.getUserProperties().put("headers", headers);
26         super.modifyHandshake(config, request, response);
27     }
28     @Bean
29     public ServerEndpointExporter serverEndpointExporter() {
30         return new ServerEndpointExporter();
31     }
32 }

  3.3 service 服務層

  WebSocketService.java

 1 package com.wunaozai.demo.websocket.service;
 2 
 3 import java.io.IOException;
 4 import java.util.List;
 5 import java.util.Map;
 6 import java.util.concurrent.ConcurrentHashMap;
 7 
 8 import javax.websocket.EndpointConfig;
 9 import javax.websocket.OnClose;
10 import javax.websocket.OnMessage;
11 import javax.websocket.OnOpen;
12 import javax.websocket.Session;
13 import javax.websocket.server.PathParam;
14 import javax.websocket.server.ServerEndpoint;
15 
16 import org.springframework.stereotype.Component;
17 
18 import com.google.gson.Gson;
19 import com.wunaozai.demo.websocket.config.GetHttpHeaderConfigurator;
20 
21 /**
22  * 服務層
23  * 注意: 這里的ServerEndpoint不會進入攔截器
24  * @author wunaozai
25  * @Date 2020-10-27
26  */
27 @ServerEndpoint(value="/websocket/{token}", configurator=GetHttpHeaderConfigurator.class)
28 @Component
29 public class WebSocketService {
30 
31     //用來存放每個客戶端對應的WebSocketServer對象
32     private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
33 
34     /**
35      * 成功建立連接
36      * @param session
37      * @param token
38      */
39     @OnOpen
40     public void onOpen(Session session, @PathParam(value="token") String token, EndpointConfig config) {
41         try {
42             Map<String, List<String>> headers = 
43                     (Map<String, List<String>>) config.getUserProperties().get("headers");
44             System.out.println(new Gson().toJson(headers)); //這里可以通過Headers或者token進行認證
45             if(!checkToken(token)) {
46                 //校驗失敗,禁止建立連接
47                 session.close();    
48             }
49             System.out.println("建立連接: " + token);
50             sessionPools.put(token, session);
51         } catch (Exception e) {
52             e.printStackTrace();
53         }
54     }
55     
56     @OnClose
57     public void onClose(@PathParam(value="token") String token) {
58         sessionPools.remove(token);
59         System.out.println("斷開連接: " + token);
60     }
61     
62     @OnMessage
63     public void onMessage(@PathParam(value="token")String token, String message) throws IOException {
64         System.out.println("收到客戶端發來信息: " + message);
65     }
66     
67     //發送廣播信息
68     public void sendBroadCast(String message) throws IOException {
69         //這里從第三方服務或調用該接口向所有客戶端進行廣播信息
70         for(Session session: sessionPools.values()) {
71             sendMessage(session, "{\"msg\":\""+message+"\"}");
72         }
73     }
74     //指定單個發送數據
75     public void sendMessage(Session session, String message) throws IOException {
76         if(session != null && session.isOpen()) {
77             synchronized (session) {
78                 session.getBasicRemote().sendText(message);
79             }
80         }
81     }
82     
83     public String getToken() {
84         //這里隨機生成Token,並緩存到Redis,設置10分鍾過期策略
85         return "access_token_random_qcb0a6S";
86     }
87     public boolean checkToken(String token) {
88         if("access_token_random_qcb0a6S".equals(token)) {
89             return true;
90         }
91         return false;
92     }
93 }

  3.4 controller 控制器

  WebSocketController.java

 1 package com.wunaozai.demo.websocket.controller;
 2 
 3 import java.io.IOException;
 4 
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.stereotype.Controller;
 7 import org.springframework.web.bind.annotation.RequestMapping;
 8 import org.springframework.web.bind.annotation.ResponseBody;
 9 
10 import com.wunaozai.demo.websocket.service.WebSocketService;
11 
12 @Controller
13 @RequestMapping(value="/websocket")
14 public class WebSocketController {
15 
16     @Autowired
17     private WebSocketService websocketService;
18     
19     @RequestMapping(value="index")
20     public String index() {
21         return "websocket/index";
22     }
23     
24     @ResponseBody
25     @RequestMapping(value="send")
26     public String sendBroadCast(String msg) throws IOException {
27         websocketService.sendBroadCast(msg);
28         return "ok.";
29     }
30     @ResponseBody
31     @RequestMapping(value="token")
32     public String getToken() {
33         //這里可以判斷請求是否合法,查詢數據庫
34         //如果在攔截器已經做處理的,就不需要進行判斷
35         return websocketService.getToken();
36     }
37     
38 }

  3.5 目錄結構

  

 

四、前端代碼

 1 <!DOCTYPE html>
 2 <html lang="en">
 3   <head>
 4     <meta charset="utf-8">
 5     <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
 6     <meta name="description" content="">
 7     <meta name="author" content="">
 8     <title>Websocket 例子</title>
 9   </head>
10   <body>
11   <button id="btn-token">獲取Token</button>
12   <input id="token" placeholder="請填寫Token"/>
13   <br>
14   <button id="btn-connect">連接WebSocket</button>
15   <input id="message" value="Hello" placeholder="請填寫要發送的信息"  />
16   <button id="btn-send">發送信息</button>
17   </body>
18   
19   <script type="text/javascript" src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
20   <script type="text/javascript">
21     jQuery(function(){
22         initEvent();
23     });
24     
25     function initEvent(){
26         $("#btn-token").bind('click', function(){
27             $.post('/websocket/token', function(ret){
28                 console.log(ret);
29                 $("#token").val(ret);
30             });
31         });
32         $("#btn-connect").bind('click', function(){
33             openSocket();
34         });
35         $("#btn-send").bind('click', function(){
36             sendMessage();
37         });
38     }
39     var socket;
40     function openSocket() {
41         if(typeof(WebSocket) == "undefined") {
42             console.log("您的瀏覽器不支持WebSocket");
43         }else{
44             console.log("您的瀏覽器支持WebSocket");
45             //實現化WebSocket對象,指定要連接的服務器地址與端口  建立連接
46             var token = $("#token").val();
47             var socketUrl="ws://127.0.0.1:8080/websocket/"+token;
48             console.log(socketUrl);
49             if(socket!=null){
50                 socket.close();
51                 socket=null;
52             }
53             socket = new WebSocket(socketUrl);
54             //打開事件
55             socket.onopen = function() {
56                 console.log("websocket已打開");
57                 //socket.send("這是來自客戶端的消息" + location.href + new Date());
58             };
59             //獲得消息事件
60             socket.onmessage = function(msg) {
61                 var serverMsg = "收到服務端信息:" + msg.data;
62                 console.log(serverMsg);
63                 //發現消息進入    開始處理前端觸發邏輯
64             };
65             //關閉事件
66             socket.onclose = function() {
67                 console.log("websocket已關閉");
68             };
69             //發生了錯誤事件
70             socket.onerror = function() {
71                 console.log("websocket發生了錯誤");
72             }
73         }
74     }
75     function sendMessage() {
76         if(typeof(WebSocket) == "undefined") {
77             console.log("您的瀏覽器不支持WebSocket");
78         }else {
79             // console.log("您的瀏覽器支持WebSocket");
80             var message = $("#message").val();
81             var msg = '{"contentText":"'+message+'"}';
82             console.log(msg);
83             socket.send(msg);
84         }
85     }
86   </script>
87 </html>

 

五、運行效果

1. 訪問頁面,然后請求獲取Token,獲取到【access_token_random_qcb0a6S】,然后拼接EndPoint,ws://127.0.0.1:8080/websocket/access_token_random_qcb0a6S

2. 服務器執行協議升級握手協議101,並將請求里面的Headers設置到當前請求的Session中

3. 完成協議升級。由Http協議轉WebSocket協議。進入WebSocketService的OnOpen方法。

4. OnOpen方法,判斷當前ws連接的token或者headers里面是否包含認證信息(比如jwt或oauth、自定義token等方式)

5. 完成連接建立,將ws連接session放到session池里面,以供下一步廣播信息給ws客戶端

6. ws客戶發送Hello到服務器,服務器可以做數據處理應答等業務操作

7. 通過請求 http://127.0.0.1:8080/websocket/send?msg=Hi 模擬【物聯網平台】通過rpc或者amqp等遠程調用協議,調用發送設備上報的信息到ws客戶端

8.1 假如廣播設備上報信息給所有客戶端,那么將從session池中遍歷所有ws客戶端,然后依次發送數據。

8.2 假如是單個發送數據。那么需要根據設備的ID信息,查詢設備與用戶的綁定信息,再轉發數據給ws客戶端(用戶)

9. 客戶端(手機APP、H5等)通過彈框等方式,提醒用戶

 

參考資料:

          https://www.cnblogs.com/JohanChan/p/12522001.html

 

本文地址:https://www.cnblogs.com/wunaozai/p/13889216.html
本系列目錄: https://www.cnblogs.com/wunaozai/p/8067577.html
個人主頁:https://www.wunaozai.com/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM