前言:
兩年前做過spring+activemq+stomp的ws推送,那個做起來很簡單,但現在公司用的mq中間件是rabbitmq,因此需要通過rabbitmq去做ws通信。仔細搜了搜百度/谷歌,網上通過spring boot+rabbitmq+stomp的教程文章倒是一搜一大把,可惜目前的項目是非spring boot的,沒法套用
。只好自己去搗鼓。搞了幾個小時,終於弄出來了,特此與大家分享下。
RabbitMQ:
怎么安裝就不是本篇討論的話題了,自己百度/谷歌之。rabbitmq默認自帶了stomp插件,但是需要自己啟用。命令為:
rabbitmq-plugins enable rabbitmq_stomp
來來來,給個文檔地址參考參考,http://www.rabbitmq.com/stomp.html。默認用guest用戶去連接,密碼也是guest。
這里有個問題,看rabbitmq配置文件,stomp協議端口默認是61613,但是用ws協議連接卻始終連接不上,所以只能用web stomp端口,端口號是15674,這個跟activemq有所區別。(P.S. 此處最好有大神來解惑,或者告知如何用61613來連
)
Javascript:
前端代碼擼起來最方便,關鍵是調試也容易,因此先來。
var stompClient = null;
var headers = {
login: 'guest',
passcode: 'guest'
};
function wsConnect(url) {
var ws = new SockJS(url);
stompClient = Stomp.over(ws);
//var ws = new WebSocket(url);
//stompClient = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
stompClient.heartbeat.outgoing = 0;
stompClient.heartbeat.incoming = 0;
stompClient.connect(headers, function (frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/test', function (sms) {
var obj = JSON.parse(sms.body)
var count = obj.totalCount;
console.log("count: " + count);
});
});
}
然后就連接唄。
$(function(){
var url = "http://host:15674/stomp";
wsConnect(url);
});
擼完准備測試,當然是選擇chrome嘍,頁面加載后,打開console控制台,可以看到web socket連上了,前端大功告成。
Java:
定義一個StompService類專門用來發送stomp消息。注意:rabbitmq 3.7以后stomp插件不再支持sockjs,因此寫法會有變化。
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
/**
* stomp服務 rabbitmq做中間件
* @author Selwyn
* @version $Id: WebSocketConfig.java, v 0.1 9/7/2018 9:59 AM Selwyn Exp $
*/
@Component
public class StompService {
private static final String URL_TEMPLATE = "http://%s:%s/stomp";
@Value("${rabbit.host}")
private String host;
//@Value("${rabbit.stomp.port}")
private Integer port = 15674;
/**
* 連接用戶名
*/
//@Value("${rabbit.stomp.login}")
private String login = "guest";
/**
* 連接密碼
*/
//@Value("${rabbit.stomp.passCode}")
private String passCode = "guest";
private String url;
@PostConstruct
public void init()
{
url = String.format(URL_TEMPLATE, host, port);
}
/**
* 發送stomp消息
* @param dest 目的地 比如/topic/test
* @param toSend 要發送的信息
* @param <T>
*/
public <T> void connectAndSend(String dest, T toSend)
{
WebSocketClient client = new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>(1);
transports.add(new WebSocketTransport( client) );
//rabbitmq 3.7以后就別這么寫了。直接new WebSocketStompClient(client)就行
WebSocketClient transport = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(transport);
//StompSessionHandlerAdapter默認的payload類型是String, 因此MessageConverter必須是StringMessageConverter
stompClient.setMessageConverter(new StringMessageConverter());
final CustomStompSessionHandler sessionHandler =
new CustomStompSessionHandler(dest, toSend);
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.setSecWebSocketProtocol("13");
//連接用戶名/密碼也是必須的,否則連不上
StompHeaders sHeaders = new StompHeaders();
sHeaders.add("login", this.login);
sHeaders.add("passcode", this.passCode);
//開始連接,回調連接上后發送stomp消息
stompClient.connect(url, headers, sHeaders, sessionHandler);
//要同步得到發送結果的話,用CountDownLatch來做或者connect結果的future對象做get
}
}
然后編寫回調handler類。
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
/**
* 自定義stomp session 回調handler
* @author Selwyn
* @version $Id: CustomStompSessionHandler.java, v 0.1 9/7/2018 3:43 PM Selwyn Exp $
*/
@Slf4j
public class CustomStompSessionHandler extends StompSessionHandlerAdapter {
/**
* 要發送的對象,將會json化傳輸出去
*/
private Object toSend;
/**
* 目的地,一般是topic地址
*/
private String dest;
public CustomStompSessionHandler(String dest, Object toSend) {
this.toSend = toSend;
this.dest = dest;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
super.handleFrame(headers, payload);
}
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
super.afterConnected(session, connectedHeaders);
String msg = JSON.toJSONString(toSend);
try{
session.send(dest, msg);
}catch(Exception e)
{
log.error("failed to send stomp msg({}) to destination {}", msg, dest);
}finally {
//做完了關閉唄
session.disconnect();
}
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
super.handleException(session, command, headers, payload, exception);
log.error("stomp error: {}", exception);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
super.handleTransportError(session, exception);
log.error("stomp transport error: {}", exception);
}
public void setToSend(Object toSend) {
this.toSend = toSend;
}
public void setDest(String dest) {
this.dest = dest;
}
}
再自己寫個controller或者寫個單元測試方法,這里就不給出代碼了,擼完后啟動服務,就可以測試消息推送了,實踐證明,真香!
結尾:
其實整個過程還沒完,需要考慮到連接中斷等情況,客戶端和服務后台都需要做好重連機制。通過sockjs這種方式連接是沒有心跳機制的,這個比activemq帶的stomp插件要low。個人建議,如果能用spring boot的話盡量用那種方式去實現stomp消息推送。
