前言
要問 Hortonworks 這家公司最有產品力的產品是什么,我覺得是 Apache NiFi.去年Cloudera 和 Hortonworks 合並之后,以 Cloudera 為主,兩家公司進行產品整合.Cloudera 從 Hortonworks 家沒拿啥東西,唯獨拿來了 Apache NiFi ,並借鑒了 HDF 平台,整合成了全新的產品 Cloudera DataFlow(CDF)產品,並且大有把 CDF 做成自家拳頭產品之勢.Cloudera 官網 PRODUCTS 展示的第一個產品就是 Data Flow.Apache NiFi 是數據領域非常有潛力,也很優秀的一款產品.但目前來看,NiFi 在數據處理領域還不火.一是目前數據處理領域內生態很繁榮,選擇很多.另一個是我們離真正的 data driven 還有一些時間和距離.目前很多公司都熱衷於搭建各自的數據 pipline,這方面的技術選型着實很多.我覺得 NiFi 在這方面是比較超前的,NiFi 把數據 pipline 在往前推進成--data flow.這也是 NiFi 的核心思想.隨着5G和DT時代的到來,data flow 將會成為一個非常普遍的技術概念.
NiFi 是什么
NiFi 的簡介這里不贅述,詳細可見 上一篇文章 或者是 官網 .這里從源碼的角度上來講講 NiFi 的本質,在我看來,NiFi 是一套借鑒 flow-based programming (FBP) 編程范式寫成的 Java 編程工具箱.首先它是用 Java 語言寫成的系統級應用或者說是框架,對於 FBP,它是一種編程范式,如面向過程,面向對象,函數式,流式編程等一樣.FBP是流式編程的一種,簡單理解就是把程序看作是信息數據包,處理器以及處理器間的連接組成的有向圖.這是 NiFi 的使用方式,也可以稱這是使用 NiFi 來進行數據編程.說它是工具箱,是因為它內置了很多的 Java 組件,框架和技術.它將各種 Java 框架技術包裝成 NiFi 中的處理器或者服務等組件,開箱即用,組合成不同的 data flow ,對於開發者來說,也可以叫編寫成不同的應用程序.
比如說,NiFi 把 jetty server 包裝成 handlehttprequest 和 handlehttpresponse 兩個處理器,我們可以基於這兩個處理器加上一個 execute sql 處理器來連一個有向圖,這其實就是一個 restful 接口了.
為什么要研究 NiFi 源碼
研究 NiFi 的源碼主要有兩個目的,也可以說是有兩個好處.
- 進行 NiFi 的二次開發,對於 NiFi 的二次開發其實有兩個層面.
- 第一個層面是利用 NiFi 提供的擴展機制,如前所述, NiFi 是一個 java 工具箱,包裝了很多豐富的 java 工具.那么我們也可以開發我們自己的 java 工具,把它放進 NiFi 的工具箱里面來進行使用.在這個層面上,也是可以完成很多特定的二次開發的需求的.
- 第二個層面那就是修改 NiFi 的源碼,然后重新進行編譯,打包了.
- NiFi 源碼是絕好的 Java 學習項目. NiFi 核心框架對 IO,並發,異步,緩沖緩存,容錯等技術方面的處理和設計都是非常值得學習和研究的. 此外, NiFi Java 工具箱里的工具也是很多 Java 框架和技術的封裝(這里深刻明白了 Java 的生態是如此的強大和繁榮),通過閱讀處理器的代碼也能加深對各種 Java 框架和技術的理解.
編譯 NiFi 源碼
要研究 NiFi 源碼,首先得編譯 NiFi 源碼. NiFi 源碼可以從 github 上面 clone 下來,它是一個 maven 工程,使用 maven 編譯打包.官網給出了非常詳細的 編譯步驟 ,這也是我非常喜歡 NiFi 的一點,它的官方文檔真的非常非常細致. 為了便於概覽和閱讀,我將 NiFi 工程導入到 eclipse 里面進行編譯.在 windows 平台下使用 eclipse 編譯 NiFi 有幾處地方要注意下:
- 可能會由於 eclipse 的插件版本過低導致報錯
- 可能要反復下載 pom 聲明的依賴. NiFi 工程編譯要下載比較多的 jar 包依賴和 build 插件.所以要保證網絡暢通,修改默認的 maven 倉庫鏡像,建議使用阿里雲的鏡像.
<mirror> <id>nexus-ali</id> <mirrorOf>central</mirrorOf> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </mirror>
- 另外工程比較繁雜,有些 pom 里面的 maven 插件生命周期有重疊,可以忽略一些插件的錯誤.
NiFi 源碼框架分析
當 NiFi 源碼編譯好之后,就開始正式進入源碼分析.在 eclipse 里面左邊視圖拉一下列表,可以看到,NiFi 總共有 394 個 pom 工程.一下子感覺無從下手,這個時候我們還是要回過頭來看 NiFi 官網上的架構圖:
記住這個圖, NiFi 首先是一個 JVM 應用,其次最上層是一個 web server.我們就從這里入手開始閱讀 NiFi 的源碼.
再回到 eclipse 里面來,要分析 maven 工程的源碼,首要看的就是 pom 文件,通過 pom 文件,我們能夠得知這個工程的大概.可以發現,這394個 pom 工程大部分是子工程,都是一層套一層的父子pom 工程.那么我們首先找到最頂級的pom 工程,也就是 nifi:
這么一看,整個 NiFi 工程還是清晰了一些.
- nifi-api 就是 nifi 的應用程序接口,里面就是定義了整個工程所需要用到的接口,注解,抽象類和枚舉等基本的接口和信息.
- nifi-assembly 負責 nifi 的成品裝配, 工程打包最后形成的可供部署的壓縮包就在這個工程里的 target 目錄內.
- nifi-bootstarp 負責 nifi 這個 jvm 應用程序的啟動相關事宜
- nifi-commons nifi 諸多特性,比如data-provenance,expression-language,s2s 傳輸 的實現就在這里,同時也是 nifi 的工具類集合
- nifi-docker nifi 的 docker 應用相關
- nifi-docs nifi 的文檔 實現相關
- nifi-external nifi 內部元信息和外部交換,主要用於集群模式下
- nifi-framework-api 這就是nifi 核心框架層的api,也就是架構圖中的 Flow Controller 那一層,注意這里只是各種接口信息定義,不是實現.
- nifi-maven-archetypes 這里只是為了生成兩個 maven archetype,一個是 nifi 自定義處理器的腳手架,一個是 nifi 自定義服務的腳手架.這些腳手架在 maven 的中央倉庫都有提供.
- nifi-mock 用於 nifi 的 mock 測試
- nifi-nar-bundles 之前一直說的 nifi java 工具箱就是這里了.整個 nifi 里面大部分的 maven 工程都是這個工程的子工程.在這個工程里面,一個 bundle 就是一個工具,也對應着上面架構圖里的 Extension
- nifi-toolkit 這里面是 nifi 的命令行工具的實現.nifi 也提供了比較豐富的命令行指令.
以上就是 NiFi 源碼的總體結構了.有了總體的概覽以后,我們需要研究哪一方面的源碼實現,就可以直接去相應的 module 里面看了.
進入源碼
接下來,正式進入源碼閱讀.很直觀的,對於 NiFi 這個應用程序,我們首先關心的是它是如何啟動的,以及它的啟動和初始化流程.所以,我們首先進入的是 nifi-bootstrap 這個 module :
我們很快就找到了 RunNiFi 這個類了,從名字上我們就能很直觀的了解到就是這個類啟動了 NiFi .這里說下,閱讀框架源碼的時候,不一定要每一行代碼都去讀,只看關鍵部分或者看注釋,一般質量比較高的源碼都會有比較詳盡的注釋和准確的命名.拉開 eclipse 右邊 outline 視圖,發現了 main 方法,這里就是 jvm 應用的入口了.
我們再回頭看 nifi 的 啟動腳本 nifi.sh 里面,可以找到這個命令:
正好對上了.
接着看這個 main 方法, 里面獲取上面啟動腳本的命令行輸入參數,找到具體的 start 方法:
switch (cmd.toLowerCase()) { case "start": runNiFi.start(); break;
接着看 start 方法,里面做了很多前期的准備性工作,主要是加載 bootstrap.conf 里配置的屬性,以及在里面構建另外一個 java cmd 命令:
final ProcessBuilder builder = new ProcessBuilder();
builder.command(cmd);
Process process = builder.start();
這個構建的命令行如下(可從 nifi 啟動日志里面找到):
/opt/jdk1.8.0_131/bin/java
-classpath /opt/nifi-1.7.1/./conf:/opt/nifi-1.7.1/./lib/logback-core-1.2.3.jar:/opt/nifi-1.7.1/./lib/jetty-schemas-3.1.jar:/opt/nifi-1.7.1/./lib/logback-classic-1.2.3.jar:/opt/nifi-1.7.1/./lib/jul-to-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/jcl-over-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/nifi-properties-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-runtime-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-framework-api-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-nar-utils-1.7.1.jar:/opt/nifi-1.7.1/./lib/javax.servlet-api-3.1.0.jar:/opt/nifi-1.7.1/./lib/log4j-over-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/slf4j-api-1.7.25.jar:/opt/nifi-1.7.1/./lib/nifi-api-1.7.1.jar
-Dorg.apache.jasper.compiler.disablejsr199=true
-Xmx3g -Xms3g
-Djavax.security.auth.useSubjectCredsOnly=true
-Djava.security.egd=file:/dev/urandom
-Dsun.net.http.allowRestrictedHeaders=true
-Djava.net.preferIPv4Stack=true
-Djava.awt.headless=true -XX:+UseG1GC
-Djava.protocol.handler.pkgs=sun.net.www.protocol
-Duser.timezone=Asia/Shanghai
-Dnifi.properties.file.path=/opt/nifi-1.7.1/./conf/nifi.properties
-Dnifi.bootstrap.listen.port=56653
-Dapp=NiFi
-Dorg.apache.nifi.bootstrap.config.log.dir=/opt/nifi-1.7.1/logs org.apache.nifi.NiFi
所以這個 start 方法總的做的事只是啟動了另外一個 java 進程,這個進程才是真正的 NiFi runtime ,注意上面命令的最后一行, org.apache.nifi.NiFi
這就是 NiFi 真正的實例程序.
NiFi 實例的本質探索
這個 NiFi 在哪里了?我們可以逐步找到它,這個 NiFi 在 nifi-framework 里面找到,nifi-framework-api 在根工程下,但 nifi-framework 的實現在 nifi-nar-bundles 里面,也就是 nifi-framework-bundle:
nifi-framework 里面就是 nifi 的核心框架代碼了:
找到里面的 nifi-runtime , org.apache.nifi.NiFi
就在這個里面了:
可以看到 nifi 的命名非常規范,通過命名我們基本就可以找到對應的功能和源碼.
打開 NiFi ,看到 main 方法,非常簡單:
/**
* Main entry point of the application.
*
* @param args things which are ignored
*/
public static void main(String[] args) {
LOGGER.info("Launching NiFi...");
try {
NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
new NiFi(properties);
} catch (final Throwable t) {
LOGGER.error("Failure to launch NiFi due to " + t, t);
}
}
生成了一個 NiFi 實例.這個構造方法調用了另外一個構造方法:
下面這個就是 NiFi 的真正實例構造方法,里面進行了很多初始化的操作.
看這個方法源碼,也是直接先看注釋,不需要一行行去讀.先找到關鍵性的代碼:
final Bundle systemBundle = SystemBundle.create(properties);
// expand the nars
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
// load the extensions classloaders
NarClassLoaders narClassLoaders = NarClassLoaders.getInstance();
narClassLoaders.init(rootClassLoader,
properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
上面就是非常關鍵性的代碼,上面說過,NiFi 是一個 java 工具箱,里面包裝了很多 java 工具.從這里開始,我們就來探究這些 java 工具的本質.
這個 java 工具箱在 NiFi 在用戶看來其實就是安裝目錄下面的 lib 文件夾:
java 工具其實就是 lib 目錄下的一個個的 nar 包:
這些 nar 其實就是一種壓縮文件,我們可以把它解壓縮:
可以看到,里面的本質還是 jar 包:
可以得出結論, nar 包 其實就是 nifi 包裝了其他額外信息的 jar 包集合的壓縮包而已.所以 nar 包本質還是 jar 包,所以就回到了我們熟悉的領域了.看上面的源碼注釋.第一步就是先把這些 nar 包全部解開 ExtensionMapping
里面去.
然后加載並初始化這些 nar 包擴展的類加載器:
// load the extensions classloaders
NarClassLoaders narClassLoaders = NarClassLoaders.getInstance();
narClassLoaders.init(rootClassLoader,
properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
// load the framework classloader
final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
if (frameworkClassLoader == null) {
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
}
final Set<Bundle> narBundles = narClassLoaders.getBundles();
在 NiFi 的官方介紹中,有兩處它的特性介紹是擴展和類加載隔離,這里我們可以對它這兩個特性的實現一探究竟了.它為每一個 nar 包構造了一個獨立的自定義的類加載器: NarClassLoader
目前基本清晰, NiFi 的 擴展性是由自定義的壓縮文件 nar 包 和 自定義的類加載器來提供的. 接着往下看:
// load the server from the framework classloader
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader);
Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class);
final long startTime = System.nanoTime();
nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles);
nifiServer.setExtensionMapping(extensionMapping);
nifiServer.setBundles(systemBundle, narBundles);
看到這里回想到一開始的架構圖,圖中 JVM 的最上層是 web server , 這個 web server 就是在這里被加載了,可以看到,這是一個 jetty server ,接着往下看:
if (shutdown) {
LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
} else {
nifiServer.start();
if (bootstrapListener != null) {
bootstrapListener.sendStartedStatus(true);
}
final long duration = System.nanoTime() - startTime;
LOGGER.info("Controller initialization took " + duration + " nanoseconds "
+ "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds).");
}
start 這個 nifiServer ,這個 NiFi 對象的構造方法這里就全部走完了.
NiFi Web
上面的代碼走完之后, NiFi 實例化並沒有完全完成,我們先放一放,回頭理一下工程項目結構.上面的 NiFiServer 的 start() 方法之后,代碼就跳轉到了 nifi-framework 下面的另外一個子工程里面了,這個子工程就是 nifi-web :
可以看到, 跟 nifi web 相關的源碼都在這個子工程里面了,包括 nifi 的 server 代碼和界面代碼,而上面所說的 NiFiServer 這個類就在 nifi-jetty 工程里面:
接着看 JettyServer 這個類,上面的 NiFi 構造方法里面最后是先實例化了這個 JettyServer ,然后調用了 start 方法.先看它的構造方法,只看注釋,找到了核心方法:
// load wars from the bundle
Handler warHandlers = loadWars(bundles);
大致可以看到,其實就是把 war 包加載進來了,這些 war 包就是 nifi-web 下面的子工程,有幾個子工程的 pom 文件中配置的就是 <packaging>war</packaging>
接着看這個 start 方法:
第一句就是 ExtensionManager.discoverExtensions(systemBundle, bundles);
就是這里把所有的擴展類加載進 JVM 了, 看到看到 ExtensionManager
的注釋,這個注釋就說明了一切
Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
@ThreadSafe - is immutable
這個 ExtensionManager 在加載類的時候,用到了java 的一種比較高級的機制, java SPI(service provider interface),這種機制在很多框架中比如 spring 中大量使用
final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), bundle.getClassLoader());
有興趣的可以自己去查閱相關資料,這個機制解釋了為什么寫自定義的處理器的時候要在 /resources /META-INF/services 目錄下面寫上配置文件.在自定義處理開發的時候,一定要注意寫這個配置文件,否則類是加載不進來的(我第一次寫自定義處理器就跳坑了)
接着 start 這個 jetty server,接着往下看,只看注釋,可以看到,大致就是做了 server context 以及 filter 的注入工作了:
// ensure the appropriate wars deployed successfully before injecting the NiFi context and security filters // this must be done after starting the server (and ensuring there were no start up failures)
基本到這里, NiFi 的實例化和初始化流程基本就有個大致了解了.我們可以接着再進一步,看到 nifi-web-api 這個工程,這個工程其實就是 nifi 的 restful 接口工程,nifi 的所有 restful 接口都是這里實現的,包括處理器的新增,處理器的連接以及處理器的 start 等.
在里面隨便打開一個以 resource 結尾的類:
這里我們看到了非常熟悉的注解了,接着打開 resources 文件夾,看到了 context.xml 文件 :
原來這是一個 spring 的 web 工程.然后找到一個關鍵的 configuration 類:
這里基本就清楚了, NiFi 實例內所有的對象都是通過 spring ioc 注入的.
總結
現在為止,從開發角度對 NiFi 就有了一個基本的認識了,它是一個 JVM 應用,它通過獨立的類加載器加載類,使用 spring ioc 注入和管理對象.從以上的分析,我們了解到了 NiFi 的擴展性特性的大致實現,也了解了架構圖最上面的一部分源碼.至於它其他諸多特性的源碼和實現,則需要花更多的時間研究 nifi-framework-core 工程了.