Elasticsearch 同步mysql數據


將mysqL 數據同步到elsearch中(同步而非導入,當數據變化時el數據跟着變化),有兩種情況,單表數據和聯表數據。

0.在bin目錄放jdbc驅動jar包

1.logstash 的配置尤為重要

參考:

https://blog.csdn.net/qq_16436555/article/details/9136071

 input { jdbc {
     #數據庫地址,用戶,密碼 jdbc_connection_string
=> "jdbc:mysql://ip:3306/test" jdbc_user => "root" jdbc_password => "root"
     #驅動位置配置
#此處的路徑最好是絕對路徑,行對路徑取決與允許命令的目錄 jdbc_driver_library => "sync-conf/mysql-connector-java-6.0.6.jar" jdbc_driver_class => "com.mysql.jdbc.Driver"
     #開啟分頁查詢
jdbc_paging_enabled => "true" jdbc_page_size => "50000"
 

      #決定增量同步的關鍵

      #是否清除 last_run_metadata_path 的記錄,如果為真那么每次都相當於從頭開始查詢所有的數據庫記錄

      clean_run => false
     #是否需要記錄某個column 的值,如果 record_last_run 為真,可以自定義我們需要表的字段名稱
   ,使用其它字段追蹤,而不是用時間,此時該參數就要為 true. 否則默認 track 的是 timestamp 的值.   use_column_value => true    #如果 use_column_value 為真,需配置此參數. 這個參數就是數據庫給出的一個字段名稱。當然該字段必須是遞增的,可以是 數據庫的數據時間這類的    tracking_column => create_time
#只有兩種選擇 numeric,timestamp
     tracking_column_type=>"timestamp"   #是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中    record_last_run => true #在 SQL 語句中 WHERE MY_ID > :last_sql_value 即可. 其中 :sql_last_value 取得就是該文件中的值 last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info" #是否將字段名稱轉小寫。 #這里有個小的提示,如果你這前就處理過一次數據,並且在Kibana中有對應的搜索需求的話,還是改為true, #因為默認是true,並且Kibana是大小寫區分的。准確的說應該是ES大小寫區分 lowercase_column_names => false #你的SQL的位置,當然,你的SQL也可以直接寫在這里。 #statement => SELECT * FROM tabeName t WHERE t.creat_time > :sql_last_value order by creat_time asc
   statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" #sql 文件執行路徑,最好絕對路徑。 
#注意:外載的SQL文件就是一個文本文件就可以了,還有需要注意的是,一個jdbc{}插件就只能處理一個SQL語句,
#如果你有多個SQL需要處理的話,只能在重新建立一個jdbc{}插件。 }
 # 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新 schedule => "* * * * *"
#用於區分輸入的數據源,可以在output中區分輸出到哪里 type => "user" } } output { #設置窗口日志輸出 stdout { codec => json_lines }
if[type] == "user"{
    elasticsearch {
        hosts => ["127.0.0.1:9200"] #注意index的值不支持大寫字母 index => "user" #document_type自行設置,不設置時,默認為doc #document_type => "" #此處的值來自查詢sql中的列名稱,根據需要自行配置 document_id => "%{id}" }

}
}

重點:

1.tracking_column 不配置是默認根據timestamp追蹤,tracking_column 配了相應的use_column_value => true

2.設置上次追蹤的位置記錄,就開啟record_last_run => true,

在增量同步時,根據tracking 的變化會更新el數據,所以在sql語句中要添加 :sql_last_value order by tracking_column asc 

這樣會往 last_run_metadata_path 文件里寫最新的racking_column值

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM