RabbitMQ踩坑記


 

之前我們給我們的系統加了一個使用SpringAOP+RabbitMQ+WebSocket進行實時消息通知功能(https://www.cnblogs.com/little-sheep/p/9934887.html)。在測試環境下沒有問題,但上到生產環境后部分用戶反映出現了丟消息的情況,針對這個問題我們進行了排查,發現,原本我們的系統是單機的,但用戶在之前做了調整,在內外網服務器分別部署了系統,兩個服務器都公用一個RabbitMQ。那么問題就來了。

之前生產者向Exchange生產消息,消費者從queue消費消息使用的是direct模式,通過routing-key保證生產的消息只有指定的消費者可消費。但當兩台服務器共用一個MQ時即有了兩個消費者連接同一個queue,此時rabbitmq並不是一個消息兩個消費者都能消費,而是采用默認的輪詢發送方式,A服務器收到消息1、3、5 。。。而B服務器收到2、4、6 。。。

這就出現了用戶感覺的丟消息現象。所以我們考慮將通知改為廣播形式即fanout。

       RabbitMQ將消息中間件的實現分成了Exchange+Queue的形式,Exchange和Queue使用Binding;生產者向Exchange生產消息,消息根據指定的binding進入Queue,消費者從Queue取消息。Fanout模式如圖:

一個消費者會對應一個queue,那么多個消費者要有多個queue。

修改后代碼如下:

RabbitMQConfig.java

//聲明exchange 
Connection connection = factory.createConnection();
Channel channel = connection.createChannel(false);
//生產環境中有多個server,每個server都是一個消費者,對同一個消息都要進行處理。選用廣播模式
channel.exchangeDeclare("exchange.websocket.msg", BuiltinExchangeType.FANOUT);

RabbitMessageQueue.java

@Override
public void send(WebSocketMsgEntity entity) {
    logger.warn("::product msg to MQ-websocket_msg_queue!");
    //若沒有指定exchange,則使用默認名為“”的exchange,binding名與queue名相同
    rabbitTemplate.convertAndSend("exchange.websocket.msg","", entity);
}

RabbitMQListener.java

@Component
public class RabbitMQListener {
    private static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class);
    @Autowired
    private RabbitMQService mqService;
    /**
     * WebSocket推送監聽器
     * @param socketEntity
     * @param deliveryTag
     * @param channel
     */
    @RabbitListener(bindings ={@QueueBinding(value = @Queue(exclusive = "true"), exchange = @Exchange(value = "exchange.websocket.msg", type = ExchangeTypes.FANOUT))})
    public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
        logger.warn("::consume msg from MQ-websocket_msg_queue!");
        mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel);
    }

}

原本的客戶端監聽是監聽Queue,而現在改成監聽Binding。並不顯式的指定一個Queue,而是將Queue設置成exclusive = true,這樣每個消費者在監聽Binding時都會默認創建一個Queue與指定的Exchange綁定,在消費者斷開連接后Queue自動刪除。若有兩個消費者,則創建兩個Queue,他們綁定的Exchange相同,當生產者有消息時會向兩個Queue各插入一條,那么兩個系統的用戶就都能收到通知啦!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM