Apache Flink - 基本API概念


  • Flink程序是實現分布式集合轉換的常規程序。集合最初是從源創建的。通過接收器(slink)返回結果,接收器可以將數據寫到某個文件或stdout。Flink可以在各種環境(context)中運行,本地JVM或集群。

1.數據集和數據流

  • Flink用特殊的類DataSet and DataStream來表示程序中的數據。可以認為他們是可以包含重復數據的不可變數據集合。在DataSet中數據是有限的,而在DataStream中數據是無限的。
  • 這些集合不同於java里的集合,他們是不可變的,一旦被創造就不能改動,也不能簡單的抽查里面的元素。
  • 最初的集合是通過在Flink程序里添加一個源被創造的,新的集合是使用API方法(如mapfilter)通過轉換得到的。

2.剖析一個Flink程序

  • 每個程序包含相同的基本部分:
  1. 獲得一個執行環境(execution environment).
  2. 加載/創建初始數據。
  3. 指定轉換這些數據。
  4. 指定放置計算結果的位置。
  5. 觸發程序執行。
  • StreamExecutionEnvironment是所有Flink程序的基礎。可以通過以下靜態方法獲得:
    getExecutionEnvironment()
    
    createLocalEnvironment()
    
    createRemoteEnvironment(String host, int port, String... jarFiles)

    通常只需要使用getExecutionEnvironment()方法,因為這將根據環境做出正確的事:如果你執行你的程序在IDE上或着作為一個普通Java程序,它將創建一個本地環境,將在本地機器上執行程序。如果您從您的程序創建了一個JAR文件,並通過命令行調用它,Flink集群管理者將執行你的main方法並且getExecutionEnvironment()將返回一個在一個集群上執行程序的執行環境。

  • 用於指定數據源,執行環境有幾個方法來從文件讀取:你可以逐行閱讀,像CSV文件,或者使用完全自定義數據輸入格式。要讀取一個文本文件的順序,您可以使用:
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStream<String> text = env.readTextFile("file:///path/to/file");

     這將給你一個數據流,然后,您可以通過轉換創建新的派生數據流。

  • 你可以通過調用DataStream數據轉換方法轉換。例如,一個map轉換看起來像這樣:
    DataStream<String> input = ...;
    
    DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
        @Override
        public Integer map(String value) {
            return Integer.parseInt(value);
        }
    });

    通過將原始集合中的每個字符串轉換為整數,這將創建一個新的數據流。

  • 一旦你有了一個包含你的最終結果的數據流,通過創建一個sink可以把它寫一個外部系統。這些是一些創建一個sink的的方法示例:
    writeAsText(String path)
    
    print()

     

  • 一旦你指定完整程序,你需要通過調用StreamExecutionEnvironment上的execute()去觸發程序執行。根據ExecutionEnvironment的類型,執行將會被觸發在你的本地機器上或提交程序在集群上執行。這個execute()方法返回一個JobExecutionResult,它包含執行時間和累加結果。

3.延遲計算

  • 所有Flink程序都是延遲執行的:當一個程序的main方法被執行時,數據加載和轉換沒有被立刻發生,相反,每個操作被創造,添加到該程序的計划里。當執行明確地被execute() 觸發時這些操作被真正執行。

4.指定的keys

  • 一些轉換(join, coGroup, keyBy, groupBy)要求的集合上定義的一個key。其他的轉換(Reduce, GroupReduce, Aggregate, Windows)允許數據在被應用前通過一個key被分組。
  • 一個DataSet被分組:
    DataSet<...> input = // [...]
    DataSet<...> reduced = input
      .groupBy(/*define key here*/)
      .reduceGroup(/*do something*/);

    一個key在DataStream中被指定:

    DataStream<...> input = // [...]
    DataStream<...> windowed = input
      .keyBy(/*define key here*/)
      .window(/*window specification*/);

    Flink的數據模型不是基於 鍵值( key-value)對的。keys是虛擬的:他們被定義為實際數據的函數來引導分組操作符。

5.指定轉換函數

  • 許多轉換需要用戶自定義函數。
  • 實現一個接口:
    class MyMapFunction implements MapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };
    data.map(new MyMapFunction());

     

  • 匿名類:
    data.map(new MapFunction<String, Integer> () {
      public Integer map(String value) { return Integer.parseInt(value); }
    });

     

  • 需要用戶定義函數的所有轉換都可以將rich函數作為參數。相比:
    class MyMapFunction implements MapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };

    可以寫成:

    class MyMapFunction extends RichMapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };

    像往常一樣傳遞函數到map轉換:

    data.map(new MyMapFunction());

    rich函數也可以被定義為一個匿名類:

    data.map (new RichMapFunction<String, Integer>() {
      public Integer map(String value) { return Integer.parseInt(value); }
    });

    rich函數除了提供像map一樣的用戶自定義功能外,還提供四個方法:openclosegetRuntimeContext, and setRuntimeContext。這些是有用的對於向函數傳遞參數,創建並最終確定本地狀態,訪問廣播變量,和用於訪問運行時信息,如累加器和計數器,和信息迭代。

 6.支持的數據類型

  1. Java Tuples and Scala Case Classes:java元組和Scala case類    元組是包含固定數量的含有各種類型的字段的復合類型。
    DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
        new Tuple2<String, Integer>("hello", 1),
        new Tuple2<String, Integer>("world", 2));
    
    wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
        @Override
        public Integer map(Tuple2<String, Integer> value) throws Exception {
            return value.f1;
        }
    });
    
    wordCounts.keyBy(0); // also valid .keyBy("f0")

     

  2. java POJOs:Java和Scala類被Flink作為一種特殊的POJO數據類型對待,並且類必須是public的,它必須有一個沒有參數的公共構造函數,所有的字段是公共或必須通過getter和setter函數來訪問,對於一個被叫做foo的字段,getter和setter函數必須命名為getFoo()setFoo()。
    public class WordWithCount {
    
        public String word;
        public int count;
    
        public WordWithCount() {}
    
        public WordWithCount(String word, int count) {
            this.word = word;
            this.count = count;
        }
    }
    
    DataStream<WordWithCount> wordCounts = env.fromElements(
        new WordWithCount("hello", 1),
        new WordWithCount("world", 2));
    
    wordCounts.keyBy("word"); // key by field expression "word"

     

  3. Primitive Types   基本數據類型   Flink支持java和Scala基本數據類型。
  4. General class types  一般類類型   Flink支持許多Java和Scala類(API和自定義)。所有不確定為POJO類類型(參見上面的POJO需求)是Flink一般類類型。Flink將這些數據類型視為黑盒,無法訪問其內容。
  5. Value    值類型通過實現org.apache.flinktypes.Value接口的讀和寫方法為那些操作提供定制代碼。當通用串行化是效率無效率的時候使用值類型是合理的。一個例子是一個數據類型實現稀疏向量元素的數組。知道數組大多是零,一個可以用一個特殊的編碼輸出非零元素,而通用串行化只會寫所有的數組元素。    Flink用預定義的值類型,對應於基本數據類型。 (ByteValueShortValueIntValueLongValueFloatValueDoubleValueStringValueCharValueBooleanValue)。這些值類型作為可變的基本數據類型的變量,其價值可以更改,允許程序員重用對象和減輕垃圾收集器的壓力。
  6. Hadoop Writables
  7. Special Types

7.Accumulators & Counters

  • Accumulators有簡單的結構包括添加操作和工作結束后可用的最后累積的結果。
  • 最簡單的accumulator是counter:你可以用Accumulator.add(V value)方法使他自己增量。工作結束后Flink將合並所有部分結果並將結果發送給客戶端。Accumulators在調試或你想很快知道關於你的數據更多的時候是有用的。
  • Flink有以下內置的Accumulators,每一個都實現了Accumulator接口:IntCounter,LongCounter, and IDoubleCounter。
  • 如何使用accumulators:                                                                                                                                                        首先你應該你想使用accumulator在用戶定義的轉換函數處必須創建一個accumulator對象(這里是一個計數器)。
    private IntCounter numLines = new IntCounter();

    第二,你必須注冊accumulator對象,通常在rich函數的open()方法。在這里你也可以定義名字。

    getRuntimeContext().addAccumulator("num-lines", this.numLines);

    您現在可以在運算符函數任何地方使用這個accumulator,包括open()和close()方法。

    this.numLines.add(1);

    總的結果將被存儲在從執行環境的execute()方法返回的JobExecutionResult對象中。

    myJobExecutionResult.getAccumulatorResult("num-lines")

    每個job中所有的accumulator共享一個命名空間,因此,您可以使用相同的accumulator在你的job的不同運算符函數中。Flink將內部合並(合)並具有相同名稱的accumulator。

  • 一般的,accumulators的結果只有在這個job結束之后才是可用的。
  • Accumulator<V,R>是最靈活的:它定義了一個類型V用於值的增加,和最終結果類型R用於最終結果。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM