ELK+MySQL出現大量重復記錄問題處理


一、使用Logstash使用jdbc從MySQL讀取數據操作

1.1 安裝jdbc插件

jdbc默認已安裝,如果沒安裝使用logstash-plugin安裝即可(logstash-plugin在logstash的bin目錄下):

logstash-plugin install logstash-input-jdbc

 

1.2 下載mysql jdbc驅動

mysql最新版是8.0,但使用時報錯,這里仍以5.1版本演示

wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.47.zip
# 不推薦放/tmp目錄,但這里為了與用戶無關所以直接解壓到/tmp目錄
unzip mysql-connector-java-5.1.47.zip -d /tmp

 

1.3 配置logstash文件

配置如下項,如無意外mysql中數據即可同步到elasticsearch

input{
  jdbc {
    # jdbc:mysql://數據庫ip:端口/數據庫名
    jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb"
    # 數據庫用戶名      
    jdbc_user => "root"
    # 數據庫密碼
    jdbc_password => "root"
    # mysql java驅動文件位置
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar"
    # 驅動類名,這個一般不需要改
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次
    schedule => "* * * * *"
    # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這里只是簡單將表中所有記錄查出
    # 如果語句很多,可使用statement_file來指定sql語句文件
    statement => "select * from exploit_records"
  }
}

filter {

}

output{
  elasticsearch {
    hosts => ["localhost:9200"]
  }
}

 

二、記錄重復問題

2.1 單日記錄重復問題

2.1.1 問題描述

elasticsearch以_id項作為主鍵,只要傳過來的數據中_id項的值尚未存在elasticsearch會接收該數據。

而_id項的值默認其實是數據到來時elasticsearch自己生成的然后添加上去的,所以在這種情況下_id是不可能重復的,即所有數據都會被來者不拒地接收。

logstash jdbc插件對數據庫的數據的操作是依照計划(schedule)執行sql語句(statement),所有查出來的數據就統統按output發過去。

在這種模式下,每執行一次計划(schedule)所有記錄就會被發往elasticsearch一次,而_id是由elasticsearch生成的不會重復,所以不同次計划(schedule)的同一條記錄會被elasticsearch反復收錄。

 

2.1.2 問題處理辦法

同一條記錄被反復收錄這種情況一般都不是我們想要的,而要消除這種情況其關鍵是要將_id變成不是由elasticsearch自動生成,更確切地講是要將_id變成人為可控地生成。

這種配置也不難,elasticsearch允計通過使用document_id來配置_id。

假設我們表的主鍵是edb_id,為了避免重復,我們要以表主鍵值賦值給document_id即可。此時1.3中配置修改如下:

input{
  jdbc {
    # jdbc:mysql://數據庫ip:端口/數據庫名
    jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb"
    # 數據庫用戶名      
    jdbc_user => "root"
    # 數據庫密碼
    jdbc_password => "root"
    # mysql java驅動文件位置
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar"
    # 驅動類名,這個一般不需要改
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次
    schedule => "* * * * *"
    # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這里只是簡單將表中所有記錄查出
    # 如果語句很多,可使用statement_file來指定sql語句文件
    statement => "select * from exploit_records"
  }
}

filter {

}

output{
  elasticsearch {
    hosts => ["localhost:9200"]
    # 相比1.3只多加了這一句
    # 將表主鍵edb_id字段的值賦值給elasticsearch的主鍵
    document_id => "%{edb_id}"
  }
}

 

2.1.3 多路徑輸入問題

由於我們這里只查詢了一張表,所以2.2中的配置沒有什么問題。但只果此時要多輸入一張表這張表沒有edb_id字段,那么按document_id => "%{edb_id}"取到的edb_id會為空,這張表的數據會因_id項值都為空而被視為重復項不為elasticsearch所收錄。

對於這種情況,jdbc預留了type項,可將type項賦值為表名進行區分。

但如果表中原來就有type字段,那么給type項賦值將會覆蓋type字段的值,這是我們所不想看到的。此時就不適合使用type項了,只能轉而使用其他條件加以區分比如if [edb_id] != ""等等。

如果不是讀表而是讀其他東西,比如讀txt文件處理思路是一樣的,只要找到一個if條件讓txt文件的內容會轉到其相應output輸出即可。

input{
  jdbc {
    # jdbc:mysql://數據庫ip:端口/數據庫名
    jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb"
    # 數據庫用戶名      
    jdbc_user => "root"
    # 數據庫密碼
    jdbc_password => "root"
    # mysql java驅動文件位置
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar"
    # 驅動類名,這個一般不需要改
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次
    schedule => "* * * * *"
    # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這里只是簡單將表中所有記錄查出
    # 如果語句很多,可使用statement_file來指定sql語句文件
    statement => "select * from exploit_records"
    # 將type項賦值為表名exploit_records
    type => "exploit_records"
  }
  # 多讀一張表cve_records
  jdbc {
    jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    schedule => "* * * * *"
    statement => "select * from cve_records"
    # 將type項賦值為表名cve_records
    type => "cve_records"
  }
}

filter {

}

output{
  if [type] == "exploit_records"{
    elasticsearch {
      hosts => ["localhost:9200"]
      document_id => "%{edb_id}"
    }
  }
  if [type] == "cve_records"{
    elasticsearch {
      hosts => ["localhost:9200"]
      document_id => "%{cve}"
    }
  }
}

 

2.2 多日記錄重復問題

2.2.1 問題描述

經過2.1的處理后在單日中已不會出現重復記錄,但由於elasticsearch中將不同日期收到的記錄存到不同index中,index不同即便是同id也不會被認為是同一條記錄所以記錄會被收復,有時這種不同日期的重復我們也是不希望有的。

其實更本質而言,並不是“elasticsearch中將不同日期收到的記錄存到不同index中”,elasticsearch只是接收記錄將記錄存放到記錄index字段所標識的index中。而logstash的elasticsearch插件默認生成的index字段值是"logstash-%{+YYYY.MM.dd}"

我們要避免不同日期記錄重復,其做法就是重寫index值將其設置成一個不隨日期變化的固定值。

 

2.2.2 配置示例

input{
  jdbc {
    # jdbc:mysql://數據庫ip:端口/數據庫名
    jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb"
    # 數據庫用戶名      
    jdbc_user => "root"
    # 數據庫密碼
    jdbc_password => "root"
    # mysql java驅動文件位置
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar"
    # 驅動類名,這個一般不需要改
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次
    schedule => "* * * * *"
    # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這里只是簡單將表中所有記錄查出
    # 如果語句很多,可使用statement_file來指定sql語句文件
    statement => "select * from exploit_records"
  }
}

filter {

}

output{
  elasticsearch {
    hosts => ["localhost:9200"]
    # 將記錄id設為表關鍵字,避免單日重復
    document_id => "%{edb_id}"
    # 將index由默認的logstash-%{+YYYY.MM.dd}設置成固定值,避免多日重復
    index => "exploit_records"
  }
}

 

 

 

三、if-else語句寫法

在第二大節中我們多處用到if語句,這里再更具體說明一下其寫法。

官方說明見:https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals

if-else語句格式如下,和各語言的寫法是類似的:

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

而表達示(EXPRESSION)進一步可細化成操作項、操作符、和操作值三種元素。

比如在我們前面的寫法中[type]就是操作項,==就是操作符,"exploit_records"就是操作值;操作項和操作值的位和其他語言一樣可以互換(本質就是一個比較)。

if [type] == "exploit_records"{
  elasticsearch {
    hosts => ["localhost:9200"]
    document_id => "%{edb_id}"
  }
}

操作項----所有項的獲取都和上邊type一樣使用中括號括起來的形式。

操作值----就是我們自己設定的操作項的比較標准。另外如果操作項是整型那操作值也應該是整型。

操作符----

一元操作符(unary):!
布爾操作符(boolean):and, or, nand, xor
相等操作符(equality): ==, !=, <, >, <=, >=
正則操作符(regexp): =~, !~ (checks a pattern on the right against a string value on the left,操作項在左邊正則操作值在右邊)
包含操作符(inclusion): in, not in

 

參考:

https://segmentfault.com/a/1190000014387486


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM