作者:LittleMagic
大數據領域 SQL 化開發的風潮方興未艾(所謂"Everybody knows SQL"),Flink 自然也不能“免俗”。Flink SQL 是 Flink 系統內部最高級別的 API,也是流批一體思想的集大成者。用戶可以通過簡單明了的 SQL 語句像查表一樣執行流任務或批任務,屏蔽了底層 DataStream/DataSet API 的復雜細節,降低了使用門檻。
但是,Flink SQL 的默認開發方式是通過 Java/Scala API 編寫,與純 SQL 化、平台化的目標相去甚遠。目前官方提供的 Flink SQL Client 僅能在配備 Flink 客戶端的本地使用,局限性很大。而 Ververica 開源的 Flink SQL Gateway 組件是基於 REST API 的,仍然需要二次開發才能供給上層使用,並不是很方便。
鑒於有很多企業都無法配備專門的團隊來解決 Flink SQL 平台化的問題,那么到底有沒有一個開源的、開箱即用的、功能相對完善的組件呢?答案就是本文的主角——Apache Zeppelin。
Flink SQL on Zeppelin!
Zeppelin 是基於 Web 的交互式數據分析筆記本,支持 SQL、Scala、Python 等語言。Zeppelin 通過插件化的 Interpreter(解釋器)來解析用戶提交的代碼,並將其轉化到對應的后端(計算框架、數據庫等)執行,靈活性很高。其架構簡圖如下所示。
Flink Interpreter 就是 Zeppelin 原生支持的眾多 Interpreters 之一。只要配置好 Flink Interpreter 以及相關的執行環境,我們就可以將 Zeppelin 用作 Flink SQL 作業的開發平台了(當然,Scala 和 Python 也是沒問題的)。接下來本文就逐步介紹 Flink on Zeppelin 的集成方法。
配置 Zeppelin
目前 Zeppelin 的最新版本是 0.9.0-preview2,可以在官網下載包含所有 Interpreters 的 zeppelin-0.9.0-preview2-bin-all.tgz,並解壓到服務器的合適位置。
接下來進入 conf 目錄。將環境配置文件 zeppelin-env.sh.template 更名為 zeppelin-env.sh,並修改:
# JDK目錄
export JAVA_HOME=/opt/jdk1.8.0_172
# 方便之后配置Interpreter on YARN模式。注意必須安裝Hadoop,且hadoop必須配置在系統環境變量PATH中
export USE_HADOOP=true
# Hadoop配置文件目錄
export HADOOP_CONF_DIR=/etc/hadoop/hadoop-conf
將服務配置文件 zeppelin-site.xml.template 更名為 zeppelin-site.xml,並修改:
<!-- 服務地址。默認為127.0.0.1,改為0.0.0.0使得可以在外部訪問 -->
<property>
<name>zeppelin.server.addr</name>
<value>0.0.0.0</value>
<description>Server binding address</description>
</property>
<!-- 服務端口。默認為8080,如果已占用,可以修改之 -->
<property>
<name>zeppelin.server.port</name>
<value>18080</value>
<description>Server port.</description>
</property>
最基礎的配置就完成了。運行 bin/zeppelin-daemon.sh start 命令,返回 Zeppelin start [ OK ]的提示之后,訪問<服務器地址>:18080,出現下面的頁面,就表示 Zeppelin 服務啟動成功。
當然,為了一步到位適應生產環境,也可以適當修改 zeppelin-site.xml 中的以下參數:
<!-- 將Notebook repo更改為HDFS存儲 -->
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo</value>
<description>Hadoop compatible file system notebook persistence layer implementation, such as local file system, hdfs, azure wasb, s3 and etc.</description>
</property>
<!-- Notebook在HDFS上的存儲路徑 -->
<property>
<name>zeppelin.notebook.dir</name>
<value>/zeppelin/notebook</value>
<description>path or URI for notebook persist</description>
</property>
<!-- 啟用Zeppelin的恢復功能。當Zeppelin服務掛掉並重啟之后,能連接到原來運行的Interpreter -->
<property>
<name>zeppelin.recovery.storage.class</name>
<value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
<description>ReoveryStorage implementation based on hadoop FileSystem</description>
</property>
<!-- Zeppelin恢復元數據在HDFS上的存儲路徑 -->
<property>
<name>zeppelin.recovery.dir</name>
<value>/zeppelin/recovery</value>
<description>Location where recovery metadata is stored</description>
</property>
<!-- 禁止使用匿名用戶 -->
<property>
<name>zeppelin.anonymous.allowed</name>
<value>true</value>
<description>Anonymous user allowed by default</description>
</property>
Zeppelin 集成了 Shiro 實現權限管理。禁止使用匿名用戶之后,可以在 conf 目錄下的 shiro.ini 中配置用戶名、密碼、角色等,不再贅述。注意每次修改配置都需要運行 bin/zeppelin-daemon.sh restart 重啟 Zeppelin 服務。
配置 Flink Interpreter on YARN
在使用 Flink Interpreter 之前,我們有必要對它進行配置,使 Flink 作業和 Interpreter 本身在 YARN 環境中運行。
點擊首頁用戶名區域菜單中的 Interpreter 項(上一節圖中已經示出),搜索 Flink,就可以看到參數列表。
Interpreter Binding
首先,將 Interpreter Binding 模式修改為 Isolated per Note,如下圖所示。
在這種模式下,每個 Note 在執行時會分別啟動 Interpreter 進程,類似於 Flink on YARN 的 Per-job 模式,最符合生產環境的需要。
Flink on YARN 參數
以下是需要修改的部分基礎參數。注意這些參數也可以在 Note 中指定,每個作業自己的配置會覆蓋掉這里的默認配置。
- FLINK_HOME:Flink 1.11所在的目錄;
- HADOOP_CONF_DIR:Hadoop 配置文件所在的目錄;
- flink.execution.mode:Flink 作業的執行模式,指定為 YARN 以啟用 Flink on YARN;
- flink.jm.memory:JobManager 的內存量(MB);
- flink.tm.memory:TaskManager 的內存量(MB);
- flink.tm.slot:TaskManager 的 Slot 數;
- flink.yarn.appName:YARN Application 的默認名稱;
- flink.yarn.queue:提交作業的默認 YARN 隊列。
Hive Integration 參數
如果我們想訪問 Hive 數據,以及用 HiveCatalog 管理 Flink SQL 的元數據,還需要配置與 Hive 的集成。
- HIVE_CONF_DIR:Hive 配置文件(hive-site.xml)所在的目錄;
- zeppelin.flink.enableHive:設為 true 以啟用 Hive Integration;
- zeppelin.flink.hive.version:Hive 版本號。
- 復制與 Hive Integration 相關的依賴到 $FLINK_HOME/lib 目錄下,包括:
- flink-connector-hive_2.11-1.11.0.jar
- flink-hadoop-compatibility_2.11-1.11.0.jar
- hive-exec-..jar
- 如果 Hive 版本是1.x,還需要額外加入 hive-metastore-1.*.jar、libfb303-0.9.2.jar 和 libthrift-0.9.2.jar
- 保證 Hive 元數據服務(Metastore)啟動。注意不能是 Embedded 模式,即必須以外部數據庫(MySQL、Postgres等)作為元數據存儲。
Interpreter on YARN 參數
在默認情況下,Interpreter 進程是在部署 Zeppelin 服務的節點上啟動的。隨着提交的任務越來越多,就會出現單點問題。因此我們需要讓 Interpreter 也在 YARN 上運行,如下圖所示。
- zeppelin.interpreter.yarn.resource.cores:Interpreter Container 占用的vCore 數量;
- zeppelin.interpreter.yarn.resource.memory:Interpreter Container 占用的內存量(MB);
- zeppelin.interpreter.yarn.queue:Interpreter 所處的 YARN 隊列名稱。
配置完成之后,Flink on Zeppelin 集成完畢,可以測試一下了。
測試 Flink SQL on Zeppelin
創建一個 Note,Interpreter 指定為 Flink。然后寫入第一個 Paragraph:
以 %flink.conf 標記的 Paragraph 用於指定這個 Note 中的作業配置,支持 Flink 的所有配置參數(參見 Flink 官網)。另外,flink.execution.packages 參數支持以 Maven GAV 坐標的方式引入外部依賴項。
接下來創建第二個 Paragraph,創建 Kafka 流表:
%flink.ssql 表示利用 StreamTableEnvironment 執行流處理 SQL,相對地,%flink.bsql 表示利用 BatchTableEnvironment 執行批處理 SQL。注意表參數中的 properties.bootstrap.servers 利用了 Zeppelin Credentials 來填寫,方便不同作業之間復用。
執行上述 SQL 之后會輸出信息:
同時在 Hive 中可以看到該表的元數據。
最后寫第三個 Paragraph,從流表中查詢,並實時展現出來:
點擊右上角的 FLINK JOB 標記,可以打開作業的 Web UI。上述作業的 JobGraph 如下。
除 SELECT 查詢外,通過 Zeppelin 也可以執行 INSERT 查詢,實現更加豐富的功能。關於 Flink SQL on Zeppelin 的更多應用,筆者在今后的文章中會繼續講解。
原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。