apollo stomp client 代碼示例


0、環境准備

   0.1、linux

   0.2、java

   0.3、下載apollo二進制包,解壓

   0.4、創建broker,名字為 userlog

{APOLLO_HOME}/bin/apollo create userlog

  0.5 啟動apollo

  cd {APOLLO_HOME}/userlog

  bin/apollo-broker run

  如果需要以服務形式啟動,執行下面命令

  sudo ln -s "/opt/db/apache-apollo-1.7.1/test/bin/apollo-broker-service" /etc/init.d/

  /etc/init.d/apollo-broker-service start

 

1、各協議開放的端口

 

INFO  | Accepting connections at: tcp://0.0.0.0:61613

INFO  | Accepting connections at: tls://0.0.0.0:61614

INFO  | Accepting connections at: ws://0.0.0.0:61623/

INFO  | Accepting connections at: wss://0.0.0.0:61624/

INFO  | Administration interface available at: https://127.0.0.1:61681/

INFO  | Administration interface available at: http://127.0.0.1:61680/

 

3、生產者

Client c = new Client("172.16.163.141", 61613, "admin", "password");

final HashMap headers = new HashMap();

for (int i = 0; i < 5; i++) {

c.send("/topic/productlogs", "M" + i);

System.out.println("Send:" + "M" + i);

Thread.sleep(20);

}

c.disconnect(headers);

 

4、消費者

Client c = new Client("172.16.163.141", 61613, "admin", "password");

final HashMap headers = new HashMap();

c.subscribe("/topic/productlogs", new Listener() {

@Override

public void message(Map headers, String body) {

System.out.println("receive:" + body);

}

}, headers);

 

4.1、發送消息

    先啟動生產者,這時沒有消費者,消息會被丟掉。消息發送完后,toplic會被自動刪掉

注意:當不進行持久化的時候,在生產者啟動之后的消費者,不能收到生產者發送的歷史消息,只能從沒有丟掉的消息開始接收。

4.2、消費消息

   啟動消費者,這時,如果有消息,將打印消息內容。

如果消費者啟動2份,每一個消費者,將會受到所有消息,如,生產者發5條消息,這5條消息,這些消費者每個都收到5條消息

 

4.3 消息分組與持久化

下面我們事先所有消費者均攤這5條消息

消費者中添加以下代碼

headers.put("persistent", "true");//persistent 的值為string,否則,會產生意想不到的效果(所有消費者都收不到任何消息)

headers.put("id", "dusb_1");//id 為消息分組的依據,每一組收到全量消息。id相同的消費者,均攤該組的所有消息。

//headers.put("credit", "50,0");

實驗1:

啟動2份消費者,這時,消息被均攤到這兩個消費者上,比如,第一個消費者收到3條,第二條收到2條,即消息均攤。

實驗2:

此時,若是,關閉所有消費者,重新啟動生產者,發送5條消息,

web頁面上durable subs 會有5條消息持久化,之后消費者head id為dsub_1的消費者,才會接受剛才發送的消息。

結論:當該id組的消費者都掛掉的時候,消息會進行持久化。下次該組消費者啟動時,繼續進行消息消費。

思考:這時候,如果啟動四個消費者,兩個消費者id為dusb_1,另外兩個消費者id為dusb_2,啟動生產者發送5條消息,這5條消息會怎么分發。

答案:

5條消息按照id進行廣播,即id 為dusb_1的2個消費者收到這5條消息。這5條消息在2個消費者之間進行均攤。

id 為dusb_2的2個消費者也收到這5條消息,5條消息在這兩個消費者之間進行均攤

再次思考1:接上面實驗,若是斷開2個分組內的所有消費者,用生產者,發送5條消息。此時直接重啟機子,消息該怎么持久化? 

結果:dusb_1持久化5條消息,dsub_2也持久化5條消息。

再次思考2:若剛才沒有直接重啟機子,而是啟動id為dsub_2的消費者進行消費。當該組消息被消費完后,直接重啟機子。消息該怎么持久化。

結果:dusb_1持久化5條消息,dusb_2由於消息已經消費完,所以沒有消息被持久化。

重啟機子:用於模擬意外狀況。

 

結論:持久化的粒度是durable subs,也就是說persistent必須和id一起使用。說明該組消息已經被持久化 

 

5、stomp消息確認Ack

消息確認方式:auto 默認自動確認。即:只要接收到消息,即認為消息消費成功,進行自動確認。

客戶端確認:client 確認當前消息時,順便確認之前沒有來得急確認的消息。比如確認消息還沒發送到服務端,就斷開了。

客戶端確認:client-individual  只確認當前消息

6、消息駁回NACK

ack模式為client-individual:適用於駁回當前單個消息, 

ack模式為client:適用於駁回那些還沒有被ACK'edNACK'ed的消息

 

其他參數詳見:http://www.cnblogs.com/piaolingzxh/p/5450176.html

 7、apollo web ui

入口: https://127.0.0.1:61681/

默認賬戶密碼:admin password

7.1 主要概念

queue:當生產者和消費者都斷開時,會自動刪除。格式:/queue/myQueueName

topic:當生產者和消費者都斷開時,會自動刪除。格式:/topic/myTopicName

durable subs:消息分組的依據。持久化的粒度

7.2 有圖有真相

查看durable subs,可以看到剛才實驗的兩個id,分別為dsub_1,dsub_2

點擊dsub_1 ,查看該id分組的持久化信息

 

生產者、消費者如下

 

7.3 實用技巧:定位生產者、消費者程序

上邊可以看到生產者和消費者的IP及端口,

下邊,我們通過IP及端口定位程序,

ssh到IP地址

lsof -i:59446 #列出占用該端口的進程PID信息,如4133

ps -ef | grep 4133

這樣就可以定位到主進程文件了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM