一個 logstash 實例中可以同時進行多個獨立數據流程的處理工作,如下圖所示。
而在這之前用戶只能通過在單機運行多個 logstash 實例或者在配置文件中增加大量 if-else 條件判斷語句來解決。要使用 multiple pipeline 也很簡單,只需要將不同的 pipeline 在 config/pipeline.yml中定義好即可,如下所示:
- pipeline.id: apache pipeline.batch.size: 125 queue.type: persisted path.config: "/path/to/config/apache.cfg" - pipeline.id: nginx path.config: "/path/to/config/nginx.cfg"
其中 apache和nginx作為獨立的 pipeline 執行,而且配置也可以獨立設置,互不干擾。pipeline.yml的引入極大地簡化了 logstash 的配置管理工作,使得新手也可以很快完成復雜的 ETL 配置。
- `pipeline.workers`:設置啟動多少個線程執行 fliter 和 output;當 input 的內容出現堆積而 CPU 使用率還比較充足時,可以考慮增加該參數的大小;
- `pipeline.batch.size`:設置單個工作線程在執行過濾器和輸出之前收集的最大事件數,較大的批量大小通常更高效,但會增加內存開銷。輸出插件會將每個批處理作為一個輸出單元。;例如,ES 輸出會為收到的每個批次發出批量請求;調整 `pipeline.batch.size` 可調整發送到 ES 的批量請求(Bulk)的大小;
- `pipeline.batch.delay`:設置 Logstash 管道的延遲時間, 管道批處理延遲是 Logstash 在當前管道工作線程中接收事件后等待新消息的最長時間(以毫秒為單位);簡單來說,當 `pipeline.batch.size` 不滿足時,會等待 `pipeline.batch.delay` 設置的時間,超時后便開始執行 filter 和 output 操作。
在 6.3 版本中,Logstash 又增加了 Pipeline-to-Pipeline的管道機制(beta),即管道和管道之間可以連接在一起組成一個完成的數據處理流。熟悉 linux 的管道命令 |的同學應該可以很快明白這種模式的好處。這無疑使得 Logstash 的配置會更加靈活,今天我們就來了解下這種靈活自由的配置方式。
Pipeline-to-Pipeline配置
修改 config/pipeline.yml文件如下:
- pipeline.id: upstream config.string: input { stdin {} } output { pipeline { send_to => [test_output] } } - pipeline.id: downstream config.string: input { pipeline { address => test_output } } output{ stdout{}}
然后運行 logstash,其中 -r 表示配置文件有改動時自動重新加載,方便我們調試。
bin/logstash -r
在終端隨意輸入字符(比如aaa)后回車,會看到屏幕輸出了類似下面的內容,代表運行成功了。
{ "@timestamp" => 2018-12-06T14:43:50.310Z, "@version" => "1", "message" => "aaa", "host" => "rockybean-MacBook-Pro.local" }
我們再回頭看下這個配置,upstreamoutput 使用了名為 pipeline 的 plugin,然后 send_to的輸出對象test_output是在 downstream的 input pipeline plugin 中定義的。通過這個唯一的address(虛擬地址)就能夠把不同的 pipeline 連接在一起組成一個更長的pipeline來處理數據。類似下圖所示:
當數據由 upstream傳遞給 downstream時會進行一個復制操作,這也意味着在這兩個 pipeline 中的數據是完全獨立的,互不影響。有一點要注意的是:數據的復制會增加額外的性能開銷,比如會加大 JVM Heap 的使用。
2. 使用場景
2.1 Distributor Pattern 分發者模式
在一個 pipeline 處理輸入,然后根據不同的數據類型再分發到對應的 Pipeline 去處理。這種模式的好處在於統一輸入端口,隔離不同類型的處理配置文件,減少由於配置文件混合在一起帶來的維護成本。大家可以想一想如果不用這種Pipeline-to-Pipeline的方式,我們如果輕松做到一個端口處理多個來源的數據呢?
這種模式的參考配置如下所示:
# config/pipelines.yml - pipeline.id: beats-server config.string: | input { beats { port => 5044 } } output { if [type] == apache { pipeline { send_to => weblogs } } else if [type] == system { pipeline { send_to => syslog } } else { pipeline { send_to => fallback } } } - pipeline.id: weblog-processing config.string: | input { pipeline { address => weblogs } } filter { # Weblog filter statements here... } output { elasticsearch { hosts => [es_cluster_a_host] } } - pipeline.id: syslog-processing config.string: | input { pipeline { address => syslog } } filter { # Syslog filter statements here... } output { elasticsearch { hosts => [es_cluster_b_host] } } - pipeline.id: fallback-processing config.string: | input { pipeline { address => fallback } } output { elasticsearch { hosts => [es_cluster_b_host] } }
2.2 Output Isolator Pattern 輸出隔離模式
雖然 Logstash 的一個 pipeline 可以配置多個 output,但是這多個 output 會相依為命,一旦某一個 output 出問題,會導致另一個 output 也無法接收新數據。而通過這種模式可以完美解決這個問題。其運行方式如下圖所示:

通過輸出到兩個獨立的 pipeline,解除相互之間的影響,比如 http service 出問題的時候,es 依然可以正常接收數據,而且兩個 pipeline 可以配置獨立的隊列來保障數據的完備性,其配置如下所示:
# config/pipelines.yml - pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [es, http] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => es } } output { elasticsearch { } } - pipeline.id: buffered-http queue.type: persisted config.string: | input { pipeline { address => http } } output { http { } }
2.3 Forked Path Pattern 克隆路徑模式
這個模式類似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 來完成各自輸出的數據處理需求,這里就不展開講了,可以參考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 這個 pipeline 去掉了一些敏感數據:
# config/pipelines.yml - pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => ["internal-es", "partner-s3"] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => "internal-es" } } # Index the full event output { elasticsearch { } } - pipeline.id: partner queue.type: persisted config.string: | input { pipeline { address => "partner-s3" } } filter { # Remove the sensitive data mutate { remove_field => 'sensitive-data' } } output { s3 { } } # Output to partner's bucket
2.4 Collector Pattern 收集者模式
從名字可以看出,該模式是將所有 Pipeline 匯集於一處的處理模式,如下圖所示:

其配置參考如下:
# config/pipelines.yml - pipeline.id: beats config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [commonOut] } } - pipeline.id: kafka config.string: | input { kafka { ... } } output { pipeline { send_to => [commonOut] } } - pipeline.id: partner # This common pipeline enforces the same logic whether data comes from Kafka or Beats config.string: | input { pipeline { address => commonOut } } filter { # Always remove sensitive data from all input sources mutate { remove_field => 'sensitive-data' } } output { elasticsearch { } }
