Logstash同步Mysql數據到Es


logstash同步Mysql數據到Es步驟

1.運行依賴環境

  • logstash需要依賴JDK環境,需要提前安裝好JDK環境

2.安裝logstash

2.1上傳logstash壓縮包,並解壓和改名

cd /usr/local
tar -zxvf logstash-6.8.9.tar.gz
mv logstash-6.8.9 logstash

2.2安裝mysql和es插件

cd /usr/local/logstash/bin
./logstash-plugin install logstash-input-jdbc
./logstash-plugin install logstash-output-elasticsearch

2.3上傳mysql的jar包,提供依賴

mkdir -p /usr/local/logstash/jar
cd /usr/local/logstash/jar

2.4 創建配置文件,配置數據庫需要收集的表信息和es信息

mkdir -p /usr/local/logstash/myConfig
cd  /usr/local/logstash/config/myConfig
touch mysql.conf
vi mysql.conf


mysql.conf
-------------
input {
  jdbc {
    jdbc_driver_library => "/usr/local/logstash/jar/mysql-connector-java-5.1.46.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.58.222:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai"
    jdbc_user => "root"
    jdbc_password => "root"
    schedule => "* * * * *"
    statement => "SELECT * FROM user WHERE updated_time >= :sql_last_value"
    # 一定要設置timezone,否則會因為時差相差8小時,導致一直同步
    jdbc_default_timezone =>"Asia/Shanghai"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "updated_time"
    last_run_metadata_path => "syncpoint_table"
  }
}


output {
    elasticsearch {
        # ES的IP地址及端口,集群就寫所有的es服務器
        hosts => ["192.168.58.222:9200","192.168.58.222:9201"]
        # 索引名稱 可自定義
        index => "user"
        # 需要關聯的數據庫中有有一個id字段,對應類型中的id
        document_id => "%{id}"
        document_type => "user"
    }
    stdout {
        # JSON格式輸出
        codec => json_lines
    }
}

2.5 指定配置文件,啟動logstash(前台啟動並跟蹤日志)

/usr/local/logstash/bin/logstash -f /usr/local/logstash/myConfig/mysql.conf 

2.6 多文件方式同步ES數據

一個 logstash 實例可以借助 pipelines 機制同步多個表,只需要寫多個配置文件就可以了,假設我們有兩個表 table1 和 table2,對應兩個配置文件 mysql.conf 和 mysql_3.conf

在 config/pipelines.yml 中添加配置,添加配置后,啟動logstash時就不用指定配置文件了

cd /usr/local/logstash/config
vi pipelines.yml

pipelines.yml
--------------------
- pipeline.id: table1
  path.config: "/usr/local/logstash/myConfig/mysql.conf"
- pipeline.id: table2
  path.config: "/usr/local/logstash/myConfig/mysql_3.conf" 
  
  
mysql_3.conf
------------
input {
  jdbc {
    jdbc_driver_library => "/usr/local/logstash/jar/mysql-connector-java-5.1.46.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.58.222:3306/meite_goods?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai"
    jdbc_user => "root"
    jdbc_password => "root"
    schedule => "* * * * *"
    statement => "SELECT * FROM meite_product WHERE updated_time >= :sql_last_value"
	jdbc_default_timezone =>"Asia/Shanghai"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "updated_time"
    last_run_metadata_path => "syncpoint_table"
  }
}


output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.58.222:9200","192.168.58.222:9201"]
        # 索引名稱 可自定義
        index => "goods"
        # 需要關聯的數據庫中有有一個id字段,對應類型中的id
        document_id => "%{id}"
        document_type => "goods"
    }
    stdout {
        # JSON格式輸出
        codec => json_lines
    }
}


2.7 logstash 自定義的配置文件說明

jdbc_driver_library: jdbc mysql 驅動的路徑,在上一步中已經下載
jdbc_driver_class: 驅動類的名字,mysql 填 com.mysql.jdbc.Driver 就好了
jdbc_connection_string: mysql 地址
jdbc_user: mysql 用戶
jdbc_password: mysql 密碼
schedule: 執行 sql 時機,類似 crontab 的調度
statement: 要執行的 sql,以 “:” 開頭是定義的變量,可以通過 parameters 來設置變量,這里的 sql_last_value 是內置的變量,表示上一次 sql 執行中 update_time 的值,這里 update_time 條件是 >= 因為時間有可能相等,沒有等號可能會漏掉一些增量
use_column_value: 使用遞增列的值
tracking_column_type: 遞增字段的類型,numeric 表示數值類型, timestamp 表示時間戳類型
tracking_column: 遞增字段的名稱,這里使用 update_time 這一列,這列的類型是 timestamp
last_run_metadata_path: 同步點文件,這個文件記錄了上次的同步點,重啟時會讀取這個文件,這個文件可以手動修改


schedule => "* * * * *"
這里schedule是Crontab,工具網址:https://tool.lu/crontab/  注意:Crontab表達式以分為單位


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM