benthos 是一個stream 處理框架,streamsets 也是,但是兩者可以通過不同的工具進行集成起來
一般我們可以使用http 服務,消息中間件(kafka、rabbitmq 。。。)
使用docker-compose 運行
服務配置
- docker-compose 文件
version: "3"
services:
sets:
image: streamsets/datacollector
volumes:
- "./ms/data:/data"
- "./ms/logs:/logs"
- "./ms/tmp:/tmp"
ports:
- "8000:8000"
- "18630:18630"
redis:
image: redis
ports:
- "6379:6379"
benthos-in:
image: jeffail/benthos
environment:
INPUT_TYPE: "http_server"
INPUT_HTTP_SERVER_ADDRESS: "0.0.0.0:8080"
OUTPUT_TYPE: "amqp"
OUTPUT_AMQP_URL: "amqp://guest:guest@rabbitmq:5672/appdemo"
OUTPUT_AMQP_EXCHANGE: "benthos-exchange"
OUTPUT_AMQP_EXCHANGE_TYPE: "direct"
OUTPUT_AMQP_KEY: "benthos-key"
ports:
- "8080:8080"
rabbitmq:
image: rabbitmq:3.6.14-management
ports:
- "4369:4369"
- "5671:5671"
- "5672:5672"
- "25672:25672"
- "15672:15672"
- 啟動
docker-compose up -d
- 添加streamsets rabbitmq 包
- rabbitmq vhost 創建
說明
benthos 使用http service 與rabbitmq 集成的模式,streamsets 使用rabbitmq customer 與local fs 集成的模式
benthos 服務的啟動 curl http://localhost:8080/post -d "example message"
同時首選需要進行vhost 的創建,目前發現的一個bug的使用默認的就會有問題,需要單獨創建,可以使用管理界面
http://docker-host:15672
streamsets 配置
- pipeline flow
- rabbitmq 配置
- local fs
- benthos 配置說明
參考docker-compose 文件
environment:
INPUT_TYPE: "http_server"
INPUT_HTTP_SERVER_ADDRESS: "0.0.0.0:8080"
OUTPUT_TYPE: "amqp"
OUTPUT_AMQP_URL: "amqp://guest:guest@rabbitmq:5672/appdemo"
OUTPUT_AMQP_EXCHANGE: "benthos-exchange"
OUTPUT_AMQP_EXCHANGE_TYPE: "direct"
OUTPUT_AMQP_KEY: "benthos-key"
啟動pipeline
效果
- 訪問(發送消息)
curl http://localhost:8080/post -d "rongfengliang@qq.com"
- 效果
配置問題說明
目前發現streamsets rabbit 配置必須包含一個vhost 不能使用默認的。
參考資料
https://github.com/rongfengliang/streamsets-demos
https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/RabbitMQ.html#concept_dyg_lq1_h5
https://github.com/Jeffail/benthos