一、短链接与长连接
1、短连接
客户端和服务器每进行一次通讯,就建立一次连接,通讯结束就中断连接。
HTTP是一个简单的请求-响应协议,它通常运行在TCP之上。HTTP/1.0使用的TCP默认是短连接。
2、长连接
是指在建立连接后可以连续多次发送数据,直到双方断开连接。
HTTP从1.1版本起,底层的TCP使用的长连接。
3、短链接与长连接的区别1)、通讯流程
短连接:创建连接 -> 传输数据 -> 关闭连接
长连接:创建连接 -> 传输数据 -> 保持连接 -> 传输数据 -> …… -> 关闭连接
2)、适用场景
短连接:并发量大,数据交互不频繁情况。
长连接:数据交互频繁,点对点的通讯。
3)、通讯方式
说明 | |
---|---|
短连接 | 我跟你发信息,必须等到你回复我或者等了一会等不下去了,就结束通讯了 |
长连接 |
二、websocket协议(全双工,即允许服务器向客户端发送数据)
项目需求:
目前用户抢单操作我们已经完成,无论是非热点商品还是热点商品抢单,抢单完成后,我们应该要通知用户抢单状态,非热点商品可以直接响应抢单结果,但热点商品目前还没有实现通知响应,通知用户抢单状态用户可以通过定时向后台发出请求查询实现,但这种短连接方式效率低,会和服务器进行多次通信,这块我们可以使用长连接websocket实现。
1、
(1)、WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。
单工就就像是汽车的单行道,是在只允许甲方向乙方传送信息,而乙方不能向甲方传送 。
单工:信息只能单向传送。
http1.0
:单工。因为是短连接,客户端发起请求之后,服务端处理完请求并收到客户端的响应后即断开连接。
半双工:信息能双向传送但不能同时双向传送。
http1.1
:半双工。默认开启长连接keep-alive
,开启一个连接可发送多个请求。
全双工:信息能够同时双向传送。
http2.0
:全双工,允许服务端主动向客户端发送数据。
(2)、
(4)、服务器向客户端发送数据的功能是websocket协议的典型使用场景。
浏览器通过 JavaScript 向服务器发出建立 WebSocket 连接的请求,连接建立以后,客户端和服务器端就可以通过 TCP 连接直接交换数据。
当你获取 Web Socket 连接后,你可以通过 send() 方法来向服务器发送数据,并通过 onmessage 事件来接收服务器返回的数据。
2、
以下 API 用于创建 WebSocket 对象。
var Socket = new WebSocket(url, [protocol] );
WebSocket属性
属性 | 描述 |
---|---|
socket.readyState | 只读属性 readyState 表示连接状态,可以是以下值:0 - 表示连接尚未建立。1 - 表示连接已建立,可以进行通信。2 - 表示连接正在进行关闭。3 - 表示连接已经关闭或者连接不能打开。 |
socket.bufferedAmount | 只读属性 bufferedAmount 已被 send() 放入正在队列中等待传输,但是还没有发出的 UTF-8 文本字节数。 |
WebSocket 事件
以下是 WebSocket 对象的相关事件。假定我们使用了以上代码创建了 Socket 对象:
事件处理程序 | 描述 | |
---|---|---|
open | Socket.onopen | 连接建立时触发 |
message | Socket.onmessage | 客户端接收服务端数据时触发 |
error | Socket.onerror | 通信发生错误时触发 |
close | Socket.onclose |
WebSocket 方法
描述 | |
---|---|
Socket.send() | 使用连接发送数据 |
Socket.close() |
三、WebSocket实例
1、客户端
WebSocket 协议本质上是一个基于 TCP 的协议。为了建立一个 WebSocket 连接,客户端浏览器首先要向服务器发起一个 HTTP 请求,这个请求和通常的 HTTP 请求不同,包含了一些附加头信息,其中附加头信息“Upgrade: WebSocket”表明这是一个申请协议升级的 HTTP 请求,服务器端解析这些附加的头信息然后产生应答信息返回给客户端,客户端和服务器端的 WebSocket 连接就建立起来了,双方就可以通过这个连接通道自由的传递信息,并且这个连接会持续存在直到客户端或者服务器端的某一方主动的关闭连接。
四、创建一个springboot工程
1、添加依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> </dependencies>
2、创建启动类
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class); } }
3、WebSocketConfig开启WebSocket的支持
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; //开启WebSocket的支持,并把该类注入到spring容器中 @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
ServerEndPointExporter在springboot内置容器(嵌入式容器)中运行时,必须上下文提供ServerEndpointExporter,但是在tomcat容器中运行时,扫描工作会交给容器处理,不需要bean注入。故在部署时需要注释掉@Bean注解,开发时不能注释。
4、WebSocketServer
(1)、因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller。
(2)、直接@ServerEndpoint("/imserver/{userId}")
、@Component
启用即可,然后在里面实现@OnOpen
开启连接,@onClose
关闭连接,@onMessage
接收消息等方法。
(3)、新建一个ConcurrentHashMap
webSocketMap 用于接收当前userId的WebSocket,方便IM之间对userId进行推送消息。单机版
实现到这里就可以。
(4)、集群版
(多个ws节点)还需要借助mysql或者redis等进行处理,改造对应的sendMessage
方法即可。
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/wsServer/{userId}") @Component @Slf4j public class WebSocketServer { /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 */ private static int onlineCount = 0; /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收userId */ private String userId = ""; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); webSocketMap.put(userId, this); //加入set中 } else { webSocketMap.put(userId, this); // 连接建立成功后,将userId作为key,将Session作为value存入Map中 //加入set中 addOnlineCount(); //在线数加1 } log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount()); try { sendMessage("连接成功"); // 服务器主动推送数据到客户端 } catch (IOException e) { log.error("用户:" + userId + ",网络异常!!!!!!"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); //从set中删除 subOnlineCount(); } log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("用户消息:" + userId + ",报文:" + message); //可以群发消息 //消息保存到数据库、redis if (StringUtils.isNotBlank(message)) { try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); // 传送给对应toUserId用户的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); } else { log.error("请求的userId:" + toUserId + "不在该服务器上"); //否则不在这个服务器上,发送到mysql或者redis } } catch (Exception e) { e.printStackTrace(); } } } /** * 出现错误 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发送自定义消息 */ public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { log.info("发送消息到:" + userId + ",报文:" + message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { log.error("用户" + userId + ",不在线!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
websocketMap的值如下:
两个客户端都注册后如下:
发现session和userId都作为值 存入了webSocketMap中
5、编写Controller推送新消息
import com.zwh.WebSocketServer; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.ModelAndView; import java.io.IOException; @RestController public class DemoController { @GetMapping("index") public ResponseEntity<String> index(){ return ResponseEntity.ok("请求成功"); } @GetMapping("page") public ModelAndView page(){ return new ModelAndView("websocket"); } @RequestMapping("/push/{toUserId}") public ResponseEntity<String> pushToWeb(@RequestParam String message, @PathVariable String toUserId) throws IOException { WebSocketServer.sendInfo(message,toUserId); return ResponseEntity.ok("MSG SEND SUCCESS:" + message); } }
效果如下:
测试之前修改部分代码如下:
@OnMessage public void onMessage(String message, Session session) { log.info("用户消息:" + userId + ",报文:" + message); //可以群发消息 //消息保存到数据库、redis if (StringUtils.isNotBlank(message)) { try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); // 传送给对应toUserId用户的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("请求的userId:" + toUserId + "不在该服务器上"); //否则不在这个服务器上,发送到mysql或者redis } } catch (Exception e) { e.printStackTrace(); } } }
我们使用websocket在线测试工具进行测试:http://www.websocket-test.com/,页面如下:
我们打开两个页面,当作两个客户端。
客户端2:
点击连接按钮即可建立连接。
客户端3
点击连接按钮即可建立连接。
客户端2给客户端3发送消息:{"fromUserId":"2","message":"你好,我是2","toUserId":"3"}
点击发送,客户端3接到消息如下:
客户端3给客户端2发送消息:{"fromUserId":"3","message":"你好,我是3","toUserId":"2"}
客户端2接到消息如下:
五、创建一个VUE工程
效果如下:
1、编写App.vue
<template> <div id="app"> <router-view/> </div> </template> <script> export default { name: 'App', data() { return { socket: null } }, mounted() { // this.initWs() }, methods: { //初始化 initWs() { if (typeof (WebSocket) === "undefined") { alert("您的浏览器不支持socket") } else { // 实例化socket 111是固定的用户id,正式环境直接获取当前登录用户id this.socket = new WebSocket(this.global.wsUrl + '111') this.global.setWs(this.socket) // 监听socket连接 this.socket.onopen = () => { console.log("socket连接成功") } // 监听socket错误信息 this.socket.onerror = () => { console.error("连接错误") } //监听socket消息 this.socket.onmessage = (msg) => { // console.log(msg) } // 监听socket关闭信息 this.socket.onclose = (e) => { console.error("socket已经关闭") console.error(e) } } }, }, } </script>
1、编写WebSocket.vue
<template> <div> <button @click="send">发消息</button> </div> </template> <script> export default { data () { return { path:"ws://localhost:8080/wsServer/2", // 2为userId socket:"" } }, mounted () { // 初始化 this.init() }, methods: { init: function () { if(typeof(WebSocket) === "undefined"){ alert("您的浏览器不支持socket") }else{ // 实例化socket this.socket = new WebSocket(this.path) // 监听socket连接 this.socket.onopen = this.open // 监听socket错误信息 this.socket.onerror = this.error // 监听socket消息 this.socket.onmessage = this.getMessage } }, open: function () { console.log("socket连接成功") }, error: function () { console.log("连接错误") }, // 客户端接收服务端数据 getMessage: function (msg) { console.log(msg.data) }, send: function () { const params = {"toUserId": "3","message": "你好,我是2"} // 使用连接发送数据 this.socket.send(JSON.stringify(params)) }, // 关闭连接 close: function () { console.log("socket已经关闭") } }, destroyed () { // 销毁监听 this.socket.onclose = this.close } } </script> <style> </style>
2、编写WebSocket1.vue
<template> <div> <button @click="send">发消息1</button> </div> </template> <script> export default { data () { return { path:"ws://localhost:8080/wsServer/3", // 3为userId socket:"" } }, mounted () { // 初始化 this.init() }, methods: { init: function () { if(typeof(WebSocket) === "undefined"){ alert("您的浏览器不支持socket") }else{ // 实例化socket this.socket = new WebSocket(this.path) // 监听socket连接 this.socket.onopen = this.open // 监听socket错误信息 this.socket.onerror = this.error // 监听socket消息 this.socket.onmessage = this.getMessage } }, open: function () { console.log("socket连接成功") }, error: function () { console.log("连接错误") }, // 客户端接收服务端数据 getMessage: function (msg) { console.log(msg.data) }, send: function () { const params = {"toUserId": "2","message": "你好,我是3"} // 使用连接发送数据 this.socket.send(JSON.stringify(params)) }, // 关闭连接 close: function () { console.log("socket已经关闭") } }, destroyed () { // 销毁监听 this.socket.onclose = this.close } } </script> <style> </style>
3、启动项目
1)、点击WebSocket按钮时,就会建立连接,userId为2的websocket对象就会存入Map中。
2)、点击WebSocket1按钮时,就会建立连接,userId为2的websocket对象就会存入Map中。
3)、点击“发消息”按钮,用户2就会给用户3发送一条消息,
{"fromUserId":"2","message":"你好,我是2","toUserId":"3"}
4)、点击“发消息1”按钮,用户3就会给用户2发送一条消息
{"fromUserId":"3","message":"你好,我是3","toUserId":"2"}
页面控制台打印如下:
4、发送消息
用户2:
用户3:
浏览器访问:http://localhost:8080/push/2?message=hello,即给用户2发送消息
此时,用户2的控制台如下:
再打开一个页面,访问:http://localhost:8080/push/3?message=hello,how%20are%20you
此时3的控制台如下:
六、通过rocketmq实现分布式WebSocket
问题:socket客户端有很多,socket客户端会和socket服务端建立连接,如果只有一个服务端的话,实现功能没问题,一旦客户端比较多的话,需要跟服务端建立很多连接,服务端就会扛不住,此时服务端要做一个集群,每一个集群节点(JVM)中都有一个MAP,MAP中保存了客户端与服务端建立连接的session,如果A客户端的在节点1中,B客户端的节点在节点2中,当A要发消息给B,发现B的Session和A的session并不在同一个JVM中,故A没有办法发送消息给B。
我们提出一个解决方案:A在给B发送消息的时候,可以先在当前节点查找一下B的session是否在该节点,如果在的话直接发送消息,如果不在,可发送消息到消息系统RocketMQ,由于所有节点(包括节点2)订阅了消息系统的消息,就能接收到消息,发现节点2中有B用户的Session,就拿到B用户的session,从而给B用户发送消息。
1、添加依赖
<!--RocketMQ相关依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency>
2、添加SpringBoot配置
server.port=8888
spring.rocketmq.nameServer=114.xxx.xxx.xxx:9876 spring.rocketmq.producer.group=haoke-im-websocket-group
3、编写业务逻辑
判断用户是否在线,如果在线就发消息,如果不在线或者不在当前的jvm中,发送消息到RocketMQ
先注入RocketMQTemplate
@Autowired private RocketMQTemplate rocketMQTemplate;
业务逻辑
@OnMessage public void onMessage(String message, Session session) { log.info("用户消息:" + userId + ",报文:" + message); //可以群发消息 //消息保存到数据库、redis if (StringUtils.isNotBlank(message)) { try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //传送给对应toUserId用户的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("请求的userId:" + toUserId + "不在该服务器上"); //用户不在线,或者不在当前的jvm中,发送消息到RocketMQ // topic:tags 设置主题和标签 rocketMQTemplate.convertAndSend("haoke-im-send-message-topic:SEND_MSG",jsonObject.getString("message")); } } catch (Exception e) { e.printStackTrace(); } } }
发送的时候需要添加一个tag,便于消费者对消息进行筛选,通过topic:tags 设置主题和标签。
测试:我们给未在线的用户发消息,
此时控制台报错:
java.lang.NullPointerException at com.zwh.WebSocketServer.onMessage(WebSocketServer.java:105) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.tomcat.websocket.pojo.PojoMessageHandlerWholeBase.onMessage(PojoMessageHandlerWholeBase.java:80) at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:395) at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:119) at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:495) at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:294) at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:133) at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:82) at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:171) at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:151) at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:148) at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:54) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:53) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:836) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1747) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748)
发现RocketMQTemplate为null。
原因:spring管理的都是单例(singleton),和 websocket (多对象)相冲突。
项目启动时初始化,会初始化 websocket (非用户连接的),spring 同时会为其注入 service,该对象的 service 不是 null,被成功注入。但是,由于 spring 默认管理的是单例,所以只会注入一次 service。当新用户进入聊天时,系统又会创建一个新的 websocket 对象,这时矛盾出现了:spring 管理的都是单例,不会给第二个 websocket 对象注入 service,所以导致只要是用户连接创建的 websocket 对象,都不能再注入了。
像 controller 里面有 service, service 里面有 dao。因为 controller,service ,dao 都有是单例,所以注入时不会报 null。但是 websocket 不是单例,所以使用spring注入一次后,后面的对象就不会再注入了,会报null。
解决办法如下:
第一步:在WebSocketServer中,使用set方法传入上下文
private static ApplicationContext applicationContext; public static void setApplicationContext(ApplicationContext applicationContext) { WebSocketServer.applicationContext = applicationContext; }
第二步:在启动类中传入上下文
//解决springboot和websocket之间使用@autowired注入为空问题 ConfigurableApplicationContext applicationContext = SpringApplication.run(ZaiApplication.class, args); //这里将Spring Application注入到websocket类中定义的Application中。 WebSocketServer.setApplicationContext(applicationContext);
如下所示:
@SpringBootApplication public class Application { public static void main(String[] args) { // SpringApplication.run(Application.class); //解决springboot和websocket之间使用@autowired注入为空问题 ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class, args); //这里将Spring Application注入到websocket类中定义的Application中。 WebSocketServer.setApplicationContext(applicationContext); } }
第三步:在使用的地方通过上下文去获取服务
@OnMessage public void onMessage(String message, Session session) { log.info("用户消息:" + userId + ",报文:" + message); //可以群发消息 //消息保存到数据库、redis if (StringUtils.isNotBlank(message)) { try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //传送给对应toUserId用户的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("请求的userId:" + toUserId + "不在该服务器上"); //用户不在线,或者不在当前的jvm中,发送消息到RocketMQ // topic:tags 设置主题和标签 applicationContext.getBean(RocketMQTemplate.class).convertAndSend("haoke-im-send-message-topic:SEND_MSG",jsonObject.toJSONString()); } } catch (Exception e) { e.printStackTrace(); } } }
此时管理控制台如下:
点击Message Detail
编写消费者
我们以WebSocketServer类为消费者,实现RocketMQListener<String>,实现onMessage方法
@OnMessage public void onMessage(String message, Session session) { log.info("用户消息:" + userId + ",报文:" + message); //可以群发消息 //消息保存到数据库、redis if (StringUtils.isNotBlank(message)) { try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //传送给对应toUserId用户的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("请求的userId:" + toUserId + "不在该服务器上"); //用户不在线,或者不在当前的jvm中,发送消息到RocketMQ // topic:tags 设置主题和标签 applicationContext.getBean(RocketMQTemplate.class).convertAndSend("haoke-im-send-message-topic:SEND_MSG",jsonObject.toJSONString()); } } catch (Exception e) { e.printStackTrace(); } } }
添加注解:
@ServerEndpoint("/wsServer/{userId}") @Component @Slf4j @RocketMQMessageListener(topic = "haoke-im-send-message-topic", consumerGroup = "haoke-im-consumer", messageModel = MessageModel.BROADCASTING, selectorExpression = "SEND_MSG") public class WebSocketServer implements RocketMQListener<String> {
注意:使用广播模式,因为每个节点都需要接收到消息。
完整代码如下:
package com.zwh; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import org.apache.rocketmq.spring.core.RocketMQTemplate; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/wsServer/{userId}") @Component @Slf4j @RocketMQMessageListener(topic = "haoke-im-send-message-topic", consumerGroup = "haoke-im-consumer", messageModel = MessageModel.BROADCASTING, selectorExpression = "SEND_MSG") public class WebSocketServer implements RocketMQListener<String> { /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 */ private static int onlineCount = 0; /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收userId */ private String userId = ""; // @Autowired // private RocketMQTemplate rocketMQTemplate; /* * 提供一个spring context上下文(解决方案) */ private static ApplicationContext applicationContext; public static void setApplicationContext(ApplicationContext applicationContext) { WebSocketServer.applicationContext = applicationContext; } /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); webSocketMap.put(userId, this); //加入set中 } else { webSocketMap.put(userId, this); //加入set中 addOnlineCount(); //在线数加1 } log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount()); try { sendMessage("连接成功"); } catch (IOException e) { log.error("用户:" + userId + ",网络异常!!!!!!"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); //从set中删除 subOnlineCount(); } log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("用户消息:" + userId + ",报文:" + message); //可以群发消息 //消息保存到数据库、redis if (StringUtils.isNotBlank(message)) { try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //传送给对应toUserId用户的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } else { log.error("请求的userId:" + toUserId + "不在该服务器上"); //用户不在线,或者不在当前的jvm中,发送消息到RocketMQ // topic:tags 设置主题和标签 applicationContext.getBean(RocketMQTemplate.class).convertAndSend("haoke-im-send-message-topic:SEND_MSG",jsonObject.toJSONString()); } } catch (Exception e) { e.printStackTrace(); } } } /** * 出现错误 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发送自定义消息 */ public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { log.info("发送消息到:" + userId + ",报文:" + message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { log.error("用户" + userId + ",不在线!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } @Override public void onMessage(String message) { if (StringUtils.isNotBlank(message)) { try { //解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); //追加发送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //传送给对应toUserId用户的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); webSocketMap.get(toUserId).sendMessage(jsonObject.getString("message")); } } catch (Exception e) { e.printStackTrace(); } } } }
测试:
我们来部署多个tomcat,通过一个服务多个端口运行,参考:https://www.cnblogs.com/zwh0910/p/16473949.html
点击“edit Configurations”,勾选“Allow parallel run”
修改端口为8889,再次启动。
server.port=8889 spring.rocketmq.nameServer=114.xxx.xxx.xxx:9876 spring.rocketmq.producer.group=my-group
使用websocket在线工具,
用户2连接8888端口的websocket
用户3连接8889端口的websoket
用户2发送消息:{"fromUserId":"2","message":"你好,我是2","toUserId":"3"}
用户3接收到消息
用户3发消息:{"fromUserId":"3","message":"你好,我是3333","toUserId":"2"}
用户2接收到消息:
七、通过rocketmq实现分布式WebSocket
1)导入redis依赖
<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2)创建redis消息实体,
import lombok.Data; import lombok.experimental.Accessors; import java.util.List; /** * redis发布订阅的消息实体 */ @Data @Accessors(chain = true) public class RedisMessage { //消息类型,1全部广播,2个人信息 private Integer category; //消息 private String message; //要发送的用户组 private List<String> userList; }
方便消息的封装。
3)创建业务处理类,监听redis消息发布
主要用于监听消息的发布,收到消息时进行相关业务的处理。
import com.alibaba.fastjson.JSON; import com.zxh.common.util.CollectionUtil; import com.zxh.model.RedisMessage; import com.zxh.server.WebSocketServer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * redis消息订阅-业务处理 */ @Component @Slf4j public class RedisMessageListener implements MessageListener { //重写onMessage,处理相关发布订阅的业务 @SneakyThrows @Override public void onMessage(Message message, byte[] bytes) { String body = new String(message.getBody(), "UTF-8"); RedisMessage redisMessage = JSON.parseObject(body, RedisMessage.class); if (redisMessage != null) { Integer category = redisMessage.getCategory(); //个人信息 if (category == 2) { //根据用户id消息 if (CollectionUtil.isNotEmpty(redisMessage.getUserList())) { redisMessage.getUserList().stream().forEach(userId -> { try { WebSocketServer.sendInfo(redisMessage.getMessage(),userId); } catch (IOException e) { e.printStackTrace(); } }); } else { log.warn("无用户信息,发送信息失败"); } } else if (category == 1) { } } } }
4)配置redis发布订阅
import com.zxh.common.SystemConst; import com.zxh.common.listener.RedisMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * redis发布订阅配置 */ @Configuration @EnableCaching public class RedisPubSubConfig { Logger logger = LoggerFactory.getLogger(this.getClass()); /** * 配置 交换机消息,添加多个 messageListener参数,配置不同的交换机 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("channel:test1")); return container; } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 * * @param listener 业务处理类 * @return */ @Bean MessageListenerAdapter listenerAdapter(RedisMessageListener listener) { logger.info("redis消息监听器加载成功--------->>>>>>"); // onMessage 就是方法名,基于反射调用 return new MessageListenerAdapter(listener, "onMessage"); } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
5)调用redis的发布功能
修改TestService的sendMessage的方法,把使用WebSocket发送信息改为把消息发布到redis中。
@Service @Slf4j public class TestService { ..... @Autowired private StringRedisTemplate stringRedisTemplate; private void sendMessage(Integer msg) { List<String> userList = Arrays.asList("1111");//使用redis的发布订阅发送消息 RedisMessage redisMessage = new RedisMessage().setCategory(2); redisMessage.setMessage(msg.toString()).setUserList(userList); stringRedisTemplate.convertAndSend("channel:test1", JSON.toJSONString(redisMessage)); } }
redis发布后,监听器监听到有消息时,使用WebSocket进行消息推送。每台服务器都会推送,只有服务连接成功的一台服务器才能通知到前台成功
redis实现发布和订阅的功能也可以参考:https://www.cnblogs.com/zwh0910/p/17370198.html