利用Logstash插件進行Elasticsearch與Mysql的數據


Logstash與Elasticsearch的安裝就不多說了,我之前有兩篇文章寫的比較詳細了ElasticSearch + Logstash + Kibana 搭建筆記Filebeat+Logstash+ElasticSearch+Kibana搭建Apache訪問日志解析平台

Mysql Connector沒有包含在ELK的包中,需要自己下載

配置文件

最主要的配置文件是 Logstash 的配置,我們命名為 mysql.conf 樣例如下

input {
    stdin {
    
    }
    jdbc {
        # 數據庫地址  端口  數據庫名
        jdbc_connection_string => "jdbc:mysql://localhost:3306/shen"
        # 數據庫用戶名      
        jdbc_user => "root"
        # 數據庫密碼
        jdbc_password => "rootroot"
        # mysql java驅動地址 
        jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar"
        # 驅動類的名稱
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        
        jdbc_paging_enabled => "true"
        
        jdbc_page_size => "50000"
        statement => "SELECT * FROM TABLE"
        # sql 語句文件,對於復雜的查詢,可以放在文件中。
        # statement_filepath => "filename.sql"
        # 設置監聽間隔,語法與Linux系統Cron相同
        schedule => "* * * * *"
    }
}
output {
     stdout {
        codec => json_lines
    }
   elasticsearch {
        hosts  => "localhost:9200"
        index => "contacts"
     document_type => "contact"
        document_id => "%{id}"
    }
}

進行數據同步

進行數據同步只需要將 Logstash 啟動,並且通過-f參數指定我們創建的 mysql.conf 配置文件即可,可以通過終端中輸出的信息查看同步是否成功。

./bin/logstash -f mysql.conf

本例是對一個數據庫表進行同步,如果需要同步多個表的數據,可以創建多個配置文件,也可以在一個配置文件中指定多個 jdbc input。配置中的所有項目都必須重新復制一遍。

增量更新

這個例子中的SQL執行的全量更新,如果需要進行增量更新,就需要對SQL進行一些修改。

input {
    stdin {
    
    }
    jdbc {
        # 數據庫地址  端口  數據庫名
        jdbc_connection_string => "jdbc:mysql://localhost:3306/shen"
        # 數據庫用戶名      
        jdbc_user => "root"
        # 數據庫密碼
        jdbc_password => "rootroot"
        # mysql java驅動地址 
        jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar"
        # 驅動類的名稱
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"

        #是否記錄上次運行的結果
        record_last_run => true
        #記錄上次運行結果的文件位置
        last_run_metadata_path => ""
        #是否使用數據庫某一列的值,
        use_column_value => true
        tracking_column => "id"
        #numeric或者timestamp
        tracking_column_type => "numeric"
        
        #如果為true則會清除 last_run_metadata_path 的記錄,即重新開始同步數據
        clean_run => false

        #sql_last_value根據tracking類型,默認為0或者1970-1-1
        statement => "SELECT * FROM TABLE WHERE id > :last_sql_value"
        # sql 語句文件,對於復雜的查詢,可以放在文件中。
        # statement_filepath => "filename.sql"
        # 設置監聽間隔,語法與Linux系統Cron相同
        schedule => "* * * * *"
    }
}
output {
     stdout {
        codec => json_lines
    }
   elasticsearch {
        hosts  => "localhost:9200"
        index => "contacts"
     document_type => "contact"
        document_id => "%{id}"
    }
}

增量更新會忽略對歷史數據的更新,如果業務中歷史數據經常發生變化,則可以通過全量更新的方法。

重要參數說明

參數 類型 說明
clean_run boolean
jdbc_connection_string string
jdbc_driver_class string
jdbc_user string
jdbc_fetch_size number
jdbc_page_size number 默認值100000
jdbc_paging_enabled boolean
sequel_opts hash 可以傳入到SQL中的參數

本文配置在 ELK 6.0 beta 環境下測試通過。

本文為作者原創,未經允許不得轉載。如果您覺得本文對您有幫助,請隨意打賞,您的支持將鼓勵我繼續創作。

參考資料:
1、Mysql Connector
2、ElasticSearch5+logstash的logstash-input-jdbc實現mysql數據同步
3、logstash-input-jdbc實現mysql 與elasticsearch實時同步深入詳解
4、logstash input jdbc連接數據庫
5、JDBC Plugin


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM