注意事項:默認Kafka傳遞給elastci的數據是在'data'字段,且不包含其他數據,所以需要使用額外的操作進行處理
logstash配置文件操作
input {
kafka {
bootstrap_servers => "172.17.107.187:9092,172.17.107.187:9093,172.17.107.187:9094" # 字符串形式,kafka集群地址
auto_offset_reset => "latest" # 拉取最近數據
consumer_threads => 5 # 使用的線程數
decorate_events => true # 傳遞給elastci的數據增加附加數據
topics => ["test_canal_topic"] # 拉取的kafka的指定topic
tags => ["canal"] # 標簽,額外使用該參數可以在elastci中創建不同索引
}
}
filter {
# 把默認的data字段重命名為message字段,方便在elastic中顯示
mutate {
rename => ["data", "message"]
}
# 還可以使用其他的處理方式,在此就不再列出來了
}
output {
elasticsearch {
hosts => ["http://172.17.107.187:9203", "http://172.17.107.187:9201","http://172.17.107.187:9202"]
index => "filebeat_%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}" # decorate_events=true的作用,可以使用metadata中的數據
user => "elastic"
password => "escluter123456"
}
}