Apache flume+Kafka獲取實時日志信息


Flume簡介以及安裝

  1. Flume是一個分布式的對海量日志進行采集,聚合和傳輸的系統。Flume系統分為三個組件,分別是source,sink,channel:source表明數據的來源,可能來自文件,Avro等,channel作為source和sink的橋梁,作為數據的臨時存儲地,channal是一個完整的事務,這一點保證了數據在收發的時候的一致性,支持的類型有: JDBC channel , File System channel , Memort channel等;sink表明數據的去向,可以把數據再次轉發到HDFS,或者Kafka等。

  2. 本文使用的版本是1.8.0(目前最新)的版本,可以到官網 進行下載。

  3. 解壓、安裝、配置

sudo tar -zxvf apache-flume-1.8.0-bin.tar.gz
cd apache-flume-1.8.0-bin
sudo vim conf/kafka.conf #這個文件剛開始並不存在,要新建

kafka.conf的具體內容:

 

# 分別對應三種基礎組件,起的別名 kafka是在啟動flume的時候,指定的agent的名字
kafka.sources = src
kafka.sinks = sk
kafka.channels = chl

# 表明需要收集的數據來自avro,此處配置會啟動avro server
kafka.sources.src.type = avro
kafka.sources.src.bind = localhost
kafka.sources.src.port = 44446

# Flume 收集的數據轉發到Kafka的關鍵配置
kafka.sinks.sk.type = org.apache.flume.sink.kafka.KafkaSink
kafka.sinks.sk.kafka.bootstrap.servers = localhost:9092  #指定kafka集群的地址
kafka.sinks.sk.partition.key=0
kafka.sinks.sk.partitioner.class=org.apache.flume.plugins.SinglePartition
kafka.sinks.sk.serializer.class=kafka.serializer.StringEncoder
kafka.sinks.sk.request.required.acks=0
kafka.sinks.sk.max.message.size=1000000
kafka.sinks.sk.producer.type=sync
kafka.sinks.sk.topic=log  #指定kafka的topic

# Use a channel which buffers events in memory
kafka.channels.chl.type = memory
kafka.channels.chl.capacity = 1000
kafka.channels.chl.transactionCapacity = 100

# Bind the source and sink to the channel
kafka.sources.src.channels = chl
kafka.sinks.sk.channel = chl

Flume服務啟動:

 

bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name kafka -Dflume.root.logger=INFO,LOGFILE

ps:上述命令也可以通過nohup方式啟動
至此,flume已經啟動完成了!
接下來我們請出另一個主角 Kafka
同樣是下載,安裝,配置的步驟:

 

wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz
tar -zxf kafka_2.12-1.0.0.tgz
cd kafka_2.12-1.0.0
vim config/server.properties

下面是config/server.properties的配置和說明:

 

#指定broker的id,數字,但不能過大
broker.id=0
#指定服務的監聽端口,默認是9092
port=9092
#這個地方需要特別注意,在代碼中使用的時候,需要完全復制這個地方的配置,如localhost:9092,但是如果是分布式的,localhost顯然是不符合要求的,最好寫成當前機器的ipv4的地址,這里寫localhost方便單機測試和開發
listeners=PLAINTEXT://localhost:9092
#同上,具體含義看官方源文件中的說明
advertised.listeners=PLAINTEXT://localhost:9092
#指定zk的地址
zookeeper.connect=localhost:2181

配置基本上是可以了,然后開始run

 

bin/kafka-server-start config/server.properties

注意: 在啟動kafka之前需要先啟動zk,可以從官網專門下載一個zk,或者使用kafka自帶的zk,都可以。

 

 

Java代碼實現日志打印並被Flume采集

接下來就需要在代碼實現日志的打印,通過flume的采集,然后發送到kafka,其實flume采集發送到kafka通過上述配置就已經完成了,現在就做第一件事:
我個人采用spring boot進行實現的,在pom中進行如下配置:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
        <exclusions><!--把logback忽略,使用log4j-->
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <!--flume對log4j的支持,使用avro的關鍵-->
    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.5.0</version>
    </dependency>

然后在src/main/resources下面,加入log配置文件:log4j.properties

 

# set log levels
log4j.rootLogger=INFO, stdout, file, flume
log4j.logger.per.flume=INFO

#flume#
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
##重點在這里!!這個地方就是在flume配置的avro的server地址,程序中產生的log都會通過avro的方式,被flume所采集
log4j.appender.flume.Hostname=localhost
log4j.appender.flume.Port=44446

#stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

 #file 
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=INFO
log4j.appender.file.File=/tmp/logs/real-log.log
log4j.appender.file.Append=true
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

關於這個項目的簡單代碼,已經在我的gitee上了,這個demo主要是通過打印日志,flume 收集並publish 到 kafka指定的topic上,然后通過kafka consume接受到,后續會通過storm進行實時計算和處理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM