本例介紹NiFI ExecuteScript處理器的使用,使用的腳本引擎ECMScript
FlowFile I / O簡介
NiFi中的流文件由兩個主要組件構成,即屬性和內容。屬性是關於內容/流文件的元數據,我們在本系列的第1部分中看到了如何使用ExecuteScript來操作它們。流文件的內容本質上只是一個字節集合,沒有固有的結構,模式,格式等。各種NiFi處理器假設傳入的流文件具有特定的模式/格式(或者從屬性中確定它作為“mime.type”或以其他方式推斷它。然后,這些處理器可以基於文件確實具有該格式的假設來對內容起作用(並且如果它們不這樣,則經常轉移到“失敗”關系)。處理器也可以輸出指定格式的流文件,這在處理器中有描述。NiFi文檔。
流文件內容的輸入和輸出(I / O)通過ProcessSession API提供,因此是ExecuteScript的“session”變量(有關更多信息,請參閱第1部分)。一種機制是將回調對象傳遞給對session.read()或session.write()的調用。將為FlowFile對象創建InputStream和/或OutputStream,並使用相應的回調接口調用回調對象,並傳入InputStream和/或OutputStream引用以供回調使用。有三個主要的回調接口,每個接口都有自己的用例:
InputStreamCallback
session.read(flowFile,inputStreamCallback)方法使用此接口 提供InputStream,從中讀取流文件的內容。界面有一個方法:
1 void process(InputStream in) throws IOException
此接口提供托管輸入流以供使用。雖然可以手動關閉流,但輸入流會自動打開和關閉。如果您只是從特定的流文件中讀取而不是寫回來,那么這是您將使用的表單。
例如,當您想要處理傳入的流文件,但創建許多輸出流文件時,如 SplitText處理器。
OutputStreamCallback
session.write(flowFile,outputStreamCallback)方法使用此接口 來提供要寫入流文件內容的OutputStream。界面有一個方法:
1 void process(OutputStream out) throws IOException
此接口提供托管輸出流以供使用。盡管可以手動關閉流,但輸出流會自動打開和關閉 - 如果包含這些流的任何流打開應該清除的資源,則非常重要。
例如,ExecuteScript將從內部或外部文件生成數據,但不生成流文件。然后你將使用session.create()創建一個新的FlowFile,然后使用session.write( flowFile,outputStreamCallback)來插入內容。
StreamCallback
session.write(flowFile,streamCallback)方法使用此接口 來提供InputStream和OutputStream,從中讀取和/或寫入流文件的內容。界面有一個方法:
1 void process(InputStream in, OutputStream out) throws IOException
此接口提供托管輸入和輸出流以供使用。雖然可以手動關閉流,但輸入流會自動打開和關閉 - 如果包含這些流的任何流打開應該清除的資源,則非常重要。
例如,當您想要處理傳入的流文件並用新的東西覆蓋其內容時,例如 EncryptContent處理器。
由於這些回調是Java對象,因此腳本必須創建一個並將其傳遞給會話方法,還有其他讀取和寫入流文件的方法,包括:
-
- 使用session.read(flowFile)返回一個InputStream。這減輕了對InputStreamCallback的需求,而是返回可以讀取的InputStream。作為交換,您必須手動管理(關閉,例如)InputStream。
-
- 使用session.importFrom(inputStream,flowFile)從InputStream寫入FlowFile。這取代了傳遞了OutputStreamCallback的session.write()的需要。
ExecuteScript介紹
ExecuteScript是一個多功能處理器,允許用戶使用編程語言編寫自定義邏輯,每次觸發ExecuteScript處理器時都會執行該編程語言。為腳本提供以下變量綁定以啟用對NiFi組件的訪問:
session:這是對分配給處理器的ProcessSession的引用。該會話允許您對流文件執行操作,如create(),putAttribute()和transfer(),以及read()和write()。
context:這是對處理器的ProcessContext的引用。它可用於檢索處理器屬性,關系,Controller Services和StateManager。
log:這是對處理器的ComponentLog的引用。使用它將消息記錄到NiFi,例如log.info('Hello world!')
REL_SUCCESS:這是對為處理器定義的“成功”關系的引用。它也可以通過引用父類的靜態成員(ExecuteScript)來繼承,但是某些引擎(如Lua)不允許引用靜態成員,因此這是一個便利變量。它還節省了必須使用關系的完全限定名稱。
REL_FAILURE:這是對為處理器定義的“失敗”關系的引用。與REL_SUCCESS一樣,它也可以通過引用父類的靜態成員(ExecuteScript)來繼承,但是某些引擎(如Lua)不允許引用靜態成員,因此這是一個便利變量。它還節省了必須使用關系的完全限定名稱。
動態屬性:ExecuteScript中定義的任何動態屬性都將作為設置為與動態屬性對應的PropertyValue對象的變量傳遞給腳本引擎。這允許您獲取屬性的String值,還可以根據NiFi表達式語言評估屬性,將值轉換為適當的數據類型(例如布爾值等)等。因為動態屬性名稱變為腳本的變量名,您必須知道所選腳本引擎的變量命名屬性
ExecuteScript使用
1、從會話中獲取傳入的流文件
目的:有到ExecuteScript的傳入連接,並希望從隊列中檢索一個流文件以進行處理
方法:使用會話對象中的get()方法。此方法返回要處理的下一個最高優先級FlowFile的FlowFile。如果沒有要處理的FlowFile,則該方法將返回null。請注意,即使FlowFiles有穩定的流入處理器,也可能返回null。如果處理器有多個並發任務,並且其他任務已經檢索到FlowFiles,則會發生這種情況。如果腳本需要FlowFile繼續處理,那么如果從session.get()返回null,它應立即返回
Examples
Javascript
1 var flowFile = session.get(); 2 if (flowFile != null) { 3 // All processing code goes here 4 }
2、從會話中獲取多個傳入流文件
目的:有到ExecuteScript的傳入連接,並希望從隊列中檢索多個流文件以進行處理
方法:使用會話對象中的get(maxResults)方法。此方法從工作隊列返回maxResults FlowFiles。如果沒有可用的FlowFiles,則返回一個空列表(該方法不返回null)。注意:如果存在多個傳入隊列,則根據是否在單個調用中輪詢所有隊列或僅輪詢單個隊列,未指定行為。話雖如此,這里描述了觀察到的行為(對於NiFi 1.1.0+和之前)
Examples
Javascript
1 flowFileList = session.get(100) 2 if(!flowFileList.isEmpty()) { 3 for each (var flowFile in flowFileList) { 4 // Process each FlowFile here 5 } 6 }
3、創建一個新的FlowFile
目的:生成一個新的FlowFile以發送到下一個處理器
方法:使用會話對象中的create()方法。此方法返回一個新的FlowFile對象,您可以對其執行進一步處理
Examples
Javascript
1 var flowFile = session.create(); 2 // Additional processing here
4、從父FlowFile創建新的FlowFile
目的:希望基於傳入的FlowFile生成新的FlowFile
方法:使用會話對象中的create(parentFlowFile)方法。此方法采用父FlowFile引用並返回新的子FlowFile對象。新創建的FlowFile將繼承除UUID之外的所有父屬性。此方法將自動生成Provenance FORK事件或Provenance JOIN事件,具體取決於在提交ProcessSession之前是否從同一父級生成其他FlowFiles
Examples
Javascript
1 var flowFile = session.get(); 2 if (flowFile != null) { 3 var newFlowFile = session.create(flowFile); 4 // Additional processing here 5 }
5、向流文件添加屬性
目的:有一個要添加自定義屬性的流文件
方法:使用會話對象中的putAttribute(flowFile,attributeKey,attributeValue)方法。此方法使用給定的鍵/值對更新給定的FlowFile屬性。注意:“uuid”屬性對於FlowFile是固定的,不能修改; 如果密鑰名為“uuid”,則將被忽略。
Examples
Javascript
1 var flowFile = session.get(); 2 if (flowFile != null) { 3 flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue') 4 }
6、向流文件添加多個屬性
目的:有一個要添加自定義屬性的流文件
方法:使用會話對象中的putAllAttributes(flowFile,attributeMap)方法。此方法使用給定Map中的鍵/值對更新給定的FlowFile屬性。注意:“uuid”屬性對於FlowFile是固定的,不能修改; 如果密鑰名為“uuid”,則將被忽略。
Examples
Javascript
1 var number2 = 2; 2 var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()} 3 var flowFile = session.get() 4 if (flowFile != null) { 5 flowFile = session.putAllAttributes(flowFile, attrMap) 6 }
7、從流文件中獲取屬性
目的:有一個流文件,您可以從中檢查屬性
方法:使用FlowFile對象中的getAttribute(attributeKey)方法。此方法返回給定attributeKey的String值,如果未找到attributeKey,則返回null。這些示例顯示了“filename”屬性值的檢索。
Examples
Javascript
1 var flowFile = session.get() 2 if (flowFile != null) { 3 var myAttr = flowFile.getAttribute('filename') 4 }
1 var flowFile = session.get() 2 if (flowFile != null) { 3 var attrs = flowFile.getAttributes(); 4 for each (var attrKey in attrs.keySet()) { 5 // Do something with attrKey (the key) and/or attrs[attrKey] (the value) 6 } 7 }
8、將流文件傳輸到關系
目的:處理流文件(新文件或傳入文件)后,您希望將流文件傳輸到關系(“成功”或“失敗”)。在這個簡單的情況下,讓我們假設有一個名為“errorOccurred”的變量,它指示FlowFile應該傳輸到哪個關系。
方法:使用會話對象中的transfer(flowFile,relationship)方法。從文檔中:此方法根據給定的關系將給定的FlowFile傳輸到適當的目標處理器工作隊列。如果關系導致多個目標,則復制FlowFile的狀態,使得每個目標都接收FlowFile的精確副本,盡管每個目標都具有其自己的唯一標識。
注意:ExecuteScript將在每次執行結束時執行session.commit()以確保已提交操作。您不需要(也不應該)在腳本中執行session.commit()。
Examples
Javascript
1 var flowFile = session.get(); 2 if (flowFile != null) { 3 // All processing code goes here 4 if(errorOccurred) { 5 session.transfer(flowFile, REL_FAILURE) 6 } 7 else { 8 session.transfer(flowFile, REL_SUCCESS) 9 } 10 }
9、以指定的日志記錄級別向日志發送消息
目的:將處理期間發生的某些事件報告給日志記錄框架。
方法:將log變量與warn(),trace(),debug(),info()或error()方法一起使用。這些方法可以使用單個String,或者后跟對象數組的String,或者后跟對象數組后跟Throwable的String。第一個用於簡單消息。當您有一些要記錄的動態對象/值時,將使用第二個。要在消息字符串中引用這些,請在消息中使用“{}”。這些是按照外觀的順序針對Object數組進行評估的,因此如果消息顯示為“Found these things:{} {} {}”並且Object數組為['Hello',1,true],則記錄的消息將為“找到這些東西:你好1真的”。這些日志記錄方法的第三種形式也采用Throwable參數
Examples
Javascript
1 var ObjectArrayType = Java.type("java.lang.Object[]"); 2 var objArray = new ObjectArrayType(3); 3 objArray[0] = 'Hello'; 4 objArray[1] = 1; 5 objArray[2] = true; 6 log.info('Found these things: {} {} {}', objArray)
10、使用回調讀取傳入流文件的內容
目的:有到ExecuteScript的傳入連接,並希望從隊列中檢索流文件的內容以進行處理
方法:使用read(flowFile,inputStreamCallback)來自會話對象的方法。傳入read()方法需要一個InputStreamCallback對象。請注意,因為InputStreamCallback是一個對象,所以默認情況下內容只對該對象可見。如果需要使用read()方法之外的數據,請使用更全局范圍的變量。這些示例將傳入流文件的完整內容存儲到String中(使用Apache Commons的IOUtils類)。注意:對於大流量文件,這不是最好的技術; 相反,您應該只讀取您需要的數據,並根據需要進行處理。對於像SplitText這樣的東西,你可以一次讀取一行並在InputStreamCallback中處理它,或者使用前面提到的session.read(flowFile)方法來獲得在回調之外使用的InputStream引用。
Examples
Javascript
1 var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback") 2 var IOUtils = Java.type("org.apache.commons.io.IOUtils") 3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets") 4 5 var flowFile = session.get(); 6 if(flowFile != null) { 7 // Create a new InputStreamCallback, passing in a function to define the interface method 8 session.read(flowFile, 9 new InputStreamCallback(function(inputStream) { 10 var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8); 11 // Do something with text here 12 })); 13 }
11、使用回調將內容寫入傳出流文件
目的:傳出流文件生成內容
方法:使用會話對象中的write(flowFile,outputStreamCallback)方法。傳遞給write()方法需要一個OutputStreamCallback對象。請注意,因為OutputStreamCallback是一個對象,所以默認情況下內容只對該對象可見。如果需要使用write()方法之外的數據,請使用更全局范圍的變量。這些示例將示例String寫入flowFile。
Examples
Javascript
1 var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback"); 2 var IOUtils = Java.type("org.apache.commons.io.IOUtils"); 3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); 4 5 var flowFile = session.get(); 6 if(flowFile != null) { 7 // Create a new OutputStreamCallback, passing in a function to define the interface method 8 flowFile = session.write(flowFile, 9 new OutputStreamCallback(function(outputStream) { 10 outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8)) 11 })); 12 }
12、使用回調覆蓋帶有更新內容的傳入流文件
目的:重用傳入的流文件,但希望修改其傳出流文件的內容。
方法:使用write(flowFile,streamCallback)來自會話對象的方法。傳遞給write()方法需要StreamCallback對象。StreamCallback提供InputStream(來自傳入流文件)和outputStream(用於該流文件的下一個版本),因此您可以使用InputStream獲取流文件的當前內容,然后修改它們並將它們寫回到流文件。這會覆蓋流文件的內容,因此對於追加,您必須通過附加到讀入內容來處理它,或者使用不同的方法(使用session.append()而不是session.write())。請注意,由於StreamCallback是一個對象,因此默認情況下內容僅對該對象可見。如果需要使用write()方法之外的數據,請使用更全局范圍的變量。
Examples
Javascript
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback"); var IOUtils = Java.type("org.apache.commons.io.IOUtils"); var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); var flowFile = session.get(); if(flowFile != null) { // Create a new StreamCallback, passing in a function to define the interface method flowFile = session.write(flowFile, new StreamCallback(function(inputStream, outputStream) { var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8)) })); }
13、處理腳本處理過程中的錯誤
目的:腳本中發生錯誤(通過數據驗證或拋出異常),並且您希望腳本正常處理它。
方法:對於異常,使用腳本語言的異常處理機制(通常它們是try / catch塊)。對於數據驗證,您可以使用類似的方法,但定義一個布爾變量,如“valid”和if / else子句而不是try / catch子句。ExecuteScript定義“成功”和“失敗”關系; 通常,您的處理將“好”流文件轉移到成功,“壞”流文件轉換為失敗(在后一種情況下記錄錯誤)
Examples
Javascript
1 var flowFile = session.get(); 2 if(flowFile != null) { 3 try { 4 // Something that might throw an exception here 5 6 // Last operation is transfer to success (failures handled in the catch block) 7 session.transfer(flowFile, REL_SUCCESS) 8 } catch(e) { 9 log.error('Something went wrong', e) 10 session.transfer(flowFile, REL_FAILURE) 11 } 12 }
ExecuteScript-Demo
1、頁面如下圖
2、GenerateFlowFile
2、ExecuteScript
腳本內容:
1 var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback"); 2 var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback"); 3 var IOUtils = Java.type("org.apache.commons.io.IOUtils"); 4 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); 5 6 var flowFile = session.get(); 7 8 9 if(flowFile != null) { 10 11 try { 12 13 var text = ""; 14 15 // 讀取flowFile中內容 16 session.read(flowFile,new InputStreamCallback(function(inputStream) { 17 var str = IOUtils.toString(inputStream, StandardCharsets.UTF_8); 18 19 20 //由JSON字符串轉換為JSON對象 21 var obj = JSON.parse(str); 22 obj.age = 18 23 24 //將JSON對象轉化為JSON字符 25 text = JSON.stringify(obj); 26 27 })); 28 29 // 向flowFile中寫入內容 30 flowFile = session.write(flowFile, new OutputStreamCallback(function(outputStream) { 31 32 outputStream.write(text.getBytes(StandardCharsets.UTF_8)) 33 34 })); 35 36 session.transfer(flowFile, REL_SUCCESS) 37 38 } catch(e) { 39 log.error('Something went wrong', e) 40 session.transfer(flowFile, REL_FAILURE) 41 } 42 43 }
3、PutFile
輸出文件內容:{"id":1,"name":"god","age":18}
其他腳本引擎,參考以下地址
參考文檔鏈接:https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html