springboot使用websocket進行實時傳送信息實戰(包含服務端和客戶端)


1.背景


 

公司對接告警信息平台,需要做一個服務,作為websocket客戶端去接收傳過來的信息,實時返回信息並對信息進行處理入庫

 

2.實現方案


 

本來想用一個服務,對信息進行接收和處理。但是基於之前的經驗,為了防止服務部署重啟的時候丟失信息,改用兩個服務:1.collcet接收服務,2.deal-send處理入庫服務。collect服務作為websocket客戶端,進行接收傳過來的信息,並同時作為websocket服務端,傳信息給deal-send服務;deal-send作為websocket客戶端,接收collect傳過來的信息,進行處理入庫。

 

3.代碼實例


 

  • collect服務主要依賴

pom.xml

 1         <dependency>
 2             <groupId>org.springframework.boot</groupId>
 3             <artifactId>spring-boot-starter-websocket</artifactId>
 4         </dependency>
 5 
 6         <!--websocket作為客戶端-->
 7         <dependency>
 8             <groupId>org.java-websocket</groupId>
 9             <artifactId>Java-WebSocket</artifactId>
10             <version>1.4.0</version>
11         </dependency>
12 
13         <!--使用Spring RetryTemplate進行重試-->
14         <dependency>
15             <groupId>org.springframework.retry</groupId>
16             <artifactId>spring-retry</artifactId>
17             <version>1.2.4.RELEASE</version>
18         </dependency>

 

  • deal-send服務主要依賴

pom.xml

1         <!--websocket作為客戶端-->
2         <dependency>
3             <groupId>org.java-websocket</groupId>
4             <artifactId>Java-WebSocket</artifactId>
5             <version>1.4.0</version>
6         </dependency>

 

  •  collect配置類

WebSocketConfig.java

 1 package com.newland.auto.pilot.collect.config;
 2 
 3 import org.springframework.context.annotation.Bean;
 4 import org.springframework.context.annotation.Configuration;
 5 import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 6 
 7 /**
 8  * @author:wk
 9  * @date:2020/6/10 開啟WebSocket支持
10  */
11 
12 @Configuration
13 public class WebSocketConfig {
14 
15     @Bean
16     public ServerEndpointExporter serverEndpointExporter() {
17         return new ServerEndpointExporter();
18     }
19 
20 }

 

  • collect服務websocket客戶端,由於告警信息平台偶爾會斷網,websocket的長連接就會被斷開,所以在onClose方法里加入了spring-retry提供的重試方法,在斷開websocket連接的時候會進行重連操作。onMessage方法里加了發送信息到deal-send服務的調用,在接收到告警信息平台的時候,會將信息發到deal-send服務。

MyWebSocketCient.java

 1 package com.newland.auto.pilot.collect.config;
 2 
 3 import com.newland.auto.pilot.collect.ApplicationRunnerImpl;
 4 import lombok.extern.slf4j.Slf4j;
 5 import org.java_websocket.client.WebSocketClient;
 6 import org.java_websocket.handshake.ServerHandshake;
 7 import org.springframework.retry.RecoveryCallback;
 8 import org.springframework.retry.RetryCallback;
 9 import org.springframework.retry.RetryContext;
10 import org.springframework.retry.policy.AlwaysRetryPolicy;
11 import org.springframework.retry.support.RetryTemplate;
12 
13 import java.net.URI;
14 import java.util.Map;
15 
16 /**
17  * @author:wk
18  * @date:2020/5/29
19  */
20 @Slf4j
21 public class MyWebSocketClient extends WebSocketClient {
22 
23     public MyWebSocketClient(URI serverUri, Map<String, String> headers) {
24         super(serverUri, headers);
25     }
26 
27     @Override
28     public void onOpen(ServerHandshake arg0) {
29         log.info("------ MyWebSocket onOpen ------");
30     }
31 
32     @Override
33     public void onClose(int arg0, String arg1, boolean arg2) {
34         log.info("------ MyWebSocket onClose ------{}", arg1);
35         log.info("啟用斷開重連!");
36 
37         try {
38             // Spring-retry提供了RetryOperations接口的實現類RetryTemplate
39             RetryTemplate template = new RetryTemplate();
40 
41             // 一直重試策略
42             AlwaysRetryPolicy policy = new AlwaysRetryPolicy();
43 
44             // 設置重試策略
45             template.setRetryPolicy(policy);
46 
47             // 執行
48             String result = template.execute(
49                     new RetryCallback<String, Exception>() {
50 
51                         public String doWithRetry(RetryContext context) throws Exception {
52                             log.info("------------------------執行關閉連接重試操作--------------------------");
53                             // 調用連接websocket方法
54                             ApplicationRunnerImpl.connectWebsocket();
55 
56                             return "------------------------重連成功--------------------------";
57                         }
58                     },
59                     // 當重試執行完閉,操作還未成為,那么可以通過RecoveryCallback完成一些失敗事后處理。
60                     new RecoveryCallback<String>() {
61                         public String recover(RetryContext context) throws Exception {
62 
63                             return "------------------------重試執行完閉,操作未完成!!--------------------------";
64                         }
65                     }
66             );
67             log.info(result);
68         } catch (Exception ex) {
69             log.error(ex.getMessage(), ex);
70         }
71     }
72 
73     @Override
74     public void onError(Exception arg0) {
75         log.info("------ MyWebSocket onError ------{}", arg0);
76     }
77 
78     @Override
79     public void onMessage(String arg0) {
80         log.info("-------- 接收到服務端數據: " + arg0 + "--------");
81         try {
82             WebSocketServer.sendInfo(arg0, "deal-send");
83         } catch (Exception e) {
84             log.info("發送消息到deal-send客戶端失敗,信息:" + e.getMessage());
85         }
86     }
87 }

 

  • collect服務的websocket服務端,提供給deal-send服務連接,sendInfo方法里加入對用戶的監聽,deal-send用戶在線時發送信息,不在線時遠程調用deal-send服務的重連接口,在deal-send服務的websocket客戶端斷開連接時進行主動調用,使deal-send服務的websocket重新連接上。

WebSocketServer.java

  1 package com.newland.auto.pilot.collect.config;
  2 
  3 
  4 import lombok.extern.slf4j.Slf4j;
  5 import org.apache.commons.lang3.StringUtils;
  6 import org.springframework.http.ResponseEntity;
  7 import org.springframework.stereotype.Component;
  8 import org.springframework.web.client.RestTemplate;
  9 
 10 import javax.websocket.*;
 11 import javax.websocket.server.PathParam;
 12 import javax.websocket.server.ServerEndpoint;
 13 import java.io.IOException;
 14 import java.util.concurrent.ConcurrentHashMap;
 15 import java.util.concurrent.atomic.LongAdder;
 16 
 17 /**
 18  * @author:wk
 19  * @date:2020/6/10
 20  */
 21 @Slf4j
 22 @ServerEndpoint("/server/ws/{userId}")
 23 @Component
 24 public class WebSocketServer {
 25 
 26     /**
 27      * LongAdder記錄當前在線連接數,線程安全。
 28      */
 29     LongAdder onlineCounter = new LongAdder();
 30     /**
 31      * concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
 32      */
 33     private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
 34     /**
 35      * 與某個客戶端的連接會話,需要通過它來給客戶端發送數據
 36      */
 37     private Session session;
 38     /**
 39      * 接收userId
 40      */
 41     private String userId = "";
 42     /**
 43      * 調用deal-send重連接口的uri
 44      */
 45     static String reConnectWebsocketUri = "http://localhost:9042/newland/reConnectWebsocket";
 46 
 47 
 48     /**
 49      * 連接建立成功調用的方法
 50      */
 51     @OnOpen
 52     public void onOpen(Session session, @PathParam("userId") String userId) {
 53         this.session = session;
 54         log.info("onopen,this.session:" + session);
 55         this.userId = userId;
 56         if (webSocketMap.containsKey(userId)) {
 57             webSocketMap.remove(userId);
 58             webSocketMap.put(userId, this);
 59             
 60         } else {
 61             webSocketMap.put(userId, this);
 62             
 63             onlineCounter.increment();
 64             //在線數加1
 65         }
 66 
 67         log.info("用戶連接:" + userId + ",當前在線人數為:" + onlineCounter.sum());
 68 
 69         try {
 70             sendMessage("連接成功");
 71         } catch (IOException e) {
 72             log.error("用戶:" + userId + ",網絡異常!!!!!!");
 73         }
 74     }
 75 
 76     /**
 77      * 連接關閉調用的方法
 78      */
 79     @OnClose
 80     public void onClose() {
 81         if (webSocketMap.containsKey(userId)) {
 82             webSocketMap.remove(userId);
 83             //從set中刪除
 84             onlineCounter.decrement();
 85         }
 86         log.info("用戶退出:" + userId + ",當前在線人數為:" + onlineCounter.sum());
 87     }
 88 
 89     /**
 90      * 收到客戶端消息后調用的方法
 91      *
 92      * @param message 客戶端發送過來的消息
 93      */
 94     @OnMessage
 95     public void onMessage(String message, Session session) {
 96         log.info("用戶消息:" + userId + ",報文:" + message);
 97     }
 98 
 99     /**
100      * @param session
101      * @param error
102      */
103     @OnError
104     public void onError(Session session, Throwable error) {
105         log.error("用戶錯誤:" + this.userId + ",原因:" + error.getMessage());
106     }  
107 108 
109     /**
110      * 實現服務器主動推送
111      */
112     public void sendMessage(String message) throws IOException {
113         this.session.getBasicRemote().sendText(message);
114     }
115 
116 
117     /**
118      * 發送自定義消息
119      */
120     public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
121         log.info("發送消息到:" + userId);
122         if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
123             webSocketMap.get(userId).sendMessage(message);
124         } else {
125             log.error("用戶:" + userId + ",不在線!");
126             log.info("---------開始遠程調用重連websocket接口----------");
127             RestTemplate restTemplate = new RestTemplate(new HttpsClientRequestFactory());
128             ResponseEntity<String> response = restTemplate.getForEntity(reConnectWebsocketUri, String.class);
129 
130             log.info("執行請求后,返回response body:" + response.getBody());
131             // 判斷返回狀態是否為200
132             if (response.getStatusCodeValue() == 200) {
133                 log.info("返回狀態為200!");
134                 // 解析響應體
135                 String content = response.getBody();
136                 log.info(content);
137             } else {
138                 log.info("返回狀態為" + response.getStatusCodeValue() + "!");
139             }
140         }
141     }
142 
143 
144 }

 

  • deal-send服務的websocket客戶端連接

WebSocketConfig.java

 1 package com.newland.auto.pilot.deal.send.config;
 2 
 3 import com.alibaba.fastjson.JSON;
 4 import com.newland.auto.pilot.deal.send.alarmDeal.AlarmConvert;
 5 import com.newland.auto.pilot.deal.send.jms.FmSendJMSMgm;
 6 import com.newland.privatefm.alarmSend.AlarmMsg;
 7 import com.newland.privatefm.alarmSend.SendManager;
 8 import lombok.extern.slf4j.Slf4j;
 9 import org.apache.commons.lang3.StringUtils;
10 import org.java_websocket.client.WebSocketClient;
11 import org.java_websocket.drafts.Draft_6455;
12 import org.java_websocket.handshake.ServerHandshake;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.context.annotation.Bean;
15 import org.springframework.stereotype.Component;
16 
17 import java.net.URI;
18 import java.util.List;
19 
20 /**
21  * @author:wk
22  * @date:2020/6/12
23  * @Description: 配置websocket客戶端
24  */
25 
26 @Slf4j
27 @Component
28 public class WebSocketConfig {
29     @Autowired
30     private AlarmConvert alarmConvert;
31 
32     @Bean
33     public WebSocketClient webSocketClient() {
34         try {
35             WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://localhost:9041" + "/server/ws/deal-send"), new Draft_6455()) {
36                 @Override
37                 public void onOpen(ServerHandshake handshakedata) {
38                     log.info("------ MyWebSocket onOpen ------");
39                     log.info("FmSendJMSMgm Thread Begin ...");
40                     FmSendJMSMgm sendJMSMgm = new FmSendJMSMgm();
41                     if (sendJMSMgm.init()) {
42                         sendJMSMgm.start();
43                         log.info("FmSendJMSMgm Thread End\n");
44                     } else {
45                         log.error("FmSendJMSMgm Thread init failed, Program Exit!!!\n");
46                         System.exit(0);
47                     }
48                 }
49 
50                 @Override
51                 public void onMessage(String message) {
52                     log.info("-------- 接收到服務端數據: " + message + "--------");
53 
54                     if (StringUtils.isNotBlank(message)) {
55                         try {
56                             //解析發送的報文
57                             List<AlarmMsg> alarmMsgList = alarmConvert.alarmMsgConvert(message);
58                             alarmMsgList.parallelStream().forEach(alarmMsg -> {
59    
60 //Jms發送數據 61 log.info("Jms發送數據:" + JSON.toJSONString(alarmMsg)); 62 SendManager.sendManager.putData(alarmMsg); 63 log.info("Jms發送數據成功!"); 64 }); 65 } catch (Exception e) { 66 log.error(e.getMessage()); 67 } 68 } 69 70 } 71 72 @Override 73 public void onClose(int code, String reason, boolean remote) { 74 log.info("------ MyWebSocket onClose ------{}", reason); 75 } 76 77 @Override 78 public void onError(Exception ex) { 79 log.info("------ MyWebSocket onError ------{}", ex); 80 } 81 }; 82 webSocketClient.connect(); 83 return webSocketClient; 84 } catch (Exception e) { 85 log.error(e.getMessage, e); 86 } 87 return null; 88 } 89 90 }

 

  • deal-send服務的遠程重連接口

ReConnectWsController.java

 1 package com.newland.auto.pilot.deal.send.controller;
 2 
 3 import com.newland.auto.pilot.deal.send.config.WebSocketConfig;
 4 import lombok.extern.slf4j.Slf4j;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.stereotype.Controller;
 7 import org.springframework.web.bind.annotation.GetMapping;
 8 import org.springframework.web.bind.annotation.RequestMapping;
 9 import org.springframework.web.bind.annotation.ResponseBody;
10 
11 /**
12  * @author:wk
13  * @date:2020/9/4 調用方法進行重連websocket
14  */
15 
16 @Slf4j
17 @Controller
18 @ResponseBody
19 @RequestMapping(value = "/newland")
20 public class ReConnectWsController {
21     @Autowired
22     WebSocketConfig webSocketConfig;
23 
24     @GetMapping("/reConnectWebsocket")
25     public String reConnectWebsocket() throws Exception {
26         String returnMsg = "---------調用重連websocket接口成功-----------";
27 
28         log.info("---------開始調用重連websocket接口----------");
29         webSocketConfig.webSocketClient().reconnect();
30         log.info(returnMsg);
31         return returnMsg;
32     }
33 }

 

  • collect服務websocket客戶端重連日志
[2020-09-05 11:25:18] 126481 [WebSocketTimer] INFO -c.n.a.p.c.config.MyWebSocketClient[36] - ------ MyWebSocket onClose ------The connection was closed because the other endpoint did not respond with a pong in time. For more information check: https://github.com/TooTallNate/Java-WebSocket/wiki/Lost-connection-detection 
[2020-09-05 11:25:18] 126485 [WebSocketTimer] INFO -c.n.a.p.c.config.MyWebSocketClient[37] - 啟用斷開重連! 
[2020-09-05 11:25:18] 126501 [WebSocketTimer] INFO -c.n.a.p.c.config.MyWebSocketClient[54] - ------------------------執行關閉連接重試操作-------------------------- 
[2020-09-05 11:25:18] 126505 [WebSocketTimer] INFO -c.n.a.p.c.collect.TokenCollect[31] - 開始執行請求! 
[2020-09-05 11:25:18] 126506 [WebSocketTimer] INFO -c.n.a.p.c.collect.TokenCollect[39] - 開始使用restTemplate.exchange!! 
[2020-09-05 11:25:18] 126705 [WebSocketTimer] INFO -c.n.a.p.c.collect.TokenCollect[41] - 執行請求后,返回response body:{"accessSession":"x-xxxxxxxxxxx","roaRand":"0xxxxxxxxxx","expires":1800,"additionalInfo":null} 
[2020-09-05 11:25:18] 126706 [WebSocketTimer] INFO -c.n.a.p.c.collect.TokenCollect[44] - 返回狀態為200! 
[2020-09-05 11:25:18] 126706 [WebSocketTimer] INFO -c.n.a.p.c.collect.TokenCollect[47] - {"accessSession":"x-xxxxxxxxxxx","roaRand":"0xxxxxxxxx","expires":1800,"additionalInfo":null} 
[2020-09-05 11:25:18] 126706 [WebSocketTimer] INFO -c.n.a.p.c.collect.TokenCollect[50] - accessToken:x-xxxxxuxxxxxx
[2020-09-05 11:25:18] 126711 [WebSocketTimer] INFO -c.n.a.p.c.collect.SubscribeCollect[46] - 開始執行請求! 
[2020-09-05 11:25:18] 126917 [WebSocketTimer] INFO -c.n.a.p.c.collect.SubscribeCollect[49] - 執行請求后,返回response body:{"output":{"subscription-result":"ok","identifier":"9xxxxxxxx","url":"/restconf/streams/ws/v1/identifier/34xxxxx"}} 
[2020-09-05 11:25:18] 126917 [WebSocketTimer] INFO -c.n.a.p.c.collect.SubscribeCollect[52] - 返回狀態為200! 
[2020-09-05 11:25:18] 126917 [WebSocketTimer] INFO -c.n.a.p.c.collect.SubscribeCollect[55] - {"output":{"subscription-result":"ok","identifier":"95xxxxxxx","url":"/restconf/streams/ws/v1/identifier/34xxxxxxx"}} 
[2020-09-05 11:25:18] 126918 [WebSocketTimer] INFO -c.n.a.p.c.collect.SubscribeCollect[59] - url:/restconf/streams/ws/v1/identifier/34xxxxxxx 
[2020-09-05 11:25:18] 126918 [WebSocketTimer] INFO -c.n.a.p.c.ApplicationRunnerImpl[58] - protocol:TLSv1.2 
[2020-09-05 11:25:18] 126919 [WebSocketTimer] INFO -c.n.a.p.c.config.MyWebSocketClient[69] - ------------------------重連成功-------------------------- 
[2020-09-05 11:25:19] 127069 [WebSocketConnectReadThread-41] INFO -c.n.a.p.c.config.MyWebSocketClient[30] - ------ MyWebSocket onOpen ------ 
[2020-09-05 11:25:25] 133139 [WebSocketConnectReadThread-41] INFO -c.n.a.p.c.config.MyWebSocketClient[84] - -------- 接收到服務端數據: {"csn":111111,"name":"測試name","priority":"medium","occur-time":"2020-09-05T03:23:22Z","clear-time":"2020-09-05T03:26:31Z","status":"status1","category":"Power environment","domain":"domain1","source-objects":[{"id":"","type":"type1","name":"test","extend-id":"123456","device-type":"34"}],"detail":"detail1","message-type":"1","root-event-csns":[{"csn":"111","type":"1"}]}-------- 
[2020-09-05 11:25:25] 133139 [WebSocketConnectReadThread-41] INFO -c.n.a.p.c.config.WebSocketServer[136] - 發送消息到:deal-send 

 

  • collect遠程調用重連websocket接口日志
[2020-09-05 11:23:33] 21128 [WebSocketConnectReadThread-33] INFO -c.n.a.p.c.config.WebSocketServer[136] - 發送消息到:deal-send 
[2020-09-05 11:23:33] 21137 [WebSocketConnectReadThread-33] ERROR-c.n.a.p.c.config.WebSocketServer[140] - 用戶:deal-send,不在線! 
[2020-09-05 11:23:33] 21138 [WebSocketConnectReadThread-33] INFO -c.n.a.p.c.config.WebSocketServer[141] - ---------開始遠程調用重連websocket接口---------- 
[2020-09-05 11:23:33] 21573 [http-nio-9041-exec-1] INFO -o.a.c.c.C.[Tomcat].[localhost].[/][173] - Initializing Spring DispatcherServlet 'dispatcherServlet' 
[2020-09-05 11:23:33] 21574 [http-nio-9041-exec-1] INFO -o.s.web.servlet.DispatcherServlet[524] - Initializing Servlet 'dispatcherServlet' 
[2020-09-05 11:23:33] 21588 [http-nio-9041-exec-1] INFO -o.s.web.servlet.DispatcherServlet[546] - Completed initialization in 14 ms 
[2020-09-05 11:23:33] 21649 [http-nio-9041-exec-1] INFO -c.n.a.p.c.config.WebSocketServer[49] - onopen,this.session:org.apache.tomcat.websocket.WsSession@32f5b990 
[2020-09-05 11:23:33] 21650 [http-nio-9041-exec-1] INFO -c.n.a.p.c.config.WebSocketServer[62] - 用戶連接:deal-send,當前在線人數為:1 

 

  • deal-send服務被遠程調用重連websocket接口日志
[2020-09-05 11:23:08] 87821 [WebSocketConnectReadThread-19] INFO -c.n.a.p.d.s.config.WebSocketConfig[74] - ------ MyWebSocket onClose ------ 
[2020-09-05 11:23:33] 112532 [http-nio-9042-exec-1] INFO -o.a.c.c.C.[Tomcat].[localhost].[/][173] - Initializing Spring DispatcherServlet 'dispatcherServlet' 
[2020-09-05 11:23:33] 112533 [http-nio-9042-exec-1] INFO -o.s.web.servlet.DispatcherServlet[524] - Initializing Servlet 'dispatcherServlet' 
[2020-09-05 11:23:33] 112556 [http-nio-9042-exec-1] INFO -o.s.web.servlet.DispatcherServlet[546] - Completed initialization in 23 ms 
[2020-09-05 11:23:33] 112630 [http-nio-9042-exec-1] INFO -c.n.a.p.d.s.c.ReConnectWsController[28] - ---------開始調用重連websocket接口---------- 
[2020-09-05 11:23:33] 112828 [WebSocketConnectReadThread-43] INFO -c.n.a.p.d.s.config.WebSocketConfig[38] - ------ MyWebSocket onOpen ------ 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM