mysql到es的全量更新方案可以通過 logstash來實現
logstash可以做基於全量的定時更新,也可以做基於時間的定時更新。
logstash的使用方式如下(本人用的是logstash-7.6.1,不同版本在使用上應該有細微區別,區別百度可解決)
1.https://artifacts.elastic.co/downloads/logstash/logstash-7.6.1.zip 下載安裝包
2.上傳到linux 通過unzip 解壓到bin目錄下新建mysql目錄(存放配置文件,由於ES7.6.1默認不讓安裝root,我也一起放到新用戶es下,目錄權限也全部賦值了)
3.安裝logstash的JDBC插件(bin/logstash-plugin install logstash-integration-jdbc)
4.在mysql目錄中放入jdbc.conf(連接數據庫的配置) jdbc.sql(編寫sql的配置,通過此語句查詢到mysql數據) last_value_meta(基於時間點更新需要用到) mysql-connector-java-5.1.35.jar(JDBC包)
三個文件的內容貼這里以供參考
jdbc.conf:
input {
jdbc {
#set timezone
jdbc_default_timezone => "Asia/Shanghai"
# mysql 數據庫鏈接,dianpingdb為數據庫名
jdbc_connection_string => "jdbc:mysql://192.168.1.4:3306/dianping"
# 用戶名和密碼
jdbc_user => "root"
jdbc_password => "root"
# 驅動
jdbc_driver_library => "/opt/logstash-7.6.1/bin/mysql/mysql-connector-java-5.1.35.j
ar"
# 驅動類名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
last_run_metadata_path => "/opt/logstash-7.6.1/bin/mysql/last_value_meta"
# 執行的sql 文件路徑+名稱
statement_filepath => "/opt/logstash-7.6.1/bin/mysql/jdbc.sql"
# 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都
更新
schedule => "* * * * *"
}
}
jdbc.sql:
select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disable_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on c
.id = a.seller_id where a.updated_at > :sql_last_value or b.updated_at > :sql_last_value or c.updated_at > :sql_last_value
last_value_meta(此文件一開始只填入了時間,此時看應該記錄了系統時間,至於怎么記錄更新的值還需要測試):
--- !ruby/object:DateTime '2020-03-29 08:13:00.216387000 Z'
以上是基於最新時間點,用輪訓方式進行更新mysql庫數據到es庫中
啟動方式到bin下./logstash -f mysql/jdbc.conf