起因
- 想處理后端向前端發送消息的情況,然后就了解到了原生
websocket和stomp協議方式來處理的幾種方式,最終選擇了stomp來,但很多參考資料都不全,導致費了很多時間,所以這里不說基礎的內容了,只記錄一些疑惑的點。
相關前綴和注解
在后台的websocket配置中,我們看到有/app、/queue、/topic、/user這些前綴:
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/app");//注意此處有設置
registry.setUserDestinationPrefix("/user");
}
同時在controller中又有@MessageMapping、@SubscribeMapping、@SendTo、@SendToUser等注解,這些前綴和這些注解是由一定的關系的,這邊理一下:
- 首先前端
stompjs有兩種方式向后端交互,一種是發送消息send,一種是訂閱subscribe,它們在都會帶一個目的地址/app/hello - 如果地址前綴是
/app,那么此消息會關聯到@MessageMapping(send命令會到這個注解)、@SubscribeMapping(subscribe命令會到這個注解)中,如果沒有/app,則不會映射到任何注解上去,例如:
當前端發送://接收前端send命令發送的 @MessageMapping("/hello") //@SendTo("/topic/hello2") public String hello(@Payload String message) { return "123"; } //接收前端subscribe命令發送的 @SubscribeMapping("/subscribe") public String subscribe() { return "456"; } //接收前端send命令,但是單對單返回 @MessageMapping("/test") @SendToUser("/queue/test") public String user(Principal principal, @Payload String message) { log.debug(principal.getName()); log.debug(message); //可以手動發送,同樣有queue //simpMessagingTemplate.convertAndSendToUser("admin","/queue/test","111"); return "111"; }send("/app/hello",...)才會走到上方第一個中,而返回的這個123,並不是直接返回,而是默認將123轉到/topic/hello這個訂閱中去(自動在前面加上/topic),當然可以用@SendTo("/topic/hello2")中將123轉到/topic/hello2這個訂閱中;當前端發送subscribe("/app/subscribe",{接收直接返回的內容},會走到第二個中,而456就不經過轉發了,直接會返回,當然也可以增加@SendTo("/topic/hello2")注解來不直接返回,而是轉到其它訂閱中。 - 如果地址前綴是
/topic,這個沒什么說的,一般用於訂閱消息,后台群發。 - 如果地址前綴是
/user,這個和一對一消息有關,而且會和queue有關聯,前端必須同時增加queue,類似subscribe("/user/queue/test",...),后端的@SendToUser("/queue/test")同樣要加queue才能正確的發送到前端訂閱的地址。
token鑒權相關
權限相關一般是增加攔截器,網上查到的資料一般有兩種方式:
- 實現
HandshakeInterceptor接口在beforeHandshake方法中來處理,這種方式缺點是無法獲取header中的值,只能獲取url中的參數,如果token用jwt等很長的,用這種方式實現並不友好。 - 實現
ChannelInterceptor接口在preSend方法中來處理,這種方式可以獲取header中的值,而且還可以設置用戶信息等,詳細見下方攔截器代碼
vue端相關注意點
vue端用websocket的好處是單頁應用,不會頻繁的斷開和重連,所以相關代碼放到App.vue中- 由於要鑒權,所以需要登錄后再連接,這里用的方法是
watch監聽token,如果token從無到有,說明剛登錄,觸發websocket連接。 - 前端引入包
npm instll sockjs-client和npm install stompjs,具體代碼見下方。
相關代碼
- 后台配置
@Configuration @EnableWebSocketMessageBroker @Slf4j public class WebsocketConfig implements WebSocketMessageBrokerConfigurer { @Autowired private AuthChannelInterceptor authChannelInterceptor; @Bean public WebSocketInterceptor getWebSocketInterceptor() { return new WebSocketInterceptor(); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws")//請求地址:http://ip:port/ws .addInterceptors(getWebSocketInterceptor())//攔截器方式1,暫不用 .setAllowedOrigins("*")//跨域 .withSockJS();//開啟socketJs } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/queue", "/topic"); registry.setApplicationDestinationPrefixes("/app"); registry.setUserDestinationPrefix("/user"); } /** * 攔截器方式2 * * @param registration */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(authChannelInterceptor); } } - 攔截器
@Component @Order(Ordered.HIGHEST_PRECEDENCE + 99) public class AuthChannelInterceptor implements ChannelInterceptor { /** * 連接前監聽 * * @param message * @param channel * @return */ @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); //1、判斷是否首次連接 if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) { //2、判斷token List<String> nativeHeader = accessor.getNativeHeader("Authorization"); if (nativeHeader != null && !nativeHeader.isEmpty()) { String token = nativeHeader.get(0); if (StringUtils.isNotBlank(token)) { //todo,通過token獲取用戶信息,下方用loginUser來代替 if (loginUser != null) { //如果存在用戶信息,將用戶名賦值,后期發送時,可以指定用戶名即可發送到對應用戶 Principal principal = new Principal() { @Override public String getName() { return loginUser.getUsername(); } }; accessor.setUser(principal); return message; } } } return null; } //不是首次連接,已經登陸成功 return message; } } - 前端代碼,放在App.vue中:
import Stomp from 'stompjs' import SockJS from 'sockjs-client' import {mapGetters} from "vuex"; export default { name: 'App', data() { return { stompClient: null, } }, computed: { ...mapGetters(["token"]) }, created() { //只有登錄后才連接 if (this.token) { this.initWebsocket(); } }, destroyed() { this.closeWebsocket() }, watch: { token(val, oldVal) { //如果一開始沒有,現在有了,說明剛登錄,連接websocket if (!oldVal && val) { this.initWebsocket(); } //如果原先由,現在沒有了,說明退出登錄,斷開websocket if (oldVal && !val) { this.closeWebsocket(); } } }, methods: { initWebsocket() { let socket = new SockJS('http://localhost:8060/ws'); this.stompClient = Stomp.over(socket); this.stompClient.connect( {"Authorization": this.token},//傳遞token (frame) => { //測試topic this.stompClient.subscribe("/topic/subscribe", (res) => { console.log("訂閱消息1:"); console.log(res); }); //測試 @SubscribeMapping this.stompClient.subscribe("/app/subscribe", (res) => { console.log("訂閱消息2:"); console.log(res); }); //測試單對單 this.stompClient.subscribe("/user/queue/test", (res) => { console.log("訂閱消息3:"); console.log(res.body); }); //測試發送 this.stompClient.send("/app/test", {}, JSON.stringify({"user": "user"})) }, (err) => { console.log("錯誤:"); console.log(err); //10s后重新連接一次 setTimeout(() => { this.initWebsocket(); }, 10000) } ); this.stompClient.heartbeat.outgoing = 20000; //若使用STOMP 1.1 版本,默認開啟了心跳檢測機制(默認值都是10000ms) this.stompClient.heartbeat.incoming = 0; //客戶端不從服務端接收心跳包 }, closeWebsocket() { if (this.stompClient !== null) { this.stompClient.disconnect(() => { console.log("關閉連接") }) } } } }
參考
- Spring消息之STOMP,寫的挺詳細的,還有源碼
- Spring官方文檔
關於stompjs的補充
- 如果直接用
npm i stompjs,安裝的是這個stomp-websocket,版本是2.3.3,七八年前的版本了,雖然還可以正常用。上方演示也是用的這個。 - 最新的版本應該是用這個
npm i @stomp/stompjs,對應的是stompjs,當前版本已經是6.x多了,一些用法有改動,類似發送不用send而是publish,官方推薦用這個。 - 新版本的前端代碼,放在App.vue中,后端沒有變化,具體文檔可參考Using STOMP with SockJS:
import {Client} from '@stomp/stompjs'; import SockJS from 'sockjs-client' import {mapGetters} from "vuex"; export default { name: 'App', data() { return { stompClient: null, } }, computed: { ...mapGetters(["name", "token"]) }, created() { //只有登錄后才連接 if (this.token) { this.initWebsocket(); } }, destroyed() { this.closeWebsocket() }, watch: { token(val, oldVal) { //如果一開始沒有,現在有了,說明剛登錄,連接websocket if (!oldVal && val) { this.initWebsocket(); } //如果原先由,現在沒有了,說明退出登錄,斷開websocket if (oldVal && !val) { this.closeWebsocket(); } } }, methods: { initWebsocket() { this.stompClient = new Client({ brokerURL: '',//可以不賦值,因為后面用SockJS來代替 connectHeaders: {"Authorization": this.token}, debug: function (str) { console.log(str); }, reconnectDelay: 10000,//重連時間 heartbeatIncoming: 4000, heartbeatOutgoing: 4000, }); //用SockJS代替brokenURL this.stompClient.webSocketFactory = function () { return new SockJS('/ws'); }; //連接 this.stompClient.onConnect = (frame) => { this.stompClient.subscribe("/topic/hello", (res) => { console.log('2:'); console.log(res); }); this.stompClient.subscribe("/app/subscribe", (res) => { console.log('3:'); console.log(res); }); //新版不用send而是publish this.stompClient.publish({ destination: '/app/hello', body: "123" }) }; //錯誤 this.stompClient.onStompError = function (frame) { console.log('Broker reported error: ' + frame.headers['message']); console.log('Additional details: ' + frame.body); //這里不需要重連了,新版自帶重連 }; //啟動 this.stompClient.activate(); }, closeWebsocket() { if (this.stompClient !== null) { this.stompClient.deactivate() } } } } - 新版本是不推薦用SockJS的,理由是現在大多數瀏覽器除了舊的IE,其它的都支持,所以如果不用的話,前端直接用
brokerURL而不需要用webSocketFactory來配置了,后端配置項需要修改,參考這個回答:@Override public void registerStompEndpoints(StompEndpointRegistry registry) { //允許原生的websocket registry.addEndpoint("/ws")//請求地址:ws://ip:port/ws .setAllowedOrigins("*");//跨域 //允許sockJS registry.addEndpoint("/ws")//請求地址:http://ip:port/ws .setAllowedOrigins("*")//跨域 .withSockJS();//開啟sockJs }
