1、Logstash是一個數據收集引擎,相當於是ETL工具。截圖來源慕課,尊重版本從你我做起。
Logstash分為三個階段,第一個階段Input是數據采集、第二個階段是Filter數據解析和轉換,第三個階段是Output數據輸出。
2、Logstash中的Pipeline概念。
1)、Pipeline是指Input-filter-output的三個階段處理流程。
2)、隊列管理。
3)、插件生命周期管理。
3、Logstash中的Logstash Event概念。
1)、內部流轉的數據表現形式,原始數據從Input進入之后,在內部流轉的時候不是原始的數據,而是Logstash Event數據。Logstash Event就是一個Java Object,對外暴漏去修改或者獲取內部字段的api
2)、原始數據在input被轉換為Event,在output event被轉換為目標格式數據。
3)、在配置文件中可以對Event中的屬性進行增刪改查。
關於數據流轉,數據由Input進入之后,從Output輸出。Codec(Code、Decode)將原始數據Data轉換為Logstash Event,當數據輸出的時候,Codec將Logstash Event轉換為目標數據源需要的類型Data。
4、Logstash的安裝,將包上傳到服務器進行解壓縮即可,如下所示:
1 tar -zxvf logstash-6.7.1.tar.gz -C /usr/local/soft/
Logstash的簡單案例,如下所示:
1 # 輸入,stdin是標准輸入,按照每一行切分數據 2 input { 3 stdin { 4 codec => line 5 } 6 } 7 8 # 過濾為空 9 filter {} 10 11 # 輸出,stdout標准輸出,將輸出轉換為json格式 12 output { 13 stdout { 14 codec => json 15 } 16 }
以每行進行切分數據,不過濾,然后輸出為json格式的輸入(Codec- Input Decoding)轉換流程,如下所示:
當輸入之后,要進行輸出(Codec- Output Encoding),如下所示:
演示效果,如下所示:
1 [elsearch@k8s-master logstash-6.7.1]$ echo "foo 2 > bar 3 > " | bin/logstash -f config/line-to-json.conf 4 Sending Logstash logs to /usr/local/soft/logstash-6.7.1/logs which is now configured via log4j2.properties 5 [2021-01-30T19:11:13,330][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/usr/local/soft/logstash-6.7.1/data/queue"} 6 [2021-01-30T19:11:13,357][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/local/soft/logstash-6.7.1/data/dead_letter_queue"} 7 [2021-01-30T19:11:14,189][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified 8 [2021-01-30T19:11:14,258][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.7.1"} 9 [2021-01-30T19:11:14,396][INFO ][logstash.agent ] No persistent UUID file found. Generating new UUID {:uuid=>"e449e4d8-b26e-49fa-b12d-3d452b50aac4", :path=>"/usr/local/soft/logstash-6.7.1/data/uuid"} 10 [2021-01-30T19:11:24,816][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50} 11 [2021-01-30T19:11:24,943][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x6185f2c6 run>"} 12 [2021-01-30T19:11:25,080][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} 13 {"@version":"1","host":"k8s-master","message":"foo","@timestamp":"2021-01-30T11:11:25.063Z"}{"@version":"1","host":"k8s-master","message":"bar","@timestamp":"2021-01-30T11:11:25.102Z"}{"@version":"1","host":"k8s-master","message":"","@timestamp":"2021-01-30T11:11:25.102Z"}[2021-01-30T19:11:25,394][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x6185f2c6 run>"} 14 [2021-01-30T19:11:25,516][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} 15 [elsearch@k8s-master logstash-6.7.1]$
5、Logstash的架構簡介,如下所示:
Logstash的輸入可以有多個的,數據流入之后,經過Codec之后,數據流入Queue,而Queue負責將流入的數據分發到Pipeline(是指Input-filter-output的三個階段處理流程)中,而Batcher的作用是批量的從Queue中取數據,而Batcher有時間和數據量兩種觸發條件。
6、Logstash中Queue的分類。
1)、In Memory,無法處理進程Crash,機器宕機等情況,會導致數據丟失。
2)、Persistent Queue In Disk,可處理進程Crash等情況,保證數據不丟失。保證數據至少消費一次。充當緩沖區,可以替代Kafka等消息隊列的作用。
3)、Persistent Queue的基本配置,第一個參數queue.type:persisted,但是這個參數默認是memory。第二個參數是queue.max_bytes:4gb,隊列存儲最大數據量,默認是1gb。
7、Logstash中的線程,包含Input Thread輸入線程,Pipeline Workder Thread,每個Pipeline會在自己獨立的線程中的,對Logtash線程調優,一般調整的就是Pipeline的線程個數。
1)、pipeline.workers | -w,pipeline線程數,即filter_output的處理線程數,默認是cpu核數。
2)、pipeline.batch.size | -b,Batcher一次批量獲取的待處理文檔數,默認是125,可以根據輸出進行調整,越大會占用越多的heap空間,可以通過jvm.options調整。
3)、pipeline.batch.delay | -u,Batch等待的時長,單位為ms。
8、logstash的配置文件。
1)、logstash設置相關的配置文件,在config文件夾中。
a、logstash.yml,這個是logstash相關的配置,比如node.name(節點名稱)、path.data(持久化存儲數據的文件夾,默認是logstash home目錄下的data)、pipeline.works(設定pipeline的線程數,優化的常用項)、pipeline.batch.size/delay(設定批量處理數據的數目和延遲)、queue.type(設定隊列類型,默認是memory)、queue.max_bytes(設定隊列總容量,默認是1GB)、path.log(設定pipeline日志文件的目錄)、path.config(設定pipeline配置文件的目錄)等等,這其中的配置可以被命令行參數中的相關參數覆蓋。
b、jvm.options修改jvm的相關參數,比如修改heap size等等。
2)、pipeline配置文件,定義數據處理流程的文件,以.conf結尾。
9、logstash命令行配置項。
1)、--node.name,指定節點名稱。
2)、-f --path.config,pipeline路徑,可以是文件或者文件夾。
3)、--path.settings,logstash配置文件夾路徑,其中要包含logstash.yml。
4)、-e --config.string,指明pipeline內容,多用於測試使用。
5)、-w --pipeline.workers,設定pipeline的線程數,優化的常用項。
6)、-b --pipeline.batch.szie,設定批量處理數據的數目和延遲。
7)、--path.data,指定logstash的數據目錄。
8)、--debug,打開調試日志。
9)、-t config.test_and_exit,打開測試,檢查Logstash加載進來的pipeline是否有錯,有錯就報錯。
10、logstash配置方式建議。
1)、線上環境推薦采用配置文件的方式來設定Logstash的相關配置,這樣可以減少犯錯的機會,而且文件便於進行版本化管理。
2)、命令行形式多用來進行快速的配置測試、驗證、檢查等。
11、logstash多實例運行方式,在一台機器運行多個logstash的實例。
1 bin/logstash --path.settings instance1 2 bin/logstash --path.settings instance2
不同的instance實例中修改logstash.yml,自定義path.path,確保其不相同即可。
12、pipeline的配置,用於配置input、filter、output的插件,框架是input{}、filter{}、output{}。pipeline配置語法,主要有如下的數據類型,如下所示:
1 a、布爾類型Boolean,例如isFailed => true。 2 b、數值類型Number,例如port => 33。 3 c、字符串類型String,例如name => "hello world"。 4 d、數組Array或者List,例如,users => [{id => 1,name => bob},{id => 2, name => jane}]。 5 e、哈希類型hash,match => { 6 "field1" => "value1" 7 "field2" => "value2" 8 } 9 f、注釋,使用井號#。
在配置中可以引用Logstash Event的屬性字段,主要有如下兩種方式。
1)、第一種,是直接引用字段值Field Reference,使用[]中括號即可,嵌套字段寫多層[]中括號即可。
2)、第二種,是在字符串以sprintf方式引用,使用%{}來實現。
3)、支持條件判斷語法,從而擴展了配置的多樣性,語法格式if 表達式 else if 表達式。
1 表達式主要包含如下的操作符。 2 1)、比較運算符,==、!=、<、>、<=、>=。 3 2)、正則是否匹配,=~、!~。 4 3)、包含字符串或者數據,in、not in。 5 4)、布爾操作符,and、or、nand、xor、!。 6 5)、分組操作符,()。
13、Input Plugin插件,input插件指定數據輸入源,一個pipeline可以有多個Input插件,這里主要使用這三個input插件,stdin、file、kafka。
1)、Input插件stdin,是最簡單的輸入,從標准輸入讀取數據,沒有特殊配置,都是使用的通用配置,這些通用配置在所有的input輸入插件都可以使用,通用配置為。
a、codec類型為codec。Codec(Code、Decode)將原始數據Data轉換為Logstash Event,當數據輸出的時候,Codec將Logstash Event轉換為目標數據源需要的類型Data。
b、type類型為string,自定義事件的類型,可用於后續判斷。
c、tags類型為array,自定義該事件的tag,可用於后續判斷。
d、add_field類型為hash,為該事件添加字段。
案例,如下所示:
1 [elsearch@master logstash-6.7.0]$ cat config/input-stdin.conf 2 # 輸入, 3 input { 4 stdin { 5 codec => "plain" 6 tags => ["input stdin"] 7 type => "stdin" 8 add_field => {"key" => "value"} 9 } 10 } 11 12 # 過濾為空 13 filter {} 14 15 # 輸出,stdout標准輸出,將輸出轉換為json格式 16 output { 17 stdout { 18 codec => "rubydebug" 19 } 20 } 21 [elsearch@master logstash-6.7.0]$
運行效果,如下所示:
1 [elsearch@master logstash-6.7.0]$ echo "test" | ./bin/logstash -f config/input-stdin.conf 2 Sending Logstash logs to /usr/local/soft/logstash-6.7.0/logs which is now configured via log4j2.properties 3 [2021-02-07T22:52:14,813][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified 4 [2021-02-07T22:52:14,842][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.7.0"} 5 [2021-02-07T22:52:25,039][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50} 6 [2021-02-07T22:52:25,192][INFO ][logstash.inputs.stdin ] Automatically switching from plain to line codec {:plugin=>"stdin"} 7 [2021-02-07T22:52:25,277][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4755c349 run>"} 8 [2021-02-07T22:52:25,391][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} 9 /usr/local/soft/logstash-6.7.0/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated 10 { 11 "key" => "value", 12 "message" => "test", 13 "host" => "master", 14 "@timestamp" => 2021-02-07T14:52:25.350Z, 15 "@version" => "1", 16 "tags" => [ 17 [0] "input stdin" 18 ], 19 "type" => "stdin" 20 } 21 [2021-02-07T22:52:26,262][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} 22 [2021-02-07T22:52:26,275][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x4755c349 run>"} 23 [elsearch@master logstash-6.7.0]$
14、Input Plugin輸入插件file,從文件讀取數據,如常見的日志文件。文件讀取通常要解決幾個問題。
1)、文件內容如何只被讀取一次,即重啟logstash的時候,從上次讀取的位置繼續。解決方法是使用sincedb方案解決這個問題。
2)、如何即時讀取到文件的新內容。解決方法是定時檢查文件是否有更新。
3)、如何發現新文件並進行讀取?解決方法是可以定時檢查新文件。
4)、如果文檔發生了歸檔操作,是否影響當前的內容讀取?答案是不影響,被歸檔的文件內容可以繼續被讀取。
5)、基於Filewatch的ruby庫實現的。
15、Input Plugin輸入插件file,讀取文件規則的關鍵配置。
1)、path類型為數組,指明讀取的文件路徑,基於glob匹配語法。例如ppath => ["/var/log/**/*.log","var/log/message"]。
2)、exclue類型為數組排除不想監聽的文件規則,基於glob匹配語法。例如exclude => "*.gz"。
3)、sincedb_path類型為字符串,記錄sincedb文件路徑。
4)、start_position類型為字符串,beginning or end,是否從頭讀取文件。
5)、stat_interval類型為數值,單位秒,定時檢查文件是否有更新,默認是1秒。
6)、discver_interval類型為數值,單位秒,定時檢查是否有新文件待讀取,默認15秒。
7)、ignore_older類型為數值,單位秒,掃描文件列表時候,如果該文件上次更改時間超過設定的時長,則不做處理,但依然會監控是否有新內容,默認是關閉。
8)、close_ollder類型為數值,單位秒,如果監聽的文件在超過該設定時間內沒有新內容,會被關閉文件句柄,釋放資源,但依然會控制是否有新內容,默認3600秒,即一個小時。
16、Input Plugin輸入插件的glob匹配語法,主要包含如下幾種匹配符。
1)、*代表匹配任意字符,但是不匹配以.點開頭的隱藏文件,匹配這類文件的時候要使用.*來進行匹配。例如,"/var/log/*.log",代表匹配/var/log目錄下以.log結尾的文件。
2)、**代表遞歸匹配子目錄。例如,"/var/log/**/*.log",代表匹配/var/log所有子目錄下以.log結尾的文件。
3)、?代表匹配單一字符。
4)、[]代表匹配多個字符,比如[a-z]、[^a-z]。
5)、{}匹配多個單詞,比如{foo,bar,hello}。例如,"/var/log/{app1,app2,app3}/*.log",代表了匹配/var/log目錄下app1、app2、app3目錄中以.log結尾的文件。
6)、\轉義符號。
17、Input Plugin組件的Kafka,kafa是最流行的消息隊列,也是Elasticsearch架構中常用的,使用相對簡單。
1 input { 2 kafka { 3 zk_connect => "kafka:2181" 4 group_id => "logstash_group" 5 topic_id => "apache_logs_topic" 6 consumer_threads => 16 7 } 8 }
18、Codec Plugin插件,Codec Plugin作用於input和output plugin,負責將數據在原始與Logstash Event之間進行轉換,常見的codec有,如下所示:
1)、plain讀取原始內容。
2)、dots將內容簡化為點進行輸出。
3)、rubydebug將logstahs Events按照ruby格式輸出,方便調試。
4)、line處理帶有換行符的內容。
5)、json處理json格式的內容。
6)、multiline處理多行數據的內容。當一個Event的message由多行組成的時候,需要使用該codec,常見的情況是堆棧日志信息的處理。主要設置參數如下:
a、pattern設置行匹配的正則表達式,可以使用grok。
b、what previous|next,如果匹配成功,那么匹配行是歸屬上一個事件還是下一個事件。
c、negate true or flase是否對pattern的結果取反,默認是false。
1 [elsearch@master logstash-6.7.0]$ ./bin/logstash -e "input{stdin{codec => line}} output{stdout{codec => rubydebug}}" 2 [elsearch@master logstash-6.7.0]$ ./bin/logstash -e "input{stdin{codec => line}} output{stdout{codec => dots}}" 3 [elsearch@master logstash-6.7.0]$ ./bin/logstash -e "input{stdin{codec => json}} output{stdout{codec => rubydebug}}"
19、Filter Plugin組件,Filter是Logstash功能強大的主要原因,它可以對Logstash Event進行豐富的處理,比如數據解析,刪除字段,類型轉換等等,常見的有如下幾個:
1)、date日期解析。將日期字符串解析為日期類型,然后替換@timestamp字段或者指定的其他字段。
a、match,類型為數組,用於指定日期匹配的格式,可以一次指定多種日期格式,例如match => ["logdate", "MMM dd yyyy HH:mm:ss", "MMM d yyyy HH:mm:ss", "ISO8601"]。
b、target,類型為字符串,用於指定賦值的字段名稱,默認是@timestamp。
c、timezone,類型為字符串,用於指定時區。
2)、grok正則匹配解析。Grok語法如下所示:
a、%{SYNTAX : SEMANTIC},其中SYNTAX為grok pattern的名稱,SEMANTIC為賦值字段名稱。
b、%{NUMBER : duration}可以匹配數值類型,但是grok匹配出的內容都是字符串類型,可以通過在最后指定為int或者float來強制轉換類型,%{NUMBER : duration : float}等等。
c、Filter Plugin組件的grok調試建議,如果是正則表達式可以使用https://www.debuggex.com/、https://regexr.com/。
d、如果是grok調試,可以使用http://grokdebug.herokuapp.com/,進行調試操作。
3)、dissect分割符解析。Filter Plugin組件之dissect,基於分割符原理解析數據,解決grok解析的時候消耗過多cpu資源的問題。
a、dissect的應用有一定的局限性,主要適用於每行格式相似且分割符明確簡單的場景。
b、dissect語法比較簡單,有一系列字段field和分割符delimiter組成,%{}中括號之間是字段、%{}和%{}之間是分割符。
c、dissect可以自動處理空的匹配值。
d、dissect分割后的字段值都是字符串,可以使用convert_datatype屬性進行類型轉換。
e、語法規則,如果不寫名稱,即%{},表明忽略該值。+加號,代表該匹配值追加到ts字段下。/斜線后面的數字代表了拼接的次序。%{?}代表了忽略匹配值,但是賦予字段名,用於后續匹配用、%{&}代表了匹配值賦予key1的匹配值。
4)、mutate對字段作處理,比如重命名、刪除、替換等等。Filter Plugin組件之mutate,使用最頻繁的操作,可以對字段進行各種操作,比如重命名,刪除,替換,更新等操作,主要操作如下:
a、convert類型轉換。實現字段類型的轉換,類型為hash,僅支持轉換為integer、float、string和boolean。
b、gsub字符串替換。對字段內容進行替換,類型為數組,每3項為一個替換配置。
c、split/join/merge字符串切割、數組合並為字符串、數組合並為數組。split可以將字符串切割為數組,join將數組拼接為字符串,merge是將兩個數組合並為一個數組,字符串會被轉為1個元素的數組進行操作。
d、rename字段重命名。
e、update/replace字段內容更新或者替換。更新字段內容,區別在於Update只在字段存在的時候生效,而replace在字段不存在的時候會執行新增字段的操作。
f、remove_field刪除字段。
5)、json按照json解析字段內容到指定字段中。Filter Plugin組件之json,將字段內容為json格式的數據進行解析。其中source指定要解析的字段名,target指定解析后的存儲字段,默認和message同級別。
6)、geoip增強地理位置數據。Filter Plugin組件之geoip,常用的插件,根據ip地址提供對應的地域信息,比如經緯度、城市名等等,方便進行地理數據分析。
7)、ruby利用ruby代碼來動態修改logstash Event。Filter Plugin組件之ruby最靈活的插件,可以以ruby語言來隨心所欲的修改Logstash Event對象。
20、Filter Plugin組件之Output,負責將Logstash Event輸出,常見的插件如下,stdout、file、elasticsearch。
1)、輸出到標准輸出,多用於調試。
1 output{ 2 stdout { 3 codec => rubydebug 4 } 5 }
2)、輸出到文件,實現將分散在多地的文件統一到一處的需求,比如將所有web機器的web日志收集到1個文件中,從而方便查閱信息。默認輸出json格式的數據,通過format可以輸出原始格式。
1 output{ 2 file { 3 path => "/var/log/web.log" 4 codec => line { format => "%{message}"} 5 } 6 }
3)、輸出到elasticsearch,是最常用的插件,基於http協議實現。
1 output{ 2 elasticsearch{ 3 hosts => ["192.168.110.133:9200"],只寫data和client node地址。 4 index => "nginx-%{+YYYY.MM.dd}",注意,索引可以按照時間寫入。 5 template => "./nginx_template.json" 6 template_name => "nginx_template" 7 template_overwrite => true 8 } 9 }
21、Logstash的監控運維的api,logstash提供了豐富的api來查看logstash的當前狀態,如下所示:
1)、http://localhost:9600
2)、http://localhost:9600/_node
3)、http://localhost:9600/_node/stats
4)、http://localhost:9600/_node/hot_threads