Logstash安裝和使用


Logstash 是開源的服務器端數據處理管道,能夠同時 從多個來源采集數據、轉換數據,然后將數據發送到您最喜歡的 “存儲庫” 中。(我們的存儲庫當然是 Elasticsearch。)

作用:集中、轉換和存儲數據

官方網站:

https://www.elastic.co/cn/products/logstash

一個民間的中文Logstash最佳實踐:

https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html

 

1.下載Logstash,版本為6.2.4,下載地址

https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz

2.解壓到目錄

3.啟動Logstash進程,Hello World Demo

bin/logstash -e 'input { stdin { } } output { stdout {} }'

bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

輸入:Hello World

輸出:

在這個Demo中,Hello World作為數據,在線程之間以 事件 的形式流傳。不要叫,因為 logstash 可以處理多行事件。

Logstash 會給事件添加一些額外信息。最重要的就是 @timestamp,用來標記事件的發生時間。因為這個字段涉及到 Logstash 的內部流轉,所以必須是一個 joda 對象,如果你嘗試自己給一個字符串字段重命名為 @timestamp 的話,Logstash 會直接報錯。所以,請使用 filters/date 插件 來管理這個特殊字段

此外,大多數時候,還可以見到另外幾個:

  1. host 標記事件發生在哪里。
  2. type 標記事件的唯一類型。
  3. tags 標記事件的某方面屬性。這是一個數組,一個事件可以有多個標簽。

4.語法

Logstash 設計了自己的 DSL —— 有點像 Puppet 的 DSL,或許因為都是用 Ruby 語言寫的吧 —— 包括有區域,注釋,數據類型(布爾值,字符串,數值,數組,哈希),條件判斷,字段引用等。

區段(section)

Logstash 用 {} 來定義區域。區域內可以包括插件區域定義,你可以在一個區域內定義多個插件。插件區域內則可以定義鍵值對設置。示例如下:

input {
    stdin {}
    syslog {}
}

數據類型

Logstash 支持少量的數據值類型:

bool   debug => true 
string  host => "hostname" 
number    port => 514
array    match => ["datetime", "UNIX", "ISO8601"]
hash
options => {
    key1 => "value1",
    key2 => "value2"
}

條件判斷(condition)

表達式支持下面這些操作符:
    equality, etc: ==, !=, <, >, <=, >=
    regexp: =~, !~
    inclusion: in, not in
    boolean: and, or, nand, xor
    unary: !()

比如:
if "_grokparsefailure" not in [tags] {
} else if [status] !~ /^2\d\d/ and [url] == "/noc.gif" {
} else {
}

命令行參數:logstash命令

參數:
執行    -e              bin/logstash -e ''
文件    --config 或 -f       bin/logstash -f agent.conf
測試    --configtest 或 -t     用來測試 Logstash 讀取到的配置文件語法是否能正常解析。
日志    --log 或 -l         Logstash 默認輸出日志到標准錯誤。生產環境下你可以通過 bin/logstash -l logs/logstash.log 命令來統一存儲日志。

 

使用Logstash的Kafka插件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

啟動一個kafka作為輸入,並輸入1231212

~/software/apache/kafka_2.11-0.10.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 stdin.conf文件

input{
        kafka{
                bootstrap_servers => ["127.0.0.1:9092"]
                topics => [ 'test' ]
        }
}
output {
        stdout {
                codec => rubydebug
        }
}

啟動logstash

bin/logstash -f stdin.conf

輸出

關於auto_offset_reset參數

由於Kafka是消息隊列,消費過的就不會再消費

<i>可以在stdin.conf中設置auto_offset_reset="earliest",比如

input{
        kafka{
                bootstrap_servers => ["127.0.0.1:9092"]
                topics => [ 'test' ]
                auto_offset_reset => "earliest"
        }
}
output {
        stdout {
                codec => rubydebug
        }
}

在kafka中依次輸入

1111
2222
3333

輸出為,注意這里timestamp的時間是1111 -> 2222 -> 3333,logstash會從頭開始消費沒有消費的消息

 

<ii>當auto_offset_reset="latest"

logstash會從進程啟動的時候開始消費消息,之前的消息會丟棄

在kafka中依次輸入

1111
2222
3333

輸出為

 

Kafka -> logstash -> Es的conf文件

input{
	kafka{
		bootstrap_servers => ["127.0.0.1:9092"]
		topics => [ 'topicB' ]
		auto_offset_reset => "earliest"
		consumer_threads => 1
		codec => json
	}
}
output {
	elasticsearch{
		hosts => ["127.0.0.1:9200"]
		index => "XXX"
	}
}

Kafka -> logstash -> File的conf文件

參考

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-file.html

注意:如果是kafka輸入是line格式的,使用codec => line { format => "custom format: %{message}"}

關於codec的說明

https://www.elastic.co/guide/en/logstash/6.2/codec-plugins.html

如果kafka輸入是json格式的,使用codec => json

input{
	kafka{
		bootstrap_servers => ["127.0.0.1:9092"]
		topics => [ 'topicB' ]
		auto_offset_reset => "earliest"
		consumer_threads => 1
		codec => json
	}
}
output {
	stdout {
		codec => rubydebug {}
    	}
	file {
		path => "/home/lintong/桌面/logs/path/to/1.txt"
		#codec => line { format => "custom format: %{message}"}		
		codec => json
	}
}

 

使用Logstash的HDFS插件

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-webhdfs.html

配置文件

input{
	kafka{
		bootstrap_servers => ["127.0.0.1:9092"]
		topics => [ 'topicB' ]
		auto_offset_reset => "earliest"
		consumer_threads => 1
		codec => json
	}
}
output {
	stdout {
		codec => rubydebug {}
    	}
	webhdfs {
		host => "127.0.0.1"                 # (required)
		port => 50070                       # (optional, default: 50070)
		path => "/user/lintong/xxx/logstash/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.log"  # (required)
		user => "lintong"                       # (required)
		codec => json
	}
}

到 http://localhost:50070 下看文件內容

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM