0、環境准備
0.1、linux
0.2、java
0.3、下載apollo二進制包,解壓
0.4、創建broker,名字為 userlog
{APOLLO_HOME}/bin/apollo create userlog
0.5 啟動apollo
cd {APOLLO_HOME}/userlog
bin/apollo-broker run
如果需要以服務形式啟動,執行下面命令
sudo ln -s "/opt/db/apache-apollo-1.7.1/test/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start
1、各協議開放的端口
INFO | Accepting connections at: tcp://0.0.0.0:61613
INFO | Accepting connections at: tls://0.0.0.0:61614
INFO | Accepting connections at: ws://0.0.0.0:61623/
INFO | Accepting connections at: wss://0.0.0.0:61624/
INFO | Administration interface available at: https://127.0.0.1:61681/
INFO | Administration interface available at: http://127.0.0.1:61680/
3、生產者
Client c = new Client("172.16.163.141", 61613, "admin", "password");
final HashMap headers = new HashMap();
for (int i = 0; i < 5; i++) {
c.send("/topic/productlogs", "M" + i);
System.out.println("Send:" + "M" + i);
Thread.sleep(20);
}
c.disconnect(headers);
4、消費者
Client c = new Client("172.16.163.141", 61613, "admin", "password");
final HashMap headers = new HashMap();
c.subscribe("/topic/productlogs", new Listener() {
@Override
public void message(Map headers, String body) {
System.out.println("receive:" + body);
}
}, headers);
4.1、發送消息
先啟動生產者,這時沒有消費者,消息會被丟掉。消息發送完后,toplic會被自動刪掉
注意:當不進行持久化的時候,在生產者啟動之后的消費者,不能收到生產者發送的歷史消息,只能從沒有丟掉的消息開始接收。
4.2、消費消息
啟動消費者,這時,如果有消息,將打印消息內容。
如果消費者啟動2份,每一個消費者,將會受到所有消息,如,生產者發5條消息,這5條消息,這些消費者每個都收到5條消息
4.3 消息分組與持久化
下面我們事先所有消費者均攤這5條消息
消費者中添加以下代碼
headers.put("persistent", "true");//persistent 的值為string,否則,會產生意想不到的效果(所有消費者都收不到任何消息)
headers.put("id", "dusb_1");//id 為消息分組的依據,每一組收到全量消息。id相同的消費者,均攤該組的所有消息。
//headers.put("credit", "50,0");
實驗1:
啟動2份消費者,這時,消息被均攤到這兩個消費者上,比如,第一個消費者收到3條,第二條收到2條,即消息均攤。
實驗2:
此時,若是,關閉所有消費者,重新啟動生產者,發送5條消息,
web頁面上durable subs 會有5條消息持久化,之后消費者head id為dsub_1的消費者,才會接受剛才發送的消息。
結論:當該id組的消費者都掛掉的時候,消息會進行持久化。下次該組消費者啟動時,繼續進行消息消費。
思考:這時候,如果啟動四個消費者,兩個消費者id為dusb_1,另外兩個消費者id為dusb_2,啟動生產者發送5條消息,這5條消息會怎么分發。
答案:
5條消息按照id進行廣播,即id 為dusb_1的2個消費者收到這5條消息。這5條消息在2個消費者之間進行均攤。
id 為dusb_2的2個消費者也收到這5條消息,5條消息在這兩個消費者之間進行均攤
再次思考1:接上面實驗,若是斷開2個分組內的所有消費者,用生產者,發送5條消息。此時直接重啟機子,消息該怎么持久化?
結果:dusb_1持久化5條消息,dsub_2也持久化5條消息。
再次思考2:若剛才沒有直接重啟機子,而是啟動id為dsub_2的消費者進行消費。當該組消息被消費完后,直接重啟機子。消息該怎么持久化。
結果:dusb_1持久化5條消息,dusb_2由於消息已經消費完,所以沒有消息被持久化。
重啟機子:用於模擬意外狀況。
結論:持久化的粒度是durable subs,也就是說persistent必須和id一起使用。說明該組消息已經被持久化
5、stomp消息確認Ack
消息確認方式:auto 默認自動確認。即:只要接收到消息,即認為消息消費成功,進行自動確認。
客戶端確認:client 確認當前消息時,順便確認之前沒有來得急確認的消息。比如確認消息還沒發送到服務端,就斷開了。
客戶端確認:client-individual 只確認當前消息
6、消息駁回NACK
ack模式為client-individual:
適用於駁回當前單個消息,
ack模式為client:適用於駁回
那些還沒有被ACK'ed
和NACK'ed
的消息
其他參數詳見:http://www.cnblogs.com/piaolingzxh/p/5450176.html
7、apollo web ui
入口: https://127.0.0.1:61681/
默認賬戶密碼:admin password
7.1 主要概念
queue:當生產者和消費者都斷開時,會自動刪除。格式:/queue/myQueueName
topic:當生產者和消費者都斷開時,會自動刪除。格式:/topic/myTopicName
durable subs:消息分組的依據。持久化的粒度
7.2 有圖有真相
查看durable subs,可以看到剛才實驗的兩個id,分別為dsub_1,dsub_2
點擊dsub_1 ,查看該id分組的持久化信息
生產者、消費者如下
7.3 實用技巧:定位生產者、消費者程序
上邊可以看到生產者和消費者的IP及端口,
下邊,我們通過IP及端口定位程序,
ssh到IP地址
lsof -i:59446 #列出占用該端口的進程PID信息,如4133
ps -ef | grep 4133
這樣就可以定位到主進程文件了。