pulsar號稱是下一代的消息系統,這二年風光無限,大有干掉kafka的勢頭,如果想快速體驗下,可以按以下步驟在本地搭建一個單機版本:(mac環境+jdk8)
一、 下載
wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.3.2/apache-pulsar-2.3.2-bin.tar.gz
目前最新版本是2.3.2
二、解壓
tar -zxvf apache-pulsar-2.3.2-bin.tar.gz
三、單機模式啟動
cd apache-pulsar-2.3.2 bin/pulsar standalone
四、 測試收發消息
pulsar自帶的client工具,可以直接測試收發消息。
收消息命令如下:
bin/pulsar-client consume my-topic -s "first-subscription"
表示從“my-topic”這個topic上消費消息,並且指定訂閱名稱為“first-subscription”
發消息命令如下:
bin/pulsar-client produce my-topic --messages "hello pulsar"
表示發送消息到my-topic這個topic上。
注:測試時,可以單獨開2個terminal,先在其中1個運行consumer,然后在另1個運行produce
附:如果希望寫java代碼實現收發消息,可參考https://github.com/yjmyzz/pulsar-sample
五、 Function測試
function是一個極有前途的功能,可以把一個topic中噴出的消息,實時接收並處理后,再把處理結果發到另一個topic,相當於輕量級的流式計算。
./examples目錄下有一個api-examples.jar包,里面自帶了一些Function示例。
5.1 部署Function
部署的過程,其實就是把帶處理邏輯的jar包,放到集群上,命令如下:
bin/pulsar-admin functions create \ --jar examples/api-examples.jar \ --className org.apache.pulsar.functions.api.examples.ExclamationFunction \ --inputs persistent://public/default/exclamation-input \ --output persistent://public/default/exclamation-output \ --name exclamation
大致是創建一個function,來源是examples/api-examples.jar這個文件,並指定了具體的類名(因為一個jar包中,可以寫多個function,必須指定具體的className), 然后這個function的入參是exclamation-input這個topic,處理完的結果,將輸出到exclamation-output,最后這個function在pulsar中的名字是exclamation - 注:如果上述命令執行失敗,可以嘗試把className,換成classname. (不同版本的pulsar這個參數的大小寫略有不同)
附:ExclamationFunction的java源碼如下,邏輯很簡單,只是在輸入參數后加一個!
package org.apache.pulsar.functions.api.examples; import java.util.function.Function; public class ExclamationFunction implements Function<String, String> { @Override public String apply(String input) { return String.format("%s!", input); } }
5.2 查看已部署的function列表
bin/pulsar-admin functions list \ --tenant public \ --namespace default
5.3 啟動消費者,查看實時處理結果
bin/pulsar-client consume persistent://public/default/exclamation-output \ --subscription-name my-subscription \ --num-messages 0
5.4 啟動生產者,產生實時處理所需的素材
bin/pulsar-client produce persistent://public/default/exclamation-input \ --num-produce 1 \ --messages "Hello world"
參考文章: