mysql2es全量更新方案


mysql到es的全量更新方案可以通過 logstash來實現

logstash可以做基於全量的定時更新,也可以做基於時間的定時更新。

logstash的使用方式如下(本人用的是logstash-7.6.1,不同版本在使用上應該有細微區別,區別百度可解決)

1.https://artifacts.elastic.co/downloads/logstash/logstash-7.6.1.zip 下載安裝包

2.上傳到linux 通過unzip 解壓到bin目錄下新建mysql目錄(存放配置文件,由於ES7.6.1默認不讓安裝root,我也一起放到新用戶es下,目錄權限也全部賦值了)

3.安裝logstash的JDBC插件(bin/logstash-plugin install logstash-integration-jdbc)

4.在mysql目錄中放入jdbc.conf(連接數據庫的配置)  jdbc.sql(編寫sql的配置,通過此語句查詢到mysql數據)  last_value_meta(基於時間點更新需要用到)  mysql-connector-java-5.1.35.jar(JDBC包)

三個文件的內容貼這里以供參考

jdbc.conf:

input {
jdbc {
#set timezone
jdbc_default_timezone => "Asia/Shanghai"
# mysql 數據庫鏈接,dianpingdb為數據庫名
jdbc_connection_string => "jdbc:mysql://192.168.1.4:3306/dianping"
# 用戶名和密碼
jdbc_user => "root"
jdbc_password => "root"
# 驅動
jdbc_driver_library => "/opt/logstash-7.6.1/bin/mysql/mysql-connector-java-5.1.35.j
ar"
# 驅動類名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
last_run_metadata_path => "/opt/logstash-7.6.1/bin/mysql/last_value_meta"
# 執行的sql 文件路徑+名稱
statement_filepath => "/opt/logstash-7.6.1/bin/mysql/jdbc.sql"
# 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都
更新
schedule => "* * * * *"
}
}

 

  jdbc.sql:

select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disable_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on c
.id = a.seller_id where a.updated_at > :sql_last_value or b.updated_at > :sql_last_value or c.updated_at > :sql_last_value

last_value_meta(此文件一開始只填入了時間,此時看應該記錄了系統時間,至於怎么記錄更新的值還需要測試):

--- !ruby/object:DateTime '2020-03-29 08:13:00.216387000 Z'   

 

以上是基於最新時間點,用輪訓方式進行更新mysql庫數據到es庫中

 

 

啟動方式到bin下./logstash -f mysql/jdbc.conf


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM