一、概述
網站日志流量分析系統之(日志埋點)這里我們已經將相關數據通過ajax發送至日志服務器,這里我只用了一台日志服務器(本機Windows環境),日志收集主要分為以下幾個步驟:
①日志服務器集結合logback,並自定義日志過濾器,將日志發給對應FlumeAgent客戶端
②FlumeAgent客戶端根據接收器策略分發至中心服務器
③中心服務器將數據分別落地至HDFS及Kafka(這里先做離線分析,中心服務器落地HDFS;實時分析中心服務器的Flume策略暫時不加,后續實時分析時加上)
二、服務器規划
三、日志收集實現
①日志服務器結合logback,並自定義日志過濾器,將日志發送至FlumeAgent客戶端
繼續編寫日志服務器代碼(代碼已經上傳Github:https://github.com/Simple-Coder/log-demo),增加logback.xml配置如下:

<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE configuration> <configuration> <appender name="consoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyy MMM dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}:%L- %msg%n </pattern> </encoder> </appender> <!-- name:自取即可, class:加載指定類(ch.qos.logback.core.rolling.RollingFileAppender類會將日志輸出到>>>指定的文件中), patter:指定輸出的日志格式 file:指定存放日志的文件(如果無,則自動創建) rollingPolicy:滾動策略>>>每天結束時,都會將該天的日志存為指定的格式的文件 FileNamePattern:文件的全路徑名模板 (注:如果最后結尾是gz或者zip等的話,那么會自動打成相應壓縮包) --> <appender name="fileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 把日志文件輸出到:項目啟動的目錄下的log文件夾(無則自動創建)下 --> <file>log/logFile.log</file> <!-- 把日志文件輸出到:name為logFilePositionDir的property標簽指定的位置下 --> <!-- <file>${logFilePositionDir}/logFile.log</file> --> <!-- 把日志文件輸出到:當前磁盤下的log文件夾(無則自動創建)下 --> <!-- <file>/log/logFile.log</file> --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- TimeBasedRollingPolicy策略會將過時的日志,另存到指定的文件中(無該文件則創建) --> <!-- 把因為 過時 或 過大 而拆分后的文件也保存到目啟動的目錄下的log文件夾下 --> <fileNamePattern>log/logFile.%d{yyyy-MM-dd}.%i.log </fileNamePattern> <!-- 設置過時時間(單位:<fileNamePattern>標簽中%d里最小的時間單位) --> <!-- 系統會刪除(分離出去了的)過時了的日志文件 --> <!-- 本人這里:保存以最后一次日志為准,往前7天以內的日志文件 --> <MaxHistory> 7 </MaxHistory> <!-- 滾動策略可以嵌套; 這里嵌套了一個SizeAndTimeBasedFNATP策略, 主要目的是: 在每天都會拆分日志的前提下, 當該天的日志大於規定大小時, 也進行拆分並以【%i】進行區分,i從0開始 --> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>5MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> </rollingPolicy> <encoder> <pattern>%d{yyy MMM dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}:%L- %msg%n </pattern> </encoder> </appender> <appender name="flumeagent" class="com.teambytes.logback.flume.FlumeLogstashV1Appender"> <filter class="com.logs.filter.StrFilter"></filter> <flumeAgents> 192.168.229.132:33333 </flumeAgents> <flumeProperties> connect-timeout=4000; request-timeout=8000 </flumeProperties> <batchSize>1000</batchSize> <reportingWindow>1000</reportingWindow> <additionalAvroHeaders> myHeader = myValue </additionalAvroHeaders> <application>flumeagent</application> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %message%n%ex</pattern> </layout> </appender> <logger name="com" level="info"> <appender-ref ref="flumeagent"/> </logger> <root level="info"> <appender-ref ref="consoleAppender"/> </root> </configuration>
②編輯FlumeAgent客戶端配置文件(flumeagentlog.conf)

#聲明Agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
#聲明source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port =33333
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = ^(?:[^\\|]*\\|){14}\\d+_\\d+_(\\d+)\\|.*$
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
#聲明sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname =centerlogserver
a1.sinks.k1.port =33333
a1.sinks.k2.type = avro
a1.sinks.k2.hostname =centerlogserver2
a1.sinks.k2.port =33333
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
#聲明channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#綁定關系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
在flume/bin目錄下啟動該flume客戶端: ./flume-ng agent --conf ../conf/ --conf-file ../conf/flumeagentlog.conf --name a1 -Dflume.root.logger=INFO,console
③中心日志服務器代碼(centerlogserver.conf)(2台中心日志服務器代碼相同)

#配置agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#聲明Source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 33333
#聲明sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoopalone:9000/logdemo/reportTime=%Y-%m-%d
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.timeZone = GMT+8
#聲明channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#綁定關系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在flume/bin目錄下分別啟動兩個中心日志服務器flume:./flume-ng agent --conf ../conf/ --conf-file ../conf/centerlogserver.conf --name a1 -Dflume.root.logger=INFO,console
④啟動hadoopalone服務器,啟動hadoop的偽分布式:start-all.sh
⑤測試
(1)分別啟動:AppServer和LogServer,瀏覽器輸入:http://localhost:8080/appserver/a.jsp,分別點擊對應連接
(2)使用IDEA連接Hadoop偽分布式,查看結果如下:說明數據已經落地至HDFS,說明測試成功!
四、遇到的問題
(1)找不到Hadoop jar包,flume中的hdfs sink需要hadoop相關jar包的支持:
要么手動將hadoop相關jar包放置到flume的lib目錄下;
要么在本機中解壓hadoop並將hadoop路徑配置為HADOOP_HOME環境變量,使flume可以自動找到這些jar。
(2)產生大量小的文件
hdfs sink的滾動條件設置不合理。修改即可
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
(3)文件內容為亂碼(序列化文件無法直接查看)
hdfs sink默認產生SequenceFile文件,無法直接查看,修改即可
a1.sinks.k1.hdfs.fileType = DataStream
(4)按日期分目錄存儲
為了支持hive的分區處理,hdfs sink在將日志寫入到hdfs的過程中,希望按照日期分目錄存儲。
a1.sinks.k1.hdfs.path = hdfs://ns/flux/reportTime=%Y-%m-%d
並且通過攔截器在日志頭中增加timestamp頭
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.regex = ^(?:[^\\|]*\\|){14}\\d+_\\d+_(\\d+)\\|.*$
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
(5)生成的目錄時間不正確
配置hdfs采用的時區
a1.sinks.k1.hdfs.timeZone = GMT+8
五、總結
至此,完成了日志的收集,並落地至HDFS(落地至Kafka后續加),以供下節離線分析的數據來源,數據清洗處理之離線分析:網站日志流量分析系統之數據清洗處理(離線分析)