日志收集之--將Kafka數據導入elasticsearch


     最近需要搭建一套日志監控平台,結合系統本身的特性總結一句話也就是:需要將Kafka中的數據導入到elasticsearch中。那么如何將Kafka中的數據導入到elasticsearch中去呢,總結起來大概有如下幾種方式:

  • Kafka->logstash->elasticsearch->kibana(簡單,只需啟動一個代理程序)
  • Kafka->kafka-connect-elasticsearch->elasticsearch->kibana(與confluent綁定緊,有些復雜)
  • Kafka->elasticsearch-river-kafka-1.2.1-plugin->elasticsearch->kibana(代碼很久沒更新,后續支持比較差)

elasticsearch-river-kafka-1.2.1-plugin插件的安裝及配置可以參考:http://hqiang.me/2015/08/將kafka的數據導入至elasticsearch/

 根據以上情況,項目決定采用方案一將Kafka中的數據存入到elasticsearch中去。

一、拓撲圖

    項目拓撲圖如下所示:

  此時消息的整體流向為:日志/消息整體流向Flume => kafka => logstash => elasticsearch => kibana

 A.Flume日志收集

agent.sources = r1
agent.channels = c1
agent.sinks = s1

agent.sources.r1.type = exec
agent.sources.r1.command = tail -F -n 0 /data01/monitorRequst.log
agent.sources.r1.restart = true  //解決tail -F進程被殺死問題


agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

agent.sinks.s1.type = avro
agent.sinks.s1.port = 50001
agent.sinks.s1.hostname = IP
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1

Flume日志收集過程中踩過的坑可以參考:http://www.digitalsdp.com/Experiencebbs/maintenance/506.jhtml

 B.Kafka Sink

agent.sources = r1
agent.channels = c2
agent.sinks = s2

agent.sources.r1.type = avro
agent.sources.r1.bind = IP
agent.sources.r1.port = 50001

agent.channels.c2.type = memory
agent.channels.c2.capacity = 1000
agent.channels.c2.transactionCapacity = 100

agent.sinks.s2.type = org.apace.flume.sink.kafka.KafkaSink
agent.sinks.s2.topic = XXX
agent.sinks.s2.brokerList = IP:9091,IP:9092
agent.sinks.s2.batchSize = 20

agent.sources.r1.channels = c2
agent.sinks.s2.channel = c2

二、環境搭建

 關於Kafka及Flume的搭建在這里不再詳細論述,如有需要請參見本文其它說明。在這里重點說明logstash的安裝及配置。

  A.下載logstash的安裝包; 

  B.新建kafka-logstash-es.conf置於logstash/conf目錄下;

  C.配置kafka-logstash-es.conf如下:

logstash的配置語法如下:

input {
  ...#讀取數據,logstash已提供非常多的插件,可以從file、redis、syslog等讀取數據
}

filter{
  ...#想要從不規則的日志中提取關注的數據,就需要在這里處理。常用的有grok、mutate等
}

output{
  ...#輸出數據,將上面處理后的數據輸出到file、elasticsearch等
}

示例:

input {
    kafka {
        zk_connect => "c1:2181,c2:2181,c3:2181"
        group_id => "elasticconsumer"   ---隨意取
        topic_id => "xxxlog"  ---與flume中的Channel保持一致
        reset_beginning => false 
        consumer_threads => 5  
        decorate_events => true 
        codec => "json"
        }
    }
output {
    elasticsearch {
        hosts => ["c4:9200","c5:9200"]
        index => "traceid"--與Kafka中json字段無任何關聯關系,注意:index必須小寫
        index => "log-%{+YYYY-MM-dd}"
        workers => 5
        codec => "json"
		  }
     }

 運行logstash命令為:nohup bin/logstash -f /XXX/logstash/conf/kafka-logstash-es.conf &

三、調測過程中遇到的一些坑

A.在集成ELK過程中總以為head插件是必須的,其實head插件為非必需品。elasticsearch僅提供了一個數據存儲的煤介,head為了讓大家更方便的去查看數據; 

B.采用以上方案進行布署時,當系統正常運行時,可以在elasticsearch服務器上http://IP:9200/*搜索index是否創建成功

參考:https://www.slahser.com/2016/04/21/日志監控平台搭建-關於Flume-Kafka-ELK/

          http://www.jayveehe.com/2017/02/01/elk-stack/

          http://wdxtub.com/2016/11/19/babel-log-analysis-platform-1/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM