1. 引言
Apache NiFi是一個易於使用、功能強大、安全可靠的數據處理和分發系統, 可用於自動化管理不同系統之間的數據流(dataflow)。NiFi 基於 Web 界面定 義數據處理流程,並由后台在服務器上進行調度。NiFi支持從多種數據源動態拉 取數據,可以輕松完成數據同步工作。本文檔將簡單介紹一些NiFi的基礎概念, 並說明如何安裝配置和實現一些常見的數據處理場景。 2. 核心概念
NiFi的設計理念與基於流程的編程fbp(Flow Based Programming)的 主要思想密切相關。 2.1 基本術語
2.1.1 FlowFile
FlowFile表示在數據處理過程中流動的每個對象,也就是我們要處理的數 據,它包括一組屬性鍵值對(attributes,如uuid、filename、path等)和字 節內容(content,即數據本身)。 2.1.2 FlowFile Processor
FlowFile處理器用於完成對數據的實際處理工作,包括但不限於對數據內容 和屬性的加載、路由、轉換、輸出等。 NiFi內置了上百個處理器,基本覆蓋了常見的數據處理場景。從功能上來看, 處理器可以大致分為數據提取、數據轉換、數據路由、數據庫訪問、屬性提取等 類型,我們將簡單介紹一些典型的處理器。 2.1.2.1 數據提取
GetFile:監控本地磁盤或者網絡連接磁盤中的文件夾,讀取文件內容並 將其封裝為一個FlowFile,會忽略沒有讀取權限的文件; GetFTP:獲取 FTP 服務器上的文件,封裝 FlowFile 並將源文件刪除, 主要用於移動文件,而不是復制文件; GetSFTP:獲取SFTP服務器上的文件,封裝FlowFile並將源文件刪除,
主要用於移動文件,而不是復制文件; GetJMSQueue:從ActiveMQ JMS消息隊列讀取消息; GetJMSTopic:從 ActiveMQ JMS消息隊列的Topic讀取消息,包括持 久訂閱和非持久訂閱; ListenUDP:監聽傳入的UDP數據包,並為每個數據包或每一組數據包 創建一個FlowFile; ListenTCP:監聽TCP連接,並使用行分隔符讀取數據; GetHDFS:監視HDFS指定的目錄,每當有新文件進入HDFS時,就會 創建一個 FlowFile 並將其從 HDFS 中刪除;如果該處理器在群集中運行,為了 保持數據的完整性,只能允許在主節點上運行; ConsumeKafka:消費Kafka消息。 2.1.2.2 數據轉換
CompressContent:使用指定的壓縮算法壓縮或解壓縮流文件內容; ConvertCharacterSet:轉換流文件內容的字符集; EncryptContent:加密或解密流文件; ReplaceText:通過正則表達式來替換匹配規則的部分流文件內容; TransformXml:將 XSLT 文件(可擴展樣式表轉換語言,Extensible Stylesheet Language Transformations)轉換為XML文件; JoltTransformJSON:使用Jolt規范轉換JSON內容。 2.1.2.3 數據路由
ControlRate: 控制數據流傳輸到后續處理器的速率; DetectDuplicate: 根據自定義的條件,監測重復的FlowFile; DistributeLoad:根據分發策略將流文件分發給下游處理器,可用於實 現負載均衡或數據抽樣; MonitorActivity:當我們定義的時間范圍內沒有收到任何數據流時發送 通知,或者在數據流恢復時發送通知; RouteOnAttribute:基於FlowFile的屬性路由數據;
ScanAttribute: 掃描FlowFile上的自定義屬性,檢查是否有屬性與自定 義的詞典相匹配,從而路由數據; ScanContent:掃描FlowFile的內容,檢查是否存在自定義的詞典條目, 從而路由數據,該詞典可以包含文本條目或二進制條目; RouteOnContent:搜索FlowFile的內容,檢查是否與自定義的正則表 達式匹配,根據匹配結果路由數據; ValidateXml:使用 XML Schema 驗證 FlowFile 的內容,根據數據的 有效性來路由。 2.1.2.4 數據庫訪問
ExecuteSQL:執行自定義的 SQL 語句,並將返回結果轉換為 Avro 格 式的FlowFile; PutSQL:執行FlowFile內容中定義的SQL DML(Insert、Update 或 Delete)語句,FlowFile屬性可以作為參數,用於構建SQL語句; ConvertJSONToSQL:將 JSON 轉化為 Insert 或者 Update 語句,並 將其發送給PutSQL處理器; GetMongo: 在 Mongo 數據庫中執行自定義的查詢語句,並將返回結 果封裝為一個FlowFile; PutMongo:將FlowFile內容寫入到MongoDB。 SelectHiveQL:在 Apache Hive 數據庫中執行自定義的 HiveQL 查詢 語句,將並結果封裝為Avro或CSV格式的FlowFile; PutHiveQL:在 Apache Hive 數據庫中執行 FlowFile 內容中定義的 HiveQL DDM語句。 2.1.2.5 屬性提取
EvaluateXPath: 使用 XPath 路徑表達式對 XML 格式的 FlowFile 內容 進行計算,用計算結果替換FlowFile內容,或者將其提取到自定義的屬性中; EvaluateXQuery:使用 XQuery 語言對 XML 格式的 FlowFile 內容進 行計算,用計算結果替換FlowFile內容,或者將其提取到自定義的屬性中;
EvaluateJsonPath: 使用 JSONPath(XPath 在 JSON 中的應用)表達 式對JSON格式的FlowFile內容進行計算,用計算結果替換FlowFile內容,或 者將其提取到自定義的屬性中; ExtractText: 使用一個或多個正則表達式從FlowFile內容中提取數據, 用於替換FlowFile內容,或者將其提取到自定義的屬性中; HashAttribute:對自定義的屬性列表串聯執行哈希函數; HashContent:對FlowFile內容執行哈希函數,並將結果添加為屬性;
IdentifyMimeType:基於FlowFile內容判定其MIME類型; UpdateAttribute:向FlowFile添加或更新任意數量的自定義屬性,常 用於添加靜態屬性值或使用表達式語言動態計算的屬性值。 2.1.2.6 系統交互
ExecuteProcess: 執行操作系統命令(比如Linux的Shell命令),並將 執行結果封裝為一個FlowFile;此處理器為源處理器,且不接受參數輸入; ExecuteStreamCommand:將輸入的FlowFile內容作為命令,並將命 令的執行結果輸出為新的FlowFile。 2.1.2.7 拆分與合並
SplitText:根據配置的規則(比如用換行符分割)將接收到的單個文本 FlowFile拆分為1個或多個FlowFile; SplitJson:將復雜的JSON對象(比如由數組或多個子對象組成)的拆 分為單個JSON元素的FlowFile; SplitXml:將XML格式的FlowFile拆分為多個; UnpackContent:解壓縮不同類型的存檔格式,比如 Zip、Tar 等,將 解壓縮后的每個文件作為新的FlowFile; SegmentContent:根據配置的數據大小,基於字節偏移將FlowFile拆 分為較小的FlowFile;可用於並行發送不同的片段來提高傳輸性能,下游接收到 的這些 FlowFiles還可以通過MergeContent處理器重新組裝; MergeContent:根據規則將多個FlowFile合並為一個FlowFile;
SplitContent:根據指定內容拆分FlowFile。 2.1.2.8 數據輸出
PutEmail:向配置的郵件地址發送電子郵件,FlowFile內容可以作為附 件發送; PutFile:將FlowFile內容寫入本地(或網絡連接)的文件夾中; PutFTP:將FlowFile內容復制到遠程FTP服務器中; PutSFTP:將FlowFile內容復制到遠程SFTP服務器中; PutJMS:將 FlowFile 內容作為 JMS 消息發送到 JMS 代理,還可以選 擇將FlowFile屬性作為JMS屬性; PublishKafka:將FlowFile內容作為消息發送到Kafka。 2.1.2.9 HTTP
GetHTTP:執行 HTTP 或 HTTPS 請求,並將結果作為一個 FlowFile, 處理器可以記錄上次請求時間,以確保不會重復獲取數據; PostHTTP:將FlowFile內容作為參數執行HTTP POST請求; ListenHTTP:啟動HTTP或HTTPS服務器並監聽傳入的連接,將POST 請求內容將作為FlowFile內容,並返回200響應; InvokeHTTP:執行已配置的HTTP請求; HandleHttpRequest/HandleHttpResponse:HandleHttpRequest 處 理器是一個源處理器,會啟動一個類似於 ListenHTTP 的嵌入式 HTTP(S)服 務器, 用HTTP請求創建一個FlowFile, HandleHttpResponse能夠在FlowFile 處理完成后將響應發送回客戶端。 2.1.3 Connection
Connection用於連接處理器,其充當了一個隊列的角色,可以用來臨時保 存FlowFile在JVM內存中的位置。Connection可以設置負載上限(FlowFile 個數或占用JVM內存總大小),如果超出閾值,則上游處理器將停止執行(即為 背壓機制),所以也可以將Connection理解為一個緩沖區。另外,還可以在 Connection中配置FlowFile的處理優先級。
2.1.4 Flow Controller
Flow Controller是NiFi的核心,它為處理器(Processor)提供線程資源, 同時管理處理器的調度執行。 2.1.5 Process Group
顧名思義,一堆處理器及其連接可以組成一個Process Group,我們可以 通過簡單地組合不同的組件,來得到一個具有全新功能的組件。 2.2 整體架構
NiFi的整體架構如下圖所示:
2.2.1 Web Server
Web Server提供了基於HTTP的命令和可視化用戶界面,使得我們可以方 便地創建、修改和監控數據處理流程。 2.2.2 Flow Controller
如上所述,Flow Controller是整個數據處理流程的核心。 2.2.3 Extensions
NiFi支持自定義擴展Processor。
2.2.4 Repositories
NiFi使用了三個存儲庫(Repositories), 均為本地存儲上的目錄,用於保 存Flow File數據。 2.2.4.1 FlowFile Repository
FlowFile Repository存儲的是當前使用的所有FlowFile的元數據,包括屬 性、狀態(比如此時屬於哪個隊列)和內容的指針,相當於為我們提供了數據處 理流程的最新狀態,因此FlowFile Repository可以用來做故障恢復工具。 在數據處理流程的每個步驟中,對FlowFile進行修改之前,NiFi會將其以 預寫日志的方式(Write-Ahead Log)記錄在FlowFile Repository中。 2.2.4.2 Content Repository
Content Repository是存儲FlowFile內容的庫,因此其數據量要遠遠大於 另外兩個庫。Content Repository中存放的內容是只讀的,也就說FlowFile的 內容一旦寫到磁盤上,就不會被更新,直到數據過期,如果需要修改FlowFile 的內容,NiFi將復制一份新的數據進行操作,這樣可以最大幅度的保證數據處理 的吞吐量和數據的任意狀態重放,也便於歷史數據的查詢和回放。 2.2.4.3 Provenance Repository
Provenance Repository是存儲所有FlowFile歷史記錄的地方,為每一條 數據都提供了數據血統圖。每當FlowFile被創建、克隆、修改時,就會產生一 條Provenance Event,每一條Provenance Event都存放了FlowFile的所有屬 性和指向FlowFile內容的指針。Provenance Repository 會存放一段時間內的 Provenance Event,可用於查看血統圖和回放數據。