Logstash 是開源的服務器端數據處理管道,能夠同時 從多個來源采集數據、轉換數據,然后將數據發送到您最喜歡的 “存儲庫” 中。(我們的存儲庫當然是 Elasticsearch。)
作用:集中、轉換和存儲數據
官方網站:
https://www.elastic.co/cn/products/logstash
一個民間的中文Logstash最佳實踐:
https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html
1.下載Logstash,版本為6.2.4,下載地址
https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
2.解壓到目錄
3.啟動Logstash進程,Hello World Demo
bin/logstash -e 'input { stdin { } } output { stdout {} }'
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
輸入:Hello World
輸出:
在這個Demo中,Hello World作為數據,在線程之間以 事件 的形式流傳。不要叫行,因為 logstash 可以處理多行事件。
Logstash 會給事件添加一些額外信息。最重要的就是 @timestamp,用來標記事件的發生時間。因為這個字段涉及到 Logstash 的內部流轉,所以必須是一個 joda 對象,如果你嘗試自己給一個字符串字段重命名為 @timestamp
的話,Logstash 會直接報錯。所以,請使用 filters/date 插件 來管理這個特殊字段。
此外,大多數時候,還可以見到另外幾個:
- host 標記事件發生在哪里。
- type 標記事件的唯一類型。
- tags 標記事件的某方面屬性。這是一個數組,一個事件可以有多個標簽。
4.語法
Logstash 設計了自己的 DSL —— 有點像 Puppet 的 DSL,或許因為都是用 Ruby 語言寫的吧 —— 包括有區域,注釋,數據類型(布爾值,字符串,數值,數組,哈希),條件判斷,字段引用等。
區段(section)
Logstash 用 {}
來定義區域。區域內可以包括插件區域定義,你可以在一個區域內定義多個插件。插件區域內則可以定義鍵值對設置。示例如下:
input { stdin {} syslog {} }
數據類型
Logstash 支持少量的數據值類型:
bool debug => true string host => "hostname" number port => 514 array match => ["datetime", "UNIX", "ISO8601"] hash options => { key1 => "value1", key2 => "value2" }
條件判斷(condition)
表達式支持下面這些操作符: equality, etc: ==, !=, <, >, <=, >= regexp: =~, !~ inclusion: in, not in boolean: and, or, nand, xor unary: !() 比如: if "_grokparsefailure" not in [tags] { } else if [status] !~ /^2\d\d/ and [url] == "/noc.gif" { } else { }
命令行參數:logstash
命令
參數: 執行 -e bin/logstash -e '' 文件 --config 或 -f bin/logstash -f agent.conf 測試 --configtest 或 -t 用來測試 Logstash 讀取到的配置文件語法是否能正常解析。 日志 --log 或 -l Logstash 默認輸出日志到標准錯誤。生產環境下你可以通過 bin/logstash -l logs/logstash.log 命令來統一存儲日志。
使用Logstash的Kafka插件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
啟動一個kafka作為輸入,並輸入1231212
~/software/apache/kafka_2.11-0.10.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
stdin.conf文件
input{ kafka{ bootstrap_servers => ["127.0.0.1:9092"] topics => [ 'test' ] } } output { stdout { codec => rubydebug } }
啟動logstash
bin/logstash -f stdin.conf
輸出
關於auto_offset_reset參數:
由於Kafka是消息隊列,消費過的就不會再消費
<i>可以在stdin.conf中設置auto_offset_reset="earliest",比如
input{ kafka{ bootstrap_servers => ["127.0.0.1:9092"] topics => [ 'test' ] auto_offset_reset => "earliest" } } output { stdout { codec => rubydebug } }
在kafka中依次輸入
1111 2222 3333
輸出為,注意這里timestamp的時間是1111 -> 2222 -> 3333,logstash會從頭開始消費沒有消費的消息
<ii>當auto_offset_reset="latest"
logstash會從進程啟動的時候開始消費消息,之前的消息會丟棄
在kafka中依次輸入
1111 2222 3333
輸出為
Kafka -> logstash -> Es的conf文件
input{ kafka{ bootstrap_servers => ["127.0.0.1:9092"] topics => [ 'topicB' ] auto_offset_reset => "earliest" consumer_threads => 1 codec => json } } output { elasticsearch{ hosts => ["127.0.0.1:9200"] index => "XXX" } }
Kafka -> logstash -> File的conf文件
參考
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-file.html
注意:如果是kafka輸入是line格式的,使用codec => line { format => "custom format: %{message}"}
關於codec的說明
https://www.elastic.co/guide/en/logstash/6.2/codec-plugins.html
如果kafka輸入是json格式的,使用codec => json
input{ kafka{ bootstrap_servers => ["127.0.0.1:9092"] topics => [ 'topicB' ] auto_offset_reset => "earliest" consumer_threads => 1 codec => json } } output { stdout { codec => rubydebug {} } file { path => "/home/lintong/桌面/logs/path/to/1.txt" #codec => line { format => "custom format: %{message}"} codec => json } }
使用Logstash的HDFS插件
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-webhdfs.html
配置文件
input{ kafka{ bootstrap_servers => ["127.0.0.1:9092"] topics => [ 'topicB' ] auto_offset_reset => "earliest" consumer_threads => 1 codec => json } } output { stdout { codec => rubydebug {} } webhdfs { host => "127.0.0.1" # (required) port => 50070 # (optional, default: 50070) path => "/user/lintong/xxx/logstash/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.log" # (required) user => "lintong" # (required) codec => json } }
到 http://localhost:50070 下看文件內容