Flink 案例整合


1.概述

  Flink 1.1.0 版本已經在官方發布了,官方博客於 2016-08-08 更新了 Flink 1.1.0 的變動。在這 Flink 版本的發布,添加了 SQL 語法這一特性。這對於業務場景復雜,依賴於 SQL 來分析統計數據,算得上是一個不錯的福利。加上之前有同學和朋友郵件中提到,Flink 官方給的示例運行有困難,能否整合一下 Flink 的案例。筆者通過本篇博客來解答一下相關疑問。

2.內容

2.1 集群部署

  首先,集群的部署需要 JDK 環境。下載 JDK 以及配置 JAVA_HOME 環境,這里就不詳述了,比較簡單。然后,我們去下載 Flink 1.1.0 的安裝包,進入到下載頁面,如下圖所示:

  這里需要注意的是,Flink 集群的部署,本身不依賴 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存儲數據,就需要選擇對應的 Hadoop 版本。大家可以根據 Hadoop 集群的版本,選擇相應的 Flink 版本下載。

  下載好 Flink 1.1.0 后,按以下步驟進行:

  • 解壓 Flink 安裝包到 Master 節點
tar xzf flink-*.tgz
cd flink-*
  • 配置 Master 和 Slaves
vi $FLINK_HOME/conf/master
vi $FLINK_HOME/conf/slaves
  • 分發
scp -r flink-1.1.0 hadoop@dn2:/opt/soft/flink
scp -r flink-1.1.0 hadoop@dn3:/opt/soft/flink

  這里只用了2個 slave 節點。另外,在 flink-conf.yaml 文件中,可以按需配置,較為簡單。就不多贅述了。

  • 啟動集群
bin/start-cluster.sh

  注意,這里沒有使用 YARN 來啟動集群,若是需要使用 YARN 啟動集群,可以參考官方文檔進行啟動。地址

  Flink 集群啟動后,系統有一個 WebUI 監控界面,如下圖所示:

2.2 案例

  這里,我們使用 Flink SQL 的 API 來運行一個場景,對一個銷售表做一個聚合計算。這里,筆者將實現代碼進行了分解,首先是獲取操作 Flink 系統的對象,如下所示:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

  接着是讀取數據源,並注冊為表,如下所示:

CsvTableSource csvTableSource = new CsvTableSource(inPath, new String[] { "trans_id", "part_dt", "lstg_format_name", "leaf_categ_id", "lstg_site_id", "slr_segment_cd", "price", "item_count", "seller_id" },
                    new TypeInformation<?>[] { Types.LONG(), Types.STRING(), Types.STRING(), Types.LONG(), Types.INT(), Types.INT(), Types.FLOAT(), Types.LONG(), Types.LONG() });
tableEnv.registerTableSource("user", csvTableSource);
Table tab = tableEnv.scan("user");

  這里 inPath 使用了 HDFS 上的數據路徑。類型可以在 Hive 中使用 desc 命令查看該表的類型。然后,將“表”轉化為數據集,如下所示:

DataSet<KylinSalesDomain> ds = tableEnv.toDataSet(tab, KylinSalesDomain.class);

tableEnv.registerDataSet("user2", ds, "trans_id,part_dt,lstg_format_name,leaf_categ_id,lstg_site_id,slr_segment_cd,price,item_count,seller_id");

Table result = tableEnv.sql("SELECT lstg_format_name as username,SUM(FLOOR(price)) as total FROM user2 group by lstg_format_name");

  最后,對結果進行存儲,這里筆者將結果存在了 HDFS 上。如下所示:

TableSink<?> sink = new CsvTableSink(outPath, "|");
            
result.writeToSink(sink);

env.setParallelism(1);
env.execute("Flink Sales SUM");

  注意,這里並發數是可以設置的,通過 setParallelism 方法來設置並發數。

  完整示例,如下所示:

try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

            CsvTableSource csvTableSource = new CsvTableSource(args[0], new String[] { "trans_id", "part_dt", "lstg_format_name", "leaf_categ_id", "lstg_site_id", "slr_segment_cd", "price", "item_count", "seller_id" },
                    new TypeInformation<?>[] { Types.LONG(), Types.STRING(), Types.STRING(), Types.LONG(), Types.INT(), Types.INT(), Types.FLOAT(), Types.LONG(), Types.LONG() });
            tableEnv.registerTableSource("user", csvTableSource);
            Table tab = tableEnv.scan("user");

            DataSet<KylinSalesDomain> ds = tableEnv.toDataSet(tab, KylinSalesDomain.class);

            tableEnv.registerDataSet("user2", ds, "trans_id,part_dt,lstg_format_name,leaf_categ_id,lstg_site_id,slr_segment_cd,price,item_count,seller_id");

            Table result = tableEnv.sql("SELECT lstg_format_name as username,SUM(FLOOR(price)) as total FROM user2 group by lstg_format_name");

            TableSink<?> sink = new CsvTableSink(args[1], "|");
            // write the result Table to the TableSink
            result.writeToSink(sink);

            // execute the program
            env.setParallelism(1);
            env.execute("Flink Sales SUM");
        } catch (Exception e) {
            e.printStackTrace();
        }

  最后,我們將應用提交到 Flink 集群。如下所示:

flink run flink_sales_sum.jar hdfs://master:8020/user/hive/warehouse/kylin_sales/DEFAULT.KYLIN_SALES.csv hdfs://master:8020/tmp/result3

3.Hive 對比

  同樣的語句,在 Hive 下運行之后,與在 Flink 集群下運行之后,結果如下所示:

  • Hive 運行結果:

  • Flink 運行結果:

 

  通過 WebUI 監控界面觀察,任務在 Flink 集群中運行所花費的時間在 2s 以內。其運行速度是比較具有誘惑力的。

4.總結

  總體來說,Flink 集群的部署較為簡單,其 SQL 的 API 編寫需要對官方的文檔比較熟悉,需要注意的是,在本地運行 Flink 代碼,若是要讀取遠程 HDFS 文件,那么獲取 Flink 對象操作環境,需要采用遠程接口(HOST & PORT),或者在本地部署一個開發集群環境,將遠程數據源提交到本地 Flink 集群環境運行。若是,讀取本地文件,則不需要。其中的原因是當你以集群的方式運行,Flink 會檢查本地是否有 Flink 集群環境存在,如若不存在,則會出現遠程數據源(如:HDFS 路徑地址無法解析等錯誤)。

5.結束語

  這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM