MQTT:java客户端库Paho


简介

Eclipse Paho Java Client (opens new window)是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。

Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API。

emqx官方文档中有相关介绍:https://docs.emqx.cn/enterprise/v4.3/development/java.html#通过-maven-安装-paho-java

paho客户端对象初始化

先导入paho依赖:

<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>

配置MqttProperties.java:

@ConfigurationProperties("mqtt")
@Component
@Data
public class MqttProperties {

    private String brokerUrl;

    private String clientId;

    private String username;

    private String password;
}

application.yml

mqtt:
  broker-url: tcp://192.168.40.128:1883
  client-id: emq-client
  username: admin
  password: public

EmqClient.java

@Component
public class EmqClient {

    private static final Logger logger = LoggerFactory.getLogger(EmqClient.class);

    @Autowired
    private MqttProperties mqttProperties;

    private IMqttClient mqttClient;

    @PostConstruct
    public void init(){

        MemoryPersistence memoryPersistence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),
                    mqttProperties.getClientId(),
                    memoryPersistence);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 创建失败");
        }
    }
}

编写客户端一些方法

创建Qos枚举类:

public enum QosEnum {
    Qos0(0),Qos1(1),Qos2(2);

    private final int value;

    QosEnum(int value) {
        this.value = value;
    }

    public int value(){
        return this.value;
    }
}
    @Autowired
    private MqttCallback mqttCallback;

    public void connect(String username, String password){
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);//自动重连
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);//临时会话

        mqttClient.setCallback(mqttCallback);//设置方法回调
        try {
            mqttClient.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 连接失败");
        }
    }

    @PreDestroy
    public void disConnect(){
        try {
            mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 断开连接失败");
        }
    }

    public void reConnect(){
        try {
            mqttClient.reconnect();
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 重新连接失败");
        }
    }

    /**
     * 发布消息
     * @author wen.jie
     * @date 2021/7/27 16:11
     */
    public void publish(String topic, String msg, QosEnum qosEnum, boolean retained){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(msg.getBytes());
        mqttMessage.setQos(qosEnum.value());
        mqttMessage.setRetained(retained);

        try {
            mqttClient.publish(topic, mqttMessage);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("发布消息失败");
        }
    }

    /**
     * 添加订阅
     * @author wen.jie
     * @date 2021/7/27 16:18
     */
    public void subscribe(String topicFilters, QosEnum qosEnum){
        try {
            mqttClient.subscribe(topicFilters, qosEnum.value());
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("添加订阅失败");
        }
    }

编写MqttCallback回调类

MessageCallback.java

@Component
public class MessageCallback implements MqttCallback {

    private static final Logger logger = LoggerFactory.getLogger(MessageCallback.class);

    /**
     * 丢失了对服务端连接后的回调
     * @author wen.jie
     * @date 2021/7/27 16:27
     */
    @Override
    public void connectionLost(Throwable cause) {
        //丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连
        logger.info("丢失了对服务端连接");
    }

    /**
     * 订阅到消息后的回调
     * 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker
     * 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的消息都将由broker服务器再次发送到客户端
     * @author wen.jie
     * @date 2021/7/27 16:28
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        logger.info("订阅到了消息;topic={},messageid={},qos={},msg={}",
                topic,
                message.getId(),
                message.getQos(),
                new String(message.getPayload()));
    }

    /**
     * 消息发布完成且收到ack确认后的回调
     * QoS0:消息被网络发出后触发一次
     * QoS1:当收到broker的PUBACK消息后触发
     * QoS2:当收到broker的PUBCOMP消息后触发
     * @author wen.jie
     * @date 2021/7/27 16:34
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        logger.info("消息发送完成,messageId={},topics={}",messageId,topics);
    }
}

主启动类

@SpringBootApplication
public class EmqDemoApplication {

    @Autowired
    private EmqClient emqClient;

    @Autowired
    private MqttProperties mqttProperties;

    @PostConstruct
    public void init(){
        emqClient.connect(mqttProperties.getUsername(), mqttProperties.getPassword());
        emqClient.subscribe("topictest/#", QosEnum.Qos2);
        new Thread(()->{
            while (true){
                emqClient.publish("topictest/123", "hello world", QosEnum.Qos2, true);
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }).start();
    }

    public static void main(String[] args) {
        SpringApplication.run(EmqDemoApplication.class, args);
    }

}

测试

启动主启动类,观察效果

image-20210727164223278

每五秒钟收发一次消息,测试成功


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM