安裝
說到mqtt,首先肯定要安裝了,安裝什么的地址:http://activemq.apache.org/ap...
我本地是Windows的環境,所以裝的是Windows版本,這里是第一個注意的地方,因為后面使用的時候windows和linux的有一些不同
下載完成之后就是解壓安裝了,這里解壓完成之后進入bin目錄下,自己用cmd或者直接進去在此處打開命令窗口也行,然后運行apollo.cmd 創建一個服務實例我的實例名稱是mybroker所以命令是 apollo.cmd create mybroker,這個名稱自己可以隨便指定
創建完實例后發現bin 目錄下多了一個文件夾,這個文件夾就是你實例名稱,進入文件夾運行
.apollo-broker.cmd run 命令

這樣就啟動成功了
啟動成功可以去http://localhost:61680/console/index.html看看,登錄賬號和密碼在mybrokeretcusers.properties文件中找到輸入就可以進去了

頁面上有連接信息和訂閱主題的一些對應信息,有興趣的自己看下,后面也會講到的
使用
安裝成功接下來就是使用了,首先創建一個maven工程,引入配置
<!--mqtt-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
由於我們后面處理訂閱消息的消費者打印的日志是用了slf4j為了方便也引入了lombok的配置 :
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
引入完成以后就可以開始准備開始使用mqtt了
這里為了方便維護和配置我把一些配置參數放在了properties文件里面:
#MQTT配置信息
spring.mqtt.username=admin
spring.mqtt.password=password
spring.mqtt.url=tcp://localhost:61613
spring.mqtt.client.id=clientId
spring.mqtt.server.id=serverId
spring.mqtt.default.topic=topic
這里我遇到了一個坑,專門注釋了,就是訂閱端訂閱消息的id 和 發布端發布消息的id 一定不能一樣,這樣會導致mqtt識別到兩個一樣的id,消息一發就斷開連接了,訂閱端總是收不到消息,這個問題我找了好長時間都不知道問題出在哪,剛接觸的很容易搞錯,第二個問題就是mqtt的服務器連接地址,在Windows和linux下tcp的端口是不一樣的,在啟動的apollo的日志中可以看出來

監聽的tcp端口是61613,看別人很多的demo上都是1883,如果一直連不上,原因可能是因為這個
接下來就是spring.mqtt.default.topic
配置了,這個是mqtt訂閱和推送的消息主題,既然你想發消息那么訂閱消息的主題和發布消息的主題一致才能收到消息,和rabbitmq一樣
然后就是客戶端
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
這里有點問題,如果你是復制我的代碼的話MessageHandler 這個類是沒有的需要自己手動導包,看了源碼發現這里需要的是一個消息處理的handler需要是org.springframework.messaging.MessageHandler的實現,直接導入這個包就行了
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MsgWriter {
void sendToMqtt(String data);
void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
這個是消息發送接口,需要發送消息的時候直接調用就行了,提供了幾個重載方法payload或者data是發送消息的內容
topic是消息發送的主題,這里可以自己靈活定義,也可以使用默認的主題,就是配置文件的主題,qos是mqtt 對消息處理的幾種機制分為0,1,2 其中0表示的是訂閱者沒收到消息不會再次發送,消息會丟失,1表示的是會嘗試重試,一直到接收到消息,但這種情況可能導致訂閱者收到多次重復消息,2相比多了一次去重的動作,確保訂閱者收到的消息有一次
當然,這三種模式下的性能肯定也不一樣,qos=0是最好的,2是最差的 ,有興趣的可以去詳細了解我在這不多贅述
上面就完成了消息的發送,可以去http://localhost:61680/console/index.html看看消息的記錄,這里我寫了一個接口調用sendToMqtt方法發送一條消息

會看到收到有兩個主題,我的是因為我訂閱了兩個主題所以上面顯示的是兩個,我的剛才發布消息的主題是too所以打開會看到too有消息送達過來

如果你還沒寫訂閱方的話consumers是沒有的,現在顯示我發了7條消息,證明發送成功了
接下來就是訂閱方,為了方便我就直接寫在啟動類上了,沒有用到所有的配置
@SpringBootApplication
@EnableAutoConfiguration
public class MytestApplication {
public static void main(String[] args) {
SpringApplication.run(MytestApplication.class, args);
}
@Value("${spring.mqtt.server.id}")
private String serverId;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://localhost:61613");
factory.setUserName("admin");
factory.setPassword("password");
return factory;
}
// consumer 訂閱者監聽消息
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", received from MQTT")
.handle(logger())
.get();
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId,
mqttClientFactory(), "too");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
}
這里訂閱的主題可以指定,我訂閱的是剛才發的too主題,還有訂閱方的id 別和發送方的id 一樣
重新啟動項目,發送消息,會發現控制台已經打印出消息
代表訂閱方已經成功收到消息,同時

也顯示消息訂閱方和記錄,至此一個完整的消息發送和訂閱完成,比較簡單,但是一不留神很容易出現問題,希望能幫助到新入門的人