使用logstash從Kafka中拉取數據並傳輸給elasticsearch且創建相應索引的操作


注意事項:默認Kafka傳遞給elastci的數據是在'data'字段,且不包含其他數據,所以需要使用額外的操作進行處理

logstash配置文件操作

input {

  kafka {
    bootstrap_servers => "172.17.107.187:9092,172.17.107.187:9093,172.17.107.187:9094"  # 字符串形式,kafka集群地址
    auto_offset_reset => "latest" # 拉取最近數據
    consumer_threads => 5 # 使用的線程數
    decorate_events => true   # 傳遞給elastci的數據增加附加數據
    topics => ["test_canal_topic"] # 拉取的kafka的指定topic
    tags => ["canal"] # 標簽,額外使用該參數可以在elastci中創建不同索引
  }
  
}


filter {
  # 把默認的data字段重命名為message字段,方便在elastic中顯示
  mutate {
    rename => ["data", "message"]
  }
  
  # 還可以使用其他的處理方式,在此就不再列出來了
}

output {

  elasticsearch {
    hosts => ["http://172.17.107.187:9203", "http://172.17.107.187:9201","http://172.17.107.187:9202"]
    index => "filebeat_%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}" # decorate_events=true的作用,可以使用metadata中的數據
    user => "elastic"
    password => "escluter123456"
  }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM