簡介
https://www.cnblogs.com/ronnieyuan/p/11935871.html
https://github.com/apache/nifi
Apache NiFi是一個基於流編程概念的數據流Data Flow 框架系統。
可以對來自多種數據源的流數據進行處理
- Data Flow 數據流 :解決的是數據端到端傳輸的問題。
數據流中的數據可以來自很多種類型,比如 CSV、JSON、HTTP、IoT 和音視頻流等等。 - Data Pipeline 數據管道 :提供多渠道數據來源來進行實時攝取、數據清洗、任務流管理、元數據管理、流批一體等功能。
講的可以:https://www.pianshen.com/article/2812347473/
可以看看:
https://www.e-learn.cn/content/qita/627407
https://www.e-learn.cn/content/wangluowenzhang/2193412
https://www.e-learn.cn/topic/1447804
https://www.e-learn.cn/tag/nifi
NiFi有一個基於web的用戶界面,用於設計、控制、反饋和監視數據流。
它在服務質量的幾個維度上是高度可配置的,比如容錯與保證交付、低延遲與高吞吐量以及基於優先級的隊列。
NiFi為所有接收的、分叉的、連接的、克隆的、修改的、發送的和最終到達配置的最終狀態時丟棄的數據提供細粒度的數據來源。
特點
-
基於 Web 的用戶界面
設計、控制、反饋和監控之間的無縫體驗 -
高度可配置
2.1 容錯與可靠投遞
2.2 低延遲與高吞吐量
2.3 動態優先級
2.4 流可以在運行時修改
2.5 背壓 -
數據來源
從頭到尾跟蹤數據流 -
專為擴展而設計
4.1 構建自己的處理器等
4.2 實現快速開發和有效測試 -
安全
5.1 SSL、SSH、HTTPS、加密內容等...
5.2 多租戶授權和內部授權/策略管理
————————————————
版權聲明:本文為CSDN博主「DataFlow范式」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
一個大佬的講解:https://blog.csdn.net/jiangshouzhuang/article/details/105609041
安裝
linux: wget https://archive.apache.org/dist/nifi/1.11.4/nifi-1.11.4-bin.tar.gz
-b 后台執行,wget-log 記錄日志
https://www.cnblogs.com/h--d/p/10079418.html
nifi.properties:
# web properties #
nifi.web.http.host=
nifi.web.http.port=8080
nifi.web.http.network.interface.default=
nifi.web.https.host=
nifi.web.https.port=
其他配置參考:https://blog.csdn.net/weibokong789/article/details/88554855
mac:brew install nifi
日志:/nifi/1.11.4/libexec/logs/nifi-app.log
啟動命令
./bin/nifi.sh start
訪問 localhost:8080/nifi/
stop、status
mac:nifi start
k8s安裝
文檔
在線:https://nifi.apache.org/docs/nifi-docs/html/getting-started.html#for-linux-macos-users
別人翻譯的:https://nifichina.gitee.io/3-Processors/AttributesToCSV.html#屬性配置
工作流組成
https://blog.csdn.net/memoordit/article/details/78804594
- Processor 處理器,是用於監聽傳入數據的 NiFi 組件、從外部獲取數據、對外發布數據,以及從 FlowFiles 中路由、轉換或提取信息。
可以用來創建、發送、接受、轉換、路由、割裂、合並、處理FlowFiles。
包含的類型很多,多達 兩百多個,比如傳統數據庫、大數據組件、日志文件、消息流、aws 雲服務的產品(DynamoDB、Lambda、S3 等)等。
處理器可以使用標准調度方法將此處理器調度為在計時器或使用CRON,也可以由傳入的流文件觸發(有些組件不支持,有的是experimental)。
CRON即是Crontab的應用,CRON的各參數含義分別代表:秒、分、時、日、月、周、年,需要配合、?和L共同執行(代表字段的值都有效;?代表對於指定的字段不指定值;L代表長整形)。如:“0 0 13 * * ?”代表想要在每天下午1點進行調度執行。
https://www.cnblogs.com/zxbdboke/p/11299506.html
- FlowFile 表示 NiFi 中的單個數據塊。
一個 FlowFile 由兩個組件組成: - FlowFile Content
Content 是 FlowFile 數據流的實際內容,比如通過 GetFile、GetHTTP 等方式獲取文件的實際內容。 - FlowFile Attributes
FlowFile 的元數據信息,包含 Content 的信息有:
FlowFile 什么時候創建、FlowFile 名字、FlowFile 來自哪里、FlowFile 表示什么等。
Processor 可以添加、更新或刪除 FlowFile attributes,以及修改 FlowFile content。
-
Connection 是指 Processor 或 Process Group 之間的連接,從而創建一個自動化的數據流。每個 Connection 都包含一個 FlowFile Queue,用於緩存傳輸的流數據,並可設置 Back Pressure和數據流的優先級方案(先進先出等)。
-
Processor Group 針對一個復雜的業務處理數據流,建議最好使用邏輯的 Process Group 來組織這個復雜的 processes,方便維護這些數據流。相當於系統中的文件夾,作用就是使數據流的各個部分看起來更工整,思路更清晰,不至於從頭到尾一條線閱讀起來十分不方便。
另外 Process Group 還有 Input/Ouput Port(組與組之間的數據流連接的傳入點與輸出點),可以用來在它們之間移動數據。 -
Controller Service 控制器,例如數據庫連接,XML讀取,JSON讀取器,用來被 processes 使用,比如一個 process 需要寫入或讀取數據庫的數據,需要使用一個 Controller Service 用來建立數據庫的連接信息。
NiFi 提供了 幾十 種 Controller Service。另外在權限控制方面,可以對每個 Controller Service 進行授權操作。
https://blog.csdn.net/wangmin1983/article/details/80466736
Processors 分類
-
數據攝取 Processors
-
數據轉換 Processors
-
數據流出/發送數據 Processors
-
路由和中轉 Processors
-
數據庫訪問 Processors
-
Attribute 抽取 Processors
-
系統交互 Processors
-
切分和聚合 Processors
-
HTTP 和 UDP Processors
-
Amazon Web Services Processors
詳見:https://zhuanlan.zhihu.com/p/299075280
文本處理組件
ReplaceText:
使用正則表達式修改文本內容(Replacement Value替換 Search Value);
可以利用屬性,進行轉換:
ExtractText:
使用一個或多個正則表達是有FlowFile內容中提取數據,以替換FlowFile內容或將該值提取到用戶命名的Attribute中。
原始數據A,B,C,D
匹配后得到dataFlow屬性:
數據流被解析為 CSV 格式,以逗號分隔。
ReplaceText
替代方案,可以用replaceText自己生成插入的sql語句。
SplitJson
對json進行拆分?根據jsonPath(JsonPath中的“根成員對象”始終稱為$,無論是對象還是數組 https://www.cnblogs.com/jpfss/p/10973590.html)允許用戶根據JSON元素將由數組或許多子對象組成的JSON對象拆分為FlowFile。
JsonPath Expression:選擇json中需要拆分的字段名稱,該字段名稱中的value為json格式的多條數據,組件會將value中的多條數據拆分成數量相等的數據流,並舍棄拆分字段名稱value值之外的所有數據。
jsonPath:$.name[]
原始數據:{“name”: [{“last”: “li”},{“first”: “wang”}],”testdata”: “test”}
分割后后變成兩條:
{“last”:”li”}
{“first”:”wang”}
如果整體數據就是jsonArray的形式,jsonPah可以用$.。另外如果文件名重復,可以使用updateAttribute更改filename,設置為uuid。
————————————————
版權聲明:本文為CSDN博主「quguang1011」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/quguang65265/article/details/78698088
EvaluateJsonPath
提取json中的某個屬性作為參數,用戶提供JSONPath表達式(類似於XPath,用於XML解析/提取),然后根據表達式內容對這些json進行計算提取,以替換FlowFile內容或將值提取到用戶命名的Attribute中(Destination)。
UpdateAttribute:
向FlowFile添加或更新任意數量的用戶定義屬性。
這對於添加靜態配置的值以及使用表達式語言動態派生屬性值很有用。
該處理器還提供了一個“高級用戶界面”,允許用戶根據用戶提供的規則有條件地更新屬性。
如修改 filename 的格式為${filename}-${now():toNumber():format("yyyy-MM-dd_HHmmss")}.json
:
https://blog.csdn.net/u010022051/article/details/51276170
https://stackovernet.xyz/cn/q/12472401
AddSchemaNameAttribute:對處理的flowfile添加一個屬性
UpdateRecord
https://zhuanlan.zhihu.com/p/257009588
https://cloud.tencent.com/developer/ask/154354/answer/267084
服務日志組件
LogMessage 用它可以在服務端打日志,內容是自己設置的,少用
LogAttribute 用它可以在服務端打日志,內容是FlowFile的屬性,少用
數據庫組件
SelectHiveQL 執行hql查詢語句,結果格式是Avro or CSV
HiveConnectionPool
PutSQL 執行INSERT、UPDATE 的sql,sql可直接寫,也可來自flowFile
ExecuteSQL 執行sql查詢語句,結果格式是Avro。參數支持?如果它是由傳入的FlowFile觸發的,則在評估選擇查詢時,該FlowFile的屬性將可用,並且該查詢可以使用?,參數必須作為FlowFile屬性存在
HiveConnectionPool
DBCPConnectionPool
DBCPConnectionPoolLookup DBCPConnectionPoolLookup
DBCPConnectionPoolLookup這個Controller Service很簡單,也非常有用,說白了,它就是保存了一個我們使用者定義的Map,key是我們自己命名的,value是我們選擇的當前流程可用的DBCPConnectionPool,然后在流程運行過程中,DBCPConnectionPoolLookup根據FlowFile中一個叫database.name的屬性去這個Map里查找DBCPConnectionPool。使用DBCPConnectionPoolLookup的最大優點是什么?靈活啊!組件不綁定於一個數據庫,根據流文件中的屬性動態去查找對應的數據庫。
https://blog.csdn.net/weixin_36048246/article/details/87359310
ConvertJSONToSQL
(注意:該處理器有一個特性,只能處理flat json,所謂flat是指it consists of a single JSON element and each field maps to a simple type)
另外ConvertJSONToSQL 不支持hive 這是nifi的一個bug https://blog.csdn.net/aiyinsitan215/article/details/93617838
大量數據插入,這個Processor的效率很低的,為什么這么說呢,一個流的數據如果是json,也應該是json數組,但一個json數組通過這個processor得到的結果是若干個insert語句,每一個insert語句中只有一條數據;可以改進成insert into table ()values ()()。。。的形式;也可以使用PutDatabaseRecord 做大量數據的insert,PutDatabaseRecord的優勢是內置reader,減少了流程的中間落地(當然PutDatabaseRecord 也沒有做到最好,還可以再優化,我自己改過一版PutDatabaseRecordQuicklyForInsert ,有空再更一下)
————————————————
版權聲明:本文為CSDN博主「酷酷的誠(公眾號:Panda誠)」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_36048246/article/details/87363263
ExecuteSQLRecord 執行sql查詢語句,結果格式可通過Record Writer自定義 https://blog.csdn.net/weixin_36048246/article/details/87359310
可選連接池同上
Record Writer:
FreeFormTextRecordSetWriter
ParquetRecordSetWriter
XMLRecordSetWriter
JsonRecordSetWriter
CSVRecordSetWriter
AvroRecordSetWriter
ScriptedRecordSetWriter
使用有點難度還用好很高級的組件
RouteOnAttribute
http://www.blogjava.net/tw-ddm/articles/433714.html
RouteOnContent
Nifi表達語言
https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html#replaceempty
if else語句
https://www.javaroad.cn/questions/124812
${uuid:replaceEmpty(${device_id})}
${field.value:toDate("yyyy-MM-dd HH:mm:ss", "GMT"):format("yyyy/MM/dd", "GMT")}
${field.value:toDate("yyyy-MM-dd HH:mm:ss", "GMT"):format("HH:mm:ss", "GMT")}
${$area:isEmpty():not()}
RecordPath Domain-Specific Language (DSL)
http://192.168.0.40:18080/nifi-docs/html/record-path-guide.html
${gId:equals(1):ifElse('男', '女')}
${gender:equals(1)}
${field.value:toDate("yyyy-MM-dd HH:mm:ss", "GMT+8"):format("yyyy/MM/dd HH:mm:ss", "GMT+8")}
https://www.renfei.net/posts/1003284
一些例子
https://note.youdao.com/ynoteshare1/index.html?id=298cd2d8df4131161f350ef328ccbcb9&type=note
使用例子
這里很多別人的使用經驗:
https://ask.hellobi.com/blog/seng/category/3137
文件讀取、輸出
組件:GetFile、PutFile
https://www.cnblogs.com/h--d/p/10079418.html
這種,相當於在動態MV或COPY文件,同名文件可以有替換(會一直循環不斷替換)、忽略、報錯三種處理方式。
FTP讀取 、修改、輸出到 HDFS
組件:GetFTP、CompressContent(壓縮或解壓FlowFile的內容)、ReplaceText(通過定義正則表達式,可對正則表達式匹配到的內容進行替換)、PutHDFS
https://www.jianshu.com/p/d2ed34060dfd?from=singlemessage
hdfs讀取 ,輸出到 hive
https://www.cnblogs.com/fengwenit/p/5795580.html
文件內容處理
Excel to ES
組件:GetFile Processor拉取表格、
ConvertExcelToCSVProcessor 將Excel多sheet表格轉為多個csv Flowfile、
ConvertRecord Processor 將csv Flowfile轉換為Avro Flowfile、
SplitRecord Processor 將從csv取到的Avro Flowfile按行split為單條Avro、
Record Processor為每一條數據添加source字段標明數據來源,並將Avro Flowfile轉為json Flowfile、
PutElasticsearch Processor 將數據存入ES
https://zhuanlan.zhihu.com/p/257009588
用到的控制器:
CSVReader:
https://blog.csdn.net/qinqinyijia/article/details/77775054
AvroSchemaRegistry:
{ "name": "people", "namespace": "nifi", "type": "record", "fields":
[ {"name": "id" , "type" : "int"}
, {"name": "name" , "type" : "string"}
, {"name": "age" , "type" : "string"}
, {"name": "address" , "type" : "string"}
, {"name": "祖籍" , "type" : "string"}
] }
es:
https://www.cnblogs.com/lightsong/p/12688996.html
CSV 格式 轉為 JSON 格式
json to csv
https://www.cnblogs.com/fengwenit/p/5795576.html
數據庫查詢
mysql 查詢
組件:ExecuteSQL(執行SQL語句,返回avro格式數據。)、ConvertAvroToJSON、ConvertJSONToSQL、PutSQL
一個同步表的例子:https://blog.csdn.net/zhulu52166/article/details/83380717
https://www.cnblogs.com/h--d/p/10079418.html
一個處理文本然后插入表的例子(ExecuteScript寫腳本處理數據 ):https://blog.csdn.net/qinqinyijia/article/details/77869566
ExecuteSQLRecord 需要配置 RecordWriter ,可以靈活地指定輸出格式,(如添加JsonRecordSetWriter -> 編輯JsonRecordSetWriter -> 在JsonRecordSetWriter添加AvroSchemaRegistry -> 編輯AvroSchemaRegistry -> AvroSchemaRegistry添加內容(內容是Avro格式)--> 激活AvroSchemaRegistry -> 激活JsonRecordSetWriter 參考:https://www.cnblogs.com/h--d/p/10081777.html)
mysql to hive
https://blog.csdn.net/aiyinsitan215/article/details/93617838
Change Data Capture (CDC)
https://community.cloudera.com/t5/Community-Articles/Change-Data-Capture-CDC-with-Apache-NiFi-Part-1-of-3/ta-p/246623
- Mysql導入至HBase
https://www.meiwen.com.cn/subject/fkzxlqtx.html
kafka讀取
https://www.jianshu.com/p/626c32c43294
web api 取數據到 hdfs
組件: InvokeHttp可以通過http請求獲取數據
https://www.cnblogs.com/fengwenit/p/5589397.html
更多例子:
https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates
https://www.cnblogs.com/fengwenit/p/5557035.html
https://blog.csdn.net/quguang65265/article/details/78698088
https://blog.csdn.net/quguang65265/article/details/78697678
https://note.youdao.com/ynoteshare1/index.html?id=298cd2d8df4131161f350ef328ccbcb9&type=note
https://github.com/xmlking/nifi-examples
https://github.com/simonellistonball/masterclass-hdf
資料大全:https://my.oschina.net/u/2306127/blog/858096
csv to mysql
https://blog.csdn.net/u012261499/article/details/109612107
PutDatabaseRecord
https://blog.csdn.net/weixin_36048246/article/details/108781872
更多例子
https://blog.csdn.net/xuzhaoa/article/details/107042660
https://www.cnblogs.com/adolfmc/archive/2012/10/07/2713562.html
https://www.iteye.com/blog/hck-1566801
https://blog.csdn.net/lamp113/article/details/79176410
https://www.cnblogs.com/mkl34367803/p/10566827.html
https://blog.csdn.net/xiaoguangtouqiang/article/details/82182654
https://blog.csdn.net/weixin_40763897/article/details/105048517
- 如果要使用NiFi提供Web服務,請查看HandleHTTPRequest和HandleHTTPResponse處理器。通過使用兩個處理器的組合,您將通過HTTP接收來自外部客戶端的請求。您將能夠對請求中的數據進行處理,並將自定義答案/結果發送回客戶端。例如,您可以使用NiFi通過HTTP訪問外部系統,例如FTP服務器。您將使用兩個處理器並通過HTTP發出請求。當您在NIFi中收到查詢時,NiFi會針對FTP服務器進行查詢以獲取文件,然后將文件發送回客戶端。
- 使用NiFi,所有這些獨特的請求都可以很好地擴展。在這種用例中,NiFi將根據需求進行水平擴展,並在NiFi實例的前面設置負載均衡器,以平衡集群中NiFi節點之間的負載。
nifi中的數據聚合?
nifi集群
nifi架構原理
https://www.jianshu.com/p/115e6771ed5a 講解了一些參數
https://www.jianshu.com/p/d2ed34060dfd?from=singlemessage 里面還有ReplaceText組件使用例子。
https://www.cnblogs.com/zskblog/p/7284198.html
https://www.cnblogs.com/ronnieyuan/p/11935871.html
https://www.cnblogs.com/cobub-yx/p/7772196.html
一些靈魂拷問
如果可以使用Kafka作為群集的入口點,為什么還要使用NiFi?
https://my.oschina.net/bigdatagrocery/blog/4901507
我不知道這個blog說了些啥?
https://blogs.apache.org/nifi/
nifi的容器化部署,遇到數據不能持久化的問題
1、發現數據(流配置文件)存儲在 conf/ 下,然而conf/ 又沒有進行掛載
嘗試掛載conf目錄,然而容器啟動報錯
replacing target file /opt/nifi/nifi-current/conf/nifi.properties
sed: can't read /opt/nifi/nifi-current/conf/nifi.properties: No such file or directory
2、數據存儲的位置設置到一個被掛載的牡蠣,比如 work/ 下
在configmap里,掛載了bootstrap.conf文件,
在文件里修改 nifi.flow.configuration.file 更改為 ./work/flow.xml.gz,還有archive.dir地址,然而並不生效。
bootstrap.conf文件是配置NiFi應該如何啟動的地方,不知道誰把nifi.properties的設置項也考進來了
3、添加nifi.properties文件掛載,並設置流配置文件路徑。
然而容器啟動報錯
replacing target file /opt/nifi/nifi-current/conf/nifi.properties
sed: cannot rename /opt/nifi/nifi-current/conf/sedB1aWUR: Device or resource busy
因為nifi啟動會 replacing target file /opt/nifi/nifi-current/conf/nifi.properties
4、指定環境變量來設置
NIFI_VARIABLE_REGISTRY_PROPERTIES=./conf/bootstrap.conf
似乎仍不起效
因為這里的變量並不是針對nifi服務的變量,參考
5、研究啟動腳本,start.sh,發現 這一段
prop_replace () {
target_file=${3:-${nifi_props_file}}
echo 'replacing target file ' ${target_file}
sed -i -e "s|^$1=.*$|$1=$2|" ${target_file}
}
export nifi_props_file=${NIFI_HOME}/conf/nifi.properties
prop_replace 'nifi.cluster.flow.election.max.wait.time' "${NIFI_ELECTION_MAX_WAIT:-5 mins}"
有了點靈感
- 先在容器里添加環境變量
2)再把環境變量里的值,通過start.sh腳本,修改在 nifi.properties里
Configmap添加:
prop_replace 'nifi.flow.configuration.file' "${NIFI_FLOW_CONFIGURATION_FILE:-}"
prop_replace 'nifi.flow.configuration.archive.dir' "${NIFI_FLOW_CONFIGURATION_ARCHIVE.DIR:-}"
prop_replace 'nifi.templates.directory' "${NIFI_TEMPLATES_DIRECTORY:-}"
容器env里添加:
env:
- name: NIFI_FLOW_CONFIGURATION_FILE
value: ./work/flow.xml.gz
- name: NIFI_FLOW_CONFIGURATION_ARCHIVE_DIR
value: ./work/archive/
- name: NIFI_TEMPLATES_DIRECTORY
value: ./work/templates