Flink實戰(1) - Apache Flink安裝和示例程序的執行


在Windows上安裝

  • 官方網站下載需要的二進制包
  • 比如我下載的是flink-1.2.0-bin-hadoop2-scala_2.10.tgz,解壓后進入bin目錄
  • 可以執行bat文件,也可以使用cygwin執行sh文件

安裝執行

安裝執行

創建和執行wordcount示例程序

使用idea新建一個Maven工程

我這里使用Intellij IDEA進行開發

  • 使用"new project"創建一個maven工程

Maven01

  • 指定示例程序的groupId和artifactId

Maven02

  • 指定示例程序的工程名和路徑

Maven03

  • 在pom.xml添加依賴關系,更新后IDEA會自動下載jar包至本地倉庫 (由於markdown解析問題,換成圖片)
    Maven03

  • 創建一個wordcountexample類文件
    Maven04

示例程序解讀

  • 基本同標准的Java程序類似,並且含有一個main()方法。每個程序基本由以下5個部分組成:
  • 獲取一個ExecutionEnvironment
  • 載入或者創建初始輸入數據
  • 指定數據變換的方式
  • 制定計算后的數據輸出位置
  • 程序執行
  • 對照上面的WordCountExample
  • 獲取一個ExecutionEnvironment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  • 初始數據:

        DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");
  • 變換方式:
        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .groupBy(0)
                .sum(1);
  • 輸出方式
        wordCounts.print();
  • 程序執行
        env.execute("Word Count Example");

本地執行

  • 直接使用菜單欄上的Build進行編譯,使用Run執行程序
  • 若直接按照樣例執行,可能出現以下錯誤:
Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
  • 參照此文,原因是print()方法自動會調用execute()方法,造成錯誤,所以注釋掉env.execute()即可

上傳flink后台運行

  • 首先build jar包,注意將META-INF目錄放在src/main/java/resource目錄下,否則可能出現找不到main class的問題
  • 將jar包上傳至flink后台
    上傳jar包
  • 點擊提交之后,可以將任務提交給后台執行,執行完成后可以看到執行統計信息。
    執行結果

--EOF--


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM