MySQL數據以全量和增量方式,向ElasticSearch搜索引擎同步流程


一、配置詳解

場景描述:MySQL數據表以全量和增量的方式向ElasticSearch搜索引擎同步。

1、下載內容

  • elasticsearch 版本 6.3.2
  • logstash 版本 6.3.2
  • mysql-connector-java-5.1.13.jar

2、核心配置

路徑:/usr/local/logstash

新建配置目錄:sync-config

1)、配置全文

/usr/local/logstash/sync-config/cicadaes.conf

input {
 stdin {}
 jdbc {
 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/cicada?characterEncoding=utf8"
 jdbc_user => "root"
 jdbc_password => "root123"
 jdbc_driver_library => "/usr/local/logstash/sync-config/mysql-connector-java-5.1.13.jar"
 jdbc_driver_class => "com.mysql.jdbc.Driver"
 jdbc_paging_enabled => "true"
 jdbc_page_size => "50000"
 jdbc_default_timezone => "Asia/Shanghai"
 statement_filepath => "/usr/local/logstash/sync-config/user_sql.sql"
 schedule => "* * * * *"
 type => "User"
 lowercase_column_names => false
 record_last_run => true
 use_column_value => true
 tracking_column => "updateTime"
 tracking_column_type => "timestamp"
 last_run_metadata_path => "/usr/local/logstash/sync-config/user_last_time"
 clean_run => false
 }
 jdbc {
 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/cicada?characterEncoding=utf8"
 jdbc_user => "root"
 jdbc_password => "root123"
 jdbc_driver_library => "/usr/local/logstash/sync-config/mysql-connector-java-5.1.13.jar"
 jdbc_driver_class => "com.mysql.jdbc.Driver"
 jdbc_paging_enabled => "true"
 jdbc_page_size => "50000"
 jdbc_default_timezone => "Asia/Shanghai"
 statement_filepath => "/usr/local/logstash/sync-config/log_sql.sql"
 schedule => "* * * * *"
 type => "Log"
 lowercase_column_names => false
 record_last_run => true
 use_column_value => true
 tracking_column => "updateTime"
 tracking_column_type => "timestamp"
 last_run_metadata_path => "/usr/local/logstash/sync-config/log_last_time"
 clean_run => false
 }
}
filter {
 json {
 source => "message"
 remove_field => ["message"]
 }
}
output {
 if [type] == "User" {
 elasticsearch {
 hosts => ["127.0.0.1:9200"]
 index => "cicada_user_search"
 document_type => "user_search_index"
 }
 }
 if [type] == "Log" {
 elasticsearch {
 hosts => ["127.0.0.1:9200"]
 index => "cicada_log_search"
 document_type => "log_search_index"
 }
 }
}

2)、SQL文件

基於binlog,update_time字段

- user_sql.sql

SELECT
    id,
    user_name userName,
    user_phone userPhone,
    create_time createTime,
    update_time updateTime
FROM c_user
WHERE update_time > : sql_last_value

- log_sql.sql

SELECT
    id,
    param_value paramValue,
    request_ip requestIp,
    create_time createTime,
    update_time updateTime
FROM c_log
WHERE update_time > : sql_last_value

 

3)、配置參數說明

- input參數

statement_filepath:讀取SQL語句位置
schedule :這里配置每分鍾執行一次
type :類型,寫入ES的標識
lowercase_column_names :字段是否轉小寫
record_last_run :記錄上次執行時間
use_column_value :使用列的值
tracking_column :根據寫入ES的updateTime字段區分增量數據
tracking_column_type :區分的字段類型

- output參數

hosts :ES服務地址
index :Index名稱,類比理解數據庫名稱
document_type :Type名稱,類比理解表名稱

3、啟動進程

/usr/local/logstash/bin/logstash 
-f 
/usr/local/logstash/sync-config/cicadaes.conf

二、ES客戶端工具

1、下載軟件

  • kibana-6.3.2-windows-x86_64

2、修改配置

kibana-6.3.2-windows-x86_64\config\kibana.yml

添加配置:

elasticsearch.url: "http://127.0.0.1:9200"

3、雙擊啟動

kibana-6.3.2-windows-x86_64\bin\kibana.bat

4、訪問地址

http://localhost:5601
非原創


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM