flume 整合kafka


 背景:系統的數據量越來越大,日志不能再簡單的文件的保存,如此日志將會越來越大,也不方便查找與分析,綜合考慮下使用了flume來收集日志,收集日志后向kafka傳遞消息,下面給出具體的配置

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,   
# in this case called 'agent'

agent.sources = r1
agent.channels = c1
agent.sinks = s1

# For each one of the sources, the type is defined
agent.sources.r1.type = netcat
agent.sources.r1.bind = localhost
agent.sources.r1.port = 10245
agent.sources.r1.charset = UTF-8

# The channel can be defined as follows.
agent.sources.r1.channels = c1

# Each sink's type must be defined
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = test
agent.sinks.s1.brokerList = ip:9092
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 20
agent.sinks.s1.channel = c1

# Each channel's type is defined.
agent.channels.c1.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.c1.capacity = 100

啟動方式:

   bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name agent -Dflume.root.logger=INFO,console

再啟動之前一定要先啟動kafka,這里可能會有一個錯誤

  

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired

這個是因為默認情況下kafka是廣播的localhost,所以如果不是同一個機器需要修改下配置

advertised.listeners=PLAINTEXT://ip:9092把默認的localhost替換成IP地址 重新啟動下就可以了.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM