Databus系統是微博DIP團隊開源的分布式日志傳輸系統。它是一個分布式、高可用的,用於采集和移動大量日志數據的服務。它基於流式數據的簡單而靈活的架構,具備健壯性和容錯性,具有故障轉移與恢復機制。它采用簡單的可擴展的數據投遞模型,允許用戶自定義擴展傳輸組件。
主要特性
- All-In-One 所有的日志傳輸通道整合到一個系統,避免針對每種業務相應地定制一套日志傳輸組件,這樣隨着業務的增多,運維壓力會劇增。
- 熱加載 在JVM無需重啟的情況下,可以添加、更新、刪除指定的日志傳輸通道,且不會影響到其他傳輸通道的正常工作。
- 容錯性 對於Databus分布式系統,若出現少量傳輸節點異常崩潰,那么異常崩潰節點的數據流量會切至其他節點,不影響整個系統的正常運行。
系統架構
Databus系統可對接多種數據源和數據目的地,將數據源的日志同步到數據目的地。常用的數據源有:Kafka、本地文件、ScribeClient等,常用的數據目的地有:Kafka、HDFS等。
Databus系統的核心處理模塊包含四部分:Source、Converter、Sink、Store。Source模塊負責收集數據源的日志,Converter模塊負責對日志轉換,如:重命名Topic名稱、對消息體的ETL和過濾,Sink模塊負責把日志同步到數據目的地,Store模塊負責把寫入數據目的地失敗的日志暫存起來,根據策略進行后續的處理。
Databus系統的監控報警模塊主要包含:數據量統計、靈活的Exporter插件、異常報警。數據量統計用於統計Source端的讀取量和Sink端的寫入量,便於全鏈路的數據對賬。系統暴露了Exporter接口,用戶只需針對特定的存儲系統實現相應的Exporter,即可把監控信息采集過去,配置圖表后做直觀的展示。另外若日志寫入數據目的地失敗,可通過配置策略發送報警。
數據流模型
Databus系統的數據流模型設計為一個Source對應一個Sink,一個Source和與其對應的Sink組成一個Pipeline管道,各個Pipeline相互獨立、互不影響。通過這種Pipeline模型,用戶新增、刪除、變更某個Pipeline,不會影響到其他Pipeline的數據傳輸,且使用熱部署的方式不需要重啟進程。做到盡可能少的中斷數據流,保障日志傳輸的實時性。
安裝部署
編譯
git clone https://github.com/weibodip/databus.git
cd databus
mvn clean package -DskipTests
初始化環境
mkdir -p /data0/workspace
mv ../databus /data0/workspace
mkdir /var/log/databus/
添加配置
可以在 /data0/workspace/databus/pipelines 目錄下添加多個配置文件,每個配置文件抽象為一個 pipeline,各個 pipeline 的日志傳輸互相獨立,互不干擾。這里以讀取本地文件的日志記錄,並寫入 kafka topic 的 pipeline 配置為例。
vim /data0/workspace/databus/pipelines/file-to-kafka-example.properties
pipeline.name=file-to-kafka-example
pipeline.source=com.weibo.dip.databus.source.FileSource
pipeline.converter=com.weibo.dip.databus.converter.TopicNameConverter
pipeline.store=com.weibo.dip.databus.store.DefaultStore
pipeline.sink=com.weibo.dip.databus.sink.KafkaSinkV010
#source
source.file.directory=/data0/log/databus/test/
source.file.include.pattern=^.*\\.test\\.log$
source.file.category=test
source.file.delete.after.read=true
source.file.retention.second=7200
#converter
topic.mappings=test:test
#sink
sink.kafka.bootstrap.servers=hostname1:9092,hostname2:9092,hostname3:9092
sink.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
sink.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
啟停操作
系統默認的JDK路徑:/usr/local/jdk1.8.0_144,可根據情況修改 bin/databus-server.sh 的 JAVA_HOME。
# 啟動
/data0/workspace/databus/bin/databus-server.sh start
# 查看運行狀態
/data0/workspace/databus/bin/databus-server.sh status
# 查看日志
tailf /var/log/databus/server.log
# 停止
/data0/workspace/databus/bin/databus-server.sh stop
與 Flume 對比
Flume 的模型抽象上有 Channel 的概念,這樣便於多路復用數據流,其常見的場景:
- 一個 source 復制到多個 channel
- 制定規則將一個 source 拆分到多個 channel
Flume 的多路復用數據流,增加了數據處理的靈活性,但是常用的 Channel 也存在一些問題:
- FileChannel 會降低數據寫入和讀取速度。
- MemoryChannel 增加對服務器內存的占用,數據傳輸通道過多時甚至會導致進程的OOM。
- KafkaChannel 浪費一部分的帶寬資源;且引入額外組件,會導致傳輸鏈路變長,降低服務穩定性。
考慮到 Channel 在目前的實現上存在一些問題,去掉 Channel 在一些不需要多路復用數據流的場景下,數據傳輸表現效果會更好。Databus 的設計理念在於去掉 Channel,其相比 Flume 的優勢在於:
- 模型抽象簡單,方便理解,一個 source 對應一個 sink。
- 配置項簡單,對於數十行的 Flume 配置,Databus 可能只需十幾行即可搞定。
- 數據傳輸延遲低,去掉 Channel 組件,縮短了數據鏈路,尤其對於非內存的 Channel,降低數據延遲的效果更明顯。
Flume | Databus | |
---|---|---|
模型抽象 | source-channel-sink | source-sink |
配置 | 繁多冗長 | 簡潔 |
靈活性 | 一個source對應多個sink | 一個source對應一個sink |
數據傳輸延遲 | 較高 | 較低 |
結語
項目實現了很多常用的Source 和 Sink,並對每個Source 和 Sink 的特性、適用場景,以及配置參數進行了說明,方便用戶快速上手。詳細內容可查閱項目的GitHub地址:https://github.com/weibodip/databus
Databus系統在微博業務的日常使用場景中,已經承接了各種Source 和Sink 的數據傳輸業務。在大數據和高並發場景的檢驗下,系統曾暴露出一些問題,而這些問題已經得到修復,目前系統已穩定運行多年。不過在程序的世界里,Bug是無法避免的,在使用過程中如有遇到問題,歡迎提 Issue,我們會盡快修復~