背景介紹:
Nginx為app打點數據,打點日志每小時滾動一次。目錄結構如下
文件中的數據如下( cat -A 2019072414r.log 后的結果,-A為顯示隱形的符號,下方^A為指定的分隔符。$為行尾結束符,換行的時候會自帶,不用關注。)
61.140.204.111^A20190724145548^A1563951348^A^A8671a9d406bd8733bf42d9644a009660^AJYH^Awin7^A2^Ahc_GC5H6A^A^A3^A1.0^ALSEARCH^AOTHER^A1563951348^A$
123.147.250.151^A20190724145552^A1563951352^A^A8a0fc9239dd100880053b1e1d0678a37^AYEAR^Awin10^A2^Ahc_GC5H6A^A^A3^A1.0^AOPENSINPUT^AOTHER^A1563951352^A$
182.142.98.33^A20190724145553^A1563951350^Aac74b3d92fdfea6249a8188556de2215^A380b0e9844c5aa4905a952908dc7ddf9^ALZQ^Awin7^A2^Ahc_GC5H6A^A^A3^A1.0^AOPENSINPUT^AOTHER^A1563949711^A$
182.142.98.33^A20190724145553^A1563951350^Aac74b3d92fdfea6249a8188556de2215^A380b0e9844c5aa4905a952908dc7ddf9^ALZQ^Awin7^A2^Ahc_GC5H6A^A^A3^A1.0^AOPENSINPUT^AOTHER^A1563951350^A$
^A分隔的15列數據依次對應如下列。
ip,date,upload_time,uid,uuid,pbv,opv,av,ch,mac,sc,st,event_type,pg,action_time
現在需要將這部分數據收集到hive中,以供后續分析計算使用。
實施步驟一.配置flume將數據存到HDFS
1.配置flume
flume需要在nginx所在服務器上
###source a2.sources = s2#ngxlog source a2.sources.s2.type = TAILDIR a2.sources.s2.channels = c3 a2.sources.s2.filegroups = f2 a2.sources.s2.filegroups.f2 = /data/logs/nginx/event_log/.*log a2.sources.s2.positionFile=/data/logs/nginx/event_log/taildir_position.json a2.sources.s2.fileHeader = true a2.sources.s2.fileHeader=true a2.sources.s2.fileHeaderKey=file a2.sources.s2.interceptors = i3 a2.sources.s2.interceptors.i3.type = regex_extractor a2.sources.s2.interceptors.i3.regex =(\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d\\d) a2.sources.s2.interceptors.i3.serializers = s2 a2.sources.s2.interceptors.i3.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a2.sources.s2.interceptors.i3.serializers.s2.name = timestamp a2.sources.s2.interceptors.i3.serializers.s2.pattern = yyyyMMddHHmmss ###channel a2.channels = c3 #huochai ngxlog channel a2.channels.c3.type = memory a2.channels.c3.capacity = 50000 a2.channels.c3.transactionCapacity = 10000 a2.channels.c3.byteCapacityBufferPercentage = 20 a2.channels.c3.byteCapacity = 134217728 ###sinks a2.sinks = k3 #k3-huochai ngxlog sink a2.sinks.k3.type = hdfs a2.sinks.k3.channel = c3 a2.sinks.k3.hdfs.path = /user/flume/huochai-events/day=%Y%m%d a2.sinks.k3.hdfs.filePrefix =%Y%m%d%H a2.sinks.k3.hdfs.rollInterval = 0 a2.sinks.k3.hdfs.batchSize=1000 a2.sinks.k3.hdfs.idleTimeout=300 a2.sinks.k3.hdfs.threadsPoolSize=10 a2.sinks.k3.hdfs.rollSize = 134217728 a2.sinks.k3.hdfs.rollCount=0 ##fileType SequenceFile DataStream CompressedStream a2.sinks.k3.hdfs.fileType= SequenceFile a2.sinks.k3.hdfs.codeC= snappy
2.對flume配置的一些說明
1) source類型為TAILDIR,這個類型是flume1.7.0才出來的,不過在CDH中1.6.0版本的flume就有這個功能了,具體說明見
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html#taildir-source
2) positionFile配置的文件,如果不存在會自動生成,並且如果不存在,會從頭讀所有文件的。
3) regex_extractor這個攔截器的作用是把每條log中 20190724145548 這個格式的時間 改為 timestamp的時間,放到flume的Event消息的Header中。
意即使用regex_extractor后Header中會產生一個key-value對為(timestamp:1563951348000)
4) a2.sinks.k3.hdfs.path = /user/flume/huochai-events/day=%Y%m%d
%Y%m%d 會取timestamp中的時間,轉成20190724這個格式,也就是說,一條log最終會放入到哪個路徑下,是由這條log中的時間決定的。
總體流程就是: log中20190724145548 被通過 regex_extractor提取成key-value對(timestamp:1563951348000),再由sinks.k3中配置的(%Y%m%d)解析出來,獲得最終的路徑。
3.刷新flume配置/重啟flume,查看HDFS中的結果
路徑正常按照日期生成
每個日期下每個小時會生成一個文件。
注解:為什么要每個小時生成一次文件
因為正在寫入的文件后綴為tmp,tmp后綴的文件在hive的外部表中是沒有辦法讀到數據的。每小時生成一次,可以保證數據最多只延遲一個小時。
實施步驟二.根據HDFS路徑創建hive外部表
1.在shell客戶端中連接hive。
beeline !connect jdbc:hive2://bigdata.node2:10000 hadoop 回車
2.創建外部表
CREATE EXTERNAL TABLE huochai_events ( ip STRING, date STRING, upload_time bigint, uid string, uuid string, pbv string, opv string, av string, ch string, mac string, sc string, st string, event_type string, pg string, action_time bigint ) PARTITIONED BY (day STRING) row format delimited fields terminated by '\u0001' STORED AS SequenceFile LOCATION 'hdfs:///user/flume/huochai-events';
3.關於外部表的一些說明
1) 一個目錄一個分區:PARTITIONED BY (day STRING) 對應的是HDFS目錄中的 /user/flume/huochai_evetns/day=20190723
2) '\u0001' 就是^A的unicode碼。row format delimited fields terminated by '\u0001'
3) STORED AS SequenceFile。Snappy+SequenceFile Hive的一種比較流行的壓縮存儲組合。日志是csv類型的,因此用這種組合。
4) 建議不要用json數據格式,否則你會遇到很多問題。比如Hive找不到JsonSerDe,或者Hive能用了Spark On Hive有問題,又或者前兩者都能用了,但Spark2 On Hive找不到。
如果執意要用是可以通過配置來解決的,但是配置步驟比較繁瑣,這里不單獨講。
4.修復分區
此時外部表已經創建完成了,但如果你select一下這個表,會發現沒有記錄。
原因是外部表下新增的分區是無法自動被發現的,需要在beeline中執行下面語句。
msck repair table huochai_events;
之后再select,便能查到數據了。
需要注意的是,明天生成新的目錄之后,你需要重新再執行一次這條命令。
你可以將這句話寫在離線任務中。
比如在spark應用代碼中加上如下一句:
sql("msck repair table huochai_events").show
注意引號中的sql語句結尾沒有分號。
總結
每個實際業務中可能有不同的實踐場景,數據格式也不盡相同,處理方式也有很多選擇(你可能還想用hive-stream),但是目前這種處理方式應該是flume-hive的最佳實踐了。
另外數據格式方面,最好使用扁平的csv類型格式的數據,hive可以處理json/數組/struct這類的數據,但並不代表它擅長處理這些。