1. websocket+rabbitmq實戰
1.1. 前言
接到的需求是后台定向給指定web登錄用戶推送消息,且可能同一賬號會登錄多個客戶端都要接收到消息
1.2. 遇坑
- 基於springboot環境搭建的websocket+rabbitmq,搭建完成后發現websocket每隔一段時間會斷開,看網上有人因為nginx的連接超時機制斷開,而我這似乎是因為長連接空閑時間太長而斷開
- 經過測試,如果一直保持每隔段時間發送消息,那么連接不會斷開,所以我采用了斷開重連機制,分三種情況
- 服務器正常,客戶端正常且空閑時間不超過1分鍾,則情況正常,超過一分鍾會斷線,前端發起請求重連
- 服務器正常,客戶端關閉或注銷,服務器正常收到通知,去除對應客戶端session
- 服務器異常,客戶端正常,客戶端發現連不上服務器會嘗試重連3次,3次都連不上放棄重連
- rabbitmq定向推送,按需求需要一台機器對應一批用戶,所以定制化需要服務啟動的時候定向訂閱該ip對應的隊列名,簡單說就是動態隊列名的設定,所以又復雜了點,不能直接在注解寫死。同時因為使用的apollo配置中心,同一集群應該相同的配置,所以也不能通過提取配置的方式設定值,為了這個點設置apollo的集群方式有點小題大做,所以采用動態讀取數據庫對應的ip取出對應的隊列名。
- 部署線上tomcat的話,不需要加上一塊代碼
/**
* 使用tomcat啟動無需配置
*/
//@Configuration
//@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
1.3. 正式代碼
1.3.1. rabbimq部分
- application.properties配置
spring.rabbitmq.addresses = i.tzxylao.com:5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = 123456
spring.rabbitmq.virtual-host = /
spring.rabbitmq.connection-timeout = 15000
- 交換機和隊列配置
/**
* @author laoliangliang
* @date 2019/3/29 11:41
*/
@Configuration
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class RabbitmqConfig {
final public static String EXCHANGENAME = "websocketExchange";
/**
* 創建交換器
*/
@Bean
FanoutExchange exchange() {
return new FanoutExchange(EXCHANGENAME);
}
@Bean
public Queue queue(){
return new Queue(orderQueueName());
}
@Bean
Binding bindingExchangeMessage(Queue queue,FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(OrderReceiver orderReceiver, @Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
// 監聽隊列的名稱
container.setQueueNames(orderQueueName());
container.setExposeListenerChannel(true);
// 設置每個消費者獲取的最大消息數量
container.setPrefetchCount(100);
// 消費者的個數
container.setConcurrentConsumers(1);
// 設置確認模式為自動確認
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener(orderReceiver);
return container;
}
/**
* 在這里寫獲取訂單隊列名的具體過程
* @return
*/
public String orderQueueName(){
return "orderChannel";
}
}
- 消息監聽類
/**
* @author laoliangliang
* @date 2019/3/29 11:38
*/
@Component
@Slf4j
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class OrderReceiver implements ChannelAwareMessageListener {
@Autowired
private MyWebSocket myWebSocket;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
log.info("接收到消息:" + new String(body));
try {
myWebSocket.sendMessage(new String(body));
} catch (IOException e) {
log.error("send rabbitmq message error", e);
}
}
}
1.3.2. websocket部分
- 配置服務端點
@Configuration
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
- 核心代碼
/**
* @author laoliangliang
* @date 2019/3/28 14:40
*/
public abstract class AbstractWebSocket {
protected static Map<String, CopyOnWriteArraySet<Session>> sessionStore = new HashMap<>();
public void sendMessage(String message) throws IOException {
List<String> userCodes = beforeSendMessage();
for (String userCode : userCodes) {
CopyOnWriteArraySet<Session> sessions = sessionStore.get(userCode);
//阻塞式的(同步的)
if (sessions !=null && sessions.size() != 0) {
for (Session s : sessions) {
if (s != null) {
s.getBasicRemote().sendText(message);
}
}
}
}
}
/**
* 刪選給誰發消息
* @return
*/
protected abstract List<String> beforeSendMessage();
protected void clearSession(Session session) {
Collection<CopyOnWriteArraySet<Session>> values = sessionStore.values();
for (CopyOnWriteArraySet<Session> sessions : values) {
for (Session session1 : sessions) {
if (session.equals(session1)) {
sessions.remove(session);
}
}
}
}
}
@ServerEndpoint(value = "/websocket")
@Component
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class MyWebSocket extends AbstractWebSocket {
private static Logger log = LogManager.getLogger(MyWebSocket.class);
@Autowired
private AmqpTemplate amqpTemplate;
@PostConstruct
public void init() {
/*ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(new Runnable() {
int i = 0;
@Override
public void run() {
amqpTemplate.convertAndSend(RabbitFanout.EXCHANGENAME, "",("msg num : " + i).getBytes());
i++;
}
}, 50, 1, TimeUnit.SECONDS);*/
}
/**
* 連接建立成功調用的方法
*
* @param session 可選的參數。session為與某個客戶端的連接會話,需要通過它來給客戶端發送數據
*/
@OnOpen
public void onOpen(Session session) throws TimeoutException {
log.info("websocket connect");
//10M
session.setMaxTextMessageBufferSize(10485760);
}
/**
* 連接關閉調用的方法
*/
@OnClose
public void onClose(Session session) {
clearSession(session);
}
/**
* 收到客戶端消息后調用的方法
*
* @param message 客戶端發送過來的消息
* @param session 可選的參數
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("from client request:" + message);
CopyOnWriteArraySet<Session> sessions = sessionStore.get(message);
if (sessions == null) {
sessions = new CopyOnWriteArraySet<>();
}
sessions.add(session);
sessionStore.put(message, sessions);
}
/**
* 發生錯誤時調用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
clearSession(session);
}
/**
* 這里返回需要給哪些用戶發送消息
* @return
*/
@Override
protected List<String> beforeSendMessage() {
//TODO 給哪些用戶發送消息
return Lists.newArrayList("6");
}
}
1.3.3. 前端代碼
var websocket = null;
var reconnectCount = 0;
function connectSocket(){
var data = basicConfig();
if(data.websocketEnable !== "true"){
return;
}
//判斷當前瀏覽器是否支持WebSocket
if ('WebSocket' in window) {
if(data.localIp && data.localIp !== "" && data.serverPort && data.serverPort !== ""){
websocket = new WebSocket("ws://"+data.localIp+":"+data.serverPort+data.serverContextPath+"/websocket");
}else{
return;
}
}else {
alert('當前瀏覽器 不支持WebSocket')
}
//連接發生錯誤的回調方法
websocket.onerror = function () {
console.log("連接發生錯誤");
};
//連接成功建立的回調方法
websocket.onopen = function () {
reconnectCount = 0;
console.log("連接成功");
};
//接收到消息的回調方法,此處添加處理接收消息方法,當前是將接收到的信息顯示在網頁上
websocket.onmessage = function (event) {
console.log("receive message:" + event.data);
};
//連接關閉的回調方法
websocket.onclose = function () {
console.log("連接關閉,如需登錄請刷新頁面。");
if(reconnectCount === 3) {
reconnectCount = 0;
return;
}
connectSocket();
basicConfig();
reconnectCount++;
};
//添加事件監聽
websocket.addEventListener('open', function () {
websocket.send(data.userCode);
});
//監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。
window.onbeforeunload = function () {
console.log("closeWebSocket");
};
}
connectSocket();
function basicConfig(){
var result = {};
$.ajax({
type: "post",
async: false,
url: "${request.contextPath}/basicConfig",
data: {},
success: function (data) {
result = data;
}
});
return result;
}
1.3.4. 后端提供接口
@ApolloConfig
private Config config;
@RequestMapping(value = {"/basicConfig"})
@ResponseBody
public Map<String, Object> getUserCode(HttpSession session) {
Map<String, Object> map = new HashMap<>(2);
map.put("userCode",String.valueOf(session.getAttribute("userCode")));
String websocketEnable = config.getProperty("websocket.enabled", "false");
String serverContextPath = config.getProperty("server.context-path", "");
map.put("websocketEnable", websocketEnable);
map.put("serverContextPath", serverContextPath);
String localIp = config.getProperty("local.ip", "");
String serverPort = config.getProperty("server.port", "80");
map.put("localIp", localIp);
map.put("serverPort", serverPort);
return map;
}