Activemq MQTT 簡單消息推送示例
簡介
簡單使用 MQTT 連接 Activemq 進行消息推送的示例代碼
編寫詳情
環境准備
使用docker啟動Activemq,查看MQTT協議監聽端口是否正確,如下命令,顯示1883:
docker run -dit --name activemq -p 11616:61616 -p 8161:8161 -p 1883:1883 rmohr/activemq
docker exec -ti activemq cat /opt/activemq/conf/activemq.xml
訂閱者
類似手機客戶端,接收消息推送,簡單打印收到的消息,代碼如下:
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;
public class Listener {
public static void main(String[] args) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
mqtt.setUserName("admin");
mqtt.setPassword("admin");
final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new org.fusesource.mqtt.client.Listener() {
@Override
public void onConnected() {
}
@Override
public void onDisconnected() {
}
@Override
public void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Runnable runnable) {
String message = buffer.utf8().toString();
System.out.println("Receive message : " + message);
}
@Override
public void onFailure(Throwable throwable) {
}
});
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void aVoid) {
Topic[] topics = {new Topic("mqttTest", QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
@Override
public void onSuccess(byte[] bytes) {
System.out.println("subscribe success");
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("subscribe failed");
}
});
}
@Override
public void onFailure(Throwable throwable) {
}
});
synchronized (Listener.class) {
while (true) {
Listener.class.wait();
}
}
}
}
發布者
進行消息的發布,代碼大致如下:
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
public class Publisher {
public static void main(String[] args) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
mqtt.setUserName("admin");
mqtt.setPassword("admin");
FutureConnection connection = mqtt.futureConnection();
connection.connect().await();
System.out.println("connect");
int messageAmount = 10;
UTF8Buffer topic = new UTF8Buffer("mqttTest");
while (messageAmount > 0) {
connection.publish(topic, new AsciiBuffer("test message" + messageAmount), QoS.AT_LEAST_ONCE, false);
System.out.println("send message " + messageAmount);
messageAmount -= 1;
}
connection.disconnect().await();
System.out.println("disconnect");
}
}
運行
先啟動訂閱者,再啟動發布者,可以看到消息發送和接收
參考鏈接
-
activemq example: