Flink batch/stream編程套路


DataSet and DataStream 這里以WordCount為例,共同的編程套路如下所示: 1.獲取執行環境(execution environment) final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2.加載/創建初始數據集 // 讀取輸入數據
DataStream<String> text; if (params.has("input")) { // 讀取text文件
    text = env.readTextFile(params.get("input")); } else { System.out.println("Executing WordCount example with default input data set."); System.out.println("Use --input to specify file input."); // 讀取默認測試數據集
    text = env.fromElements(WordCountData.WORDS); } 3.對數據集進行各種轉換操作(生成新的數據集) DataStream<Tuple2<String, Integer>> counts =
                    // 切分每行單詞
                    text.flatMap(new Tokenizer()) //對每個單詞分組統計詞頻數
                    .keyBy(0).sum(1); 4.指定將計算的結果放到何處去 // 輸出統計結果
if (params.has("output")) { //寫入文件地址
    counts.writeAsText(params.get("output")); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); //數據打印控制台
 counts.print(); } 5.觸發APP執行 // 執行flink 程序
env.execute("Streaming WordCount");

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM