最近需要搭建一套日志監控平台,結合系統本身的特性總結一句話也就是:需要將Kafka中的數據導入到elasticsearch中。那么如何將Kafka中的數據導入到elasticsearch中去呢,總結起來大概有如下幾種方式:
- Kafka->logstash->elasticsearch->kibana(簡單,只需啟動一個代理程序)
- Kafka->kafka-connect-elasticsearch->elasticsearch->kibana(與confluent綁定緊,有些復雜)
- Kafka->elasticsearch-river-kafka-1.2.1-plugin->elasticsearch->kibana(代碼很久沒更新,后續支持比較差)
elasticsearch-river-kafka-1.2.1-plugin插件的安裝及配置可以參考:http://hqiang.me/2015/08/將kafka的數據導入至elasticsearch/
根據以上情況,項目決定采用方案一將Kafka中的數據存入到elasticsearch中去。
一、拓撲圖
項目拓撲圖如下所示:
此時消息的整體流向為:日志/消息整體流向Flume => kafka => logstash => elasticsearch => kibana
A.Flume日志收集
agent.sources = r1 agent.channels = c1 agent.sinks = s1 agent.sources.r1.type = exec agent.sources.r1.command = tail -F -n 0 /data01/monitorRequst.log agent.sources.r1.restart = true //解決tail -F進程被殺死問題 agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100 agent.sinks.s1.type = avro agent.sinks.s1.port = 50001 agent.sinks.s1.hostname = IP agent.sources.r1.channels = c1 agent.sinks.s1.channel = c1
Flume日志收集過程中踩過的坑可以參考:http://www.digitalsdp.com/Experiencebbs/maintenance/506.jhtml
B.Kafka Sink
agent.sources = r1 agent.channels = c2 agent.sinks = s2 agent.sources.r1.type = avro agent.sources.r1.bind = IP agent.sources.r1.port = 50001 agent.channels.c2.type = memory agent.channels.c2.capacity = 1000 agent.channels.c2.transactionCapacity = 100 agent.sinks.s2.type = org.apace.flume.sink.kafka.KafkaSink agent.sinks.s2.topic = XXX agent.sinks.s2.brokerList = IP:9091,IP:9092 agent.sinks.s2.batchSize = 20 agent.sources.r1.channels = c2 agent.sinks.s2.channel = c2
二、環境搭建
關於Kafka及Flume的搭建在這里不再詳細論述,如有需要請參見本文其它說明。在這里重點說明logstash的安裝及配置。
A.下載logstash的安裝包;
B.新建kafka-logstash-es.conf置於logstash/conf目錄下;
C.配置kafka-logstash-es.conf如下:
logstash的配置語法如下:
input { ...#讀取數據,logstash已提供非常多的插件,可以從file、redis、syslog等讀取數據 } filter{ ...#想要從不規則的日志中提取關注的數據,就需要在這里處理。常用的有grok、mutate等 } output{ ...#輸出數據,將上面處理后的數據輸出到file、elasticsearch等 }
示例:
input { kafka { zk_connect => "c1:2181,c2:2181,c3:2181" group_id => "elasticconsumer" ---隨意取 topic_id => "xxxlog" ---與flume中的Channel保持一致 reset_beginning => false consumer_threads => 5 decorate_events => true codec => "json" } } output { elasticsearch { hosts => ["c4:9200","c5:9200"] index => "traceid"--與Kafka中json字段無任何關聯關系,注意:index必須小寫 index => "log-%{+YYYY-MM-dd}" workers => 5 codec => "json" } }
運行logstash命令為:nohup bin/logstash -f /XXX/logstash/conf/kafka-logstash-es.conf &
三、調測過程中遇到的一些坑
A.在集成ELK過程中總以為head插件是必須的,其實head插件為非必需品。elasticsearch僅提供了一個數據存儲的煤介,head為了讓大家更方便的去查看數據;
B.采用以上方案進行布署時,當系統正常運行時,可以在elasticsearch服務器上http://IP:9200/*中搜索index是否創建成功
參考:https://www.slahser.com/2016/04/21/日志監控平台搭建-關於Flume-Kafka-ELK/