簡介
Eclipse Paho Java Client (opens new window)是用 Java 編寫的 MQTT 客戶端庫(MQTT Java Client),可用於 JVM 或其他 Java 兼容平台(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API。
emqx官方文檔中有相關介紹:https://docs.emqx.cn/enterprise/v4.3/development/java.html#通過-maven-安裝-paho-java
paho客戶端對象初始化
先導入paho依賴:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
配置MqttProperties.java:
@ConfigurationProperties("mqtt")
@Component
@Data
public class MqttProperties {
private String brokerUrl;
private String clientId;
private String username;
private String password;
}
application.yml
mqtt:
broker-url: tcp://192.168.40.128:1883
client-id: emq-client
username: admin
password: public
EmqClient.java
@Component
public class EmqClient {
private static final Logger logger = LoggerFactory.getLogger(EmqClient.class);
@Autowired
private MqttProperties mqttProperties;
private IMqttClient mqttClient;
@PostConstruct
public void init(){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),
mqttProperties.getClientId(),
memoryPersistence);
} catch (MqttException e) {
e.printStackTrace();
logger.error("mqttClient 創建失敗");
}
}
}
編寫客戶端一些方法
創建Qos枚舉類:
public enum QosEnum {
Qos0(0),Qos1(1),Qos2(2);
private final int value;
QosEnum(int value) {
this.value = value;
}
public int value(){
return this.value;
}
}
@Autowired
private MqttCallback mqttCallback;
public void connect(String username, String password){
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);//自動重連
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);//臨時會話
mqttClient.setCallback(mqttCallback);//設置方法回調
try {
mqttClient.connect(options);
} catch (MqttException e) {
e.printStackTrace();
logger.error("mqttClient 連接失敗");
}
}
@PreDestroy
public void disConnect(){
try {
mqttClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
logger.error("mqttClient 斷開連接失敗");
}
}
public void reConnect(){
try {
mqttClient.reconnect();
} catch (MqttException e) {
e.printStackTrace();
logger.error("mqttClient 重新連接失敗");
}
}
/**
* 發布消息
* @author wen.jie
* @date 2021/7/27 16:11
*/
public void publish(String topic, String msg, QosEnum qosEnum, boolean retained){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(msg.getBytes());
mqttMessage.setQos(qosEnum.value());
mqttMessage.setRetained(retained);
try {
mqttClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
logger.error("發布消息失敗");
}
}
/**
* 添加訂閱
* @author wen.jie
* @date 2021/7/27 16:18
*/
public void subscribe(String topicFilters, QosEnum qosEnum){
try {
mqttClient.subscribe(topicFilters, qosEnum.value());
} catch (MqttException e) {
e.printStackTrace();
logger.error("添加訂閱失敗");
}
}
編寫MqttCallback回調類
MessageCallback.java
@Component
public class MessageCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MessageCallback.class);
/**
* 丟失了對服務端連接后的回調
* @author wen.jie
* @date 2021/7/27 16:27
*/
@Override
public void connectionLost(Throwable cause) {
//丟失對服務端的連接后觸發該方法回調,此處可以做一些特殊處理,比如重連
logger.info("丟失了對服務端連接");
}
/**
* 訂閱到消息后的回調
* 該方法由mqtt客戶端同步調用,在此方法未正確返回之前,不會發送ack確認消息到broker
* 一旦該方法向外拋出了異常客戶端將異常關閉,當再次連接時;所有QoS1,QoS2且客戶端未進行ack確認的消息都將由broker服務器再次發送到客戶端
* @author wen.jie
* @date 2021/7/27 16:28
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("訂閱到了消息;topic={},messageid={},qos={},msg={}",
topic,
message.getId(),
message.getQos(),
new String(message.getPayload()));
}
/**
* 消息發布完成且收到ack確認后的回調
* QoS0:消息被網絡發出后觸發一次
* QoS1:當收到broker的PUBACK消息后觸發
* QoS2:當收到broker的PUBCOMP消息后觸發
* @author wen.jie
* @date 2021/7/27 16:34
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
int messageId = token.getMessageId();
String[] topics = token.getTopics();
logger.info("消息發送完成,messageId={},topics={}",messageId,topics);
}
}
主啟動類
@SpringBootApplication
public class EmqDemoApplication {
@Autowired
private EmqClient emqClient;
@Autowired
private MqttProperties mqttProperties;
@PostConstruct
public void init(){
emqClient.connect(mqttProperties.getUsername(), mqttProperties.getPassword());
emqClient.subscribe("topictest/#", QosEnum.Qos2);
new Thread(()->{
while (true){
emqClient.publish("topictest/123", "hello world", QosEnum.Qos2, true);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) {
SpringApplication.run(EmqDemoApplication.class, args);
}
}
測試
啟動主啟動類,觀察效果
每五秒鍾收發一次消息,測試成功