【NIFI】 開發自定義Nifi Processor


  本例需要基礎知識:【NIFI】 Apache NiFI 安裝及簡單的使用

  Nifi不光可以使用自帶的Processor,還可以自定義Processor。本例簡單介紹開發一個Processor

開發

  1、新建一個Maven工程,這里采用的是eclipse的模板原型來創建。

    a、創建

    

    b、添加模板,內容:

      • Archetype Group Id:org.apache.nifi
      • Archetype Artifact Id:nifi-processor-bundle-archetype
      • Archetype Version:1.2.0

    

    c、根據模板創建,maven項目

    

  2、創建后,項目目錄如下:

    

    其中3個pom文件如下

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <!--
 3   Licensed to the Apache Software Foundation (ASF) under one or more
 4   contributor license agreements. See the NOTICE file distributed with
 5   this work for additional information regarding copyright ownership.
 6   The ASF licenses this file to You under the Apache License, Version 2.0
 7   (the "License"); you may not use this file except in compliance with
 8   the License. You may obtain a copy of the License at
 9   http://www.apache.org/licenses/LICENSE-2.0
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 -->
16 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
17     <modelVersion>4.0.0</modelVersion>
18 
19     <parent>
20         <groupId>org.apache.nifi</groupId>
21         <artifactId>nifi-nar-bundles</artifactId>
22         <version>1.2.0</version>
23     </parent>
24 
25     <groupId>com.test</groupId>
26     <artifactId>my-processor</artifactId>
27     <version>1.0-SNAPSHOT</version>
28     <packaging>pom</packaging>
29 
30     <modules>
31         <module>nifi-my-processor-processors</module>
32         <module>nifi-my-processor-nar</module>
33     </modules>
34     
35 </project>
my-processor pom.xml 

  

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <!--
 3   Licensed to the Apache Software Foundation (ASF) under one or more
 4   contributor license agreements. See the NOTICE file distributed with
 5   this work for additional information regarding copyright ownership.
 6   The ASF licenses this file to You under the Apache License, Version 2.0
 7   (the "License"); you may not use this file except in compliance with
 8   the License. You may obtain a copy of the License at
 9   http://www.apache.org/licenses/LICENSE-2.0
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 -->
16 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
17     <modelVersion>4.0.0</modelVersion>
18 
19     <parent>
20         <groupId>com.test</groupId>
21         <artifactId>my-processor</artifactId>
22         <version>1.0-SNAPSHOT</version>
23     </parent>
24 
25     <artifactId>nifi-my-processor-nar</artifactId>
26     <packaging>nar</packaging>
27     <properties>
28         <maven.javadoc.skip>true</maven.javadoc.skip>
29         <source.skip>true</source.skip>
30     </properties>
31 
32     <dependencies>
33         <dependency>
34             <groupId>com.test</groupId>
35             <artifactId>nifi-my-processor-processors</artifactId>
36             <version>1.0-SNAPSHOT</version>
37         </dependency>
38     </dependencies>
39 
40 </project>
nifi-my-processor-nar pom.xml

 

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <!--
 3   Licensed to the Apache Software Foundation (ASF) under one or more
 4   contributor license agreements. See the NOTICE file distributed with
 5   this work for additional information regarding copyright ownership.
 6   The ASF licenses this file to You under the Apache License, Version 2.0
 7   (the "License"); you may not use this file except in compliance with
 8   the License. You may obtain a copy of the License at
 9   http://www.apache.org/licenses/LICENSE-2.0
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 -->
16 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
17     <modelVersion>4.0.0</modelVersion>
18 
19     <parent>
20         <groupId>com.test</groupId>
21         <artifactId>my-processor</artifactId>
22         <version>1.0-SNAPSHOT</version>
23     </parent>
24 
25     <artifactId>nifi-my-processor-processors</artifactId>
26     <packaging>jar</packaging>
27 
28     <dependencies>
29         <dependency>
30             <groupId>org.apache.nifi</groupId>
31             <artifactId>nifi-api</artifactId>
32         </dependency>
33         <dependency>
34             <groupId>org.apache.nifi</groupId>
35             <artifactId>nifi-utils</artifactId>
36         </dependency>
37         <dependency>
38             <groupId>org.apache.nifi</groupId>
39             <artifactId>nifi-mock</artifactId>
40             <scope>test</scope>
41         </dependency>
42         <dependency>
43             <groupId>org.slf4j</groupId>
44             <artifactId>slf4j-simple</artifactId>
45             <scope>test</scope>
46         </dependency>
47         <dependency>
48             <groupId>junit</groupId>
49             <artifactId>junit</artifactId>
50             <scope>test</scope>
51         </dependency>
52     </dependencies>
53 </project>
nifi-my-processor-processors pom.xml

 

  

  3、修改項目,因環境引起的錯誤

    a、刪除nifi-my-processor-processors子項目中,src/test中的測試文件(打包可能出現錯誤)

    b、在org.apache.nifi.processor.Processor文件中配置自己的Porcessor

      

  4、代碼編寫,編輯MyProcessor.java文件,文件在項目創建的時候已經生成,做適當修改即可。其中有設置狀態,屬性,及處理方法(onTrigger)等

    

    內容:

  1 /*
  2  * Licensed to the Apache Software Foundation (ASF) under one or more
  3  * contributor license agreements.  See the NOTICE file distributed with
  4  * this work for additional information regarding copyright ownership.
  5  * The ASF licenses this file to You under the Apache License, Version 2.0
  6  * (the "License"); you may not use this file except in compliance with
  7  * the License.  You may obtain a copy of the License at
  8  *
  9  *     http://www.apache.org/licenses/LICENSE-2.0
 10  *
 11  * Unless required by applicable law or agreed to in writing, software
 12  * distributed under the License is distributed on an "AS IS" BASIS,
 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  * See the License for the specific language governing permissions and
 15  * limitations under the License.
 16  */
 17 package com.test.processors;
 18 
 19 import org.apache.nifi.components.PropertyDescriptor;
 20 import org.apache.nifi.flowfile.FlowFile;
 21 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 22 import org.apache.nifi.annotation.behavior.ReadsAttributes;
 23 import org.apache.nifi.annotation.behavior.WritesAttribute;
 24 import org.apache.nifi.annotation.behavior.WritesAttributes;
 25 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 26 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 27 import org.apache.nifi.annotation.documentation.SeeAlso;
 28 import org.apache.nifi.annotation.documentation.Tags;
 29 import org.apache.nifi.processor.exception.ProcessException;
 30 import org.apache.nifi.processor.AbstractProcessor;
 31 import org.apache.nifi.processor.ProcessContext;
 32 import org.apache.nifi.processor.ProcessSession;
 33 import org.apache.nifi.processor.ProcessorInitializationContext;
 34 import org.apache.nifi.processor.Relationship;
 35 import org.apache.nifi.processor.util.StandardValidators;
 36 
 37 import java.io.InputStreamReader;
 38 import java.io.StringWriter;
 39 import java.util.ArrayList;
 40 import java.util.Collections;
 41 import java.util.HashSet;
 42 import java.util.List;
 43 import java.util.Set;
 44 import java.util.concurrent.atomic.AtomicReference;
 45 
 46 @Tags({"example"})
 47 @CapabilityDescription("Provide a description")
 48 @SeeAlso({})
 49 @ReadsAttributes({@ReadsAttribute(attribute="", description="")})
 50 @WritesAttributes({@WritesAttribute(attribute="", description="")})
 51 public class MyProcessor extends AbstractProcessor {
 52 
 53     public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
 54             .Builder().name("MY_PROPERTY")
 55             .displayName("My property")
 56             .description("Example Property")
 57             .required(true)
 58             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 59             .build();
 60 
 61     public static final Relationship MY_RELATIONSHIP_SUCCESS = new Relationship.Builder()
 62             .name("sucess")
 63             .description("Example relationship Success")
 64             .build();
 65     
 66     public static final Relationship MY_RELATIONSHIP_FAILURE = new Relationship.Builder()
 67             .name("failure")
 68             .description("Example relationship Failure")
 69             .build();
 70 
 71     private List<PropertyDescriptor> descriptors;
 72 
 73     private Set<Relationship> relationships;
 74 
 75     @Override
 76     protected void init(final ProcessorInitializationContext context) {
 77         final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
 78         descriptors.add(MY_PROPERTY);
 79         this.descriptors = Collections.unmodifiableList(descriptors);
 80 
 81         final Set<Relationship> relationships = new HashSet<Relationship>();
 82         relationships.add(MY_RELATIONSHIP_SUCCESS);
 83         relationships.add(MY_RELATIONSHIP_FAILURE);
 84         this.relationships = Collections.unmodifiableSet(relationships);
 85     }
 86 
 87     @Override
 88     public Set<Relationship> getRelationships() {
 89         return this.relationships;
 90     }
 91 
 92     @Override
 93     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
 94         return descriptors;
 95     }
 96 
 97     @OnScheduled
 98     public void onScheduled(final ProcessContext context) {
 99 
100     }
101 
102     @Override
103     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
104         FlowFile flowFile = session.get();
105         if ( flowFile == null ) {
106             return;
107         }
108         // TODO implement
109         final AtomicReference<String> value = new AtomicReference<>();
110         session.read(flowFile, in -> {
111             try{
112                 StringWriter sw = new StringWriter();
113                 InputStreamReader inr = new InputStreamReader(in);
114                 char[] buffer = new char[1024];
115                 int n = 0;
116                 while (-1 != (n = inr.read(buffer))) {
117                     sw.write(buffer, 0, n);
118                 }
119                 String str = sw.toString();
120                 
121                 String result = "處理了:" + str + context.getProperty("MY_PROPERTY").getValue();
122                 value.set(result);
123             }catch(Exception ex){
124                 ex.printStackTrace();
125                 getLogger().error("Failed to read json string.");
126             }
127         });
128 
129         String results = value.get();
130         if(results != null && !results.isEmpty()){
131             flowFile = session.putAttribute(flowFile, "match", results);
132         }
133 
134         flowFile = session.write(flowFile, out -> out.write(value.get().getBytes()));
135 
136         session.transfer(flowFile, MY_RELATIONSHIP_SUCCESS);
137         
138     }
139 }

  5、打包,使用maven命令:mvn clean package

  6、將nifi-my-processor-nar工程target目錄中的 nifi-my-processor-nar-1.0-SNAPSHOT.nar 文件,拷貝到 nifi\lib 目錄中 

  7、啟動 NIFI 項目,使用自定義的Process:MyProcessor

    配置如下:

      a、拉入三個Processor

        

      b、配置三個Processor,

        下圖是GenerateFlowFile的配置,主要配置了執行的時間(10s)及產生的字符串(123)

          

        下圖是MyProcessor配置

        

      c、啟動三個Processor

      d、查看輸出,可以看到字符串 123 經過處理成 : "處理了:123abc"

        

        

    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM