Flink程序的執行過程
no-desc | 說明 | 詳情 |
1-env | 獲取flink的執行環境 | 批處理:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 流處理:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
2-source | 加載數據 | 1) socketTextStream – 讀取Socket 數據流 2) readTextFile() – 逐行讀取文本文件獲取數據流,每行都返回字符串 3) fromCollection() – 從集合中創建數據流 4) fromElements() – 從給定的數據對象創建數據流,所有數據類型要一致 5) addSource() – 添加新的源函數,例如從kafka 中讀取數據,參見讀取kafka 數據案例 |
3-transformation | 對加載的數據進行轉換 | |
4-sink | 對結果進行保存或者打印 | 1) writeAsText() – 以字符串的形式逐行寫入文件,調用每個元素的toString()得到寫入的字符串 2) writeAsCsv() – 將元組寫出以逗號分隔的csv 文件。注意:只能作用到元組數據上 3) print() – 控制台直接輸出結果,調用對象的toString()方法得到輸出結果。 4) addSink() – 自定義接收函數。例如將結果保存到kafka 中,參見kafka 案例 |
5-execute | 觸發flink程序的執行 | 代碼流程必須符合 source ->transformation -> sink transformation 都是懶執行,需要最后使用env.execute()或者使用 print(),count(),collect() 觸發執行 |
注意
Flink編程不是基於K,V格式的編程,通過某些方式來指定虛擬key
Flink中的tuple最多支持25個元素,每個元素是從0開始
算子
中間處理、轉換的環節是通過不同的算子完成的。
算子將一個或多個DataStream轉換為新的DataStream
轉型 | 描述 |
---|---|
Map DataStream→DataStream |
采用一個數據元並生成一個數據元。一個map函數,它將輸入流的值加倍: |
FlatMap DataStream→DataStream |
采用一個數據元並生成零個,一個或多個數據元。將句子分割為單詞的flatmap函數: |
Filter DataStream→DataStream |
計算每個數據元的布爾函數,並保存函數返回true的數據元。過濾掉零值的過濾器: |
KeyBy DataStream→KeyedStream |
邏輯上將流分區為不相交的分區。具有相同Keys的所有記錄都分配給同一分區。在內部,keyBy()是使用散列分區實現的。指定鍵有不同的方法。 此轉換返回KeyedStream,其中包括使用被Keys化狀態所需的KeyedStream。 注意 如果出現以下情況,則類型不能成為關鍵:
|
Reduce KeyedStream→DataStream |
被Keys化數據流上的“滾動”Reduce。將當前數據元與最后一個Reduce的值組合並發出新值。 reduce函數,用於創建部分和的流: |
案例1: 元素處理
env: 批
Source:fromElements
Sink:print
算子:Map
public class MapTest { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> dataSet = env.fromElements(1, 2, -3, 0, 5, -9, 8); DataSet<Integer> dataSet2 = dataSet.map(new Tokenizer()); // DataSet<Integer> dataSet2 = dataSet.map(i->i * 2); dataSet2.print(); } public static class Tokenizer implements MapFunction<Integer, Integer> { @Override public Integer map(Integer in) { return in * 2; } } }
案例2: 詞頻統計
env: 批
Source:readTextFile
Sink:writeAsCsv
算子:Map
public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dataSet = env.readTextFile("/yourpath/in.txt"); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) dataSet.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); String outputPath = "/yourpath/out.txt"; counts.writeAsCsv(outputPath, "\n", " "); env.execute("myflink"); } public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.split(" "); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
案例3:數據流匯總
env: 流
Source:addSource
Sink:print
算子:keyBy、Reduce
public class ReduceTest { private static final Logger LOG = LoggerFactory.getLogger(ReduceTest.class); private static final String[] TYPE = {"蘋果", "梨", "西瓜", "葡萄", "火龍果"}; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //添加自定義數據源,每秒發出一筆訂單信息{商品名稱,商品數量} DataStreamSource<Tuple2<String, Integer>> orderSource = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { private volatile boolean isRunning = true; private final Random random = new Random(); @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (isRunning) { TimeUnit.SECONDS.sleep(1); ctx.collect(Tuple2.of(TYPE[random.nextInt(TYPE.length)], 1)); } } @Override public void cancel() { isRunning = false; } }, "order-info"); orderSource.keyBy(0) //將上一元素與當前元素相加后,返回給下一元素處理 .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .print(); env.execute("Flink Streaming Java API Skeleton"); } }
Source:readTextFile
Sink:writeAsCsv
算子:Map
參考
https://blog.csdn.net/qq_40929921/article/details/99603150
https://flink.sojb.cn/dev/stream/operators/