項目內的大佬推薦了一種流式處理框架benthos,該工具使用go語言編寫,使用起來比較方便,只需要編寫yaml配置文件即可完成對數據的處理;
啟動就不說了,推薦大家看下官網,講的比較詳細。
該框架有三大組件:
- input 輸入
- pipeline 流水線處理
- output 輸出
該框架包含多種輸入源,多種輸出方式,數據處理提供了一些映射方法,並提供了一種自定義的語言Bloblang,功能比較強大。
官方一個簡單的例子:
input:
type: stdin
pipeline:
processors:
- bloblang: root = content().uppercase()
output:
type: stdout
上面的例子控制台輸入信息,處理的方式是將輸入字符變成大寫,最后輸出至控制台;
benthos感覺可以當一個etl工具使用。
input 組件:
每一個yaml文件都必須有的一個組件,這是程序的入口,從input 觸發整個數據的流動。
我們常用的input組件有file和kafka。
# Common config fields, showing default values
input:
file:
paths: [ ./data/*.csv ]
codec: csv
delete_on_finish:false
file
Metadata元數據(-path),元數據在流轉過程中是“不可見的”,但是可以獲取到。file的元數據保存的是文件的路徑path。
這個元數據會附加到每一條消息上。
Fields:屬性
paths:表明文件的路徑
codec:表明文件的格式,支持的格式有csv、line、gzip、tar等。
max_buffer:讀取文件的大小,默認1000000(不記得單位了)
delete_on_finish:數據處理完成后是否刪除原文件。
當目錄下有文件時就會觸發程序的流轉。
generate
可以當一個觸發器使用,沒有數據源的時候,即沒有輸入,那么正常來說程序就如法繼續進行,但是有的場景就是不需要輸入的,這是可以使用generate作為input,觸發程序的運行。
# Config fields, showing default values
input:
label: ""
generate:
mapping: ""
interval: 1s
count: 0
Fields 屬性:
mapping 可以作為生成的數據。
interval 出發周期,可以使用 5s,1min 或者0,30 */2 * * * *的形式,注意時區,默認時UTC時區。
count 觸發的次數。
其他input 還包括:hdfs、socket、http、stdin等等
processor組件
processor主要就是做數據處理的部分,benthos已經自定義了很組件例如:catch、awk、branch、compress、depress、log、sql等等,
官方提供了30多種的處理組件,剛開始做這個項目的時候感覺一切被限制的死死的,寫代碼(寫配置文件)就必須得用官方的組件進行組合來實現我的功能,感覺代碼是組合出來的,特別別扭。
一方面來源於,確實對benthos的使用比較陌生,國內用的人比較少,參考資料比較少。后來用着用着熟悉了也還好吧。另外processor中有個組件叫做bloblang,這是官方自定義的一種映射語言,
稍微讓代碼靈活了一點,有點寫代碼的趕腳了。
組件比較多,建議大家去看官方文檔。
output 組件
輸出的方式
直接上示例吧:
file的
# Config fields, showing default values
output:
label: ""
file:
path: "/tmp/data.txt"
codec: lines
上述的例子中,最終輸出到文件中,方式是一行一行的寫入。output組件中也可以使用processor,表示輸出前進行一些處理。
github地址:https://github.com/Jeffail/benthos
官方網站:https://www.benthos.dev/