需求一:通過socket實時產生單詞,使用flink實時接收數據,對指定時間窗口內(例如:2秒)的數據進行聚合統計,並把時間窗口內計算的結果打印出來。
Flink程序開發步驟:
1、獲得一個執行環境
2、加載/創建 初始化數據
3、指定操作數據的transaction算子
4、指定把計算好的數據放在哪里
5、調用execute()觸發執行程序
注意:Flink程序是延遲計算的,只有最后調用execute()方法的時候才會真正出發執行程序。
延遲計算的好處是:你可以開發復雜的程序,但是Flink可以將復雜的程序轉成一個Plan,將Plan作為一個整體單元執行!
實現代碼:
Java版本:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 滑動窗口計算 * * 通過socket模擬產生單詞數據 * flink對數據進行統計計算 * * 需要實現每隔1秒對最近2秒內的數據進行匯總計算 * */ public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception{ //獲取需要的端口號 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9000--java"); port = 9000; } //獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "hadoop100"; String delimiter = "\n"; //連接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); // a a c // a 1 // a 1 // c 1 DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒 .sum("count");//在這里使用sum或者reduce都可以 /*.reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word,a.count+b.count); } })*/ //把數據打印到控制台並且設置並行度 windowCounts.print().setParallelism(1); //這一行代碼一定要實現,否則程序不執行 env.execute("Socket window count"); } public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word,long count){ this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
Scala版本:
后續補充......
轉載請注明地址: https://www.cnblogs.com/wynjauu/articles/10542807.html