一、使用Logstash使用jdbc從MySQL讀取數據操作
1.1 安裝jdbc插件
jdbc默認已安裝,如果沒安裝使用logstash-plugin安裝即可(logstash-plugin在logstash的bin目錄下):
logstash-plugin install logstash-input-jdbc
1.2 下載mysql jdbc驅動
mysql最新版是8.0,但使用時報錯,這里仍以5.1版本演示
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.47.zip # 不推薦放/tmp目錄,但這里為了與用戶無關所以直接解壓到/tmp目錄 unzip mysql-connector-java-5.1.47.zip -d /tmp
1.3 配置logstash文件
配置如下項,如無意外mysql中數據即可同步到elasticsearch
input{ jdbc { # jdbc:mysql://數據庫ip:端口/數據庫名 jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb" # 數據庫用戶名 jdbc_user => "root" # 數據庫密碼 jdbc_password => "root" # mysql java驅動文件位置 jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" # 驅動類名,這個一般不需要改 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次 schedule => "* * * * *" # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這里只是簡單將表中所有記錄查出 # 如果語句很多,可使用statement_file來指定sql語句文件 statement => "select * from exploit_records" } } filter { } output{ elasticsearch { hosts => ["localhost:9200"] } }
二、記錄重復問題
2.1 單日記錄重復問題
2.1.1 問題描述
elasticsearch以_id項作為主鍵,只要傳過來的數據中_id項的值尚未存在elasticsearch會接收該數據。
而_id項的值默認其實是數據到來時elasticsearch自己生成的然后添加上去的,所以在這種情況下_id是不可能重復的,即所有數據都會被來者不拒地接收。
logstash jdbc插件對數據庫的數據的操作是依照計划(schedule)執行sql語句(statement),所有查出來的數據就統統按output發過去。
在這種模式下,每執行一次計划(schedule)所有記錄就會被發往elasticsearch一次,而_id是由elasticsearch生成的不會重復,所以不同次計划(schedule)的同一條記錄會被elasticsearch反復收錄。
2.1.2 問題處理辦法
同一條記錄被反復收錄這種情況一般都不是我們想要的,而要消除這種情況其關鍵是要將_id變成不是由elasticsearch自動生成,更確切地講是要將_id變成人為可控地生成。
這種配置也不難,elasticsearch允計通過使用document_id來配置_id。
假設我們表的主鍵是edb_id,為了避免重復,我們要以表主鍵值賦值給document_id即可。此時1.3中配置修改如下:
input{ jdbc { # jdbc:mysql://數據庫ip:端口/數據庫名 jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb" # 數據庫用戶名 jdbc_user => "root" # 數據庫密碼 jdbc_password => "root" # mysql java驅動文件位置 jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" # 驅動類名,這個一般不需要改 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次 schedule => "* * * * *" # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這里只是簡單將表中所有記錄查出 # 如果語句很多,可使用statement_file來指定sql語句文件 statement => "select * from exploit_records" } } filter { } output{ elasticsearch { hosts => ["localhost:9200"] # 相比1.3只多加了這一句 # 將表主鍵edb_id字段的值賦值給elasticsearch的主鍵 document_id => "%{edb_id}" } }
2.1.3 多路徑輸入問題
由於我們這里只查詢了一張表,所以2.2中的配置沒有什么問題。但只果此時要多輸入一張表這張表沒有edb_id字段,那么按document_id => "%{edb_id}"取到的edb_id會為空,這張表的數據會因_id項值都為空而被視為重復項不為elasticsearch所收錄。
對於這種情況,jdbc預留了type項,可將type項賦值為表名進行區分。
但如果表中原來就有type字段,那么給type項賦值將會覆蓋type字段的值,這是我們所不想看到的。此時就不適合使用type項了,只能轉而使用其他條件加以區分比如if [edb_id] != ""等等。
如果不是讀表而是讀其他東西,比如讀txt文件處理思路是一樣的,只要找到一個if條件讓txt文件的內容會轉到其相應output輸出即可。
input{ jdbc { # jdbc:mysql://數據庫ip:端口/數據庫名 jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb" # 數據庫用戶名 jdbc_user => "root" # 數據庫密碼 jdbc_password => "root" # mysql java驅動文件位置 jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" # 驅動類名,這個一般不需要改 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次 schedule => "* * * * *" # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這里只是簡單將表中所有記錄查出 # 如果語句很多,可使用statement_file來指定sql語句文件 statement => "select * from exploit_records" # 將type項賦值為表名exploit_records type => "exploit_records" } # 多讀一張表cve_records jdbc { jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb" jdbc_user => "root" jdbc_password => "root" jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" schedule => "* * * * *" statement => "select * from cve_records" # 將type項賦值為表名cve_records type => "cve_records" } } filter { } output{ if [type] == "exploit_records"{ elasticsearch { hosts => ["localhost:9200"] document_id => "%{edb_id}" } } if [type] == "cve_records"{ elasticsearch { hosts => ["localhost:9200"] document_id => "%{cve}" } } }
2.2 多日記錄重復問題
2.2.1 問題描述
經過2.1的處理后在單日中已不會出現重復記錄,但由於elasticsearch中將不同日期收到的記錄存到不同index中,index不同即便是同id也不會被認為是同一條記錄所以記錄會被收復,有時這種不同日期的重復我們也是不希望有的。
其實更本質而言,並不是“elasticsearch中將不同日期收到的記錄存到不同index中”,elasticsearch只是接收記錄將記錄存放到記錄index字段所標識的index中。而logstash的elasticsearch插件默認生成的index字段值是"logstash-%{+YYYY.MM.dd}"
。
我們要避免不同日期記錄重復,其做法就是重寫index值將其設置成一個不隨日期變化的固定值。
2.2.2 配置示例
input{ jdbc { # jdbc:mysql://數據庫ip:端口/數據庫名 jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb" # 數據庫用戶名 jdbc_user => "root" # 數據庫密碼 jdbc_password => "root" # mysql java驅動文件位置 jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" # 驅動類名,這個一般不需要改 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次 schedule => "* * * * *" # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這里只是簡單將表中所有記錄查出 # 如果語句很多,可使用statement_file來指定sql語句文件 statement => "select * from exploit_records" } } filter { } output{ elasticsearch { hosts => ["localhost:9200"] # 將記錄id設為表關鍵字,避免單日重復 document_id => "%{edb_id}" # 將index由默認的logstash-%{+YYYY.MM.dd}設置成固定值,避免多日重復 index => "exploit_records" } }
三、if-else語句寫法
在第二大節中我們多處用到if語句,這里再更具體說明一下其寫法。
官方說明見:https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals
if-else語句格式如下,和各語言的寫法是類似的:
if EXPRESSION { ... } else if EXPRESSION { ... } else { ... }
而表達示(EXPRESSION)進一步可細化成操作項、操作符、和操作值三種元素。
比如在我們前面的寫法中[type]就是操作項,==就是操作符,"exploit_records"就是操作值;操作項和操作值的位和其他語言一樣可以互換(本質就是一個比較)。
if [type] == "exploit_records"{ elasticsearch { hosts => ["localhost:9200"] document_id => "%{edb_id}" } }
操作項----所有項的獲取都和上邊type一樣使用中括號括起來的形式。
操作值----就是我們自己設定的操作項的比較標准。另外如果操作項是整型那操作值也應該是整型。
操作符----
一元操作符(unary):!
布爾操作符(boolean):and, or, nand, xor
相等操作符(equality): ==, !=, <, >, <=, >=
正則操作符(regexp): =~, !~ (checks a pattern on the right against a string value on the left,操作項在左邊正則操作值在右邊)
包含操作符(inclusion): in, not in
參考: