Nifi組件腳本開發—ExecuteScript 使用指南(二)


Part 2 - FlowFile I/O 和 Error Handling

flow File的IO

NiFi 的 Flow files 由兩個主要部件組成:attributes 和 content. Attributes 是關於 content / flow file的元數據, 我們在Nifi組件腳本開發—ExecuteScript 使用指南(一) 看到了如何使用 ExecuteScript 來操縱這個屬性. flow file 的內容, 核心是一個 bytes集合,沒有繼承的 structure, schema, format, 等等. 不同的 NiFi processors 假定輸入的 flow files 具有特定的 schema/format (或者從 attributes確定如 "mime.type" 或者通過其他的方法). 這些 processors 然后按照假定的格式對內容進行處理 (將返回 "failure" 到relationship,如果不是的話). 經常 processors 將輸出  flow files 以特定的格式, 這在 processors' 描述中有相應的說明.

flow files 的 Input 和 Output (I/O) 通過 ProcessSession API 提供,通過 ExecuteScript  的"session" 變量來訪問。一個機制是傳遞一個 callback 對象到session.read() 或 session.write()的調用。對於FlowFile將創建一個 InputStream 和/或 OutputStream, 這個callback 對象將被激活,使用相應的 callback 接口, 然后這個InputStream 和/或 OutputStream 的引用被傳遞到 callback函數使用. 這里有三個 callback 接口, 每一個有自己的應用環境:

InputStreamCallback

這個 interface 用在 session.read( flowFile, inputStreamCallback) 方法中,提供一個 InputStream,用於讀取 flow file的內容. 該 interface 有一個單一方法:

void process(InputStream in) throws IOException

該 interface 提供一個被管理的 input stream. 這個input stream自動打開和關閉,也可以手動關閉. 這是從 flow file讀取的方法, 並且不能被寫回去。

OutputStreamCallback

該 interface 被用於session.write( flowFile, outputStreamCallback) 方法,提供 OutputStream寫入內容到 flow file. 該 interface 具有單一的方法:

void process(OutputStream out) throws IOException

該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關閉,也可以手動關閉。 - 重要的一點是,如果任何 streams 包裝了這個 streams,所有打開的資源應該被清理.

例如, 在ExecuteScript中被創建數據 , 來自於外部文件, 而不是一個 flow file. 然后你可以使用 session.create() 去創建一個新的FlowFile, 然后 session.write( flowFile, outputStreamCallback) 用於添加內容.

StreamCallback

該 interface 用於 session.write( flowFile, streamCallback) 方法,提供 InputStream 和 OutputStream,為 flow file提供內容的讀取和寫入. 該 interface 有一個單一的方法:

void process(InputStream in, OutputStream out) throws IOException

該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關閉,也可以手動關閉。 - 重要的一點是,如果任何 streams 包裝了這個 streams,所有打開的資源應該被清理.

因為這些  callbacks 是 Java objects, 腳本將創建一個並且傳入 session 方法, 下面的方法將使用不同的腳本語言進行演示. 並且,這里還有其他的讀寫 flow files方法, 包括:

  • 使用 session.read(flowFile) 返回 InputStream. 取代 InputStreamCallback, 將返回 InputStream 用於讀取. 你必須 (close, e.g.) 手動管理 InputStream.
  • 使用 session.importFrom(inputStreamflowFile) 從 InputStream 寫入到 FlowFile. 這將替代 借助OutputStreamCallback的session.write() 的使用.

從 flow file 中讀取數據

需求:傳入連接執行 ExecuteScript ,並且從隊列中得到 flow file 的內容進行處理.

方法:使用session的read(flowFileinputStreamCallback) 方法。一個 InputStreamCallback 對象需要被傳入 read() 方法. 注意到,因為InputStreamCallback 是一個對象, 內容只在該對象中可見。 如果你需要在 read() 方法之外訪問, 需要使用更為全局化的變量. 這里的例子講來自flow file的全部內容存儲到 String (使用 Apache Commons' IOUtils class)。

注意: 對於大的 flow files, 這並不是最好的技術方法; 應該只讀取需要的數據,並按照適應的方法處理。比如 SplitText, 你應該一次讀一行並且在 InputStreamCallback中處理, 或者 session.read(flowFile) 方法 得到 InputStream 的引用,從而在 callback之外處理.

例子

Groovy:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()

if(!flowFile)return

def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

// Do something with text here
} as InputStreamCallback)

Jython:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback

# Define a subclass of InputStreamCallback for use in session.read()
class PyInputStreamCallback(InputStreamCallback):
def __init__(self):
    pass
def process(self, inputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

# Do something with text here
# end class

flowFile = session.get()
if(flowFile != None):
session.read(flowFile, PyInputStreamCallback())
# implicit return at the end

Javascript:

var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

var flowFile = session.get();

if(flowFile != null) {
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,new InputStreamCallback(function(inputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
        // Do something with text here
    }));
}

JRuby:

java_import org.apache.commons.io.IOUtils
java_import org.apache.nifi.processor.io.InputStreamCallback

# Define a subclass of InputStreamCallback for use in session.read()
class JRubyInputStreamCallback
include InputStreamCallback

def process(inputStream)
    text = IOUtils.toString(inputStream)
    # Do something with text here
    end
end

jrubyInputStreamCallback = JRubyInputStreamCallback.new
flowFile = session.get()
if flowFile != nil
    session.read(flowFile, jrubyInputStreamCallback)
end

寫入數據至 flow file

需求:為輸出的 flow file創建內容.

方法:使用session的write(flowFileoutputStreamCallback) 方法。一個OutputStreamCallback 對象需要傳遞給 write() 方法. 注意,因為 OutputStreamCallback 是一個對象, 因此內容之災對象內部可見. 如果你需要在 write() 方法之外訪問, 使用更為全局化變量. 西面的例子寫入 String 到 flowFile。

例子

Groovy:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile) return def text = 'Hello world!' // Cast a closure with an outputStream parameter to OutputStreamCallback flowFile = session.write(flowFile, {outputStream ->         outputStream.write(text.getBytes(StandardCharsets.UTF_8))     } as OutputStreamCallback)

Jython:

from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import OutputStreamCallback # Define a subclass of OutputStreamCallback for use in session.write() class PyOutputStreamCallback(OutputStreamCallback): def __init__(self):     pass def process(self, outputStream):     outputStream.write(bytearray('Hello World!'.encode('utf-8'))) # end class flowFile = session.get() if(flowFile != None):     flowFile = session.write(flowFile, PyOutputStreamCallback()) # implicit return at the end

Javascript:

var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback"); var IOUtils = Java.type("org.apache.commons.io.IOUtils"); var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); var flowFile = session.get(); if(flowFile != null) { // Create a new OutputStreamCallback, passing in a function to define the interface method flowFile = session.write(flowFile,new OutputStreamCallback(function(outputStream) {         outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))     })); }

JRuby:

java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.OutputStreamCallback

# Define a subclass of OutputStreamCallback for use in session.write()
class JRubyOutputStreamCallback include OutputStreamCallback def process(outputStream)     outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8)) end end jrubyOutputStreamCallback = JRubyOutputStreamCallback.new flowFile = session.get() if flowFile != nil     flowFile = session.write(flowFile, jrubyOutputStreamCallback) end

覆蓋 flow file內容

需求:重用輸入 flow file但是希望修改內容並傳遞到輸出的 flow file.

方法:使用session的write(flowFilestreamCallback) 方法。一個StreamCallback 對象需要傳遞給 write() 方法. StreamCallback 同時提供了InputStream (從輸入的 flow file) 和 outputStream (下一版本的 flow file), 因此你可以使用InputStream去取得 flow file的當前內容, 然后修改他們並且寫會到 flow file. 這將覆蓋 flow file 的內容, 因此對於追加內容要采用讀入內容添加的方式, 或者使用不同的方法 (使用 session.append() 而不是session.write() )。

注意,因為 StreamCallback 是一個對象, 因此內容之災對象內部可見. 如果你需要在 write() 方法之外訪問, 使用更為全局化變量.

以下這個例子將反轉輸入flowFile (假定為 String) 的內容,並將反轉后的字符串寫入到新版的 flowFile.

例子

Groovy:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile) return def text = 'Hello world!' // Cast a closure with an inputStream and outputStream parameter to StreamCallback flowFile = session.write(flowFile, {inputStream, outputStream ->         text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)         outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))     } as StreamCallback) session.transfer(flowFile, REL_SUCCESS)

Jython:

from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback # Define a subclass of StreamCallback for use in session.write() class PyStreamCallback(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8'))) # end class flowFile = session.get() if(flowFile != None):     flowFile = session.write(flowFile, PyStreamCallback()) # implicit return at the end

Javascript:

var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback"); var IOUtils = Java.type("org.apache.commons.io.IOUtils"); var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); var flowFile = session.get(); if(flowFile != null) { // Create a new StreamCallback, passing in a function to define the interface method flowFile = session.write(flowFile,new StreamCallback(function(inputStream, outputStream) {     var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)     outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))     })); }

JRuby:

java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.StreamCallback

# Define a subclass of StreamCallback for use in session.write()

class JRubyStreamCallback include StreamCallback def process(inputStream, outputStream)     text = IOUtils.toString(inputStream)     outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8)) end end jrubyStreamCallback = JRubyStreamCallback.new flowFile = session.get() if flowFile != nil     flowFile = session.write(flowFile, jrubyStreamCallback) end

處理錯誤

 

需求:在 script ( data validation 或者出現一個 exception)運行時出現錯誤, 處理和拋出錯誤。

方法:對於exceptions, 使用腳本語言的exception-handling 機制  (一般是try/catch 代碼塊). 對於 data validation, 可以使用類似的方法, 但是定義一個boolean 變量,如 "valid" 以及 if/else 語句,而不是try/catch 語句. ExecuteScript 定義了 "success" and "failure" relationships; 一般情況下,你的處理將轉移 "good" flow files 到 success,而 "bad" flow files 到 failure (記錄錯誤在后續的操作中)。

例子

Groovy:

flowFile = session.get()

if(!flowFile) return try {     // Something that might throw an exception here     // Last operation is transfer to success (failures handled in the catch block)     session.transfer(flowFile, REL_SUCCESS) } catch(e) {     log.error('Something went wrong', e)     session.transfer(flowFile, REL_FAILURE) }

Jython:

flowFile = session.get()

if(flowFile != None): try:     # Something that might throw an exception here     # Last operation is transfer to success (failures handled in the catch block)     session.transfer(flowFile, REL_SUCCESS) except:     log.error('Something went wrong', e)     session.transfer(flowFile, REL_FAILURE) # implicit return at the end

Javascript:

var flowFile = session.get(); if(flowFile != null) { try {     // Something that might throw an exception here     // Last operation is transfer to success (failures handled in the catch block)     session.transfer(flowFile, REL_SUCCESS) } catch(e) {     log.error('Something went wrong', e)     session.transfer(flowFile, REL_FAILURE) } }

JRuby:

flowFile = session.get()

if flowFile != nil begin     # Something that might raise an exception here     # Last operation is transfer to success (failures handled in the rescue block)     session.transfer(flowFile, REL_SUCCESS)     rescue Exception => e     log.error('Something went wrong', e)     session.transfer(flowFile, REL_FAILURE) end end

上一篇:Nifi組件腳本開發—ExecuteScript 使用指南(一) 
源:https://www.shangmayuan.com/a/0ba9c44310b04d1dad461790.html

參考:http://nifi.apache.org/developer-guide.html

 

1、Nifi:基本認識

2、Nifi:基礎用法及頁面常識

3、Nifi:ExcuseXXXScript組件的使用(一)

4、Nifi:ExcuseXXXScript組件的使用(二)

5、Nifi:ExcuseXXXScript組件的使用(三)

6、Nifi:自定義處理器的開發

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM