1.maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2.application.properties配置
#MQTT配置信息
#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
zxwl.mqtt.host=tcp://127.0.0.1:1883
#MQTT-连接服务器默认客户端ID
zxwl.mqtt.clientid=mqtt_subscribe_
#MQTT-用户名
zxwl.mqtt.username=admin
#MQTT-密码
zxwl.mqtt.password=admin
#MQTT-默认的消息推送主题,实际可在调用接口时指定
zxwl.mqtt.topic=testtopic
#连接超时
zxwl.mqtt.timeout=1000
3.MqttConfiguration
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import com.yunfan.mqtt.MqttPushClient; @Component @Configuration @ConfigurationProperties(MqttConfiguration.PREFIX) public class MqttConfiguration { @Autowired private MqttPushClient mqttPushClient; public static final String PREFIX = "zxwl.mqtt"; private String host; private String clientid; private String username; private String password; private String topic; private int timeout; private int keepalive; public String getClientid() { return clientid; } public void setClientid(String clientid) { this.clientid = clientid+System.currentTimeMillis(); } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getTimeout() { return timeout; } public void setTimeout(int timeout) { this.timeout = timeout; } public int getKeepalive() { return keepalive; } public void setKeepalive(int keepalive) { this.keepalive = keepalive; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } @Bean public MqttPushClient getMqttPushClient(){
mqttPushClient.getParameters(host, clientid, username, password, timeout,keepalive);
mqttPushClient.connect();
return mqttPushClient; } }
4.MqttPushClient
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MqttPushClient { private static Logger log = LoggerFactory.getLogger(MqttPushClient.class); @Autowired private PushCallback pushCallback; private static MqttClient client; public static MqttClient getClient() { return client; } public static void setClient(MqttClient client) { MqttPushClient.client = client; } private String host; private String clientid; private String username; private String password; private int timeout; private int keepalive; public void getParameters(String host,String clientid,String username,String password,int timeout,int keepalive) { this.host = host; this.clientid = clientid; this.username = username; this.password = password; this.timeout = timeout; this.keepalive = keepalive; } /** * 设置mqtt连接参数 * * @param username * @param password * @param timeout * @param keepalive * @return */ public MqttConnectOptions setMqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); options.setCleanSession(false); return options; } public void connect(){ try { if (client == null) { client = new MqttClient(host, clientid, new MemoryPersistence()); client.setCallback(pushCallback); } MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(); if (!client.isConnected()) { client.connect(mqttConnectOptions); } else { client.disconnect(); client.connect(mqttConnectOptions); } } catch (Exception e) { e.printStackTrace(); } } /*public void connect(String host, String clientID, String username, String password, int timeout, int keepalive){ MqttClient client; try { client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); MqttPushClient.setClient(client); try { client.setCallback(pushCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } }*/ /** * 发布,默认qos为0,非持久化 * @param topic * @param pushMessage */ public static void publish(String topic,String pushMessage){ publish(0, false, topic, pushMessage); } /** * 发布 * @param qos * @param retained * @param topic * @param pushMessage */ public static void publish(int qos,boolean retained,String topic,String pushMessage){ MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if(null == mTopic){ log.error("topic not exist"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅某个主题,qos默认为0 * @param topic */ public static void subscribe(String topic){ subscribe(topic,0); } /** * 订阅某个主题 * @param topic * @param qos */ public static void subscribe(String topic,int qos){ try { MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
5.PushCallback
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class PushCallback implements MqttCallback { private static Logger log = LoggerFactory.getLogger(PushCallback.class); @Autowired private MqttPushClient mqttPushClient; @Override public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 log.error("连接断开,尝试重连.."); long reconnectTimes = 1; while (true) { try { if (mqttPushClient.getClient().isConnected()) { log.info("mqtt reconnect success end"); return; } log.info("mqtt reconnect times = {} try again...", reconnectTimes++); mqttPushClient.connect(); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(10000); } catch (InterruptedException e1) { //e1.printStackTrace(); } } } @Override public void deliveryComplete(IMqttDeliveryToken token) { log.info("deliveryComplete: " + token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + message.getQos()); log.info("接收消息内容 : " + new String(message.getPayload())); } }
6.默认加载的订阅主题
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Component public class Subscribe implements ApplicationRunner{ /** * 默认订阅的主题 */ @Override public void run(ApplicationArguments args) throws Exception { MqttPushClient.subscribe("test01"); MqttPushClient.subscribe("test02"); } }