多個配置文件
user.conf
input {
kafka {
bootstrap_servers => "hadoop003:9092"
client_id => "user"
group_id => "yiyang_user"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => "yiyang_2"
type => "user"
codec => "json"
}
}
filter {
json {
source => "message"
}
}
output {
if [type] == "user" {
elasticsearch {
hosts => ["hadoop003:9200"]
index => "user_%{YYYY-MM-dd}"
}
}
stdout { codec => rubydebug }
}
record_traffic.conf
input {
kafka {
bootstrap_servers => "hadoop003:9092"
client_id => "record_traffic"
group_id => "yiyang_record_traffic"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => "yiyang"
type => "record_traffic"
codec => "json"
}
}
filter {
json {
source => "message"
}
}
output {
if [type] == "record_traffic" {
elasticsearch {
hosts => ["hadoop003:9200"]
index => "record_traffic-%{YYYY-MM-dd}"
}
}
stdout { codec => rubydebug }
}
將這兩個配置文件全部放到config目錄下.
**注意:因為input是兩個kafka,所以必須指定client_id **
啟動logstash
[root@hadoop003 logstash-7.6.2]# bin/logstash -f config/
啟動完成之后,我們開始測試
我們先發送user的數據到kafka。我們發送一條數據,logstash的日志,輸出了兩次
從圖上可以看出,多了一個type字段
但是最終還是存儲一條數據。至於日志為什么打印兩次,目前還不清楚。