在Windows上安裝
- 從官方網站下載需要的二進制包
- 比如我下載的是flink-1.2.0-bin-hadoop2-scala_2.10.tgz,解壓后進入bin目錄
- 可以執行bat文件,也可以使用cygwin執行sh文件

- 然后可以在瀏覽器中輸入http://localhost:8081打開管理頁面

創建和執行wordcount示例程序
使用idea新建一個Maven工程
我這里使用Intellij IDEA進行開發
- 使用"new project"創建一個maven工程

- 指定示例程序的groupId和artifactId

- 指定示例程序的工程名和路徑

-
在pom.xml添加依賴關系,更新后IDEA會自動下載jar包至本地倉庫 (由於markdown解析問題,換成圖片)

-
創建一個wordcountexample類文件

示例程序解讀
- 基本同標准的Java程序類似,並且含有一個main()方法。每個程序基本由以下5個部分組成:
- 獲取一個
ExecutionEnvironment - 載入或者創建初始輸入數據
- 指定數據變換的方式
- 制定計算后的數據輸出位置
- 程序執行
- 對照上面的
WordCountExample:
-
獲取一個
ExecutionEnvironment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -
初始數據:
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
- 變換方式:
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
- 輸出方式
wordCounts.print();
- 程序執行
env.execute("Word Count Example");
本地執行
- 直接使用菜單欄上的
Build進行編譯,使用Run執行程序 - 若直接按照樣例執行,可能出現以下錯誤:
Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
- 參照此文,原因是print()方法自動會調用execute()方法,造成錯誤,所以注釋掉
env.execute()即可
上傳flink后台運行
- 首先build jar包,注意將META-INF目錄放在src/main/java/resource目錄下,否則可能出現找不到main class的問題
- 將jar包上傳至flink后台

- 點擊提交之后,可以將任務提交給后台執行,執行完成后可以看到執行統計信息。

--EOF--
