logstash的pipeline


一個 logstash 實例中可以同時進行多個獨立數據流程的處理工作,如下圖所示。

而在這之前用戶只能通過在單機運行多個 logstash 實例或者在配置文件中增加大量 if-else 條件判斷語句來解決。要使用 multiple pipeline 也很簡單,只需要將不同的 pipeline 在 config/pipeline.yml中定義好即可,如下所示:

- pipeline.id: apache
  pipeline.batch.size: 125
  queue.type: persisted
  path.config: "/path/to/config/apache.cfg"
- pipeline.id: nginx
  path.config: "/path/to/config/nginx.cfg"

其中 apachenginx作為獨立的 pipeline 執行,而且配置也可以獨立設置,互不干擾。pipeline.yml的引入極大地簡化了 logstash 的配置管理工作,使得新手也可以很快完成復雜的 ETL 配置。

- `pipeline.workers`:設置啟動多少個線程執行 fliter 和 output;當 input 的內容出現堆積而 CPU 使用率還比較充足時,可以考慮增加該參數的大小;
- `pipeline.batch.size`:設置單個工作線程在執行過濾器和輸出之前收集的最大事件數,較大的批量大小通常更高效,但會增加內存開銷。輸出插件會將每個批處理作為一個輸出單元。;例如,ES 輸出會為收到的每個批次發出批量請求;調整 `pipeline.batch.size` 可調整發送到 ES 的批量請求(Bulk)的大小;
- `pipeline.batch.delay`:設置 Logstash 管道的延遲時間, 管道批處理延遲是 Logstash 在當前管道工作線程中接收事件后等待新消息的最長時間(以毫秒為單位);簡單來說,當 `pipeline.batch.size` 不滿足時,會等待 `pipeline.batch.delay` 設置的時間,超時后便開始執行 filter 和 output 操作。

在 6.3 版本中,Logstash 又增加了 Pipeline-to-Pipeline的管道機制(beta),即管道和管道之間可以連接在一起組成一個完成的數據處理流。熟悉 linux 的管道命令 |的同學應該可以很快明白這種模式的好處。這無疑使得 Logstash 的配置會更加靈活,今天我們就來了解下這種靈活自由的配置方式。

Pipeline-to-Pipeline配置

修改 config/pipeline.yml文件如下:

 - pipeline.id: upstream
   config.string: input { stdin {} } output { pipeline { send_to => [test_output] } }
 - pipeline.id: downstream
   config.string: input { pipeline { address => test_output } } output{ stdout{}}

然后運行 logstash,其中 -r 表示配置文件有改動時自動重新加載,方便我們調試。

bin/logstash -r

在終端隨意輸入字符(比如aaa)后回車,會看到屏幕輸出了類似下面的內容,代表運行成功了。

{
    "@timestamp" => 2018-12-06T14:43:50.310Z,
    "@version" => "1",
    "message" => "aaa",
    "host" => "rockybean-MacBook-Pro.local"
}

我們再回頭看下這個配置,upstreamoutput 使用了名為 pipeline 的 plugin,然后 send_to的輸出對象test_output是在 downstreaminput pipeline plugin 中定義的。通過這個唯一的address(虛擬地址)就能夠把不同的 pipeline 連接在一起組成一個更長的pipeline來處理數據。類似下圖所示:

 

當數據由 upstream傳遞給 downstream時會進行一個復制操作,這也意味着在這兩個 pipeline 中的數據是完全獨立的,互不影響。有一點要注意的是:數據的復制會增加額外的性能開銷,比如會加大 JVM Heap 的使用。

2. 使用場景

2.1 Distributor Pattern 分發者模式

在一個 pipeline 處理輸入,然后根據不同的數據類型再分發到對應的 Pipeline 去處理。這種模式的好處在於統一輸入端口,隔離不同類型的處理配置文件,減少由於配置文件混合在一起帶來的維護成本。大家可以想一想如果不用這種Pipeline-to-Pipeline的方式,我們如果輕松做到一個端口處理多個來源的數據呢?

這種模式的參考配置如下所示:

# config/pipelines.yml
- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == apache {
          pipeline { send_to => weblogs }
        } else if [type] == system {
          pipeline { send_to => syslog }
        } else {
          pipeline { send_to => fallback }
        }
    }
- pipeline.id: weblog-processing
  config.string: |
    input { pipeline { address => weblogs } }
    filter {
       # Weblog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_a_host] }
    }
- pipeline.id: syslog-processing
  config.string: |
    input { pipeline { address => syslog } }
    filter {
       # Syslog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_b_host] }
    }
- pipeline.id: fallback-processing
    config.string: |
    input { pipeline { address => fallback } }
    output { elasticsearch { hosts => [es_cluster_b_host] } }

2.2 Output Isolator Pattern 輸出隔離模式

雖然 Logstash 的一個 pipeline 可以配置多個 output,但是這多個 output 會相依為命,一旦某一個 output 出問題,會導致另一個 output 也無法接收新數據。而通過這種模式可以完美解決這個問題。其運行方式如下圖所示:

通過輸出到兩個獨立的 pipeline,解除相互之間的影響,比如 http service 出問題的時候,es 依然可以正常接收數據,而且兩個 pipeline 可以配置獨立的隊列來保障數據的完備性,其配置如下所示:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => es } }
    output { elasticsearch { } }
- pipeline.id: buffered-http
  queue.type: persisted
  config.string: |
    input { pipeline { address => http } }
    output { http { } }

2.3 Forked Path Pattern 克隆路徑模式

這個模式類似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 來完成各自輸出的數據處理需求,這里就不展開講了,可以參考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 這個 pipeline 去掉了一些敏感數據:

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => "internal-es" } }
    # Index the full event
    output { elasticsearch { } }
- pipeline.id: partner
  queue.type: persisted
  config.string: |
    input { pipeline { address => "partner-s3" } }
    filter {
      # Remove the sensitive data
      mutate { remove_field => 'sensitive-data' }
    }
    output { s3 { } } # Output to partner's bucket

2.4 Collector Pattern 收集者模式

從名字可以看出,該模式是將所有 Pipeline 匯集於一處的處理模式,如下圖所示:

其配置參考如下:

# config/pipelines.yml
- pipeline.id: beats
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
  config.string: |
    input { kafka { ... } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
  # This common pipeline enforces the same logic whether data comes from Kafka or Beats
  config.string: |
    input { pipeline { address => commonOut } }
    filter {
      # Always remove sensitive data from all input sources
      mutate { remove_field => 'sensitive-data' }
    }
    output { elasticsearch { } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM