【NIFI】 Apache NiFI 之 ExecuteScript處理(一)


   本例介紹NiFI ExecuteScript處理器的使用,使用的腳本引擎ECMScript

FlowFile I / O簡介

  NiFi中的流文件由兩個主要組件構成,即屬性和內容。屬性是關於內容/流文件的元數據,我們在本系列的第1部分中看到了如何使用ExecuteScript來操作它們流文件的內容本質上只是一個字節集合,沒有固有的結構,模式,格式等。各種NiFi處理器假設傳入的流文件具有特定的模式/格式(或者從屬性中確定它作為“mime.type”或以其他方式推斷它。然后,這些處理器可以基於文件確實具有該格式的假設來對內容起作用(並且如果它們不這樣,則經常轉移到“失敗”關系)。處理器也可以輸出指定格式的流文件,這在處理器中有描述。NiFi文檔

流文件內容的輸入和輸出(I / O)通過ProcessSession API提供,因此是ExecuteScript的“session”變量(有關更多信息,請參閱第1部分)。一種機制是將回調對象傳遞給對session.read()或session.write()的調用。將為FlowFile對象創建InputStream和/或OutputStream,並使用相應的回調接口調用回調對象,並傳入InputStream和/或OutputStream引用以供回調使用。有三個主要的回調接口,每個接口都有自己的用例:

  InputStreamCallback

    session.read(flowFileinputStreamCallback)方法使用此接口 提供InputStream,從中讀取流文件的內容。界面有一個方法:

1 void process(InputStream in) throws IOException

 

 

   此接口提供托管輸入流以供使用。雖然可以手動關閉流,但輸入流會自動打開和關閉。如果您只是從特定的流文件中讀取而不是寫回來,那么這是您將使用的表單。

   例如,當您想要處理傳入的流文件,但創建許多輸出流文件時,如 SplitText處理器。

  OutputStreamCallback

    session.write(flowFileoutputStreamCallback)方法使用此接口 來提供要寫入流文件內容的OutputStream。界面有一個方法:

1 void process(OutputStream out) throws IOException

    此接口提供托管輸出流以供使用。盡管可以手動關閉流,但輸出流會自動打開和關閉 - 如果包含這些流的任何流打開應該清除的資源,則非常重要。

    例如,ExecuteScript將從內部或外部文件生成數據,但不生成流文件。然后你將使用session.create()創建一個新的FlowFile,然后使用session.write( flowFileoutputStreamCallback)來插入內容。

  StreamCallback

    session.write(flowFilestreamCallback)方法使用此接口 來提供InputStream和OutputStream,從中讀取和/或寫入流文件的內容。界面有一個方法:

1 void process(InputStream in, OutputStream out) throws IOException

 

 

    此接口提供托管輸入和輸出流以供使用。雖然可以手動關閉流,但輸入流會自動打開和關閉 - 如果包含這些流的任何流打開應該清除的資源,則非常重要。

    例如,當您想要處理傳入的流文件並用新的東西覆蓋其內容時,例如 EncryptContent處理器。

  由於這些回調是Java對象,因此腳本必須創建一個並將其傳遞給會話方法,還有其他讀取和寫入流文件的方法,包括:

    • 使用session.read(flowFile)返回一個InputStream。這減輕了對InputStreamCallback的需求,而是返回可以讀取的InputStream。作為交換,您必須手動管理(關閉,例如)InputStream。
    • 使用session.importFrom(inputStreamflowFile)從InputStream寫入FlowFile。這取代了傳遞了OutputStreamCallback的session.write()的需要。

 ExecuteScript介紹

  ExecuteScript是一個多功能處理器,允許用戶使用編程語言編寫自定義邏輯,每次觸發ExecuteScript處理器時都會執行該編程語言。為腳本提供以下變量綁定以啟用對NiFi組件的訪問:

  session:這是對分配給處理器的ProcessSession的引用。該會話允許您對流文件執行操作,如create(),putAttribute()和transfer(),以及read()和write()。

  context:這是對處理器的ProcessContext的引用。它可用於檢索處理器屬性,關系,Controller Services和StateManager。

  log:這是對處理器的ComponentLog的引用。使用它將消息記錄到NiFi,例如log.info('Hello world!')

  REL_SUCCESS:這是對為處理器定義的“成功”關系的引用。它也可以通過引用父類的靜態成員(ExecuteScript)來繼承,但是某些引擎(如Lua)不允許引用靜態成員,因此這是一個便利變量。它還節省了必須使用關系的完全限定名稱。

  REL_FAILURE:這是對為處理器定義的“失敗”關系的引用。與REL_SUCCESS一樣,它也可以通過引用父類的靜態成員(ExecuteScript)來繼承,但是某些引擎(如Lua)不允許引用靜態成員,因此這是一個便利變量。它還節省了必須使用關系的完全限定名稱。

  動態屬性:ExecuteScript中定義的任何動態屬性都將作為設置為與動態屬性對應的PropertyValue對象的變量傳遞給腳本引擎。這允許您獲取屬性的String值,還可以根據NiFi表達式語言評估屬性,將值轉換為適當的數據類型(例如布爾值等)等。因為動態屬性名稱變為腳本的變量名,您必須知道所選腳本引擎的變量命名屬性

ExecuteScript使用

  1、從會話中獲取傳入的流文件

    目的:有到ExecuteScript的傳入連接,並希望從隊列中檢索一個流文件以進行處理

    方法:使用會話對象中的get()方法。此方法返回要處理的下一個最高優先級FlowFile的FlowFile。如果沒有要處理的FlowFile,則該方法將返回null。請注意,即使FlowFiles有穩定的流入處理器,也可能返回null。如果處理器有多個並發任務,並且其他任務已經檢索到FlowFiles,則會發生這種情況。如果腳本需要FlowFile繼續處理,那么如果從session.get()返回null,它應立即返回

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     // All processing code goes here
4 }

  2、從會話中獲取多個傳入流文件

    目的:有到ExecuteScript的傳入連接,並希望從隊列中檢索多個流文件以進行處理

    方法:使用會話對象中的get(maxResults)方法。此方法從工作隊列返回maxResults FlowFiles。如果沒有可用的FlowFiles,則返回一個空列表(該方法不返回null)。注意:如果存在多個傳入隊列,則根據是否在單個調用中輪詢所有隊列或僅輪詢單個隊列,未指定行為。話雖如此,這里描述了觀察到的行為(對於NiFi 1.1.0+和之前)

    Examples

      Javascript

1 flowFileList = session.get(100)
2 if(!flowFileList.isEmpty()) {
3   for each (var flowFile in flowFileList) { 
4        // Process each FlowFile here
5   }
6 }

  3、創建一個新的FlowFile

    目的:生成一個新的FlowFile以發送到下一個處理器

    方法:使用會話對象中的create()方法。此方法返回一個新的FlowFile對象,您可以對其執行進一步處理

    Examples

      Javascript

1 var flowFile = session.create();
2 // Additional processing here

  4、從父FlowFile創建新的FlowFile

    目的:希望基於傳入的FlowFile生成新的FlowFile

    方法:使用會話對象中的create(parentFlowFile)方法。此方法采用父FlowFile引用並返回新的子FlowFile對象。新創建的FlowFile將繼承除UUID之外的所有父屬性。此方法將自動生成Provenance FORK事件或Provenance JOIN事件,具體取決於在提交ProcessSession之前是否從同一父級生成其他FlowFiles

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     var newFlowFile = session.create(flowFile);
4     // Additional processing here
5 }

  5、向流文件添加屬性

    目的:有一個要添加自定義屬性的流文件

    方法:使用會話對象中的putAttribute(flowFileattributeKeyattributeValue)方法。此方法使用給定的鍵/值對更新給定的FlowFile屬性。注意:“uuid”屬性對於FlowFile是固定的,不能修改; 如果密鑰名為“uuid”,則將被忽略。

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
4 }

  6、向流文件添加多個屬性

    目的:有一個要添加自定義屬性的流文件

    方法:使用會話對象中的putAllAttributes(flowFileattributeMap)方法。此方法使用給定Map中的鍵/值對更新給定的FlowFile屬性。注意:“uuid”屬性對於FlowFile是固定的,不能修改; 如果密鑰名為“uuid”,則將被忽略。

    Examples

      Javascript

1 var number2 = 2;
2 var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
3 var flowFile = session.get() 
4 if (flowFile != null) {
5     flowFile = session.putAllAttributes(flowFile, attrMap)
6 }

  7、從流文件中獲取屬性

    目的:有一個流文件,您可以從中檢查屬性

    方法:使用FlowFile對象中的getAttribute(attributeKey)方法。此方法返回給定attributeKey的String值,如果未找到attributeKey,則返回null。這些示例顯示了“filename”屬性值的檢索。

    Examples

      Javascript

1 var flowFile = session.get() 
2 if (flowFile != null) {
3     var myAttr = flowFile.getAttribute('filename')
4 }

 

1 var flowFile = session.get() 
2 if (flowFile != null) {
3     var attrs = flowFile.getAttributes();
4     for each (var attrKey in attrs.keySet()) { 
5        // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
6   }
7 }

  8、將流文件傳輸到關系

    目的:處理流文件(新文件或傳入文件)后,您希望將流文件傳輸到關系(“成功”或“失敗”)。在這個簡單的情況下,讓我們假設有一個名為“errorOccurred”的變量,它指示FlowFile應該傳輸到哪個關系。

    方法:使用會話對象中的transfer(flowFilerelationship)方法。從文檔中:此方法根據給定的關系將給定的FlowFile傳輸到適當的目標處理器工作隊列。如果關系導致多個目標,則復制FlowFile的狀態,使得每個目標都接收FlowFile的精確副本,盡管每個目標都具有其自己的唯一標識。

    注意:ExecuteScript將在每次執行結束時執行session.commit()以確保已提交操作。您不需要(也不應該)在腳本中執行session.commit()。

    Examples

      Javascript

 1 var flowFile = session.get();
 2 if (flowFile != null) {
 3    // All processing code goes here
 4    if(errorOccurred) {
 5      session.transfer(flowFile, REL_FAILURE)
 6    }
 7    else {
 8      session.transfer(flowFile, REL_SUCCESS)
 9    }
10 }

 

  9、以指定的日志記錄級別向日志發送消息

 

    目的:將處理期間發生的某些事件報告給日志記錄框架。

    方法:將log變量與warn(),trace(),debug(),info()或error()方法一起使用。這些方法可以使用單個String,或者后跟對象數組的String,或者后跟對象數組后跟Throwable的String。第一個用於簡單消息。當您有一些要記錄的動態對象/值時,將使用第二個。要在消息字符串中引用這些,請在消息中使用“{}”。這些是按照外觀的順序針對Object數組進行評估的,因此如果消息顯示為“Found these things:{} {} {}”並且Object數組為['Hello',1,true],則記錄的消息將為“找到這些東西:你好1真的”。這些日志記錄方法的第三種形式也采用Throwable參數
    
Examples

      Javascript

1 var ObjectArrayType = Java.type("java.lang.Object[]");
2 var objArray = new ObjectArrayType(3);
3 objArray[0] = 'Hello';
4 objArray[1] = 1;
5 objArray[2] = true;
6 log.info('Found these things: {} {} {}', objArray)

 

   10、使用回調讀取傳入流文件的內容

    目的:有到ExecuteScript的傳入連接,並希望從隊列中檢索流文件的內容以進行處理

    方法:使用read(flowFileinputStreamCallback)來自會話對象的方法。傳入read()方法需要一個InputStreamCallback對象。請注意,因為InputStreamCallback是一個對象,所以默認情況下內容只對該對象可見。如果需要使用read()方法之外的數據,請使用更全局范圍的變量。這些示例將傳入流文件的完整內容存儲到String中(使用Apache Commons的IOUtils類)。注意:對於大流量文件,這不是最好的技術; 相反,您應該只讀取您需要的數據,並根據需要進行處理。對於像SplitText這樣的東西,你可以一次讀取一行並在InputStreamCallback中處理它,或者使用前面提到的session.read(flowFile)方法來獲得在回調之外使用的InputStream引用。

    Examples

      Javascript

 1 var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback")
 2 var IOUtils = Java.type("org.apache.commons.io.IOUtils")
 3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
 4  
 5 var flowFile = session.get();
 6 if(flowFile != null) {
 7   // Create a new InputStreamCallback, passing in a function to define the interface method
 8   session.read(flowFile,
 9     new InputStreamCallback(function(inputStream) {
10         var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
11         // Do something with text here
12     }));
13 }

 

   11、使用回調將內容寫入傳出流文件

    目的:傳出流文件生成內容

    方法:使用會話對象中的write(flowFileoutputStreamCallback)方法。傳遞給write()方法需要一個OutputStreamCallback對象。請注意,因為OutputStreamCallback是一個對象,所以默認情況下內容只對該對象可見。如果需要使用write()方法之外的數據,請使用更全局范圍的變量。這些示例將示例String寫入flowFile。

    Examples

      Javascript

 1 var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
 2 var IOUtils = Java.type("org.apache.commons.io.IOUtils");
 3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 4  
 5 var flowFile = session.get();
 6 if(flowFile != null) {
 7   // Create a new OutputStreamCallback, passing in a function to define the interface method
 8   flowFile = session.write(flowFile,
 9     new OutputStreamCallback(function(outputStream) {
10         outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
11     }));
12 }

 

   12、使用回調覆蓋帶有更新內容的傳入流文件

    目的:重用傳入的流文件,但希望修改其傳出流文件的內容。

    方法:使用write(flowFilestreamCallback)來自會話對象的方法。傳遞給write()方法需要StreamCallback對象。StreamCallback提供InputStream(來自傳入流文件)和outputStream(用於該流文件的下一個版本),因此您可以使用InputStream獲取流文件的當前內容,然后修改它們並將它們寫回到流文件。這會覆蓋流文件的內容,因此對於追加,您必須通過附加到讀入內容來處理它,或者使用不同的方法(使用session.append()而不是session.write())。請注意,由於StreamCallback是一個對象,因此默認情況下內容僅對該對象可見。如果需要使用write()方法之外的數據,請使用更全局范圍的變量

    Examples

      Javascript

var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 
var flowFile = session.get();
if(flowFile != null) {
  // Create a new StreamCallback, passing in a function to define the interface method
  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
    }));
}

 

   13、處理腳本處理過程中的錯誤

    目的:腳本中發生錯誤(通過數據驗證或拋出異常),並且您希望腳本正常處理它。

    方法:對於異常,使用腳本語言的異常處理機制(通常它們是try / catch塊)。對於數據驗證,您可以使用類似的方法,但定義一個布爾變量,如“valid”和if / else子句而不是try / catch子句。ExecuteScript定義“成功”和“失敗”關系; 通常,您的處理將“好”流文件轉移到成功,“壞”流文件轉換為失敗(在后一種情況下記錄錯誤)

    Examples

      Javascript

 1 var flowFile = session.get();
 2 if(flowFile != null) {
 3   try {
 4     // Something that might throw an exception here
 5  
 6     // Last operation is transfer to success (failures handled in the catch block)
 7     session.transfer(flowFile, REL_SUCCESS)
 8 } catch(e) {
 9   log.error('Something went wrong', e)
10   session.transfer(flowFile, REL_FAILURE)
11 }
12 }

 

 

 ExecuteScript-Demo

  1、頁面如下圖

  

  2、GenerateFlowFile

    

  2、ExecuteScript

    

    腳本內容:

 1 var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback");
 2 var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
 3 var IOUtils = Java.type("org.apache.commons.io.IOUtils");
 4 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 5  
 6 var flowFile = session.get();
 7 
 8 
 9 if(flowFile != null) {
10 
11     try {
12 
13         var text = "";
14 
15         // 讀取flowFile中內容
16         session.read(flowFile,new InputStreamCallback(function(inputStream) {
17             var str = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
18             
19 
20             //由JSON字符串轉換為JSON對象
21             var obj = JSON.parse(str); 
22             obj.age = 18
23 
24             //將JSON對象轉化為JSON字符
25             text = JSON.stringify(obj); 
26 
27         }));
28 
29         // 向flowFile中寫入內容
30         flowFile = session.write(flowFile, new OutputStreamCallback(function(outputStream) {
31 
32             outputStream.write(text.getBytes(StandardCharsets.UTF_8))
33 
34         }));
35 
36         session.transfer(flowFile, REL_SUCCESS)
37 
38     } catch(e) {
39         log.error('Something went wrong', e)
40         session.transfer(flowFile, REL_FAILURE)
41     }
42     
43 }

  3、PutFile

    

    輸出文件內容:{"id":1,"name":"god","age":18}

  

  其他腳本引擎,參考以下地址 

  參考文檔鏈接:https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM