將nginx搜集到的日志通過flume轉到hive


背景介紹:

Nginx為app打點數據,打點日志每小時滾動一次。目錄結構如下

 

文件中的數據如下( cat -A 2019072414r.log 后的結果,-A為顯示隱形的符號,下方^A為指定的分隔符。$為行尾結束符,換行的時候會自帶,不用關注。)

61.140.204.111^A20190724145548^A1563951348^A^A8671a9d406bd8733bf42d9644a009660^AJYH^Awin7^A2^Ahc_GC5H6A^A^A3^A1.0^ALSEARCH^AOTHER^A1563951348^A$
123.147.250.151^A20190724145552^A1563951352^A^A8a0fc9239dd100880053b1e1d0678a37^AYEAR^Awin10^A2^Ahc_GC5H6A^A^A3^A1.0^AOPENSINPUT^AOTHER^A1563951352^A$
182.142.98.33^A20190724145553^A1563951350^Aac74b3d92fdfea6249a8188556de2215^A380b0e9844c5aa4905a952908dc7ddf9^ALZQ^Awin7^A2^Ahc_GC5H6A^A^A3^A1.0^AOPENSINPUT^AOTHER^A1563949711^A$
182.142.98.33^A20190724145553^A1563951350^Aac74b3d92fdfea6249a8188556de2215^A380b0e9844c5aa4905a952908dc7ddf9^ALZQ^Awin7^A2^Ahc_GC5H6A^A^A3^A1.0^AOPENSINPUT^AOTHER^A1563951350^A$ 

^A分隔的15列數據依次對應如下列。

ip,date,upload_time,uid,uuid,pbv,opv,av,ch,mac,sc,st,event_type,pg,action_time

現在需要將這部分數據收集到hive中,以供后續分析計算使用。

實施步驟一.配置flume將數據存到HDFS

1.配置flume

flume需要在nginx所在服務器上

###source
a2.sources = s2#ngxlog source
a2.sources.s2.type = TAILDIR
a2.sources.s2.channels = c3
a2.sources.s2.filegroups = f2
a2.sources.s2.filegroups.f2 = /data/logs/nginx/event_log/.*log
a2.sources.s2.positionFile=/data/logs/nginx/event_log/taildir_position.json
a2.sources.s2.fileHeader = true
a2.sources.s2.fileHeader=true
a2.sources.s2.fileHeaderKey=file
a2.sources.s2.interceptors = i3
a2.sources.s2.interceptors.i3.type = regex_extractor
a2.sources.s2.interceptors.i3.regex =(\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d)
a2.sources.s2.interceptors.i3.serializers = s2
a2.sources.s2.interceptors.i3.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a2.sources.s2.interceptors.i3.serializers.s2.name = timestamp
a2.sources.s2.interceptors.i3.serializers.s2.pattern = yyyyMMddHHmmss

###channel
a2.channels = c3
#huochai ngxlog channel
a2.channels.c3.type = memory
a2.channels.c3.capacity = 50000
a2.channels.c3.transactionCapacity = 10000
a2.channels.c3.byteCapacityBufferPercentage = 20
a2.channels.c3.byteCapacity = 134217728

###sinks
a2.sinks =  k3
#k3-huochai ngxlog sink
a2.sinks.k3.type = hdfs
a2.sinks.k3.channel = c3
a2.sinks.k3.hdfs.path = /user/flume/huochai-events/day=%Y%m%d
a2.sinks.k3.hdfs.filePrefix =%Y%m%d%H
a2.sinks.k3.hdfs.rollInterval = 0
a2.sinks.k3.hdfs.batchSize=1000
a2.sinks.k3.hdfs.idleTimeout=300
a2.sinks.k3.hdfs.threadsPoolSize=10
a2.sinks.k3.hdfs.rollSize = 134217728
a2.sinks.k3.hdfs.rollCount=0
##fileType SequenceFile DataStream CompressedStream
a2.sinks.k3.hdfs.fileType= SequenceFile
a2.sinks.k3.hdfs.codeC= snappy

 

2.對flume配置的一些說明

1) source類型為TAILDIR,這個類型是flume1.7.0才出來的,不過在CDH中1.6.0版本的flume就有這個功能了,具體說明見

http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html#taildir-source 

2) positionFile配置的文件,如果不存在會自動生成,並且如果不存在,會從頭讀所有文件的。

3) regex_extractor這個攔截器的作用是把每條log中 20190724145548 這個格式的時間 改為 timestamp的時間,放到flume的Event消息的Header中。

     意即使用regex_extractor后Header中會產生一個key-value對為(timestamp:1563951348000)

4) a2.sinks.k3.hdfs.path = /user/flume/huochai-events/day=%Y%m%d

    %Y%m%d 會取timestamp中的時間,轉成20190724這個格式,也就是說,一條log最終會放入到哪個路徑下,是由這條log中的時間決定的。

   總體流程就是: log中20190724145548 被通過 regex_extractor提取成key-value對(timestamp:1563951348000),再由sinks.k3中配置的(%Y%m%d)解析出來,獲得最終的路徑。

 

3.刷新flume配置/重啟flume,查看HDFS中的結果

 

 路徑正常按照日期生成

 

 

每個日期下每個小時會生成一個文件。

注解:為什么要每個小時生成一次文件

因為正在寫入的文件后綴為tmp,tmp后綴的文件在hive的外部表中是沒有辦法讀到數據的。每小時生成一次,可以保證數據最多只延遲一個小時。

實施步驟二.根據HDFS路徑創建hive外部表

1.在shell客戶端中連接hive。 

beeline
!connect jdbc:hive2://bigdata.node2:10000
hadoop
回車

2.創建外部表

 CREATE EXTERNAL TABLE huochai_events (
       ip STRING,
       date STRING,
       upload_time bigint,
       uid string,
       uuid string,
       pbv string,
       opv string,
       av string,
       ch string,
       mac string,
       sc string,
       st string,
       event_type string,
       pg string,
       action_time bigint
     ) 
     PARTITIONED BY (day STRING)
row format delimited fields terminated by '\u0001'
STORED AS 
SequenceFile
     LOCATION 'hdfs:///user/flume/huochai-events';

 

3.關於外部表的一些說明

1)  一個目錄一個分區:PARTITIONED BY (day STRING) 對應的是HDFS目錄中的 /user/flume/huochai_evetns/day=20190723

2)  '\u0001' 就是^A的unicode碼。row format delimited fields terminated by '\u0001'

3) STORED AS SequenceFile。Snappy+SequenceFile  Hive的一種比較流行的壓縮存儲組合。日志是csv類型的,因此用這種組合。

4) 建議不要用json數據格式,否則你會遇到很多問題。比如Hive找不到JsonSerDe,或者Hive能用了Spark On Hive有問題,又或者前兩者都能用了,但Spark2 On Hive找不到。

    如果執意要用是可以通過配置來解決的,但是配置步驟比較繁瑣,這里不單獨講。

4.修復分區

此時外部表已經創建完成了,但如果你select一下這個表,會發現沒有記錄。

原因是外部表下新增的分區是無法自動被發現的,需要在beeline中執行下面語句。

msck repair table huochai_events;

之后再select,便能查到數據了。

需要注意的是,明天生成新的目錄之后,你需要重新再執行一次這條命令。

你可以將這句話寫在離線任務中。

比如在spark應用代碼中加上如下一句:

sql("msck repair table huochai_events").show

 注意引號中的sql語句結尾沒有分號。

 

總結

每個實際業務中可能有不同的實踐場景,數據格式也不盡相同,處理方式也有很多選擇(你可能還想用hive-stream),但是目前這種處理方式應該是flume-hive的最佳實踐了。

另外數據格式方面,最好使用扁平的csv類型格式的數據,hive可以處理json/數組/struct這類的數據,但並不代表它擅長處理這些。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM