centos7配置Logstash同步Mysql數據到Elasticsearch


Logstash 是開源的服務器端數據處理管道,能夠同時從多個來源采集數據,轉換數據,然后將數據發送到您最喜歡的“存儲庫”中。個人認為這款插件是比較穩定,容易配置的使用Logstash之前,我們得明確自己的需求場景是什么,從哪種類型的數據源同步數據到哪種存儲庫。Logstash版本迭代較快,每個版本的插件都有點區別,比如6.3版本以后output到沒有jdbc的插件,然而你如果想使用output的jdbc插件就需要去安裝插件(logstash-output-jdbc),也就是說,如果你想用output的jdbc,你就必須使用6.3以下(最好5.x)的版本。這里以Logstash5.3.1版本為例。

1.下載安裝

Logstash不支持jdk1.10,建議使用1.8。Logstash版本要與Es版本保持一致。下載地址:https://www.elastic.co/cn/downloads/past-releases/logstash-5-3-1

上傳到目錄,例如:usr/local/src

解壓  tar -zxvf  logstash-5.3.1.tar.gz

重命名 mv logstash-5.3.1 logstash

啟動驗證:

cd logstash

bin/logstash -e 'input{stdin{}} output{stdout{}}',輸出如下,代表啟動成功:

關閉

ps -ef | grep logstash

kill pid

2.配置目錄

配置文件目錄:logstash/config

新建配置文件test.conf

下載mysql-connector-java-5.1.30.jar,下載地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.30

logstash目錄下創建一個jar目錄,用來存放jar文件 

mkdir jar
在logstash目錄下創建一個sql目錄,用來存放查詢sql文件test.sql

mkdir sql

3.配置文件

Logstash同步數據方式有全量同步和增量同步兩種,不同方式配置文件有細微不同。第一次同步時需要全量的數據,之后則需要定時去同步增量數據,使用logstash需要了解一下事項:

1.凡是SQL可以實現的logstash均可以實現(本就是通過sql查詢數據)

2.支持每次全量同步或按照特定字段(如自增ID、更新時間)增量同步

3.同步頻率可控,最快同步頻率每分鍾一次(如果對實效性要求較高,慎用)

4.不支持被物理刪除的數據同步物理刪除ES中的數據(可在表設計中增加邏輯刪除字段標識數據刪除)

全量同步

test.conf文件內容如下:

input {
  jdbc {
      # mysql 數據庫鏈接,test為數據庫名
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
      jdbc_user => "root"
      jdbc_password => "root"

      # 驅動路徑
      jdbc_driver_library => "/usr/local/src/logstash/jar/mysql-connector-java-5.1.30-bin.jar"

      # 驅動類名
      jdbc_driver_class => "com.mysql.jdbc.Driver"

      #是否分頁
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      #直接執行sql語句
      #statement =>"select * from test"
      # 執行的sql 文件路徑+名稱
      statement_filepath => "/usr/local/src/logstash/sql/test.sql"

      #設置監聽間隔  各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新
      schedule => "* * * * *"

      # 索引類型
      #type => "jdbc"
    }

}

output {
  elasticsearch {
        #es的ip和端口
        hosts => ["http://127.0.0.1:9200"]
        #ES索引名稱(自己定義的)
        index => "test_logstash"
        #文檔類型(自己定義的)
        document_type => "test"
        #設置es中數據的id為數據庫中的字段(一般設置為mysql中主鍵字段)
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }

}

test.sql文件內容如下:

#sql查詢語句,mysql中怎樣寫,此處就怎樣寫
select
* from table

指定配置文件啟動Logstash:

cd logstash

bin/logstash -f config/test.conf

會看到mysql中數據會在屏幕顯示,如果有報錯,一般為配置文件出錯。

后台啟動:nohup bin/logstash -f config/test.conf 

增量同步

test.conf文件內容如下:

input {
  jdbc {
      # mysql 數據庫鏈接,test為數據庫名
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
      jdbc_user => "root"
      jdbc_password => "root"

      # 驅動路徑
      jdbc_driver_library => "/usr/local/src/logstash/jar/mysql-connector-java-5.1.30-bin.jar"

      # 驅動類名
      jdbc_driver_class => "com.mysql.jdbc.Driver"

      #是否分頁
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      
      record_last_run => true
      use_column_value => true
      tracking_column => "id"
      last_run_metadata_path => "/usr/local/src/logstash/last_run_record"


      #直接執行sql語句
      statement =>"statement =>"select * from test where id >:sql_last_value""
      # 執行的sql 文件路徑+名稱
      #statement_filepath => "/usr/local/src/logstash/sql/test.sql"

      #設置監聽間隔  各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新
      schedule => "* * * * *"

      # 索引類型
      #type => "jdbc"
    }

}

output {
  elasticsearch {
        #es的ip和端口
        hosts => ["http://127.0.0.1:9200"]
        #ES索引名稱(自己定義的)
        index => "test_logstash"
        #文檔類型(自己定義的)
        document_type => "test"
        #設置es中數據的id為數據庫中的字段(一般設置為mysql中主鍵字段)
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }

}

紅色部分是跟全量同步有區別的地方:

record_last_run:記錄最后運行結果

use_column_value:記錄字段

tracking_column:記錄字段名,這里就是id

last_run_metadata_path:記錄數據保存位置

sql_last_value:下次在執行同步的時候會將這個值,賦給sql_last_value

啟動Logstash后,以后就是根據查詢條件增量同步數據了。

到此,使用Logstash同步Mysql數據到Es就算完成了,各位如果覺得還有點意義,煩請點一下推薦,加個關注,互相交流,如果安裝過程有任何問題或者發現錯誤,都可以留言交流,共同進步! 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM