SpringBoot+WebSocket+RabbitMQ整合實現消息實時
========>可能影響項目運行,訪問不到靜態資源文件,整合WebSocket也可參考:
https://www.cnblogs.com/yu-si/articles/15075737.html
WebSocket+RabbitMQ基於Spring boot實現
1.配置
maven導包
SpringBoot 與webSocket的關聯
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
SpringBoot與Rabbit MQ的關聯
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置application.yml文件
spring:
rabbitmq:
host: 127.0.0.1
username: guest
password: guest
#虛擬主機
virtual-host: /
port: 5672
編寫RabbitMQ服務端
配置服務端的config
@Configuration
public class SendConfig {
@Bean("queue")
public Queue queueMsg(){
return new Queue("queue");
}
@Bean("exchange")
public TopicExchange exchange(){
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeAndQueuemsg(@Qualifier("queue") Queue queue, @Qualifier("exchange") TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("a");
}
}
編寫服務端的Controller
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Controller {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/send")
public void SennerMsg(String msg){
amqpTemplate.convertAndSend("exchange","a",msg);
}
}
到這啟動MQ后運行程序便可以看到有一個新的郵件
或者用命令查看
編寫MQ的客戶端(與WebSocket結合)
配置Websocket—做請求攔截
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
public class Hank implements HandshakeInterceptor{
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
String jspCode = ((ServletServerHttpRequest) request).getServletRequest().getParameter("jspCode");
if (jspCode != null) {
map.put("jspCoe", jspCode);
} else {
return false;
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
配置Websocket的Handler
@Component
public class MyWebSocketHandler implements WebSocketHandler {
private static final Map<String ,WebSocketSession> userMap=new HashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
String jspCode = (String )webSocketSession.getAttributes().get("jspCode");
//if(userMap.get(jspCode)==null){
System.out.println(jspCode);
userMap.put(jspCode,webSocketSession);
//}
}
@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
}
@Override
public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
}
@Override
public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
}
@Override
public boolean supportsPartialMessages() {
return false;
}
public void sendMsgToJsp(final TextMessage message, String type) throws Exception{
Iterator<Map.Entry<String ,WebSocketSession>> it=userMap.entrySet().iterator();
while (it.hasNext()){
final Map.Entry<String ,WebSocketSession> entry=it.next();
System.out.println(entry.getValue().isOpen());
System.out.println(entry.getKey().contains(type));
if(entry.getValue().isOpen()&&entry.getKey().contains(type)){
new Thread(new Runnable() {
@Override
public void run() {
try {
if(entry.getValue().isOpen()){
entry.getValue().sendMessage(message);
}
}catch (IOException e){
e.printStackTrace();
}
}
}).start();
}
}
}
}
配置WebSocket的config
@Component
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
@Resource
private MyWebSocketHandler handler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler, "/wsMy").addInterceptors(new HandshakeInterceptor() {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
String jspCode = ((ServletServerHttpRequest) request).getServletRequest().getParameter("jspCode");
if (jspCode != null) {
map.put("jspCode", jspCode);
} else {
return false;
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
});
}
}
配置客戶端的cortorller(二者關聯的部分)
@Autowired
public MyWebSocketHandler handler;
@RabbitListener(queues = "queue")
public void Recive(String msg) throws Exception{
handler.sendMsgToJsp(new TextMessage(msg), "A");
}
到此便可以啟動測試了
另附上前端頁面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
</body>
<script type="text/javascript">
//1.創建websocket客戶端
var wsServerUrl = 'ws://127.0.0.1/';
var limitConnect = 3; // 斷線重連次數
var timeConnect = 0;
webSocketInit(wsServerUrl);
//socket初始化鏈接
function webSocketInit(wsServerUrl) {
// 首先判斷是否 支持 WebSocket
if ('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:80/wsMy?jspCode=AA");
} else if ('MozWebSocket' in window) {
websocket = new MozWebSocket("ws://localhost:80/wsMy?jspCode=AA");
} else {
websocket = new SockJS("ws://localhost:80/wsMy?jspCode=AA");
}
// 打開連接時
websocket.onopen = function (event) {
console.log("已連接TCP服務器");
};
// 收到消息時
websocket.onmessage = function (event) {
console.log("收到一條消息" + event.data);
alert(event.data);
};
websocket.onerror = function (event) {
console.log("服務器報錯:");
reconnect()
};
websocket.onclose = function (event) {
console.log('服務器已經斷開');
reconnect()
};
// 重連
function reconnect(wsServerUrl) {
// lockReconnect加鎖,防止onclose、onerror兩次重連
if (limitConnect > 0) {
if (localStorage.getItem('lockReconnect') != true) {
localStorage.setItem("lockReconnect", 1);
limitConnect--;
timeConnect++;
console.log("第" + timeConnect + "次重連");
// 進行重連
setTimeout(function () {
webSocketInit(wsServerUrl);
localStorage.removeItem("lockReconnect");
}, 2000);
}
} else {
console.log("TCP連接已超時");
}
}
// 心跳 * 回應
setInterval(function () {
websocket.send('');
}, 1000 * 100);
}
</script>
</html>
原文鏈接:https://blog.csdn.net/lilin0800/article/details/80884950