Logstash由三個組件構造成,分別是input、filter以及output。我們可以吧Logstash三個組件的工作流理解為:input收集數據,filter處理數據,output輸出數據。至於怎么收集、去哪收集、怎么處理、處理什么、怎么發生以及發送到哪等等一些列的問題就是我們接下啦要討論的一個重點。
我們今天先討論input組件的功能和基本插件。前面我們意見介紹過了,input組件是Logstash的眼睛和鼻子,負責收集數據的,那么們就不得不思考兩個問題,第一個問題要清楚的就是,元數據在哪,當然,這就包含了元數據是什么類型,屬於什么業務;第二個問題要清楚怎么去拿到元數據。只要搞明白了這兩個問題,那么Logstash的input組件就算是弄明白了。
對於第一個問題,元數據的類型有很多,比如說你的元數據可以是日志、報表、可以是數據庫的內容等等。元數據是什么樣子的我們不需要關心,我們要關系的是元數據是什么類型的,只要你知道元數據是什么類型的,你才能給他分類,或者說給他一個type,這很重要,type對於你后面的工作處理是非常有幫助的。所以第一個問題的重心元數據在嗎,是什么,現在已經是清楚了。那么進行第二個問題。
第二個問題的核心是怎么拿到這些不同類型的原數據?這是一個真個input組件的核心內容了,我們分門別類的來看待這和解決個問題。
首先,我們肯定需要認同的,什么樣的數據源,就需要使用什么樣的方式去獲取數據。
我們列舉幾種:
1、文件類型:文件類型,顧名思義,文件數據源,我們可以使用input組件的file插件來獲取數據。file{}插件有很多的屬性參數,我們可以張開講解一下。具體內容在下面的代碼中展示:
input{ file{ #path屬性接受的參數是一個數組,其含義是標明需要讀取的文件位置 path => [‘pathA’,‘pathB’] #表示多就去path路徑下查看是夠有新的文件產生。默認是15秒檢查一次。 discover_interval => 15 #排除那些文件,也就是不去讀取那些文件 exclude => [‘fileName1’,‘fileNmae2’] #被監聽的文件多久沒更新后斷開連接不在監聽,默認是一個小時。 close_older => 3600 #在每次檢查文件列 表的時候, 如果一個文件的最后 修改時間 超過這個值, 就忽略這個文件。 默認一天。 ignore_older => 86400 #logstash 每隔多 久檢查一次被監聽文件狀態( 是否有更新) , 默認是 1 秒。 stat_interval => 1 #sincedb記錄數據上一次的讀取位置的一個index sincedb_path => ’$HOME/. sincedb‘ #logstash 從什么 位置開始讀取文件數據, 默認是結束位置 也可以設置為:beginning 從頭開始 start_position => ‘beginning’ #注意:這里需要提醒大家的是,如果你需要每次都從同開始讀取文件的話,關設置start_position => beginning是沒有用的,你可以選擇sincedb_path 定義為 /dev/null } }
2、數據庫類型:數據庫類型的數據源,就意味着我們需要去和數據庫打交道了是嗎?是的!那是必須的啊,不然怎么獲取數據呢。input組件如何獲取數據庫類的數據呢?沒錯,下面即將隆重登場的是input組件的JDBC插件jdbc{}。同樣的,jdbc{}有很多的屬性,我們在下面的代碼中作出說明;
input{ jdbc{ #jdbc sql server 驅動,各個數據庫都有對應的驅動,需自己下載 jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar" #jdbc class 不同數據庫有不同的 class 配置 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" #配置數據庫連接 ip 和端口,以及數據庫 jdbc_connection_string => "jdbc:sqlserver://200.200.0.18:1433;databaseName=test_db" #配置數據庫用戶名 jdbc_user => #配置數據庫密碼 jdbc_password => #上面這些都不重要,要是這些都看不懂的話,你的老板估計要考慮換人了。重要的是接下來的內容。 # 定時器 多久執行一次SQL,默認是一分鍾 # schedule => 分 時 天 月 年 # schedule => 22 表示每天22點執行一次 schedule => " *" #是否清除 last_run_metadata_path 的記錄,如果為真那么每次都相當於從頭開始查詢所有的數據庫記錄 clean_run => false #是否需要記錄某個column 的值,如果 record_last_run 為真,可以自定義我們需要表的字段名稱, #此時該參數就要為 true. 否則默認 track 的是 timestamp 的值. use_column_value => true #如果 use_column_value 為真,需配置此參數. 這個參數就是數據庫給出的一個字段名稱。當然該字段必須是遞增的,可以是 數據庫的數據時間這類的 tracking_column => create_time #是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true #們只需要在 SQL 語句中 WHERE MY_ID > :last_sql_value 即可. 其中 :sql_last_value 取得就是該文件中的值 last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info" #是否將字段名稱轉小寫。 #這里有個小的提示,如果你這前就處理過一次數據,並且在Kibana中有對應的搜索需求的話,還是改為true, #因為默認是true,並且Kibana是大小寫區分的。准確的說應該是ES大小寫區分 lowercase_column_names => false #你的SQL的位置,當然,你的SQL也可以直接寫在這里。 #statement => SELECT * FROM tabeName t WHERE t.creat_time > :sql_last_value
statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" #數據類型,標明你屬於那一方勢力。單了ES哪里好給你安排不同的山頭。 type => "my_info" } #注意:外載的SQL文件就是一個文本文件就可以了,還有需要注意的是,一個jdbc{}插件就只能處理一個SQL語句, #如果你有多個SQL需要處理的話,只能在重新建立一個jdbc{}插件。 }
好了,廢話不多說了,接着第三種情況:
input { beats { #接受數據端口 port => 5044 #數據類型 type => "logs" } #這個插件需要和filebeat進行配很這里不做多講,到時候結合起來一起介紹。 }
現在我們基本清楚的知道了input組件需要做的事情和如何去做,當然他還有很多的插件可以進行數據的收集,比如說TCP這類的,還有可以對數據進行encode,這些感興趣的朋友可以自己去查看,我說的只是我自己使用的。一般情況下我說的三種插件已經足夠了。
今天的ELK種的Logstash的input組件就到這。后面還會講述Logstash的另外另個組件filter和output。
注意:如果看到這樣的報錯信息 Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting. 請執行命令:service logstash stop 然后在執行就可以了。
轉載自:https://yq.aliyun.com/articles/152043
logstash的output,查看https://yq.aliyun.com/articles/197785?spm=5176.8091938.0.0.pg7WLz