Logstash的配置


MySql 到 Elasticsearch

input {
    jdbc {
        # 驅動jar包的位置
        jdbc_driver_library => "/usr/local/logstash/jdbc-driver-library/mysql-connector-java-8.0.19.jar"
        # 驅動類名
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"        
        # 數據庫連接        
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/base_db?characterEncoding=utf8&autoReconnect=true&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull"
        # mysql用戶名
        jdbc_user => "root"
        # mysql密碼
        jdbc_password => "123456"        
        # 數據庫重連嘗試次數        
        connection_retry_attempts => "3"
        # 開啟分頁查詢(默認false不開啟)
        jdbc_paging_enabled => true
        # 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值)
        jdbc_page_size => "2"
        # 如果sql較復雜,建議配通過statement_filepath配置sql文件的存放路徑;
        statement_filepath => "/usr/local/logstash/sql/t_sys_loginperson.sql"
        # 需要記錄查詢結果某字段的值時,此字段為true
        use_column_value => true        
        # 需要記錄的字段,用於增量同步
        tracking_column => "last_modify_time"
        # 字段類型
        tracking_column_type => "timestamp"
        # 記錄上一次運行記錄
        record_last_run => true
        # 上一次運行記錄值的存放文件路徑
        last_run_metadata_path => "/usr/local/logstash/last-run-metadata/t_sys_loginperson.txt"
        # 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;
        clean_run => false
        # 設置時區 如果設置會將 last_modify_time 增加8小時
        #jdbc_default_timezone => "Asia/Shanghai"
        # 同步頻率(分 時 天 月 年),默認每分鍾同步一次;
        schedule => "* * * * *"
    }    
}
filter {
    json {
        source => "message"
        remove_field => ["message","@version"]
    }
}
output {        
    elasticsearch { 
        hosts => "127.0.0.1:9200"
        index => "%{table_name}" 
        document_id => "%{id}"
    }
} 

MySql 到 Kafka

input {
    jdbc {
        # 驅動jar包的位置
        jdbc_driver_library => "/usr/local/logstash/jdbc-driver-library/mysql-connector-java-8.0.19.jar"
        # 驅動類名
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"        
        # 數據庫連接        
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/base_db?characterEncoding=utf8&autoReconnect=true&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull"
        # mysql用戶名
        jdbc_user => "root"
        # mysql密碼
        jdbc_password => "123456"        
        # 數據庫重連嘗試次數        
        connection_retry_attempts => "3"
        # 開啟分頁查詢(默認false不開啟)
        jdbc_paging_enabled => true
        # 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值)
        jdbc_page_size => "2"
        # 如果sql較復雜,建議配通過statement_filepath配置sql文件的存放路徑;
        statement_filepath => "/usr/local/logstash/sql/t_sys_loginperson.sql"
        # 需要記錄查詢結果某字段的值時,此字段為true
        use_column_value => true        
        # 需要記錄的字段,用於增量同步
        tracking_column => "last_modify_time"
        # 字段類型
        tracking_column_type => "timestamp"
        # 記錄上一次運行記錄
        record_last_run => true
        # 上一次運行記錄值的存放文件路徑
        last_run_metadata_path => "/usr/local/logstash/last-run-metadata/t_sys_loginperson.txt"
        # 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false
        clean_run => false
        # 設置時區 如果設置會將 last_modify_time 增加8小時
        #jdbc_default_timezone => "Asia/Shanghai"
        # 同步頻率(分 時 天 月 年),默認每分鍾同步一次;
        schedule => "* * * * *"        
    }        
}
filter {
    mutate {
        # 刪除字段
        remove_field => ["@timestamp","@version"]
    }
}
output {    
    kafka {        
        bootstrap_servers => "10.10.6.202:9092"
        topic_id => "base-db"            
        codec => "json"
        client_id => "kafkaOutPut"
    }
} 

Kafka 到 Elasticsearch

input {
    kafka {
        bootstrap_servers => "10.10.6.202:9092"
        client_id => "kafkaInPut"
        # 從最新一條開始讀取
        auto_offset_reset => "latest"
        # 消費線程,一般是這個隊列的partitions數
        consumer_threads => 3
        decorate_events => true
        # 隊列名稱
        topics => ["base-db"]
        codec => "json"        
    }
}
filter {
    mutate {
        # 刪除字段
        remove_field => ["@timestamp","@version"]
    }
}
output {    
    elasticsearch {
        hosts => "10.10.6.202:9200"
        index => "%{table_name}"
        document_id => "%{id}"
    }    
}

t_sys_loginperson.sql 文件

select id,person_name,date_format(create_time, '%Y-%m-%d %H:%i:%s') as create_time,date_format(last_modify_time, '%Y-%m-%d %H:%i:%s') as last_modify_time,'t_sys_loginperson' as table_name from t_sys_loginperson where last_modify_time>:sql_last_value
注意 所有的日期類型需轉為字符串類型
#帶時分秒
date_format(create_time, '%Y-%m-%d %H:%i:%s')
#不帶時分秒
date_format(create_time, '%Y-%m-%d')

啟動

#單配置文件啟動(后台啟動)
nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/kafka2elasticsearch.conf > /dev/null 2>&1 &

多個pipeline啟動多個配置文件

配置pipelines.yml

vi /usr/local/logstash/config/pipelines.yml
- pipeline.id: mysql
   pipeline.workers: 1
   pipeline.batch.size: 100
   path.config: "/usr/local/logstash/customize-config/mysql2kafka.conf"
 - pipeline.id: es
   queue.type: persisted
   path.config: "/usr/local/logstash/customize-config/kafka2elasticsearch.conf"

啟動

#無需指定配置文件,默認走pipelines.yml的配置,如果使用-e或-f時,Logstash會忽略pipelines.yml文件
nohup /usr/local/logstash/bin/logstash > /dev/null 2>&1 &

多個pipeline備忘
https://blog.csdn.net/UbuntuTouch/article/details/100995868?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

https://segmentfault.com/a/1190000016592277


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM