寫入到kafka
input {
stdin { }
}
output {
kafka {
bootstrap_servers => "10.0.0.200:9092"
topic_id => "test"
}
}
##從kafka中將數據讀出,寫入到 elasticsearch中
input {
kafka {
bootstrap_servers => ["10.0.0.200:9092"] ##kafka地址,可以是集群
client_id => "test"
auto_offset_reset => "latest" ##從最新的偏移量開始消費
topics => ["test"] ## 數組類型,可配置多個topic
decorate_events => true #此屬性會將當前topic、offset、group、partition等信息也帶到message中
consumer_threads => 5
}
}
output {
elasticsearch {
hosts => ["10.0.0.140:9200"]
index => "kafka-%{+YYYY.MM.dd}"
}
stdout{
codec => rubydebug ##輸出到屏幕上
}
}
