Spring websocket+Stomp+SockJS 實時通信詳解
一、三者之間的關系
Http連接為一次請求(request)一次響應(response),必須為同步調用方式。WebSocket 協議提供了通過一個套接字實現全雙工通信的功能。一次連接以后,會建立tcp連接,后續客戶端與服務器交互為全雙工方式的交互方式,客戶端可以發送消息到服務端,服務端也可將消息發送給客戶端。
SockJS 是 WebSocket 技術的一種模擬。為了應對許多瀏覽器不支持WebSocket協議的問題,設計了備選SockJs。開啟並使用SockJS后,它會優先選用Websocket協議作為傳輸協議,如果瀏覽器不支持Websocket協議,則會在其他方案中,選擇一個較好的協議進行通訊。
-服務端使用:
registry.addEndpoint("/endpointChat").withSockJS();
-客戶端使用:
//加載sockjs
<script src="http://cdn.sockjs.org/sockjs-0.3.min.js"></script>
var url = '/chat';
var sock = new SockJS(url);
//SockJS 所處理的 URL是“http://“或“https://“,而不是“ws://“or “wss://“
//.....
STOMP 中文為: 面向消息的簡單文本協議。websocket定義了兩種傳輸信息類型: 文本信息和二進制信息。類型雖然被確定,但是他們的傳輸體是沒有規定的。所以,需要用一種簡單的文本傳輸類型來規定傳輸內容,它可以作為通訊中的文本傳輸協議,即交互中的高級協議來定義交互信息。
STOMP本身可以支持流類型的網絡傳輸協議: websocket協議和tcp協議。
Stomp還提供了一個stomp.js,用於瀏覽器客戶端使用STOMP消息協議傳輸的js庫。
STOMP的優點如下:
(1)不需要自建一套自定義的消息格式
(2)現有stomp.js客戶端(瀏覽器中使用)可以直接使用
(3)能路由信息到指定消息地點
(4)可以直接使用成熟的STOMP代理進行廣播 如:RabbitMQ, ActiveMQ
二、配置WebsocketStompConfig
1、共享session
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
/**
* @EnableWebSocketMessageBroker 注解表明: 這個配置類不僅配置了 WebSocket,還配置了基於代理的 STOMP消息;
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
/**
* 復寫了 registerStompEndpoints() 方法:添加一個服務端點,來接收客戶端的連接。將 "/endpointChat" 路徑注冊為 STOMP 端點。
* 這個路徑與發送和接收消息的目的路徑有所不同, 這是一個端點,客戶端在訂閱或發布消息到目的地址前,要連接該端點,
* 即用戶發送請求 :url="/127.0.0.1:8080/endpointChat" 與 STOMP server 進行連接,之后再轉發到訂閱url;
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//添加一個/endpointChat端點,客戶端就可以通過這個端點來進行連接;withSockJS作用是添加SockJS支持
registry.addEndpoint("/endpointChat").withSockJS();
}
/**
* 復寫了 configureMessageBroker() 方法:
* 配置了一個 簡單的消息代理,通俗一點講就是設置消息連接請求的各種規范信息。
* 發送應用程序的消息將會帶有 “/app” 前綴。
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//定義了一個(或多個)客戶端訂閱地址的前綴信息,也就是客戶端接收服務端發送消息的前綴信息
registry.enableSimpleBroker("/queue", "/topic");
//定義了服務端接收地址的前綴,也即客戶端給服務端發消息的地址前綴
//registry.setApplicationDestinationPrefixes("/app");
// 點對點使用的訂閱前綴(客戶端訂閱路徑上會體現出來),不設置的話,默認也是/user/
//registry.setUserDestinationPrefix("/user/");
}
}
注意:
此配置是基於SpringBoot+Shiro的框架,Shiro維護了所有的session,在用戶登錄的時候就通過
SimpleAuthenticationInfo info = new SimpleAuthenticationInfo(user, password, getName());
將用戶信息注冊成為principal。當客戶端連接endpointChat成功時,stomp會取java.security.Principal的默認實現類(在我的系統中為shiro的principal)信息注冊成為username,然后返回給客戶端。這個username對於點對點發送消息十分重要,通過服務端和客戶端維護相同的username(此username就是一個唯一的字符串)來達到精准推送消息的目的。
2、自定義匹配規則
如果采用其他架構,沒有實現principal,這就需要自己實現自定義的username規則,必須要通過實現自己的principal類來完成,參考代碼如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/endpointChat").setHandshakeHandler(new DefaultHandshakeHandler(){
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
//key就是服務器和客戶端保持一致的標記,一般可以用賬戶名稱,或者是用戶ID。
return new MyPrincipal("test");
}
})
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//定義了一個(或多個)客戶端訂閱地址的前綴信息,也就是客戶端接收服務端發送消息的前綴信息
registry.enableSimpleBroker("/queue", "/topic");
//定義了服務端接收地址的前綴,也即客戶端給服務端發消息的地址前綴
//registry.setApplicationDestinationPrefixes("/app");
// 點對點使用的訂閱前綴(客戶端訂閱路徑上會體現出來),不設置的話,默認也是/user/
//registry.setUserDestinationPrefix("/user/");
}
/**
* 自定義的Principal
*/
class MyPrincipal implements Principal{
private String key;
public MyPrincipal(String key) {
this.key = key;
}
@Override
public String getName() {
return key;
}
}
}
然后服務端給客戶端發送消息:
SimpMessagingTemplate.convertAndSendToUser(“test”,"/queue/notifications", "新消息");
客戶端訂閱服務器發送的消息(控制板打印消息如圖1):
stomp.subscribe("/user/queue/notifications", handleFunction);
注意:此處為什么不是“/user/test/queue/notifications”,稍候再講。
圖1
3、連接時驗證登錄權限
一般在連接服務器時,需要驗證此連接的安全性,驗證用戶是否登錄,如果沒有登錄,不能連接服務器,訂閱消息。
/**
* 連接時驗證用戶是否登錄
* @author LEITAO
* @date 2018年4月18日 上午10:10:37
*/
public class SessionAuthHandshakeInterceptor implements HandshakeInterceptor{
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
UserDO user = ShiroUtils.getUser();
if (user == null) {
logger.error("websocket權限拒絕:用戶未登錄");
return false;
}
//attributes.put("user", user);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
}
}
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//添加一個/endpointChat端點,客戶端就可以通過這個端點來進行連接;withSockJS作用是添加SockJS支持
registry.addEndpoint("/endpointChat")
//添加連接登錄驗證
.addInterceptors(new SessionAuthHandshakeInterceptor())
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//定義了一個(或多個)客戶端訂閱地址的前綴信息,也就是客戶端接收服務端發送消息的前綴信息
registry.enableSimpleBroker("/queue", "/topic");
//定義了服務端接收地址的前綴,也即客戶端給服務端發消息的地址前綴
//registry.setApplicationDestinationPrefixes("/app");
// 點對點使用的訂閱前綴(客戶端訂閱路徑上會體現出來),不設置的話,默認也是/user/
//registry.setUserDestinationPrefix("/user/");
}
}
三、@MessageMapping、@SendTo、@SendToUser注解
@MessageMapping注解和@RequestMapping注解功能類似,只不過@RequestMapping表明此方法是Stomp客戶端向服務端send消息的目標地址。
使用方式如下:
@Controller
public class WebSocketController {
@Autowired
public SimpMessagingTemplate template;
@MessageMapping("/hello")
@SendTo("/topic/hello")
public Greeting greeting(Greeting message) throws Exception {
return message;
}
@MessageMapping("/message")
@SendToUser("/message")
public UserMessage userMessage(UserMessage userMessage) throws Exception {
return userMessage;
}
}
第一個方法,表示服務端可以接收客戶端通過向地址“/hello”發送過來的消息。@SendTo表示此方法會向訂閱”/topic/hello”的用戶廣播message消息。@SendTo("/topic/hello")注解等同於使用
SimpMessagingTemplate.convertAndSend("/topic/hello", new Response("你好" ));
客戶端通過
stomp.subscribe("/topic/hello", handleFunction);
方法訂閱的地方都能收到消息。
第二個方法道理相同,只是注意這里用的是@SendToUser,這就是發送給單一客戶端的標志。本例中,客戶端接收一對一消息的主題應該是“/user/message” ,”/user/”是固定的搭配,服務器會自動識別。
@SendToUser("/message") 等同於使用
SimpMessagingTemplate.convertAndSendToUser(Key,"/message", "新消息");
客戶端通過
stomp.subscribe("/user/message", handleFunction);
方法訂閱的並且注冊時返回的username=Key時才能收到消息。
注意:相關的注解還有很多,此處不一一描述。
四、點對點發送流程
一對多廣播消息流程比較簡單,此處不做描述。
點對點發送功能區別不僅僅在使用@SendToUser或者是convertAndSendToUser方法。最重要的區別,在於底層的實現邏輯上面。
當我在剛剛學習的時候遇到了一個問題,客戶端通過
stomp.subscribe("/user/queue/notifications", handleFunction);
訂閱的地址,居然能收到后台使用
SimpMessagingTemplate.convertAndSendToUser(user.toString,"/queue/notifications", "新消息");
發布的點對點消息。
通過簡單的研究代碼,發現convertAndSendToUser底層通過方法:
@Override
public void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers,
MessagePostProcessor postProcessor) throws MessagingException {
Assert.notNull(user, "User must not be null");
user = StringUtils.replace(user, "/", "%2F");
super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}
將"/queue/notifications"轉換成了"/user/UserDO{userId=1,accountType=0, username='admin',name='超級管理員',...}/queue/notifications"。而前端按照網上的說法客戶端應該通過訂閱相同的地址"/user/UserDO{userId=1,accountType=0, username='admin',name='超級管理員',...}/queue/notifications"才能夠接受消息才對,這一點讓我百思不得其解。
注意以下開始為重點:
系統啟動通過
stomp.subscribe("/user/queue/notifications", handleFunction);
訂閱的時候,會調用org.springframework.messaging.simp.user.DefaultUserDestinationResolver的resolveDestination方法,將連接服務器返回給前端的username傳回給resolveDestination方法,然后獲取此用戶的sessionID,此ID是連接服務器時,為每個用戶生成的唯一ID,通過返回給前端的username來獲取(我的系統此時username=UserDO{userId=1,accountType=0, username='admin',name='超級管理員',...},是用戶user實體的toString()字符串)。然后最后將"/user/queue/notifications"地址轉換為"/queue/notifications-userefna60v1",其中”-user”是固定的搭配,”efna60v1”就是用戶的sessionID。
服務器通過方法
SimpMessagingTemplate.convertAndSendToUser(user.toString(),"/queue/notifications", user.getName()+"新消息");
發送消息時,首先的確是先將"/queue/notifications"轉換為"/user/UserDO{userId=1,accountType=0, username='admin',name='超級管理員',...}/queue/notifications",但是緊接着也會調用resolveDestination方法將剛剛的地址轉換為"/queue/notifications-userefna60v1"。具體過程是通過將原地址字符串進行分解,得到”UserDO{userId=1,accountType=0, username='admin',name='超級管理員',...}”(此信息就是剛剛注冊時返回給前端的username),然后再通過此信息獲取到用戶注冊時生成的sessionID,最后將地址轉化為"/queue/notifications-userefna60v1"並廣播消息,因為訂閱此地址的客戶端只有一個,因此實現了點對點通信功能。
此過程涉及到的其他方法如下:
private ParseResult parse(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String sourceDestination = SimpMessageHeaderAccessor.getDestination(headers);
if (sourceDestination == null || !checkDestination(sourceDestination, this.prefix)) {
return null;
}
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
switch (messageType) {
case SUBSCRIBE:
case UNSUBSCRIBE:
return parseSubscriptionMessage(message, sourceDestination);
case MESSAGE:
return parseMessage(headers, sourceDestination);
default:
return null;
}
}
private ParseResult parseMessage(MessageHeaders headers, String sourceDestination) {
int prefixEnd = this.prefix.length();
int userEnd = sourceDestination.indexOf('/', prefixEnd);
Assert.isTrue(userEnd > 0, "Expected destination pattern \"/user/{userId}/**\"");
String actualDestination = sourceDestination.substring(userEnd);
String subscribeDestination = this.prefix.substring(0, prefixEnd - 1) + actualDestination;
String userName = sourceDestination.substring(prefixEnd, userEnd);
userName = StringUtils.replace(userName, "%2F", "/");
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
Set<String> sessionIds;
if (userName.equals(sessionId)) {
userName = null;
sessionIds = Collections.singleton(sessionId);
}
else {
sessionIds = getSessionIdsByUser(userName, sessionId);
}
if (!this.keepLeadingSlash) {
actualDestination = actualDestination.substring(1);
}
return new ParseResult(sourceDestination, actualDestination, subscribeDestination,
sessionIds, userName);
}
//通過設置的userName來查詢sessionId
private Set<String> getSessionIdsByUser(String userName, String sessionId) {
Set<String> sessionIds;
SimpUser user = this.userRegistry.getUser(userName);
if (user != null) {
if (user.getSession(sessionId) != null) {
sessionIds = Collections.singleton(sessionId);
}
else {
Set<SimpSession> sessions = user.getSessions();
sessionIds = new HashSet<String>(sessions.size());
for (SimpSession session : sessions) {
sessionIds.add(session.getId());
}
}
}
else {
sessionIds = Collections.emptySet();
}
return sessionIds;
}
五、Stomp客戶端API
1、發起連接
client.connect(headers, connectCallback, errorCallback);
其中headers表示客戶端的認證信息:
var headers = {
login: 'mylogin',
passcode: 'mypasscode',
// additional header
'client-id': 'my-client-id'
};
若無需認證,直接使用空對象 “{}” 即可;
(1)connectCallback 表示連接成功時(服務器響應 CONNECTED 幀)的回調方法;
(2)errorCallback 表示連接失敗時(服務器響應 ERROR 幀)的回調方法,非必須;
實例代碼:
// 建立連接對象(還未發起連接)
var socket=new SockJS("/endpointChat");
// 獲取 STOMP 子協議的客戶端對象
var stompClient = Stomp.over(socket);
// 向服務器發起websocket連接並發送CONNECT幀
stompClient.connect( {},
function connectCallback (frame) {
// 連接成功時(服務器響應 CONNECTED 幀)的回調方法
console.log('已連接【' + frame + '】');
//訂閱一個消息
stompClient.subscribe('/topic/getResponse',
function (response) {
showResponse(response.body);
});
},
function errorCallBack (error) {
// 連接失敗時(服務器響應 ERROR 幀)的回調方法
console.log('連接失敗【' + error + '】');
} );
2、斷開連接
若要從客戶端主動斷開連接,可調用 disconnect() 方法:
client.disconnect(
function () {
alert("斷開連接");
});
3、發送信息
連接成功后,客戶端可使用 send() 方法向服務器發送信息:
client.send(destination url, headers, body);
其中:
(1)destination url 為服務器 controller中 @MessageMapping 中匹配的URL,字符串,必須參數;
(2)headers 為發送信息的header,JavaScript 對象,可選參數;
(3)body 為發送信息的 body,字符串,可選參數;
實例代碼:
client.send("/queue/test", {priority: 9}, "Hello, STOMP");
client.send("/queue/test", {}, "Hello, STOMP");
4、訂閱、接收消息
STOMP 客戶端要想接收來自服務器推送的消息,必須先訂閱相應的URL,即發送一個 SUBSCRIBE 幀,然后才能不斷接收來自服務器的推送消息。
訂閱和接收消息通過 subscribe() 方法實現:
subscribe(destination url, callback, headers)
其中
(1)destination url 為服務器 @SendTo 匹配的 URL,字符串;
(2)callback 為每次收到服務器推送的消息時的回調方法,該方法包含參數 message;
(3)headers 為附加的headers,JavaScript 對象;該方法返回一個包含了id屬性的 JavaScript 對象,可作為 unsubscribe() 方法的參數;默認情況下,如果沒有在headers額外添加,這個庫會默認構建一個獨一無二的ID。在傳遞headers這個參數時,可以使用你自己id。
參考代碼:
var headers = {
ack: 'client',
//這個客戶端指定了它會確認接收的信息,只接收符合這個selector : location = 'Europe'的消息。
'selector': "location = 'Europe'",
//id:’myid’
};
var callback = function(message) {
if (message.body) {
alert("got message with body " +JSON.parse( message.body)) }
else{
alert("got empty message");
} });
var subscription = client.subscribe("/queue/test", callback, headers);
如果想讓客戶端訂閱多個目的地,你可以在接收所有信息的時候調用相同的回調函數:
onmessage = function(message) {
// called every time the client receives a message
}
var sub1 = client.subscribe("queue/test", onmessage);
var sub2 = client.subscribe("queue/another", onmessage)
5、取消訂閱
var subscription = client.subscribe(...);
subscription.unsubscribe();
6、事務支持
可以在將消息的發送和確認接收放在一個事務中。
客戶端調用自身的begin()方法就可以開始啟動事務了,begin()有一個可選的參數transaction,一個唯一的可標識事務的字符串。如果沒有傳遞這個參數,那么庫會自動構建一個。
這個方法會返回一個object。這個對象有一個id屬性對應這個事務的ID,還有兩個方法:
commit()提交事務
abort()中止事務
在一個事務中,客戶端可以在發送/接受消息時指定transaction id來設置transaction。
// start the transaction
var tx = client.begin();
// send the message in a transaction
client.send("/queue/test", {transaction: tx.id}, "message in a transaction");
// commit the transaction to effectively send the message
tx.commit();
如果你在調用send()方法發送消息的時候忘記添加transction header,那么這不會稱為事務的一部分,這個消息會直接發送,不會等到事務完成后才發送。
var txid = "unique_transaction_identifier";
// start the transaction
var tx = client.begin();
// oops! send the message outside the transaction
client.send("/queue/test", {}, "I thought I was in a transaction!");
tx.abort(); // Too late! the message has been sent
7、消息確認ack
默認情況,在消息發送給客戶端之前,服務端會自動確認(acknowledged)。
客戶端可以選擇通過訂閱一個目的地時設置一個ack header為client或client-individual來處理消息確認。
在下面這個例子,客戶端必須調用message.ack()來通知客戶端它已經接收了消息。
var subscription = client.subscribe("/queue/test",
function(message) {
// do something with the message
...
// and acknowledge it
message.ack();
},
{ack: 'client'}
);
ack()接受headers參數用來附加確認消息。例如,將消息作為事務(transaction)的一部分,當要求接收消息時其實代理(broker)已經將ACK STOMP frame處理了。
var tx = client.begin();
message.ack({ transaction: tx.id, receipt: 'my-receipt' });
tx.commit();
nack()也可以用來通知STOMP 1.1.brokers(代理):客戶端不能消費這個消息。與ack()方法的參數相同。
8、debug調試
有一些測試代碼能有助於你知道庫發送或接收的是什么,從而來調試程序。
客戶端可以將其debug屬性設置為一個函數,傳遞一個字符串參數去觀察庫所有的debug語句。
client.debug = function(str) {
// append the debug log to a #debug div somewhere in the page using JQuery:
$("#debug").append(str + "\n");
};
默認情況,debug消息會被記錄在在瀏覽器的控制台。
9、心跳機制
如果STOMP broker(代理)接收STOMP 1.1版本的幀,heart-beating是默認啟用的。heart-beating也就是頻率,incoming是接收頻率,outgoing是發送頻率。
通過改變incoming和outgoing可以更改客戶端的heart-beating(默認為10000ms):
client.heartbeat.outgoing = 20000;
// client will send heartbeats every 20000ms
client.heartbeat.incoming = 0;
// client does not want to receive heartbeats
// from the server
heart-beating是利用window.setInterval()去規律地發送heart-beats或者檢查服務端的heart-beats。
————————————————
版權聲明:本文為CSDN博主「雷小濤的摸爬滾打」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/leixiaotao_java/article/details/79982309