有時候我們的項目中會用到即時通訊功能,比如電商系統中的客服聊天功能,還有在支付過程中,當用戶支付成功后,第三方支付服務會回調我們的回調接口,此時我們需要通知前端支付成功。最近發現RabbitMQ可以很方便的實現即時通訊功能,如果你沒有特殊的業務需求,甚至可以不寫后端代碼,今天給大家講講如何使用RabbitMQ來實現即時通訊!
MQTT協議
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的輕量級通訊協議,該協議構建於TCP/IP協議上。MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。
MQTT相關概念
- Publisher(發布者):消息的發出者,負責發送消息。
- Subscriber(訂閱者):消息的訂閱者,負責接收並處理消息。
- Broker(代理):消息代理,位於消息發布者和訂閱者之間,各類支持MQTT協議的消息中間件都可以充當。
- Topic(主題):可以理解為消息隊列中的路由,訂閱者訂閱了主題之后,就可以收到發送到該主題的消息。
- Payload(負載);可以理解為發送消息的內容。
- QoS(消息質量):全稱Quality of Service,即消息的發送質量,主要有QoS 0、QoS 1、QoS 2三個等級,下面分別介紹下: QoS 0(Almost Once):至多一次,只發送一次,會發生消息丟失或重復; QoS 1(Atleast Once):至少一次,確保消息到達,但消息重復可能會發生; QoS 2(Exactly Once):只有一次,確保消息只到達一次。
RabbitMQ啟用MQTT功能
RabbitMQ啟用MQTT功能,需要先安裝然RabbitMQ然后再啟用MQTT插件。
- 首先我們需要安裝並啟動RabbitMQ,對RabbitMQ不了解的朋友可以參考《花了3天總結的RabbitMQ實用技巧,有點東西!》;
- 接下來就是啟用RabbitMQ的MQTT插件了,默認是不啟用的,使用如下命令開啟即可;
rabbitmq-plugins enable rabbitmq_mqtt
- 開啟成功后,查看管理控制台,我們可以發現MQTT服務運行在1883端口上了。
MQTT客戶端
我們可以使用MQTT客戶端來測試MQTT的即時通訊功能,這里使用的是MQTTBox這個客戶端工具。
- 首先下載並安裝好MQTTBox,下載地址:http://workswithweb.com/mqttbox.html
- 點擊Create MQTT Client按鈕來創建一個MQTT客戶端;
- 接下來對MQTT客戶端進行配置,主要是配置好協議端口、連接用戶名密碼和QoS即可;
- 再配置一個訂閱者,訂閱者訂閱testTopicA這個主題,我們會向這個主題發送消息;
- 發布者向主題中發布消息,訂閱者可以實時接收到。
前端直接實現即時通訊
既然MQTTBox客戶端可以直接通過RabbitMQ實現即時通訊,那我們是不是直接使用前端技術也可以實現即時通訊?答案是肯定的!下面我們將通過html+javascript實現一個簡單的聊天功能,真正不寫一行后端代碼實現即時通訊!
- 由於RabbitMQ與Web端交互底層使用的是WebSocket,所以我們需要開啟RabbitMQ的MQTT WEB支持,使用如下命令開啟即可;
rabbitmq-plugins enable rabbitmq_web_mqtt
- 開啟成功后,查看管理控制台,我們可以發現MQTT的WEB服務運行在15675端口上了;
- WEB端與MQTT服務進行通訊需要使用一個叫MQTT.js的庫,項目地址:https://github.com/mqttjs/MQTT.js
- 實現的功能非常簡單,一個單聊功能,需要注意的是配置好MQTT服務的訪問地址為:ws://localhost:15675/ws
Title
目標Topic:發送消息:發送 清空
``
- 接下來我們訂閱不同的主題開啟兩個頁面測試下功能(頁面放在了SpringBoot應用的resource目錄下了,需要先啟動應用再訪問): 第一個訂閱主題testTopicA,訪問地址:http://localhost:8088/page/index?topic=testTopicA 第二個訂閱主題testTopicB,訪問地址:http://localhost:8088/page/index?topic=testTopicB
- 之后互相發送消息,讓我們來看看效果吧!
在SpringBoot中使用
沒有特殊業務需求的時候,前端可以直接和RabbitMQ對接實現即時通訊。但是有時候我們需要通過服務端去通知前端,此時就需要在應用中集成MQTT了,接下來我們來講講如何在SpringBoot應用中使用MQTT。
- 首先我們需要在pom.xml中添加MQTT相關依賴;
org.springframework.integration spring-integration-mqtt
- 在application.yml中添加MQTT相關配置,主要是訪問地址、用戶名密碼、默認主題信息;
rabbitmq: mqtt: url: tcp://localhost:1883 username: guest password: guest defaultTopic: testTopic
- 編寫一個Java配置類從配置文件中讀取配置便於使用;
/** * MQTT相關配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig { /** * RabbitMQ連接用戶名 */ private String username; /** * RabbitMQ連接密碼 */ private String password; /** * RabbitMQ的MQTT默認topic */ private String defaultTopic; /** * RabbitMQ的MQTT連接地址 */ private String url;}
- 添加MQTT消息訂閱者相關配置,使用@ServiceActivator注解聲明一個服務激活器,通過MessageHandler來處理訂閱消息;
/** * MQTT消息訂閱者相關配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient", mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //設置消息質量:0->至多一次;1->至少一次;2->只有一次 adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message> message) throws MessagingException { //處理訂閱消息 log.info("handleMessage : {}",message.getPayload()); } }; }}
- 添加MQTT消息發布者相關配置;
/** * MQTT消息發布者相關配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttConfig.getUrl()}); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisherClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }}
- 添加MQTT網關,用於向主題中發送消息;
/** * MQTT網關,通過接口將數據傳遞到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway { /** * 發送消息到默認topic */ void sendToMqtt(String payload); /** * 發送消息到指定topic */ void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); /** * 發送消息到指定topic並設置QOS */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
- 添加MQTT測試接口,使用MQTT網關向特定主題中發送消息;
/** * MQTT測試接口 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT測試接口")@RestController@RequestMapping("/mqtt")public class MqttController { @Autowired private MqttGateway mqttGateway; @PostMapping("/sendToDefaultTopic") @ApiOperation("向默認主題發送消息") public CommonResult sendToDefaultTopic(String payload) { mqttGateway.sendToMqtt(payload); return CommonResult.success(null); } @PostMapping("/sendToTopic") @ApiOperation("向指定主題發送消息") public CommonResult sendToTopic(String payload, String topic) { mqttGateway.sendToMqtt(payload, topic); return CommonResult.success(null); }}
- 調用接口向主題中發送消息進行測試;
- 后台成功接收到消息並進行打印。
2020-09-17 14:29:01.689 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的消息2020-09-17 14:29:06.101 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的消息2020-09-17 14:29:07.384 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的消息
總結
消息中間件應用越來越廣泛,不僅可以實現可靠的異步通信,還可以實現即時通訊,掌握一個消息中間件還是很有必要的。如果沒有特殊業務需求,客戶端或者前端直接使用MQTT對接消息中間件即可實現即時通訊,有特殊需求的時候也可以使用SpringBoot集成MQTT的方式來實現,總之消息中間件是實現即時通訊的一個好選擇!
關注公眾號:java寶典