一、短鏈接與長連接
1、短連接
客戶端和服務器每進行一次通訊,就建立一次連接,通訊結束就中斷連接。

HTTP是一個簡單的請求-響應協議,它通常運行在TCP之上。HTTP/1.0使用的TCP默認是短連接。
2、長連接
是指在建立連接后可以連續多次發送數據,直到雙方斷開連接。

HTTP從1.1版本起,底層的TCP使用的長連接。
3、短鏈接與長連接的區別1)、通訊流程
短連接:創建連接 -> 傳輸數據 -> 關閉連接
長連接:創建連接 -> 傳輸數據 -> 保持連接 -> 傳輸數據 -> …… -> 關閉連接
2)、適用場景
短連接:並發量大,數據交互不頻繁情況。
長連接:數據交互頻繁,點對點的通訊。
3)、通訊方式
| 說明 | |
|---|---|
| 短連接 | 我跟你發信息,必須等到你回復我或者等了一會等不下去了,就結束通訊了 |
| 長連接 |
二、websocket協議(全雙工,即允許服務器向客戶端發送數據)
項目需求:
目前用戶搶單操作我們已經完成,無論是非熱點商品還是熱點商品搶單,搶單完成后,我們應該要通知用戶搶單狀態,非熱點商品可以直接響應搶單結果,但熱點商品目前還沒有實現通知響應,通知用戶搶單狀態用戶可以通過定時向后台發出請求查詢實現,但這種短連接方式效率低,會和服務器進行多次通信,這塊我們可以使用長連接websocket實現。
1、
(1)、WebSocket 是 HTML5 開始提供的一種在單個 TCP 連接上進行全雙工通訊的協議。
單工就就像是汽車的單行道,是在只允許甲方向乙方傳送信息,而乙方不能向甲方傳送 。
單工:信息只能單向傳送。
http1.0:單工。因為是短連接,客戶端發起請求之后,服務端處理完請求並收到客戶端的響應后即斷開連接。
半雙工:信息能雙向傳送但不能同時雙向傳送。
http1.1:半雙工。默認開啟長連接keep-alive,開啟一個連接可發送多個請求。
全雙工:信息能夠同時雙向傳送。
http2.0:全雙工,允許服務端主動向客戶端發送數據。
(2)、
(4)、服務器向客戶端發送數據的功能是websocket協議的典型使用場景。

瀏覽器通過 JavaScript 向服務器發出建立 WebSocket 連接的請求,連接建立以后,客戶端和服務器端就可以通過 TCP 連接直接交換數據。
當你獲取 Web Socket 連接后,你可以通過 send() 方法來向服務器發送數據,並通過 onmessage 事件來接收服務器返回的數據。
2、
以下 API 用於創建 WebSocket 對象。
var Socket = new WebSocket(url, [protocol] );
WebSocket屬性
| 屬性 | 描述 |
|---|---|
| socket.readyState | 只讀屬性 readyState 表示連接狀態,可以是以下值:0 - 表示連接尚未建立。1 - 表示連接已建立,可以進行通信。2 - 表示連接正在進行關閉。3 - 表示連接已經關閉或者連接不能打開。 |
| socket.bufferedAmount | 只讀屬性 bufferedAmount 已被 send() 放入正在隊列中等待傳輸,但是還沒有發出的 UTF-8 文本字節數。 |
WebSocket 事件
以下是 WebSocket 對象的相關事件。假定我們使用了以上代碼創建了 Socket 對象:
| 事件處理程序 | 描述 | |
|---|---|---|
| open | Socket.onopen | 連接建立時觸發 |
| message | Socket.onmessage | 客戶端接收服務端數據時觸發 |
| error | Socket.onerror | 通信發生錯誤時觸發 |
| close | Socket.onclose |
WebSocket 方法
| 描述 | |
|---|---|
| Socket.send() | 使用連接發送數據 |
| Socket.close() |
三、WebSocket實例
1、客戶端
WebSocket 協議本質上是一個基於 TCP 的協議。為了建立一個 WebSocket 連接,客戶端瀏覽器首先要向服務器發起一個 HTTP 請求,這個請求和通常的 HTTP 請求不同,包含了一些附加頭信息,其中附加頭信息“Upgrade: WebSocket”表明這是一個申請協議升級的 HTTP 請求,服務器端解析這些附加的頭信息然后產生應答信息返回給客戶端,客戶端和服務器端的 WebSocket 連接就建立起來了,雙方就可以通過這個連接通道自由的傳遞信息,並且這個連接會持續存在直到客戶端或者服務器端的某一方主動的關閉連接。
四、創建一個springboot工程
1、添加依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
2、創建啟動類
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class); } }
3、WebSocketConfig開啟WebSocket的支持
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; //開啟WebSocket的支持,並把該類注入到spring容器中 @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
ServerEndPointExporter在springboot內置容器(嵌入式容器)中運行時,必須上下文提供ServerEndpointExporter,但是在tomcat容器中運行時,掃描工作會交給容器處理,不需要bean注入。故在部署時需要注釋掉@Bean注解,開發時不能注釋。
4、WebSocketServer
(1)、因為WebSocket是類似客戶端服務端的形式(采用ws協議),那么這里的WebSocketServer其實就相當於一個ws協議的Controller。
(2)、直接@ServerEndpoint("/imserver/{userId}") 、@Component啟用即可,然后在里面實現@OnOpen開啟連接,@onClose關閉連接,@onMessage接收消息等方法。
(3)、新建一個ConcurrentHashMap webSocketMap 用於接收當前userId的WebSocket,方便IM之間對userId進行推送消息。單機版實現到這里就可以。
(4)、集群版(多個ws節點)還需要借助mysql或者redis等進行處理,改造對應的sendMessage方法即可。
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/wsServer/{userId}") @Component @Slf4j public class WebSocketServer { /** * 靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 */ private static int onlineCount = 0; /** * concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 */ private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /** * 與某個客戶端的連接會話,需要通過它來給客戶端發送數據 */ private Session session; /** * 接收userId */ private String userId = ""; /** * 連接建立成功調用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); webSocketMap.put(userId, this); //加入set中 } else { webSocketMap.put(userId, this); // 連接建立成功后,將userId作為key,將Session作為value存入Map中 //加入set中 addOnlineCount(); //在線數加1 } log.info("用戶連接:" + userId + ",當前在線人數為:" + getOnlineCount()); try { sendMessage("連接成功"); // 服務器主動推送數據到客戶端 } catch (IOException e) { log.error("用戶:" + userId + ",網絡異常!!!!!!"); } } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); //從set中刪除 subOnlineCount(); } log.info("用戶退出:" + userId + ",當前在線人數為:" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("用戶消息:" + userId + ",報文:" + message); //可以群發消息 //消息保存到數據庫、redis if (StringUtils.isNotBlank(message)) { try { //解析發送的報文 JSONObject jsonObject = JSON.parseObject(message); //追加發送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); // 傳送給對應toUserId用戶的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); } else { log.error("請求的userId:" + toUserId + "不在該服務器上"); //否則不在這個服務器上,發送到mysql或者redis } } catch (Exception e) { e.printStackTrace(); } } } /** * 出現錯誤 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用戶錯誤:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 實現服務器主動推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 發送自定義消息 */ public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { log.info("發送消息到:" + userId + ",報文:" + message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { log.error("用戶" + userId + ",不在線!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
websocketMap的值如下:

兩個客戶端都注冊后如下:

發現session和userId都作為值 存入了webSocketMap中
5、編寫Controller推送新消息
import com.zwh.WebSocketServer; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.ModelAndView; import java.io.IOException; @RestController public class DemoController { @GetMapping("index") public ResponseEntity<String> index(){ return ResponseEntity.ok("請求成功"); } @GetMapping("page") public ModelAndView page(){ return new ModelAndView("websocket"); } @RequestMapping("/push/{toUserId}") public ResponseEntity<String> pushToWeb(@RequestParam String message, @PathVariable String toUserId) throws IOException { WebSocketServer.sendInfo(message,toUserId); return ResponseEntity.ok("MSG SEND SUCCESS:" + message); } }
效果如下:

測試之前修改部分代碼如下:
@OnMessage public void onMessage(String message, Session session) { log.info("用戶消息:" + userId + ",報文:" + message); //可以群發消息 //消息保存到數據庫、redis if (StringUtils.isNotBlank(message)) { try { //解析發送的報文 JSONObject jsonObject = JSON.parseObject(message); //追加發送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); // 傳送給對應toUserId用戶的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("請求的userId:" + toUserId + "不在該服務器上"); //否則不在這個服務器上,發送到mysql或者redis } } catch (Exception e) { e.printStackTrace(); } } }
我們使用websocket在線測試工具進行測試:http://www.websocket-test.com/,頁面如下:

我們打開兩個頁面,當作兩個客戶端。
客戶端2:

點擊連接按鈕即可建立連接。
客戶端3

點擊連接按鈕即可建立連接。
客戶端2給客戶端3發送消息:{"fromUserId":"2","message":"你好,我是2","toUserId":"3"}

點擊發送,客戶端3接到消息如下:

客戶端3給客戶端2發送消息:{"fromUserId":"3","message":"你好,我是3","toUserId":"2"}

客戶端2接到消息如下:

五、創建一個VUE工程
效果如下:

1、編寫App.vue
<template> <div id="app"> <router-view/> </div> </template> <script> export default { name: 'App', data() { return { socket: null } }, mounted() { // this.initWs() }, methods: { //初始化 initWs() { if (typeof (WebSocket) === "undefined") { alert("您的瀏覽器不支持socket") } else { // 實例化socket 111是固定的用戶id,正式環境直接獲取當前登錄用戶id this.socket = new WebSocket(this.global.wsUrl + '111') this.global.setWs(this.socket) // 監聽socket連接 this.socket.onopen = () => { console.log("socket連接成功") } // 監聽socket錯誤信息 this.socket.onerror = () => { console.error("連接錯誤") } //監聽socket消息 this.socket.onmessage = (msg) => { // console.log(msg) } // 監聽socket關閉信息 this.socket.onclose = (e) => { console.error("socket已經關閉") console.error(e) } } }, }, } </script>
1、編寫WebSocket.vue
<template>
<div>
<button @click="send">發消息</button>
</div>
</template>
<script>
export default {
data () {
return {
path:"ws://localhost:8080/wsServer/2", // 2為userId
socket:""
}
},
mounted () {
// 初始化
this.init()
},
methods: {
init: function () {
if(typeof(WebSocket) === "undefined"){
alert("您的瀏覽器不支持socket")
}else{
// 實例化socket
this.socket = new WebSocket(this.path)
// 監聽socket連接
this.socket.onopen = this.open
// 監聽socket錯誤信息
this.socket.onerror = this.error
// 監聽socket消息
this.socket.onmessage = this.getMessage
}
},
open: function () {
console.log("socket連接成功")
},
error: function () {
console.log("連接錯誤")
},
// 客戶端接收服務端數據
getMessage: function (msg) {
console.log(msg.data)
},
send: function () {
const params = {"toUserId": "3","message": "你好,我是2"}
// 使用連接發送數據
this.socket.send(JSON.stringify(params))
},
// 關閉連接
close: function () {
console.log("socket已經關閉")
}
},
destroyed () {
// 銷毀監聽
this.socket.onclose = this.close
}
}
</script>
<style>
</style>
2、編寫WebSocket1.vue
<template>
<div>
<button @click="send">發消息1</button>
</div>
</template>
<script>
export default {
data () {
return {
path:"ws://localhost:8080/wsServer/3", // 3為userId
socket:""
}
},
mounted () {
// 初始化
this.init()
},
methods: {
init: function () {
if(typeof(WebSocket) === "undefined"){
alert("您的瀏覽器不支持socket")
}else{
// 實例化socket
this.socket = new WebSocket(this.path)
// 監聽socket連接
this.socket.onopen = this.open
// 監聽socket錯誤信息
this.socket.onerror = this.error
// 監聽socket消息
this.socket.onmessage = this.getMessage
}
},
open: function () {
console.log("socket連接成功")
},
error: function () {
console.log("連接錯誤")
},
// 客戶端接收服務端數據
getMessage: function (msg) {
console.log(msg.data)
},
send: function () {
const params = {"toUserId": "2","message": "你好,我是3"}
// 使用連接發送數據
this.socket.send(JSON.stringify(params))
},
// 關閉連接
close: function () {
console.log("socket已經關閉")
}
},
destroyed () {
// 銷毀監聽
this.socket.onclose = this.close
}
}
</script>
<style>
</style>
3、啟動項目

1)、點擊WebSocket按鈕時,就會建立連接,userId為2的websocket對象就會存入Map中。

2)、點擊WebSocket1按鈕時,就會建立連接,userId為2的websocket對象就會存入Map中。
3)、點擊“發消息”按鈕,用戶2就會給用戶3發送一條消息,
{"fromUserId":"2","message":"你好,我是2","toUserId":"3"}
4)、點擊“發消息1”按鈕,用戶3就會給用戶2發送一條消息
{"fromUserId":"3","message":"你好,我是3","toUserId":"2"}
頁面控制台打印如下:

4、發送消息
用戶2:

用戶3:

瀏覽器訪問:http://localhost:8080/push/2?message=hello,即給用戶2發送消息

此時,用戶2的控制台如下:

再打開一個頁面,訪問:http://localhost:8080/push/3?message=hello,how%20are%20you

此時3的控制台如下:

六、通過rocketmq實現分布式WebSocket

問題:socket客戶端有很多,socket客戶端會和socket服務端建立連接,如果只有一個服務端的話,實現功能沒問題,一旦客戶端比較多的話,需要跟服務端建立很多連接,服務端就會扛不住,此時服務端要做一個集群,每一個集群節點(JVM)中都有一個MAP,MAP中保存了客戶端與服務端建立連接的session,如果A客戶端的在節點1中,B客戶端的節點在節點2中,當A要發消息給B,發現B的Session和A的session並不在同一個JVM中,故A沒有辦法發送消息給B。
我們提出一個解決方案:A在給B發送消息的時候,可以先在當前節點查找一下B的session是否在該節點,如果在的話直接發送消息,如果不在,可發送消息到消息系統RocketMQ,由於所有節點(包括節點2)訂閱了消息系統的消息,就能接收到消息,發現節點2中有B用戶的Session,就拿到B用戶的session,從而給B用戶發送消息。
1、添加依賴
<!--RocketMQ相關依賴-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>
2、添加SpringBoot配置
server.port=8888
spring.rocketmq.nameServer=114.xxx.xxx.xxx:9876 spring.rocketmq.producer.group=haoke-im-websocket-group
3、編寫業務邏輯
判斷用戶是否在線,如果在線就發消息,如果不在線或者不在當前的jvm中,發送消息到RocketMQ
先注入RocketMQTemplate
@Autowired private RocketMQTemplate rocketMQTemplate;
業務邏輯
@OnMessage public void onMessage(String message, Session session) { log.info("用戶消息:" + userId + ",報文:" + message); //可以群發消息 //消息保存到數據庫、redis if (StringUtils.isNotBlank(message)) { try { //解析發送的報文 JSONObject jsonObject = JSON.parseObject(message); //追加發送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //傳送給對應toUserId用戶的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("請求的userId:" + toUserId + "不在該服務器上"); //用戶不在線,或者不在當前的jvm中,發送消息到RocketMQ // topic:tags 設置主題和標簽 rocketMQTemplate.convertAndSend("haoke-im-send-message-topic:SEND_MSG",jsonObject.getString("message")); } } catch (Exception e) { e.printStackTrace(); } } }
發送的時候需要添加一個tag,便於消費者對消息進行篩選,通過topic:tags 設置主題和標簽。
測試:我們給未在線的用戶發消息,

此時控制台報錯:
java.lang.NullPointerException at com.zwh.WebSocketServer.onMessage(WebSocketServer.java:105) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.tomcat.websocket.pojo.PojoMessageHandlerWholeBase.onMessage(PojoMessageHandlerWholeBase.java:80) at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:395) at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:119) at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:495) at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:294) at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:133) at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:82) at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:171) at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:151) at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:148) at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:54) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:53) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:836) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1747) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748)
發現RocketMQTemplate為null。
原因:spring管理的都是單例(singleton),和 websocket (多對象)相沖突。
項目啟動時初始化,會初始化 websocket (非用戶連接的),spring 同時會為其注入 service,該對象的 service 不是 null,被成功注入。但是,由於 spring 默認管理的是單例,所以只會注入一次 service。當新用戶進入聊天時,系統又會創建一個新的 websocket 對象,這時矛盾出現了:spring 管理的都是單例,不會給第二個 websocket 對象注入 service,所以導致只要是用戶連接創建的 websocket 對象,都不能再注入了。
像 controller 里面有 service, service 里面有 dao。因為 controller,service ,dao 都有是單例,所以注入時不會報 null。但是 websocket 不是單例,所以使用spring注入一次后,后面的對象就不會再注入了,會報null。
解決辦法如下:
第一步:在WebSocketServer中,使用set方法傳入上下文
private static ApplicationContext applicationContext; public static void setApplicationContext(ApplicationContext applicationContext) { WebSocketServer.applicationContext = applicationContext; }
第二步:在啟動類中傳入上下文
//解決springboot和websocket之間使用@autowired注入為空問題 ConfigurableApplicationContext applicationContext = SpringApplication.run(ZaiApplication.class, args); //這里將Spring Application注入到websocket類中定義的Application中。 WebSocketServer.setApplicationContext(applicationContext);
如下所示:
@SpringBootApplication public class Application { public static void main(String[] args) { // SpringApplication.run(Application.class); //解決springboot和websocket之間使用@autowired注入為空問題 ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class, args); //這里將Spring Application注入到websocket類中定義的Application中。 WebSocketServer.setApplicationContext(applicationContext); } }
第三步:在使用的地方通過上下文去獲取服務
@OnMessage public void onMessage(String message, Session session) { log.info("用戶消息:" + userId + ",報文:" + message); //可以群發消息 //消息保存到數據庫、redis if (StringUtils.isNotBlank(message)) { try { //解析發送的報文 JSONObject jsonObject = JSON.parseObject(message); //追加發送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //傳送給對應toUserId用戶的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("請求的userId:" + toUserId + "不在該服務器上"); //用戶不在線,或者不在當前的jvm中,發送消息到RocketMQ // topic:tags 設置主題和標簽 applicationContext.getBean(RocketMQTemplate.class).convertAndSend("haoke-im-send-message-topic:SEND_MSG",jsonObject.toJSONString()); } } catch (Exception e) { e.printStackTrace(); } } }
此時管理控制台如下:

點擊Message Detail

編寫消費者
我們以WebSocketServer類為消費者,實現RocketMQListener<String>,實現onMessage方法
@OnMessage public void onMessage(String message, Session session) { log.info("用戶消息:" + userId + ",報文:" + message); //可以群發消息 //消息保存到數據庫、redis if (StringUtils.isNotBlank(message)) { try { //解析發送的報文 JSONObject jsonObject = JSON.parseObject(message); //追加發送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //傳送給對應toUserId用戶的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("請求的userId:" + toUserId + "不在該服務器上"); //用戶不在線,或者不在當前的jvm中,發送消息到RocketMQ // topic:tags 設置主題和標簽 applicationContext.getBean(RocketMQTemplate.class).convertAndSend("haoke-im-send-message-topic:SEND_MSG",jsonObject.toJSONString()); } } catch (Exception e) { e.printStackTrace(); } } }
添加注解:
@ServerEndpoint("/wsServer/{userId}") @Component @Slf4j @RocketMQMessageListener(topic = "haoke-im-send-message-topic", consumerGroup = "haoke-im-consumer", messageModel = MessageModel.BROADCASTING, selectorExpression = "SEND_MSG") public class WebSocketServer implements RocketMQListener<String> {
注意:使用廣播模式,因為每個節點都需要接收到消息。
完整代碼如下:
package com.zwh; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import org.apache.rocketmq.spring.core.RocketMQTemplate; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/wsServer/{userId}") @Component @Slf4j @RocketMQMessageListener(topic = "haoke-im-send-message-topic", consumerGroup = "haoke-im-consumer", messageModel = MessageModel.BROADCASTING, selectorExpression = "SEND_MSG") public class WebSocketServer implements RocketMQListener<String> { /** * 靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 */ private static int onlineCount = 0; /** * concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 */ private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /** * 與某個客戶端的連接會話,需要通過它來給客戶端發送數據 */ private Session session; /** * 接收userId */ private String userId = ""; // @Autowired // private RocketMQTemplate rocketMQTemplate; /* * 提供一個spring context上下文(解決方案) */ private static ApplicationContext applicationContext; public static void setApplicationContext(ApplicationContext applicationContext) { WebSocketServer.applicationContext = applicationContext; } /** * 連接建立成功調用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); webSocketMap.put(userId, this); //加入set中 } else { webSocketMap.put(userId, this); //加入set中 addOnlineCount(); //在線數加1 } log.info("用戶連接:" + userId + ",當前在線人數為:" + getOnlineCount()); try { sendMessage("連接成功"); } catch (IOException e) { log.error("用戶:" + userId + ",網絡異常!!!!!!"); } } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); //從set中刪除 subOnlineCount(); } log.info("用戶退出:" + userId + ",當前在線人數為:" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("用戶消息:" + userId + ",報文:" + message); //可以群發消息 //消息保存到數據庫、redis if (StringUtils.isNotBlank(message)) { try { //解析發送的報文 JSONObject jsonObject = JSON.parseObject(message); //追加發送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //傳送給對應toUserId用戶的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("請求的userId:" + toUserId + "不在該服務器上"); //用戶不在線,或者不在當前的jvm中,發送消息到RocketMQ // topic:tags 設置主題和標簽 applicationContext.getBean(RocketMQTemplate.class).convertAndSend("haoke-im-send-message-topic:SEND_MSG",jsonObject.toJSONString()); } } catch (Exception e) { e.printStackTrace(); } } } /** * 出現錯誤 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用戶錯誤:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 實現服務器主動推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 發送自定義消息 */ public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { log.info("發送消息到:" + userId + ",報文:" + message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { log.error("用戶" + userId + ",不在線!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } @Override public void onMessage(String message) { if (StringUtils.isNotBlank(message)) { try { //解析發送的報文 JSONObject jsonObject = JSON.parseObject(message); //追加發送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //傳送給對應toUserId用戶的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } } catch (Exception e) { e.printStackTrace(); } } } }
測試:
我們來部署多個tomcat,通過一個服務多個端口運行,參考:https://www.cnblogs.com/zwh0910/p/16473949.html
點擊“edit Configurations”,勾選“Allow parallel run”

修改端口為8889,再次啟動。
server.port=8889 spring.rocketmq.nameServer=114.xxx.xxx.xxx:9876 spring.rocketmq.producer.group=my-group
使用websocket在線工具,
用戶2連接8888端口的websocket

用戶3連接8889端口的websoket

用戶2發送消息:{"fromUserId":"2","message":"你好,我是2","toUserId":"3"}

用戶3接收到消息

用戶3發消息:{"fromUserId":"3","message":"你好,我是3333","toUserId":"2"}

用戶2接收到消息:

七、通過rocketmq實現分布式WebSocket
1)導入redis依賴
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2)創建redis消息實體,
import lombok.Data; import lombok.experimental.Accessors; import java.util.List; /** * redis發布訂閱的消息實體 */ @Data @Accessors(chain = true) public class RedisMessage { //消息類型,1全部廣播,2個人信息 private Integer category; //消息 private String message; //要發送的用戶組 private List<String> userList; }
方便消息的封裝。
3)創建業務處理類,監聽redis消息發布
主要用於監聽消息的發布,收到消息時進行相關業務的處理。
import com.alibaba.fastjson.JSON; import com.zxh.common.util.CollectionUtil; import com.zxh.model.RedisMessage; import com.zxh.server.WebSocketServer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * redis消息訂閱-業務處理 */ @Component @Slf4j public class RedisMessageListener implements MessageListener { //重寫onMessage,處理相關發布訂閱的業務 @SneakyThrows @Override public void onMessage(Message message, byte[] bytes) { String body = new String(message.getBody(), "UTF-8"); RedisMessage redisMessage = JSON.parseObject(body, RedisMessage.class); if (redisMessage != null) { Integer category = redisMessage.getCategory(); //個人信息 if (category == 2) { //根據用戶id消息 if (CollectionUtil.isNotEmpty(redisMessage.getUserList())) { redisMessage.getUserList().stream().forEach(userId -> { try { WebSocketServer.sendInfo(redisMessage.getMessage(),userId); } catch (IOException e) { e.printStackTrace(); } }); } else { log.warn("無用戶信息,發送信息失敗"); } } else if (category == 1) { } } } }
4)配置redis發布訂閱
import com.zxh.common.SystemConst; import com.zxh.common.listener.RedisMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * redis發布訂閱配置 */ @Configuration @EnableCaching public class RedisPubSubConfig { Logger logger = LoggerFactory.getLogger(this.getClass()); /** * 配置 交換機消息,添加多個 messageListener參數,配置不同的交換機 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("channel:test1")); return container; } /** * 消息監聽器適配器,綁定消息處理器,利用反射技術調用消息處理器的業務方法 * * @param listener 業務處理類 * @return */ @Bean MessageListenerAdapter listenerAdapter(RedisMessageListener listener) { logger.info("redis消息監聽器加載成功--------->>>>>>"); // onMessage 就是方法名,基於反射調用 return new MessageListenerAdapter(listener, "onMessage"); } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
5)調用redis的發布功能
修改TestService的sendMessage的方法,把使用WebSocket發送信息改為把消息發布到redis中。
@Service @Slf4j public class TestService { ..... @Autowired private StringRedisTemplate stringRedisTemplate; private void sendMessage(Integer msg) { List<String> userList = Arrays.asList("1111");//使用redis的發布訂閱發送消息 RedisMessage redisMessage = new RedisMessage().setCategory(2); redisMessage.setMessage(msg.toString()).setUserList(userList); stringRedisTemplate.convertAndSend("channel:test1", JSON.toJSONString(redisMessage)); } }
redis發布后,監聽器監聽到有消息時,使用WebSocket進行消息推送。每台服務器都會推送,只有服務連接成功的一台服務器才能通知到前台成功
redis實現發布和訂閱的功能也可以參考:https://www.cnblogs.com/zwh0910/p/17370198.html
