input {
jdbc {
#jdbc驅動包位置
jdbc_driver_library => "D:\tools\elk\logstash-7.6.1\ojdbc8-12.2.0.1.jar"
#jdbc驅動類
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
# 數據庫相關配置
jdbc_connection_string => "jdbc:oracle:thin:callcard/huawei123@//callcardsitoracle01.beta.hic.cloud:1521/callcardsit_srv"
jdbc_user => "callcard"
jdbc_password => "huawei123"
# 是否清除sql_last_value的記錄,需要增量同步時此字段必須為false;
clean_run => true
# 同步頻率(分 時 天 月 年),默認每分鍾同步一次;
schedule => "*/10 * * * * *"
use_column_value => true
tracking_column_type => timestamp
tracking_column => updated_at
# 同步SQL
statement =>"select *
from user n
where updated_at>:sql_last_value and updated_at<sysdate order by updated_at desc"
# 索引類型,不需要指定type,否則會在同步ES后生成type字段
#type => "user"
# 設置時區
#jdbc_default_timezone =>"Asia/Shanghai"
}
}
filter {
# 刪除無用字段
mutate {
remove_field => "@timestamp"
remove_field => "@version"
}
# 時間+8個時區,思想是找臨時變量,最后+8后替換
ruby {
code => "event.set('@created_at', event.get('created_at').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('created_at',event.get('@created_at'))"
}
mutate {
remove_field => ["@created_at"]
}
}
output {
stdout {
codec => rubydebug
}
#if [type]=="user" {
elasticsearch {
# ES host:port
hosts => ["127.0.0.1:9200"]
#將mysql數據加入blog索引下,會自動創建
index => "user"
# 自增ID 需要關聯的數據庫中有有一個id字段,對應索引的id號_id
document_id => "%{id}"
}
# }
}