一、STOMP 簡介
直接使用WebSocket(或SockJS)就很類似於使用TCP套接字來編寫Web應用。因為沒有高層級的線路協議(wire protocol),因此就需要我們定義應用之間所發送消息的語義,還需要確保連接的兩端都能遵循這些語義。
就像HTTP在TCP套接字之上添加了請求-響應模型層一樣,STOMP在WebSocket之上提供了一個基於幀的線路格式(frame-based wire format)層,用來定義消息的語義。
與HTTP請求和響應類似,STOMP幀由命令、一個或多個頭信息以及負載所組成。例如,如下就是發送數據的一個STOMP幀:
>>> SEND transaction:tx-0 destination:/app/marco content-length:20 {"message":"Marco!"}
在這個例子中,STOMP命令是send,表明會發送一些內容。緊接着是三個頭信息:一個表示消息的的事務機制,一個用來表示消息要發送到哪里的目的地,另外一個則包含了負載的大小。然后,緊接着是一個空行,STOMP幀的最后是負載內容。
二、服務端實現
1、啟用STOMP功能
STOMP 的消息根據前綴的不同分為三種。如下,以 /app 開頭的消息都會被路由到帶有@MessageMapping 或 @SubscribeMapping 注解的方法中;以/topic 或 /queue 開頭的消息都會發送到STOMP代理中,根據你所選擇的STOMP代理不同,目的地的可選前綴也會有所限制;以/user開頭的消息會將消息重路由到某個用戶獨有的目的地上。

@Configuration @EnableWebSocketMessageBroker @PropertySource("classpath:resources.properties") public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer { @Value("${rabbitmq.host}") private String host; @Value("${rabbitmq.port}") private Integer port; @Value("${rabbitmq.userName}") private String userName; @Value("${rabbitmq.password}") private String password; /** * 將 "/stomp" 注冊為一個 STOMP 端點。這個路徑與之前發送和接收消息的目的地路徑有所 * 不同。這是一個端點,客戶端在訂閱或發布消息到目的地路徑前,要連接到該端點。 * * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/stomp").withSockJS(); } /** * 如果不重載它的話,將會自動配置一個簡單的內存消息代理,用它來處理以"/topic"為前綴的消息 * * @param registry */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //基於內存的STOMP消息代理 registry.enableSimpleBroker("/queue", "/topic"); //基於RabbitMQ 的STOMP消息代理 /* registry.enableStompBrokerRelay("/queue", "/topic") .setRelayHost(host) .setRelayPort(port) .setClientLogin(userName) .setClientPasscode(password);*/ registry.setApplicationDestinationPrefixes("/app", "/foo"); registry.setUserDestinationPrefix("/user"); } }
2、處理來自客戶端的STOMP消息
服務端處理客戶端發來的STOMP消息,主要用的是 @MessageMapping 注解。如下:
@MessageMapping("/marco") @SendTo("/topic/marco") public Shout stompHandle(Shout shout){ LOGGER.info("接收到消息:" + shout.getMessage()); Shout s = new Shout(); s.setMessage("Polo!"); return s; }
2.1、@MessageMapping 指定目的地是“/app/marco”(“/app”前綴是隱含的,因為我們將其配置為應用的目的地前綴)。
2.2、方法接收一個Shout參數,因為Spring的某一個消息轉換器會將STOMP消息的負載轉換為Shout對象。Spring 4.0提供了幾個消息轉換器,作為其消息API的一部分:
2.3、尤其注意,這個處理器方法有一個返回值,這個返回值並不是返回給客戶端的,而是轉發給消息代理的,如果客戶端想要這個返回值的話,只能從消息代理訂閱。@SendTo 注解重寫了消息代理的目的地,如果不指定@SendTo,幀所發往的目的地會與觸發處理器方法的目的地相同,只不過會添加上“/topic”前綴。
2.4、如果客戶端就是想要服務端直接返回消息呢?聽起來不就是HTTP做的事情!即使這樣,STOMP 仍然為這種一次性的響應提供了支持,用的是@SubscribeMapping注解,與HTTP不同的是,這種請求-響應模式是異步的...
@SubscribeMapping("/getShout") public Shout getShout(){ Shout shout = new Shout(); shout.setMessage("Hello STOMP"); return shout; }
3、發送消息到客戶端
3.1 在處理消息之后發送消息
正如前面看到的那樣,使用 @MessageMapping 或者 @SubscribeMapping 注解可以處理客戶端發送過來的消息,並選擇方法是否有返回值。
如果 @MessageMapping 注解的控制器方法有返回值的話,返回值會被發送到消息代理,只不過會添加上"/topic"前綴。可以使用@SendTo 重寫消息目的地;
如果 @SubscribeMapping 注解的控制器方法有返回值的話,返回值會直接發送到客戶端,不經過代理。如果加上@SendTo 注解的話,則要經過消息代理。
3.2 在應用的任意地方發送消息
spring-websocket 定義了一個 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以實現自由的向任意目的地發送消息,並且訂閱此目的地的所有用戶都能收到消息。
@Autowired private SimpMessageSendingOperations simpMessageSendingOperations; /** * 廣播消息,不指定用戶,所有訂閱此的用戶都能收到消息 * @param shout */ @MessageMapping("/broadcastShout") public void broadcast(Shout shout) { simpMessageSendingOperations.convertAndSend("/topic/shouts", shout); }
3.3 為指定用戶發送消息
3.2介紹了如何廣播消息,訂閱目的地的所有用戶都能收到消息。如果消息只想發送給特定的用戶呢?spring-websocket 介紹了兩種方式來實現這種功能,一種是 基於@SendToUser注解和Principal參數,一種是SimpMessageSendingOperations 接口的convertAndSendToUser方法。
- 基於@SendToUser注解和Principal參數
@SendToUser 表示要將消息發送給指定的用戶,會自動在消息目的地前補上"/user"前綴。如下,最后消息會被發布在 /user/queue/notifications-username。但是問題來了,這個username是怎么來的呢?就是通過 principal 參數來獲得的。那么,principal 參數又是怎么來的呢?需要在spring-websocket 的配置類中重寫 configureClientInboundChannel 方法,添加上用戶的認證。

/** * 1、設置攔截器 * 2、首次連接的時候,獲取其Header信息,利用Header里面的信息進行權限認證 * 3、通過認證的用戶,使用 accessor.setUser(user); 方法,將登陸信息綁定在該 StompHeaderAccessor 上,在Controller方法上可以獲取 StompHeaderAccessor 的相關信息 * @param registration */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(new ChannelInterceptorAdapter() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); //1、判斷是否首次連接 if (StompCommand.CONNECT.equals(accessor.getCommand())){ //2、判斷用戶名和密碼 String username = accessor.getNativeHeader("username").get(0); String password = accessor.getNativeHeader("password").get(0); if ("admin".equals(username) && "admin".equals(password)){ Principal principal = new Principal() { @Override public String getName() { return userName; } }; accessor.setUser(principal); return message; }else { return null; } } //不是首次連接,已經登陸成功 return message; } }); }
@MessageMapping("/shout") @SendToUser("/queue/notifications") public Shout userStomp(Principal principal, Shout shout) { String name = principal.getName(); String message = shout.getMessage(); LOGGER.info("認證的名字是:{},收到的消息是:{}", name, message); return shout; }
- convertAndSendToUser方法
除了convertAndSend()以外,SimpMessageSendingOperations 還提供了convertAndSendToUser()方法。按照名字就可以判斷出來,convertAndSendToUser()方法能夠讓我們給特定用戶發送消息。
@MessageMapping("/singleShout") public void singleUser(Shout shout, StompHeaderAccessor stompHeaderAccessor) { String message = shout.getMessage(); LOGGER.info("接收到消息:" + message); Principal user = stompHeaderAccessor.getUser(); simpMessageSendingOperations.convertAndSendToUser(user.getName(), "/queue/shouts", shout); }
如上,這里雖然我還是用了認證的信息得到用戶名。但是,其實大可不必這樣,因為 convertAndSendToUser 方法可以指定要發送給哪個用戶。也就是說,完全可以把用戶名的當作一個參數傳遞給控制器方法,從而繞過身份認證!convertAndSendToUser 方法最終會把消息發送到 /user/sername/queue/shouts 目的地上。
4、處理消息異常
在處理消息的時候,有可能會出錯並拋出異常。因為STOMP消息異步的特點,發送者可能永遠也不會知道出現了錯誤。@MessageExceptionHandler標注的方法能夠處理消息方法中所拋出的異常。我們可以把錯誤發送給用戶特定的目的地上,然后用戶從該目的地上訂閱消息,從而用戶就能知道自己出現了什么錯誤啦...
@MessageExceptionHandler(Exception.class) @SendToUser("/queue/errors") public Exception handleExceptions(Exception t){ t.printStackTrace(); return t; }
三、客戶端實現
1、JavaScript 依賴
STOMP 依賴 sockjs.js 和 stomp.min.js。stomp.min.js的下載鏈接:http://www.bootcdn.cn/stomp.js/
<script type="text/javascript" src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.js"></script> <script type="text/javascript" src="/js/stomp.min.js"></script>
2、JavaScript 客戶端實現

/*STOMP*/ var url = 'http://localhost:8080/stomp'; var sock = new SockJS(url); var stomp = Stomp.over(sock); var strJson = JSON.stringify({'message': 'Marco!'}); //默認的和STOMP端點連接 /*stomp.connect("guest", "guest", function (franme) { });*/ var headers={ username:'admin', password:'admin' }; stomp.connect(headers, function (frame) { //發送消息 //第二個參數是一個頭信息的Map,它會包含在STOMP的幀中 //事務支持 var tx = stomp.begin(); stomp.send("/app/marco", {transaction: tx.id}, strJson); tx.commit(); //訂閱服務端消息 subscribe(destination url, callback[, headers]) stomp.subscribe("/topic/marco", function (message) { var content = message.body; var obj = JSON.parse(content); console.log("訂閱的服務端消息:" + obj.message); }, {}); stomp.subscribe("/app/getShout", function (message) { var content = message.body; var obj = JSON.parse(content); console.log("訂閱的服務端直接返回的消息:" + obj.message); }, {}); /*以下是針對特定用戶的訂閱*/ var adminJSON = JSON.stringify({'message': 'ADMIN'}); /*第一種*/ stomp.send("/app/singleShout", {}, adminJSON); stomp.subscribe("/user/queue/shouts",function (message) { var content = message.body; var obj = JSON.parse(content); console.log("admin用戶特定的消息1:" + obj.message); }); /*第二種*/ stomp.send("/app/shout", {}, adminJSON); stomp.subscribe("/user/queue/notifications",function (message) { var content = message.body; var obj = JSON.parse(content); console.log("admin用戶特定的消息2:" + obj.message); }); /*訂閱異常消息*/ stomp.subscribe("/user/queue/errors", function (message) { console.log(message.body); }); //若使用STOMP 1.1 版本,默認開啟了心跳檢測機制(默認值都是10000ms) stomp.heartbeat.outgoing = 20000; stomp.heartbeat.incoming = 0; //客戶端不從服務端接收心跳包 });