Logstash學習之路(四)使用Logstash將mysql數據導入elasticsearch(單表同步、多表同步、全量同步、增量同步)


一、使用Logstash將mysql數據導入elasticsearch

1、在mysql中准備數據:

mysql> show tables;
+----------------+
| Tables_in_yang |
+----------------+
| im             |
+----------------+
1 row in set (0.00 sec)

mysql> select * from im;
+----+------+
| id | name |
+----+------+
|  2 | MSN  |
|  3 | QQ   |
+----+------+
2 rows in set (0.00 sec)

2、簡單實例配置文件准備:

[root@master bin]# cat mysqles.conf 
input {
        stdin {}
        jdbc {
                type => "jdbc"
                jdbc_connection_string => "jdbc:mysql://192.168.200.100:3306/yang?characterEncoding=UTF-8&autoReconnect=true"
                 # 數據庫連接賬號密碼;
                jdbc_user => "root"
                jdbc_password => "010209"
                 # MySQL依賴包路徑;
                jdbc_driver_library => "/mnt/mysql-connector-java-5.1.38.jar"
                 # the name of the driver class for mysql
                jdbc_driver_class => "com.mysql.jdbc.Driver"
                statement => "SELECT * FROM `im`"
        }
}
output {
        elasticsearch {
                 # 配置ES集群地址
                hosts => ["192.168.200.100:9200"]
                 # 索引名字,必須小寫
                index => "im"
        }
        stdout {
        }
}

3、實例結果:

[root@master bin]# ./logstash -f mysqles.conf

4、更多選項配置如下(單表同步):

input {
    stdin {}
    jdbc {
        type => "jdbc"
         # 數據庫連接地址
        jdbc_connection_string => "jdbc:mysql://192.168.1.1:3306/TestDB?characterEncoding=UTF-8&autoReconnect=true""
         # 數據庫連接賬號密碼;
        jdbc_user => "username"
        jdbc_password => "pwd"
         # MySQL依賴包路徑;
        jdbc_driver_library => "mysql/mysql-connector-java-5.1.34.jar"
         # the name of the driver class for mysql
        jdbc_driver_class => "com.mysql.jdbc.Driver"
         # 數據庫重連嘗試次數
        connection_retry_attempts => "3"
         # 判斷數據庫連接是否可用,默認false不開啟
        jdbc_validate_connection => "true"
         # 數據庫連接可用校驗超時時間,默認3600S
        jdbc_validation_timeout => "3600"
         # 開啟分頁查詢(默認false不開啟);
        jdbc_paging_enabled => "true"
         # 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值);
        jdbc_page_size => "500"
         # statement為查詢數據sql,如果sql較復雜,建議配通過statement_filepath配置sql文件的存放路徑;
         # sql_last_value為內置的變量,存放上次查詢結果中最后一條數據tracking_column的值,此處即為ModifyTime;
         # statement_filepath => "mysql/jdbc.sql"
        statement => "SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `DetailTab` WHERE ModifyTime>= :sql_last_value order by ModifyTime asc"
         # 是否將字段名轉換為小寫,默認true(如果有數據序列化、反序列化需求,建議改為false);
        lowercase_column_names => false
         # Value can be any of: fatal,error,warn,info,debug,默認info;
        sql_log_level => warn
         #
         # 是否記錄上次執行結果,true表示會將上次執行結果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
        record_last_run => true
         # 需要記錄查詢結果某字段的值時,此字段為true,否則默認tracking_column為timestamp的值;
        use_column_value => true
         # 需要記錄的字段,用於增量同步,需是數據庫字段
        tracking_column => "ModifyTime"
         # Value can be any of: numeric,timestamp,Default value is "numeric"
        tracking_column_type => timestamp
         # record_last_run上次數據存放位置;
        last_run_metadata_path => "mysql/last_id.txt"
         # 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;
        clean_run => false
         #
         # 同步頻率(分 時 天 月 年),默認每分鍾同步一次;
        schedule => "* * * * *"
    }
}
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
    # convert 字段類型轉換,將字段TotalMoney數據類型改為float;
    mutate {
        convert => {
            "TotalMoney" => "float"
        }
    }
}
output {
    elasticsearch {
         # 配置ES集群地址
        hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
         # 索引名字,必須小寫
        index => "consumption"
    }
    stdout {
        codec => json_lines
    }
}

5、多表同步:

多表配置和單表配置的區別在於input模塊的jdbc模塊有幾個type,output模塊就需對應有幾個type;

input {
    stdin {}
    jdbc {
         # 多表同步時,表類型區分,建議命名為“庫名_表名”,每個jdbc模塊需對應一個type;
        type => "TestDB_DetailTab"
        
         # 其他配置此處省略,參考單表配置
         # ...
         # ...
         # record_last_run上次數據存放位置;
        last_run_metadata_path => "mysql\last_id.txt"
         # 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;
        clean_run => false
         #
         # 同步頻率(分 時 天 月 年),默認每分鍾同步一次;
        schedule => "* * * * *"
    }
    jdbc {
         # 多表同步時,表類型區分,建議命名為“庫名_表名”,每個jdbc模塊需對應一個type;
        type => "TestDB_Tab2"
        # 多表同步時,last_run_metadata_path配置的路徑應不一致,避免有影響;
         # 其他配置此處省略
         # ...
         # ...
    }
}
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
 
output {
    # output模塊的type需和jdbc模塊的type一致
    if [type] == "TestDB_DetailTab" {
        elasticsearch {
             # host => "192.168.1.1"
             # port => "9200"
             # 配置ES集群地址
            hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
             # 索引名字,必須小寫
            index => "detailtab1"
             # 數據唯一索引(建議使用數據庫KeyID)
            document_id => "%{KeyId}"
        }
    }
    if [type] == "TestDB_Tab2" {
        elasticsearch {
            # host => "192.168.1.1"
            # port => "9200"
            # 配置ES集群地址
            hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
            # 索引名字,必須小寫
            index => "detailtab2"
            # 數據唯一索引(建議使用數據庫KeyID)
            document_id => "%{KeyId}"
        }
    }
    stdout {
        codec => json_lines
    }
}

 

二、使用logstash全量同步(1分鍾同步一次)mysql數據導入到elasticsearch

配置如下:

input {
        stdin {}
        jdbc {
                type => "jdbc"
                jdbc_connection_string => "jdbc:mysql://192.168.200.100:3306/yang?characterEncoding=UTF-8&autoReconnect=true"
                 # 數據庫連接賬號密碼;
                jdbc_user => "root"
                jdbc_password => "010209"
                 # MySQL依賴包路徑;
                jdbc_driver_library => "/mnt/mysql-connector-java-5.1.38.jar"
                 # the name of the driver class for mysql
                jdbc_driver_class => "com.mysql.jdbc.Driver"
                statement => "SELECT * FROM `im`"
                schedule => "* * * * *"
        }
}
output {
        elasticsearch {
                 # 配置ES集群地址
                hosts => ["192.168.200.100:9200"]
                 # 索引名字,必須小寫
                index => "im"
        }
        stdout {
        }
}

第一次同步結果:

[2019-04-25T14:39:03,194][INFO ][logstash.inputs.jdbc     ] (0.100064s) SELECT * FROM `im`
{
      "@version" => "1",
    "@timestamp" => 2019-04-25T06:39:03.338Z,
          "type" => "jdbc",
            "id" => 3,
          "name" => "QQ"
}
{
      "@version" => "1",
    "@timestamp" => 2019-04-25T06:39:03.309Z,
          "type" => "jdbc",
            "id" => 2,
          "name" => "MSN"
}

向mysql插入數據后第二次同步:

[2019-04-25T14:40:00,295][INFO ][logstash.inputs.jdbc     ] (0.001956s) SELECT * FROM `im`
{
      "@version" => "1",
    "@timestamp" => 2019-04-25T06:40:00.310Z,
          "type" => "jdbc",
            "id" => 2,
          "name" => "MSN"
}
{
      "@version" => "1",
    "@timestamp" => 2019-04-25T06:40:00.316Z,
          "type" => "jdbc",
            "id" => 3,
          "name" => "QQ"
}
{
      "@version" => "1",
    "@timestamp" => 2019-04-25T06:40:00.317Z,
          "type" => "jdbc",
            "id" => 4,
          "name" => "dfs"
}
{
      "@version" => "1",
    "@timestamp" => 2019-04-25T06:40:00.317Z,
          "type" => "jdbc",
            "id" => 5,
          "name" => "fdf"
}

三、使用logstash增量同步(1分鍾同步一次)mysql數據導入到elasticsearch

 

input {
        stdin {}
        jdbc {
                type => "jdbc"
                jdbc_connection_string => "jdbc:mysql://192.168.200.100:3306/yang?characterEncoding=UTF-8&autoReconnect=true"
                # 數據庫連接賬號密碼;
                jdbc_user => "root"
                jdbc_password => "010209"
                # MySQL依賴包路徑;
                jdbc_driver_library => "/mnt/mysql-connector-java-5.1.38.jar"
                # the name of the driver class for mysql
                jdbc_driver_class => "com.mysql.jdbc.Driver"
                #是否開啟分頁
                jdbc_paging_enabled => "true"
                #分頁條數
                jdbc_page_size => "50000"
                # 執行的sql 文件路徑+名稱
                #statement_filepath => "/data/my_sql2.sql"
                #SQL語句,也可以使用statement_filepath來指定想要執行的SQL
                statement => "SELECT * FROM `im` where id > :sql_last_value"
                #每一分鍾做一次同步
                schedule => "* * * * *"
                #是否將字段名轉換為小寫,默認true(如果有數據序列化、反序列化需求,建議改為false)
                lowercase_column_names => false
                # 是否記錄上次執行結果,true表示會將上次執行結果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
                record_last_run => true
                # 需要記錄查詢結果某字段的值時,此字段為true,否則默認tracking_column為timestamp的值;
                use_column_value => true
                # 需要記錄的字段,用於增量同步,需是數據庫字段
                tracking_column => "id"
                # record_last_run上次數據存放位置;
                last_run_metadata_path => "/mnt/sql_last_value"
                #是否將字段名轉換為小寫,默認true(如果有數據序列化、反序列化需求,建議改為false)
                clean_run => false
        }
}
output {
        elasticsearch {
                 # 配置ES集群地址
                hosts => ["192.168.200.100:9200"]
                 # 索引名字,必須小寫
                index => "im"
        }
        stdout {
        }
}
注意標紅色的部分:這些配置是為了達到增量同步的目的,每次同步結束之后會記錄最后一條數據的tracking_column列,比如我們這設置的是id,就會將這個值記錄在last_run_metadata_path中。
下次在執行同步的時候會將這個值,賦給sql_last_value

說明:

由於我上一次最后sql_last_value文件中記錄的id為5,當向mysql插入id=6的值時,結果:

插入id=8,7時;

因為我插入的順序,先插入id 為8,后插入id為7,因此最后一次記錄的id為7,當我下一次插入id=9,10時,會重新導入id為8的值。

當我插入id=10的值后,結束,觀察sql_last_value文件的最后記錄:

結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM