起因
- 想處理后端向前端發送消息的情況,然后就了解到了原生
websocket
和stomp
協議方式來處理的幾種方式,最終選擇了stomp
來,但很多參考資料都不全,導致費了很多時間,所以這里不說基礎的內容了,只記錄一些疑惑的點。
相關前綴和注解
在后台的websocket
配置中,我們看到有/app
、/queue
、/topic
、/user
這些前綴:
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/app");//注意此處有設置
registry.setUserDestinationPrefix("/user");
}
同時在controller中又有@MessageMapping
、@SubscribeMapping
、@SendTo
、@SendToUser
等注解,這些前綴和這些注解是由一定的關系的,這邊理一下:
- 首先前端
stompjs
有兩種方式向后端交互,一種是發送消息send
,一種是訂閱subscribe
,它們在都會帶一個目的地址/app/hello
- 如果地址前綴是
/app
,那么此消息會關聯到@MessageMapping
(send
命令會到這個注解)、@SubscribeMapping
(subscribe
命令會到這個注解)中,如果沒有/app
,則不會映射到任何注解上去,例如:
當前端發送://接收前端send命令發送的 @MessageMapping("/hello") //@SendTo("/topic/hello2") public String hello(@Payload String message) { return "123"; } //接收前端subscribe命令發送的 @SubscribeMapping("/subscribe") public String subscribe() { return "456"; } //接收前端send命令,但是單對單返回 @MessageMapping("/test") @SendToUser("/queue/test") public String user(Principal principal, @Payload String message) { log.debug(principal.getName()); log.debug(message); //可以手動發送,同樣有queue //simpMessagingTemplate.convertAndSendToUser("admin","/queue/test","111"); return "111"; }
send("/app/hello",...)
才會走到上方第一個中,而返回的這個123
,並不是直接返回,而是默認將123
轉到/topic/hello
這個訂閱中去(自動在前面加上/topic
),當然可以用@SendTo("/topic/hello2")
中將123
轉到/topic/hello2
這個訂閱中;當前端發送subscribe("/app/subscribe",{接收直接返回的內容}
,會走到第二個中,而456
就不經過轉發了,直接會返回,當然也可以增加@SendTo("/topic/hello2")
注解來不直接返回,而是轉到其它訂閱中。 - 如果地址前綴是
/topic
,這個沒什么說的,一般用於訂閱消息,后台群發。 - 如果地址前綴是
/user
,這個和一對一消息有關,而且會和queue
有關聯,前端必須同時增加queue
,類似subscribe("/user/queue/test",...)
,后端的@SendToUser("/queue/test")
同樣要加queue
才能正確的發送到前端訂閱的地址。
token鑒權相關
權限相關一般是增加攔截器,網上查到的資料一般有兩種方式:
- 實現
HandshakeInterceptor
接口在beforeHandshake
方法中來處理,這種方式缺點是無法獲取header
中的值,只能獲取url
中的參數,如果token
用jwt
等很長的,用這種方式實現並不友好。 - 實現
ChannelInterceptor
接口在preSend
方法中來處理,這種方式可以獲取header
中的值,而且還可以設置用戶信息等,詳細見下方攔截器代碼
vue端相關注意點
vue
端用websocket
的好處是單頁應用,不會頻繁的斷開和重連,所以相關代碼放到App.vue
中- 由於要鑒權,所以需要登錄后再連接,這里用的方法是
watch
監聽token
,如果token
從無到有,說明剛登錄,觸發websocket
連接。 - 前端引入包
npm instll sockjs-client
和npm install stompjs
,具體代碼見下方。
相關代碼
- 后台配置
@Configuration @EnableWebSocketMessageBroker @Slf4j public class WebsocketConfig implements WebSocketMessageBrokerConfigurer { @Autowired private AuthChannelInterceptor authChannelInterceptor; @Bean public WebSocketInterceptor getWebSocketInterceptor() { return new WebSocketInterceptor(); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws")//請求地址:http://ip:port/ws .addInterceptors(getWebSocketInterceptor())//攔截器方式1,暫不用 .setAllowedOrigins("*")//跨域 .withSockJS();//開啟socketJs } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/queue", "/topic"); registry.setApplicationDestinationPrefixes("/app"); registry.setUserDestinationPrefix("/user"); } /** * 攔截器方式2 * * @param registration */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(authChannelInterceptor); } }
- 攔截器
@Component @Order(Ordered.HIGHEST_PRECEDENCE + 99) public class AuthChannelInterceptor implements ChannelInterceptor { /** * 連接前監聽 * * @param message * @param channel * @return */ @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); //1、判斷是否首次連接 if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) { //2、判斷token List<String> nativeHeader = accessor.getNativeHeader("Authorization"); if (nativeHeader != null && !nativeHeader.isEmpty()) { String token = nativeHeader.get(0); if (StringUtils.isNotBlank(token)) { //todo,通過token獲取用戶信息,下方用loginUser來代替 if (loginUser != null) { //如果存在用戶信息,將用戶名賦值,后期發送時,可以指定用戶名即可發送到對應用戶 Principal principal = new Principal() { @Override public String getName() { return loginUser.getUsername(); } }; accessor.setUser(principal); return message; } } } return null; } //不是首次連接,已經登陸成功 return message; } }
- 前端代碼,放在App.vue中:
import Stomp from 'stompjs' import SockJS from 'sockjs-client' import {mapGetters} from "vuex"; export default { name: 'App', data() { return { stompClient: null, } }, computed: { ...mapGetters(["token"]) }, created() { //只有登錄后才連接 if (this.token) { this.initWebsocket(); } }, destroyed() { this.closeWebsocket() }, watch: { token(val, oldVal) { //如果一開始沒有,現在有了,說明剛登錄,連接websocket if (!oldVal && val) { this.initWebsocket(); } //如果原先由,現在沒有了,說明退出登錄,斷開websocket if (oldVal && !val) { this.closeWebsocket(); } } }, methods: { initWebsocket() { let socket = new SockJS('http://localhost:8060/ws'); this.stompClient = Stomp.over(socket); this.stompClient.connect( {"Authorization": this.token},//傳遞token (frame) => { //測試topic this.stompClient.subscribe("/topic/subscribe", (res) => { console.log("訂閱消息1:"); console.log(res); }); //測試 @SubscribeMapping this.stompClient.subscribe("/app/subscribe", (res) => { console.log("訂閱消息2:"); console.log(res); }); //測試單對單 this.stompClient.subscribe("/user/queue/test", (res) => { console.log("訂閱消息3:"); console.log(res.body); }); //測試發送 this.stompClient.send("/app/test", {}, JSON.stringify({"user": "user"})) }, (err) => { console.log("錯誤:"); console.log(err); //10s后重新連接一次 setTimeout(() => { this.initWebsocket(); }, 10000) } ); this.stompClient.heartbeat.outgoing = 20000; //若使用STOMP 1.1 版本,默認開啟了心跳檢測機制(默認值都是10000ms) this.stompClient.heartbeat.incoming = 0; //客戶端不從服務端接收心跳包 }, closeWebsocket() { if (this.stompClient !== null) { this.stompClient.disconnect(() => { console.log("關閉連接") }) } } } }
參考
- Spring消息之STOMP,寫的挺詳細的,還有源碼
- Spring官方文檔
關於stompjs
的補充
- 如果直接用
npm i stompjs
,安裝的是這個stomp-websocket,版本是2.3.3
,七八年前的版本了,雖然還可以正常用。上方演示也是用的這個。 - 最新的版本應該是用這個
npm i @stomp/stompjs
,對應的是stompjs,當前版本已經是6.x
多了,一些用法有改動,類似發送不用send
而是publish
,官方推薦用這個。 - 新版本的前端代碼,放在App.vue中,后端沒有變化,具體文檔可參考Using STOMP with SockJS:
import {Client} from '@stomp/stompjs'; import SockJS from 'sockjs-client' import {mapGetters} from "vuex"; export default { name: 'App', data() { return { stompClient: null, } }, computed: { ...mapGetters(["name", "token"]) }, created() { //只有登錄后才連接 if (this.token) { this.initWebsocket(); } }, destroyed() { this.closeWebsocket() }, watch: { token(val, oldVal) { //如果一開始沒有,現在有了,說明剛登錄,連接websocket if (!oldVal && val) { this.initWebsocket(); } //如果原先由,現在沒有了,說明退出登錄,斷開websocket if (oldVal && !val) { this.closeWebsocket(); } } }, methods: { initWebsocket() { this.stompClient = new Client({ brokerURL: '',//可以不賦值,因為后面用SockJS來代替 connectHeaders: {"Authorization": this.token}, debug: function (str) { console.log(str); }, reconnectDelay: 10000,//重連時間 heartbeatIncoming: 4000, heartbeatOutgoing: 4000, }); //用SockJS代替brokenURL this.stompClient.webSocketFactory = function () { return new SockJS('/ws'); }; //連接 this.stompClient.onConnect = (frame) => { this.stompClient.subscribe("/topic/hello", (res) => { console.log('2:'); console.log(res); }); this.stompClient.subscribe("/app/subscribe", (res) => { console.log('3:'); console.log(res); }); //新版不用send而是publish this.stompClient.publish({ destination: '/app/hello', body: "123" }) }; //錯誤 this.stompClient.onStompError = function (frame) { console.log('Broker reported error: ' + frame.headers['message']); console.log('Additional details: ' + frame.body); //這里不需要重連了,新版自帶重連 }; //啟動 this.stompClient.activate(); }, closeWebsocket() { if (this.stompClient !== null) { this.stompClient.deactivate() } } } }
- 新版本是不推薦用SockJS的,理由是現在大多數瀏覽器除了舊的IE,其它的都支持,所以如果不用的話,前端直接用
brokerURL
而不需要用webSocketFactory
來配置了,后端配置項需要修改,參考這個回答:@Override public void registerStompEndpoints(StompEndpointRegistry registry) { //允許原生的websocket registry.addEndpoint("/ws")//請求地址:ws://ip:port/ws .setAllowedOrigins("*");//跨域 //允許sockJS registry.addEndpoint("/ws")//請求地址:http://ip:port/ws .setAllowedOrigins("*")//跨域 .withSockJS();//開啟sockJs }