benthos的簡單使用


項目內的大佬推薦了一種流式處理框架benthos,該工具使用go語言編寫,使用起來比較方便,只需要編寫yaml配置文件即可完成對數據的處理;
啟動就不說了,推薦大家看下官網,講的比較詳細。

該框架有三大組件:

  • input 輸入
  • pipeline 流水線處理
  • output 輸出
    該框架包含多種輸入源,多種輸出方式,數據處理提供了一些映射方法,並提供了一種自定義的語言Bloblang,功能比較強大。

官方一個簡單的例子:

input:
  type: stdin

pipeline:
  processors:
    - bloblang: root = content().uppercase()

output:
  type: stdout

上面的例子控制台輸入信息,處理的方式是將輸入字符變成大寫,最后輸出至控制台;

benthos感覺可以當一個etl工具使用。

input 組件:

每一個yaml文件都必須有的一個組件,這是程序的入口,從input 觸發整個數據的流動。
我們常用的input組件有file和kafka。

# Common config fields, showing default values
input:
  file:
    paths: [ ./data/*.csv ]
    codec: csv
    delete_on_finish:false

file
Metadata元數據(-path),元數據在流轉過程中是“不可見的”,但是可以獲取到。file的元數據保存的是文件的路徑path。
這個元數據會附加到每一條消息上。
Fields:屬性
paths:表明文件的路徑
codec:表明文件的格式,支持的格式有csv、line、gzip、tar等。
max_buffer:讀取文件的大小,默認1000000(不記得單位了)
delete_on_finish:數據處理完成后是否刪除原文件。
當目錄下有文件時就會觸發程序的流轉。

generate
可以當一個觸發器使用,沒有數據源的時候,即沒有輸入,那么正常來說程序就如法繼續進行,但是有的場景就是不需要輸入的,這是可以使用generate作為input,觸發程序的運行。

# Config fields, showing default values
input:
  label: ""
  generate:
    mapping: ""
    interval: 1s
    count: 0

Fields 屬性:
mapping 可以作為生成的數據。
interval 出發周期,可以使用 5s,1min 或者0,30 */2 * * * *的形式,注意時區,默認時UTC時區。
count 觸發的次數。
其他input 還包括:hdfs、socket、http、stdin等等

processor組件

processor主要就是做數據處理的部分,benthos已經自定義了很組件例如:catch、awk、branch、compress、depress、log、sql等等,
官方提供了30多種的處理組件,剛開始做這個項目的時候感覺一切被限制的死死的,寫代碼(寫配置文件)就必須得用官方的組件進行組合來實現我的功能,感覺代碼是組合出來的,特別別扭。
一方面來源於,確實對benthos的使用比較陌生,國內用的人比較少,參考資料比較少。后來用着用着熟悉了也還好吧。另外processor中有個組件叫做bloblang,這是官方自定義的一種映射語言,
稍微讓代碼靈活了一點,有點寫代碼的趕腳了。
組件比較多,建議大家去看官方文檔。

output 組件

輸出的方式
直接上示例吧:
file的

# Config fields, showing default values
output:
  label: ""
  file:
    path: "/tmp/data.txt"
    codec: lines

上述的例子中,最終輸出到文件中,方式是一行一行的寫入。output組件中也可以使用processor,表示輸出前進行一些處理。

github地址:https://github.com/Jeffail/benthos
官方網站:https://www.benthos.dev/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM