Flink 案例分析


Flink程序的執行過程

no-desc 說明 詳情
1-env 獲取flink的執行環境

批處理:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

流處理:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2-source 加載數據 1) socketTextStream – 讀取Socket 數據流
​2) readTextFile() – 逐行讀取文本文件獲取數據流,每行都返回字符串
3) fromCollection() – 從集合中創建數據流
​4) fromElements() – 從給定的數據對象創建數據流,所有數據類型要一致
​5) addSource() – 添加新的源函數,例如從kafka 中讀取數據,參見讀取kafka 數據案例
3-transformation 對加載的數據進行轉換  
4-sink 對結果進行保存或者打印 1) writeAsText() – 以字符串的形式逐行寫入文件,調用每個元素的toString()得到寫入的字符串
2) writeAsCsv() – 將元組寫出以逗號分隔的csv 文件。注意:只能作用到元組數據上
​3) print() – 控制台直接輸出結果,調用對象的toString()方法得到輸出結果。
​4) addSink() – 自定義接收函數。例如將結果保存到kafka 中,參見kafka 案例
5-execute 觸發flink程序的執行 代碼流程必須符合 source ->transformation -> sink transformation 都是執行,需要最后使用env.execute()或者使用 print(),count(),collect() 觸發執行

注意

Flink編程不是基於K,V格式的編程,通過某些方式來指定虛擬key

Flink中的tuple最多支持25個元素,每個元素是從0開始

算子

中間處理、轉換的環節是通過不同的算子完成的。

算子將一個或多個DataStream轉換為新的DataStream

轉型 描述
Map
DataStream→DataStream

采用一個數據元並生成一個數據元。一個map函數,它將輸入流的值加倍:

DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } });
FlatMap
DataStream→DataStream

采用一個數據元並生成零個,一個或多個數據元。將句子分割為單詞的flatmap函數:

dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
Filter
DataStream→DataStream

計算每個數據元的布爾函數,並保存函數返回true的數據元。過濾掉零值的過濾器:

dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); 
KeyBy
DataStream→KeyedStream

邏輯上將流分區為不相交的分區。具有相同Keys的所有記錄都分配給同一分區。在內部,keyBy()是使用散列分區實現的。指定鍵有不同的方法

此轉換返回KeyedStream,其中包括使用被Keys化狀態所需KeyedStream

dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple 

注意 如果出現以下情況,則類型不能成為關鍵

  1. 它是POJO類型但不覆蓋hashCode()方法並依賴於Object.hashCode()實現。
  2. 它是任何類型的數組。
Reduce
KeyedStream→DataStream

被Keys化數據流上的“滾動”Reduce。將當前數據元與最后一個Reduce的值組合並發出新值。

reduce函數,用於創建部分和的流:

keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });

案例1: 元素處理

env: 批

Source:fromElements

Sink:print

算子:Map

public class MapTest {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Integer> dataSet = env.fromElements(1, 2, -3, 0, 5, -9, 8);
        DataSet<Integer> dataSet2 = dataSet.map(new Tokenizer());
//        DataSet<Integer> dataSet2 = dataSet.map(i->i * 2);
        dataSet2.print();
    }

    public static class Tokenizer implements MapFunction<Integer, Integer> {
        @Override
        public Integer map(Integer in) {
            return in * 2;
        }
    }
}

案例2: 詞頻統計

env: 批

Source:readTextFile 

Sink:writeAsCsv

算子:Map

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> dataSet = env.readTextFile("/yourpath/in.txt");

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                dataSet.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        String outputPath = "/yourpath/out.txt";
        counts.writeAsCsv(outputPath, "\n", " ");
        env.execute("myflink");
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.split(" ");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

案例3:數據流匯總

env: 流

Source:addSource

Sink:print

算子:keyBy、Reduce

public class ReduceTest {
    private static final Logger LOG = LoggerFactory.getLogger(ReduceTest.class);
    private static final String[] TYPE = {"蘋果", "梨", "西瓜", "葡萄", "火龍果"};

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //添加自定義數據源,每秒發出一筆訂單信息{商品名稱,商品數量}
        DataStreamSource<Tuple2<String, Integer>> orderSource = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();

            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(Tuple2.of(TYPE[random.nextInt(TYPE.length)], 1));
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }

        }, "order-info");

        orderSource.keyBy(0)
                //將上一元素與當前元素相加后,返回給下一元素處理
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
                            throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                })
                .print();

        env.execute("Flink Streaming Java API Skeleton");
    }
}

 

Source:readTextFile 

Sink:writeAsCsv

算子:Map

參考

https://blog.csdn.net/qq_40929921/article/details/99603150

https://flink.sojb.cn/dev/stream/operators/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM