本例介紹NiFI ExecuteScript處理器的使用,使用的腳本引擎ECMScript
接上一篇【NIFI】 Apache NiFI 之 ExecuteScript處理(一)
ExecuteScript使用
1、動態屬性
其中一個功能是動態屬性的概念,也稱為用戶定義屬性。這些是處理器的屬性,用戶可以為其設置屬性名稱和值。並非所有處理器都支持/使用動態屬性,但ExecuteScript會將動態屬性作為變量傳遞,這些變量引用與屬性值對應的PropertyValue對象。這里有兩件重要的事情需要注意:
-
- 由於屬性名稱按原樣綁定到變量名稱,因此必須為指定的編程語言支持動態屬性的命名約定。例如,Groovy不支持句點(。)作為有效的變量字符,因此動態屬性(如“my.value”)將導致處理器失敗。在這種情況下,有效的替代方案是“myValue”。
- 使用PropertyValue對象(而不是值的String表示形式)以允許腳本在將屬性值評估為String之前對屬性的值執行各種操作。如果已知屬性包含文字值,則可以對變量調用getValue()方法以獲取其String表示形式。相反,如果值可能包含表達式語言,或者您希望將值轉換為除String之外的值(例如對布爾對象的值為'true'),則還有這些操作的方法。這些示例在下面的配方中說明,假設我們有兩個屬性'myProperty1'和'myProperty2'定義如下:
目的:用戶輸入了動態屬性以供腳本使用(例如,配置參數)。
方法:使用變量的PropertyValue對象中的getValue()方法。此方法返回動態屬性值的String表示形式。請注意,如果值中存在表達式語言,則getValue()將不對其進行求值
Examples
Javascript
1 var myValue1 = myProperty1.getValue() 2 var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
2、添加模塊
ExecuteScript的另一個功能是能夠向類路徑添加外部“模塊”,這允許您利用各種第三方庫,腳本等。但是每個腳本引擎都以不同方式處理模塊的概念,因此我將討論它們分別。一般來說,有兩種類型的模塊,Java庫(JAR)和腳本(用與ExecuteScript中相同的語言編寫
配置如下:
目的:腳本執行時,使用Java庫
方法:配置ExecuteScript,添加Module Directory目錄(里面都是jar包),腳本中獲取jar包中的類,使用jar包中的類方法
Examples
Javascript
1 var ObjectMapper = Java.type("com.fasterxml.jackson.databind.ObjectMapper"); 2 var Map = Java.type("java.util.Map"); 3 4 5 var objectMapper = new ObjectMapper(); 6 7 8 // java對象轉 json字符串 9 var str = objectMapper.writeValueAsString(obj); 10 11 // json字符串 轉 java對象 12 var map = objectMapper.readValue(str, Map.class);
3、狀態存儲
NiFi(我相信0.5.0)為處理器和其他NiFi組件提供了持久保存一些信息的能力,以便在組件周圍實現某些狀態功能,例如,QueryDatabaseTable處理器跟蹤它在指定列中看到的最大值,這樣下次運行時,它只會獲取其值大於目前已見過的行(即存儲在州經理)。
NiFi組件可以選擇將其狀態存儲在集群級別或本地級別。請注意,在獨立的NiFi實例中,“群集范圍”與“本地范圍”相同。范圍的選擇通常是關於在流中,每個節點上的相同處理器是否可以共享狀態數據。如果群集中的實例不需要共享狀態,則使用本地范圍。在Java中,這些選項作為名為Scope的枚舉提供,因此當我引用Scope.CLUSTER和Scope.LOCAL時,我分別表示集群和本地作用域。
要在ExecuteScript中使用狀態管理功能(下面是特定於語言的示例),您可以通過調用ProcessContext的getStateManager()方法獲得對StateManager的引用(回想一下,每個引擎都獲得一個名為“context”的變量,其中包含ProcessContext實例)。然后,您可以在StateManager對象上調用以下方法:
void setState(Map <String,String> state,Scope scope) - 更新組件在給定范圍內的狀態值,並將其設置為給定值。請注意,該值是一個Map; “組件狀態”的概念是每個構成較低級別狀態的所有鍵/值對的映射。Map會立即更新以提供原子性。
StateMap getState(作用域范圍) - 返回給定范圍內組件的當前狀態。此方法永遠不會返回null; 相反,它是一個StateMap對象,如果尚未設置狀態,StateMap的版本將為-1,值的映射將為空。通常會創建一個新的Map <String,String>來存儲更新的值,然后調用setState()或replace()。
boolean replace(StateMap oldValue,Map <String,String> newValue,Scope scope) - 當且僅當值與給定的oldValue相同時,才更新組件狀態(在給定范圍內)的值到新值。如果狀態已更新為新值,則返回true; 否則,如果狀態的值不等於oldValue,則返回false。
void clear(范圍范圍) - 清除給定范圍內組件狀態的所有鍵和值。
目的1:腳本需要從狀態管理器獲取當前鍵/值對以供腳本使用(例如,更新)
方法1:使用ProcessContext中的getStateManager()方法,然后使用StateManager中的getStateMap(),然后使用toMap()轉換為鍵/值對的Map <String,String>。請注意,StateMap還有一個簡單檢索值的get(key)方法,但這種方法並不常用,因為Map通常會更新,一旦完成,就必須為StateManager設置。
Examples
Javascript
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();
目的2:腳本希望使用新的鍵/值對映射更新狀態映射。
方法2:要獲取當前StateMap對象,請再次使用ProcessContext中的getStateManager()方法,然后使用StateManager中的getStateMap()。這些示例假設一個新的Map,但使用上面的配方(使用toMap()方法),您可以使用現有值創建一個新的Map,然后只更新所需的條目。請注意,如果沒有當前映射(即StateMap.getVersion()返回-1),則replace()將不起作用,因此示例將相應地檢查並調用setState()或replace()。從新的ExecuteScript實例運行時,StateMap版本將為-1,因此在單次執行后,如果右鍵單擊ExecuteScript處理器並選擇View State,您應該看到如下內容:
Examples
Javascript
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var stateManager = context.stateManager; 3 var stateMap = stateManager.getState(Scope.CLUSTER); 4 var newMap = {'myKey1': 'myValue1'}; 5 if (stateMap.version == -1) { 6 stateManager.setState(newMap, Scope.CLUSTER); 7 } else { 8 stateManager.replace(stateMap, newMap, Scope.CLUSTER); 9 }
ExecuteScript-Demo
Demo流程:先存入緩存myKey1 =》 第二次取出myKey1的值 =》 myKey1的值修改 =》 再次存入緩存中
ExecuteScript內容如下:
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var stateManager = context.stateManager; 3 var stateMap = stateManager.getState(Scope.CLUSTER); 4 5 if (stateMap.version == -1) { 6 var newMap = {'myKey1': "1"}; 7 stateManager.setState(newMap, Scope.CLUSTER); 8 } else { 9 10 11 var myValue1 = stateMap.toMap().get("myKey1"); 12 myValue1 = myValue1*1 + 1; 13 var newMap = {'myKey1': myValue1 + ""}; 14 15 // 替換 16 stateManager.replace(stateMap, newMap, Scope.CLUSTER); 17 }
其他腳本引擎,參考以下地址
參考文檔鏈接:https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html