一、說明
MQTT是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支持所有平台,幾乎可以把所有聯網物品和外部連接起來,被用來當做傳感器和致動器(比如通過Twitter讓房屋聯網)的通信協議。
Apache Apollo是一個代理服務器,其是在ActiveMQ基礎上發展而來的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, and WebSockets 等多種協議。
總結來說MQTT只是一種消息推送的協議目前(2016/1/13)為V3.1版本,而Apache Apollo是更具這種協議而開發的一款服務性的服務程序,被用來進行消息推送。同樣的服務還有Mosquitto,介紹地址:http://blog.csdn.net/xukai871105/article/details/39252653
二、原理
Apache Apollo說白了其實很簡單,就是在服務器端創建一個唯一訂閱號,發送者可以向這個訂閱號中發東西,然后接受者(即訂閱了這個訂閱號的人)都會收到這個訂閱號發出來的消息。以此來完成消息的推送。服務器其實是一個消息中轉站。
三、下載安裝Apollo(windows)
1.下載地址:http://activemq.apache.org/apollo/


2.安裝
apollo中間件其實是免安裝的,我們只需要下載apache-apollo-1.7.1-windows-distro.zip,然后解壓到某個文件夾就可以了。在這里我解壓到D:\apache-apollo-1.7.1。解壓開的路徑如下:

3.創建屬於自己的apollo域
下載下來的是官方的apollo,我們需要自己生成自己的apollo,這樣做的好處就是我們可以根據自己的需求修改一些配置文件,創建過程如下:
1 創建一個D:\myApollo文件夾。
2 進入命令輸入模式,進入到剛創建的文件下下:cd D:\myApollo。
3 因為接到的目錄關系所以可能有些改變,目錄為(解壓路徑\bin\apollo create myapollo) 例子:D:\apache-apollo-1.7.1\bin\apollo create myapollo
4 創建成功后,在D:\myApollo會有一個myapollo子文件夾,里面內容如下:

其中 bin為啟動目錄 etc為配置目錄。
4.啟動myapollo
1 cd D:\myApollo\myapollo\bin (命名模式下進入到自己生成的apollo下的bin目錄)
2 apollo-broker run (輸入啟動命令)
輸入后效果如下:

這里我們可以看到端口配置信息
5.訪問控制台
在瀏覽器輸入http://127.0.0.1:61680/,就是上面黑窗口最后一行,打開如下頁面

然后輸入默認用戶名/密碼:(admin/password),用戶名密碼可以在etc/users.properties中找到,點擊登陸,然后進入控制頁面,可以看到myapollo,當然目前里面是沒有連接,沒有消息的,因為我們還沒有建立連接,發送消息。

至此,我們的apollo中間件就可以正常使用了。
6.修改配置
在我們的配置目錄下(D:\myApollo\myapollo\etc)

apollo.xml 為網絡配置信息,其他的不用管最主要的
<connector id="tcp" bind="tcp://0.0.0.0:61613" connection_limit="2000"/> connection_limit連接限制條數2000,就是說超過2000就GG了。可不可以修改等連接到了2000條的時候更改試試。
groups.properties 用於增加用戶
原本為: admins=admin
增加test用戶: admins=admin|test(中間用|分開)
users.properties 用於設置用戶的賬號密碼
用戶名=密碼
原本為: admin=password
增加test用戶: admin=password
test=test (下面新增一列,此處與groups.properties文件對應)
7.連接程序:
依賴包:mqtt-client-0.4.0.jar
包下載地址:https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/
代碼示例下載:http://pan.baidu.com/s/1kUmsVrT (gbk編碼)
在線代碼解析:
public class MyMqtt{
/**連接訪問者id,不能重復*/
private String clientId;
/**默認連接路徑,服務器所在ip*/
private String host="tcp://192.168.1.130:61613";
/**MQTT訪問默認用戶名*/
private String userName = "admin";
/**MQTT訪問默密碼*/
private String passWord = "password";
/**發送的訂閱號集合*/
private String[] publishTopics={"test"};
/**訂閱者的訂閱號集合*/
private String[] subTopics={"test"};
/**消息發送時的消息模式集合*/
private int[] publishQos={2};
/**消息訂閱式的消息模式集合*/
private int[] subQos={2};
/**單次MQTT連接*/
private MqttClient client;
/**連接時的一些額外設置*/
private MqttConnectOptions options;
/**發送消息是的個體topic*/
private MqttTopic topic;
/**消息傳遞hander*/
private Handler mHandler;
public MyMqtt(Context context,final Handler handler) {
clientId=Tool.getIdentifyNumber(context);
if(client==null || !client.isConnected()){
try {
//設置連接指定的額ip,連接人
client = new MqttClient(host, clientId, new MemoryPersistence());
//開始設置連接時的參數
options = new MqttConnectOptions();
//設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
options.setCleanSession(true);
//設置連接用戶名
options.setUserName(userName);
//設置連接密碼
options.setPassword(passWord.toCharArray());
//設置超時連接
options.setConnectionTimeout(10);
//設置心跳間隔
options.setKeepAliveInterval(20);
//設置當連接斷開時發送的死亡預告,當這句被接受到時 證明本連接斷開
// options.setWill(publishTopic, (clientId+"destroy").getBytes(), 0, true);
//連接回調函數
client.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {
// TODO Auto-generated method stub
Message msg = new Message();
msg.what=1;
msg.obj=message.toString();
handler.sendMessage(msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
});
client.connect(options);
client.subscribe(subTopics, subQos);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* 發送消息
* @param msg
*/
public void sendMessage(String msg){
if(client!=null && client.isConnected()){
System.out.println("MQTT發送消息了呀 "+msg);
try {
MqttMessage message=new MqttMessage();
//設置消息傳輸的類型 0,1,2可選
message.setQos(2);
//設置是否在服務器中保存消息體
message.setRetained(false);
//設置消息的內容
message.setPayload(msg.getBytes());
//循環發送,因為一次只能一個
for (String publishTopic: publishTopics) {
topic=client.getTopic(publishTopic);
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
}
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NullPointerException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
8. API查看地址
http://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/package-summary.html
