Nifi生產環境使用
1、服務器日志目錄內的 log 文件中,我們使用 Apache Flume 這個工具將原始數據抽取出來 kafka sink ,
2、Nifi接入kafka數據。
首先做驗證,然后過濾格式錯誤記錄,然后路由不同的日志類型. nifi能做到這些的關鍵在於它的 flowfile 這個概念. 每一條數據記錄進入到nifi中就叫flowfile. 每一個flowfile 由兩部分組成,一個是content, 文件內容. 一個是 attributes ,文件屬性. 在 nifi 中, 我們可以對文件屬性進行增刪改查等操作,甚至我們可以使用 nifi 提供的DSL,特定領域語言 對 attributes 進行編程. 這樣的設定使得可以對數據記錄進行任何想要的邏輯處理. 所以,一般是先把日志記錄的內容轉換到 flow file 的屬性值當中去,然后進行后續的不同處理. 如下圖:
接着繼續判斷,對於通過網絡獲取手機號碼成功的日志,將原始的日志記錄保存到 hbase 中,之后供業務方做即席查詢. 如下圖:
對於獲取手機號碼失敗的日志,手動去查用戶的地址和所屬運營商信息.這里是強業務相關的,因為屬於其他運營商,所以是獲取不到號碼的.這里的處理真正體現了nifi的強大和靈活.
因為對於失敗的日志,實際上是缺少必備字段的.缺少了字段, 這在日志文件批量處理中是多么坑爹的事情.然而使用 nifi 卻能很輕松的解決這個問題.直接拿這條記錄的用戶ip 去調 一個內部的服務化查詢接口,把字段查出來.並把值賦給flow file 相應的attribute. 把這樣的日志記錄變成正常的日志記錄后,再匯入到處理的主流程當中,接着流動下去.
接下的處理流程主要就是分發及轉存了.對於同一條日志記錄,一份數據要提取字段,將之轉成 hive表結構對應的 csv 格式 ,保存到 hdfs 中, 也就是將所有處理好的數據落地到hdp集群環境中去.這是數據清洗后一大終點,也是結果.之后可以拿來直接做其他處理了,比如做批量查詢,供其他工具使用(比如 kylin 等),也可以用來做模型訓練.因為這里就算是干凈的數據了.
另一份數據(假定歷史數據都已經處理完成,現在都是實時的數據)要提取業務需要的字段,導入到kafka的topic 中. 供業務需要的即時統計分析使用.
至此,整個 nifi 的數據處理流程都走完了. 歸功了 nifi 的強大 ,數據從起點到終點,雖然處理流程多,但流向非常清晰.用 nifi 拖拽幾下,一套特定業務日志的處理系統就完成了. 右鍵點擊 start ,系統就跑起來,你可以在界面看到數據的流動,可以監控,可以暫停,可以調試某段,可以查看中間結果.這些都是可以在界面上完成的. 用很簡單的使用方式,去做很復雜的事情,最牛逼的工具莫過於此了.當然, nifi 也有很多高級的用法. 甚至可以 搭一個 nifi 的集群 ,來處理更加海量的數據,這里就不細說了.
當然,nifi中一個很長的數據處理流程是需要花時間觀察,調試及驗證的.從一個處理器到另一個處理器是否通暢.數據是否阻塞在了流程圖中的某一段等問題.需要調試單個處理器的並發量及run schedule等信息.以及處理器之間的緩存隊列容量和大小等信息.這些需要耐心的調試,就像一個流動的人群一樣,慢慢疏導.
最后,附上實時數據統計的代碼,這是這個項目寫的唯一代碼了,使用的是 spark 技術棧 ,spark streaming 和 spark sql . 關鍵是這個group by 函數.把聚合后的數據保存mysql 里面,供業務應用查詢.這里后來把時間窗口改為了一小時,因為當時聚合后的數據量也還蠻大,2天存了35萬條到mysql里面,這樣的量放mysql 里面不太合適.
可能一開始的方案就不正確,后來也沒繼續跟進了.
以上,就是這個項目的一個完整記錄,做得有點粗糙,最后也沒有繼續跟進.很多細節沒有考慮到 ,比如由於網絡的問題,在整個流程中,是否會有數據的丟失?當由於某種原因導致流程中斷,是否會有數據重復.最后的結果,怎樣去校驗數據的完整性.這些問題理論上是聚焦於各個組件,比如sqoop, kafka ,nifi的機制上的,但實際跑起來之后,會有什么樣的問題.當時並沒有過細的研究.