RabbitMQ實現即時通訊居然如此簡單!


有時候我們的項目中會用到即時通訊功能,比如電商系統中的客服聊天功能,還有在支付過程中,當用戶支付成功后,第三方支付服務會回調我們的回調接口,此時我們需要通知前端支付成功。最近發現RabbitMQ可以很方便的實現即時通訊功能,如果你沒有特殊的業務需求,甚至可以不寫后端代碼,今天給大家講講如何使用RabbitMQ來實現即時通訊!

MQTT協議

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的輕量級通訊協議,該協議構建於TCP/IP協議上。MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。

ThirdPartyImage_ae54bd97.png

MQTT相關概念

  • Publisher(發布者):消息的發出者,負責發送消息。
  • Subscriber(訂閱者):消息的訂閱者,負責接收並處理消息。
  • Broker(代理):消息代理,位於消息發布者和訂閱者之間,各類支持MQTT協議的消息中間件都可以充當。
  • Topic(主題):可以理解為消息隊列中的路由,訂閱者訂閱了主題之后,就可以收到發送到該主題的消息。
  • Payload(負載);可以理解為發送消息的內容。
  • QoS(消息質量):全稱Quality of Service,即消息的發送質量,主要有QoS 0、QoS 1、QoS 2三個等級,下面分別介紹下: QoS 0(Almost Once):至多一次,只發送一次,會發生消息丟失或重復; QoS 1(Atleast Once):至少一次,確保消息到達,但消息重復可能會發生; QoS 2(Exactly Once):只有一次,確保消息只到達一次。

RabbitMQ啟用MQTT功能

RabbitMQ啟用MQTT功能,需要先安裝然RabbitMQ然后再啟用MQTT插件。

  • 首先我們需要安裝並啟動RabbitMQ,對RabbitMQ不了解的朋友可以參考《花了3天總結的RabbitMQ實用技巧,有點東西!》;
  • 接下來就是啟用RabbitMQ的MQTT插件了,默認是不啟用的,使用如下命令開啟即可;
rabbitmq-plugins enable rabbitmq_mqtt
  • 開啟成功后,查看管理控制台,我們可以發現MQTT服務運行在1883端口上了。

ThirdPartyImage_af39b4a1.png

MQTT客戶端

我們可以使用MQTT客戶端來測試MQTT的即時通訊功能,這里使用的是MQTTBox這個客戶端工具。

ThirdPartyImage_8e80c3e9.png

  • 點擊Create MQTT Client按鈕來創建一個MQTT客戶端;

ThirdPartyImage_d91430ae.png

  • 接下來對MQTT客戶端進行配置,主要是配置好協議端口、連接用戶名密碼和QoS即可;

ThirdPartyImage_3f0d8691.png

  • 再配置一個訂閱者,訂閱者訂閱testTopicA這個主題,我們會向這個主題發送消息;

ThirdPartyImage_55cad197.png

  • 發布者向主題中發布消息,訂閱者可以實時接收到。

ThirdPartyImage_15d5058b.png

前端直接實現即時通訊

既然MQTTBox客戶端可以直接通過RabbitMQ實現即時通訊,那我們是不是直接使用前端技術也可以實現即時通訊?答案是肯定的!下面我們將通過html+javascript實現一個簡單的聊天功能,真正不寫一行后端代碼實現即時通訊!

  • 由於RabbitMQ與Web端交互底層使用的是WebSocket,所以我們需要開啟RabbitMQ的MQTT WEB支持,使用如下命令開啟即可;
rabbitmq-plugins enable rabbitmq_web_mqtt
  • 開啟成功后,查看管理控制台,我們可以發現MQTT的WEB服務運行在15675端口上了;

ThirdPartyImage_b4e2b561.png

ThirdPartyImage_7a1a1d42.png

  • 實現的功能非常簡單,一個單聊功能,需要注意的是配置好MQTT服務的訪問地址為:ws://localhost:15675/ws
Title

目標Topic:發送消息:發送 清空

``

ThirdPartyImage_f92103e3.png

在SpringBoot中使用

沒有特殊業務需求的時候,前端可以直接和RabbitMQ對接實現即時通訊。但是有時候我們需要通過服務端去通知前端,此時就需要在應用中集成MQTT了,接下來我們來講講如何在SpringBoot應用中使用MQTT。

  • 首先我們需要在pom.xml中添加MQTT相關依賴;
org.springframework.integration    spring-integration-mqtt
  • 在application.yml中添加MQTT相關配置,主要是訪問地址、用戶名密碼、默認主題信息;
rabbitmq:  mqtt:    url: tcp://localhost:1883    username: guest    password: guest    defaultTopic: testTopic
  • 編寫一個Java配置類從配置文件中讀取配置便於使用;
/** * MQTT相關配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig {    /**     * RabbitMQ連接用戶名     */    private String username;    /**     * RabbitMQ連接密碼     */    private String password;    /**     * RabbitMQ的MQTT默認topic     */    private String defaultTopic;    /**     * RabbitMQ的MQTT連接地址     */    private String url;}
  • 添加MQTT消息訂閱者相關配置,使用@ServiceActivator注解聲明一個服務激活器,通過MessageHandler來處理訂閱消息;
/** * MQTT消息訂閱者相關配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig {    @Autowired    private MqttConfig mqttConfig;    @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",                        mqttConfig.getDefaultTopic());        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        //設置消息質量:0->至多一次;1->至少一次;2->只有一次        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }    @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {            @Override            public void handleMessage(Message> message) throws MessagingException {                //處理訂閱消息                log.info("handleMessage : {}",message.getPayload());            }        };    }}
  • 添加MQTT消息發布者相關配置;
/** * MQTT消息發布者相關配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig {    @Autowired    private MqttConfig mqttConfig;    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(new String[] { mqttConfig.getUrl()});        options.setUserName(mqttConfig.getUsername());        options.setPassword(mqttConfig.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }    @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel")    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler =                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());        return messageHandler;    }    @Bean    public MessageChannel mqttOutboundChannel() {        return new DirectChannel();    }}
  • 添加MQTT網關,用於向主題中發送消息;
/** * MQTT網關,通過接口將數據傳遞到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {    /**     * 發送消息到默認topic     */    void sendToMqtt(String payload);    /**     * 發送消息到指定topic     */    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);    /**     * 發送消息到指定topic並設置QOS     */    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
  • 添加MQTT測試接口,使用MQTT網關向特定主題中發送消息;
/** * MQTT測試接口 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT測試接口")@RestController@RequestMapping("/mqtt")public class MqttController {    @Autowired    private MqttGateway mqttGateway;    @PostMapping("/sendToDefaultTopic")    @ApiOperation("向默認主題發送消息")    public CommonResult sendToDefaultTopic(String payload) {        mqttGateway.sendToMqtt(payload);        return CommonResult.success(null);    }    @PostMapping("/sendToTopic")    @ApiOperation("向指定主題發送消息")    public CommonResult sendToTopic(String payload, String topic) {        mqttGateway.sendToMqtt(payload, topic);        return CommonResult.success(null);    }}
  • 調用接口向主題中發送消息進行測試;

ThirdPartyImage_cb4af000.png

  • 后台成功接收到消息並進行打印。
2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息

總結

消息中間件應用越來越廣泛,不僅可以實現可靠的異步通信,還可以實現即時通訊,掌握一個消息中間件還是很有必要的。如果沒有特殊業務需求,客戶端或者前端直接使用MQTT對接消息中間件即可實現即時通訊,有特殊需求的時候也可以使用SpringBoot集成MQTT的方式來實現,總之消息中間件是實現即時通訊的一個好選擇!

關注公眾號:java寶典
a


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM