Apache Flink -Streaming(DataStream API)


綜述:

  • 在Flink中DataStream程序是在數據流上實現了轉換的常規程序。

1.示范程序

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}
nc -lk 9999

2.數據源

  • 程序從源讀取輸入。可以通過StreamExecutionEnvironment.addSource(sourceFunction)給程序附上源。
  • 在StreamExecutionEnvironment中有一些可訪問的預定義的流數據源:                                                                              readTextFile(path)   逐行作為字符串讀取文本文件                                                                                                              readFile(fileInputFormat, path)    通過指定的文件輸入格式(the specified file input format)讀取文件                                    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)     這是一個被前兩個方法內部調用的方法。它基於給定fileInputFormat在path下讀取文件,根據提供的watchType,這個源會定期監測(每 interval ms)新數據的路徑。
  • 基於套接字的  socketTextStream   從套接字讀取。元素可以由一個分隔符分開。
  • 基於集合的                                                                                                                                                                              fromCollection(Collection)     從Java Java.util.Collection創建一個數據流,集合中的所有元素必須是相同類型的。            fromCollection(Iterator, Class)     從一個迭代器創建一個數據流,類指定迭代器返回的元素的數據類型。                          fromElements(T ...)      從給定的對象的序列創建一個數據流,所有對象必須是相同類型的。                                            fromParallelCollection(SplittableIterator, Class)   在並行執行中,從一個迭代器創建一個數據流,類指定迭代器返回的元素的數據類型。                                                                                                                                                                      generateSequence(from, to)       在給定的時間間隔內,生成的數字序列,並行執行。
  • 自定義的          addSource    附上一個新的源函數。例如要從Apache Kafka讀取,可以用addSource(new FlinkKafkaConsumer08<>(...))。

3.DataStream Transformations     參照運算符。

4.Data Sinks 數據接收

  • .Data Sinks中消耗數據流,把他們發給文件、套接字、外部系統或打印。Flink帶有多種封裝在操作符中的內置輸出格式:   writeAsText() / TextOutputFormat    把元素當做字符串(通過toString()獲得)順序寫入                                                     writeAsCsv(...) / CsvOutputFormat    將元組寫為逗號分隔值文件。                                                                               print() / printToErr()    在stdout/stderr上輸出每個元素的toString()值。                                                                       writeUsingOutputFormat() / FileOutputForma      方法和基類定義文件輸出。                                                             writeToSocket   根據SerializationSchema把元素輸出到套接字。                                                                                   addSink    調用自定義sink函數。

5.Iterations  迭代次數

  • 迭代流程序實現一個階躍函數並且被嵌入到一個IterativeStream。需要指定的哪一部分流反饋迭代和哪一部分流用split轉換或filter轉發下游。                                                                                                                                                                  首先,我們定義一個IterativeStream:
    IterativeStream<Integer> iteration = input.iterate();

    然后,我們用一系列的轉換指定將在循環內執行的邏輯:

    DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

    關閉一個迭代和定義迭代尾巴,調用IterativeStream中closeWith(feedbackStream) 方法。一種常見模式是使用一個filter(過濾器)來分離流的一部分(用於反饋)和流的一部分(向前傳播):

    iteration.closeWith(iterationBody.filter(/* one part of the stream */));
    DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

    例如,這是程序是一系列整數不斷減去1,直到他們達到零:

    DataStream<Long> someIntegers = env.generateSequence(0, 1000);
    
    IterativeStream<Long> iteration = someIntegers.iterate();
    
    DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
      @Override
      public Long map(Long value) throws Exception {
        return value - 1 ;
      }
    });
    
    DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
      @Override
      public boolean filter(Long value) throws Exception {
        return (value > 0);
      }
    });
    
    iteration.closeWith(stillGreaterThanZero);
    
    DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
      @Override
      public boolean filter(Long value) throws Exception {
        return (value <= 0);
      }
    });

     

6.執行參數

  • StreamExecutionEnvironment包含ExecutionConfig (允許在運行時給job設定具體的配置值)
  • 延遲控制  默認情況下,元素並不一個接一個的傳輸到網絡上(這將導致不必要的網絡流量)而是緩沖。盡管緩沖可以優化吞吐量,但是當傳入的流不夠快他可能導致延遲問題。為了控制吞吐量和延遲,您可以使用env.setBufferTimeout(timeoutMillis)去設置一個最大等待時間使緩沖區填滿。
    LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    env.setBufferTimeout(timeoutMillis);
    
    env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

    為了使吞吐量最大化,設置setBufferTimeout(-1)會刪除超時並且只會當緩沖都滿了才刷新。為了使最小化延遲,設置Timeout為 一個值接近於0(例如5或10 ms)的值。應該避免緩沖區Timeout為0,因為它可能導致性能嚴重的下降。

 7.調試

  • Flink提供通過從IDE中支持本地調試顯著地減輕數據分析程序的開發,注入測試數據和結果數據的集合。
  • 本地執行環境:
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    
    DataStream<String> lines = env.addSource(/* some source */);
    // build your program
    
    env.execute();

    LocalEnvironment創建和使用。

  • 收集數據來源:
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    
    // Create a DataStream from a list of elements
    DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
    
    // Create a DataStream from any Java collection
    List<Tuple2<String, Integer>> data = ...
    DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
    
    // Create a DataStream from an Iterator
    Iterator<Long> longIt = ...
    DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

    目前,收集數據源要求數據類型和迭代器實現Serializable(可串行化)。此外,收集數據源不能並行執行(parallelism(平行)= 1)。

  • Iterator Data Sink:迭代數據接收器。Flink還提供了一個sink收集DataStream數據結果進行測試和調試:
    import org.apache.flink.streaming.experimental.DataStreamUtils
    
    DataStream<Tuple2<String, Integer>> myResult = ...
    Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

     

Event Time:

1.Time

  • Processing time :處理時間。處理時間是指機器執行相應的操作的系統時間。當流程序運行在處理時間時,所有基於時間的操作(如時間窗口)將使用機器的系統時鍾運行各自的操作符。
  •  Event time:事件時間 。事件時間是在其生產設備每個事件發生的時間。這個時間通常是嵌入在他們進入Flink之前的記錄中,事件的時間戳可以從每個記錄提取出。時間的進展取決於數據。
  • Ingestion time:攝取時間。攝取時間是事件進入Flink的時間。源操作符的每個記錄源的當前時間作為一個時間戳,基於時間的操作(如時間窗)指這個時間戳。
  • Flink DataStream程序的第一部分通常設置基本時間特性該設置定義了數據流源的行為方式(例如,它們是否將分配時間戳),以及窗口操作應該使用的概念像KeyedStream.timeWindow(Time.seconds(30))以下示例顯示了一個Flink程序,它在每小時時間窗口中合計事件。窗戶的行為適應時間特征。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    
    // alternatively:
    // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
    
    stream
        .keyBy( (event) -> event.getUser() )
        .timeWindow(Time.hours(1))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

     

2.事件時間和水印

  • 支持事件時間的流處理器需要一種方法來衡量事件時間的進度。例如,當事件時間超過一小時時,需要通知構建每小時窗口的窗口操作員,以便操作員可以關閉正在進行的窗口。事件時間可以獨立於處理時間(由掛鍾測量)進行。
  • Flink中用於衡量事件時間進度的機制是水印。水印作為數據流的一部分流動並帶有時間戳(t)。
  • 下圖顯示了具有(邏輯的)時間戳和(內聯的)水印的事件流。在該示例中,事件按順序(關於它們的時間戳),意味着水印僅是流中的周期性標記。
  • 水印對於亂序流是至關重要的,如下圖所示,這里的活動不是由他們的時間戳排序。通常,水印是一種聲明,在該流中到達某個時間戳的所有事件都應該到達。一旦水印到達操作員,操作員就可以將其內部事件時鍾提前到水印的值

3.並行流中的水印

  • 水印或直接在源函數生成。每個源函數的平行子任務通常獨立生成的水印。這些水印定義特定並行來源的事件時間。
  • 某些元素可能違反水印條件,這意味着即使在水印(t)發生之后,也會出現更多具有時間戳t'<= t的元素。某些元素可以任意延遲,從而無法指定某個事件時間戳的所有元素將發生的時間。

4.生成時間戳/水印

  • 使用事件時間,流程序需要設置相應的時間特性。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • 了處理事件時間,Flink需要知道事件的時間戳,這意味着流中的每個元素都需要分配其事件時間戳。時間戳分配與生成水印密切相關,水印告訴系統事件時間的進展。時間戳和水印都指定為毫秒。有兩種方法可以分配時間戳並生成水印:
    1. 直接在數據流源中。
    2. 通過時間戳分配器/水印生成器:在Flink中,時間戳分配器還定義要發出的水印。
  • 數據流源可以直接為它們生成的元素分配時間戳,也可以發出水印。完成此操作后,不需要時間戳分配器。請注意,如果使用時間戳分配器,則源將提供的任何時間戳和水印都將被覆蓋。要直接在源中為每一個元素分配一個時間戳,必須使用SourceContext上的collectWithTimestamp(...)方法,為了生成水印,源必須調用emitWatermark(Watermark)方法。下面是一個(非檢查點)的簡單示例,它分配時間戳並生成水印:
    @Override
    public void run(SourceContext<MyType> ctx) throws Exception {
        while (/* condition */) {
            MyType next = getNext();
            ctx.collectWithTimestamp(next, next.getEventTimestamp());
    
            if (next.hasWatermarkTime()) {
                ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
            }
        }
    }

     

  • 時間戳分配器獲取流並生成帶時間戳和水印的新流。如果原始流已經有時間戳和/或水印,則時間戳分配器會覆蓋它們。
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    DataStream<MyEvent> stream = env.readFile(
            myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
            FilePathFilter.createDefaultFilter(), typeInfo);
    
    DataStream<MyEvent> withTimestampsAndWatermarks = stream
            .filter( event -> event.severity() == WARNING )
            .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
    
    withTimestampsAndWatermarks
            .keyBy( (event) -> event.getGroup() )
            .timeWindow(Time.seconds(10))
            .reduce( (a, b) -> a.add(b) )
            .addSink(...);

    AssignerWithPeriodicWatermarks 分配時間戳並定期生成水印

State & Fault Tolerance:

1.Keyed StateOperator State

  • Flink有兩種基本的狀態:Keyed StateOperator State
  • 鍵控狀態總是相對於鍵,只能用於KeyedStream上的函數和運算符。
  • 與操作符狀態(或非鍵狀態),每個操作符綁定到一個並行算子實例狀態。

2.The Broadcast State Pattern

  • 使用State描述運算符狀態,其在運算符的並行任務中恢復時均勻分布,或者聯合,整個狀態用於初始化已恢復的並行任務。

3.檢查點

  • Flink中的每個函數和運算符都可以是有狀態的有狀態函數通過各個元素/事件的處理存儲數據,使狀態成為任何類型的更精細操作的關鍵構建模塊。
  • 為了能使狀態容錯,Flink需要checkpoint狀態。檢查點允許Flink恢復流中的狀態和位置,從而為應用程序提供與無故障執行相同的語義。

操作符:

  •  操作符可以把一個或多個流轉換為新流,程序可以將多個轉換組合成復雜的數據流拓撲。
  • 數據流轉換:
    轉換 描述

    Map

    DataStream → DataStream

    采用一個元素並生成一個元素。一個map函數,它將輸入流的值加倍:
    DataStream<Integer> dataStream = //...
    dataStream.map(new MapFunction<Integer, Integer>() {
        @Override
        public Integer map(Integer value) throws Exception {
            return 2 * value;
        }
    });

     

     FlatMap

    DataStream→DataStream

     采用一個元素並生成零個,一個或多個元素。將句子分割為單詞的flatmap函數:
    dataStream.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out)
            throws Exception {
            for(String word: value.split(" ")){
                out.collect(word);
            }
        }
    });

     

     Filter

    DataStream → DataStream

     計算每個元素的布爾函數,並保留函數返回true的元素。過濾掉零值的過濾器:
    dataStream.filter(new FilterFunction<Integer>() {
        @Override
        public boolean filter(Integer value) throws Exception {
            return value != 0;
        }
    });

     

     KeyBy

    DataStream → KeyedStream

     邏輯上將流分區為不相交的分區。具有相同密鑰key的記錄都分配給同一分區。在內部KeyBy()是通過哈希分區實現的。有不同的方式去指定keys。

    此轉換返回KeyedStream,其中包括使用鍵控狀態所需KeyedStream

    dataStream.keyBy("someKey") // Key by field "someKey"
    dataStream.keyBy(0) // Key by the first element of a Tuple

    下列類型不能成為key:

    1.是一個POJO類型但是沒有重寫hashCode()方法並且依賴於Object.hashCode()實現。

    2.是任何類型的數組。

     Reduce

    KeyedStream → DataStream

    鍵控數據流上的“滾動”減少。將當前元素與最后一個減少的值組合並發出新值。  

    reduce函數,用於創建部分和的流:

    keyedStream.reduce(new ReduceFunction<Integer>() {
        @Override
        public Integer reduce(Integer value1, Integer value2)
        throws Exception {
            return value1 + value2;
        }
    });

     

     Flod

    KeyedStream → DataStream

    具有初始值的鍵控數據流上的“滾動”折疊。將當前元素與最后折疊的值組合並發出新值。 

    折疊函數,當應用於序列(1,2,3,4,5)時,發出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..

    DataStream<String> result =
      keyedStream.fold("start", new FoldFunction<Integer, String>() {
        @Override
        public String fold(String current, Integer value) {
            return current + "-" + value;
        }
      });

     

     Aggregations

    KeyedStream → DataStream

     在鍵控數據流上滾動聚合。min和minBy之間的差異是min返回最小值,而minBy返回該字段中具有最小值的元素(max和maxBy相同)。
    keyedStream.sum(0);
    keyedStream.sum("key");
    keyedStream.min(0);
    keyedStream.min("key");
    keyedStream.max(0);
    keyedStream.max("key");
    keyedStream.minBy(0);
    keyedStream.minBy("key");
    keyedStream.maxBy(0);
    keyedStream.maxBy("key");

     

     Window

    KeyedStream → WindowedStream

     可以在已經分區的KeyedStream上定義Windows。Windows根據某些特征(例如,在最后5秒內到達的數據)對每個key中的數據進行分組。
    dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

     

     WindowALL

    DataStream → AllWindowedStream

     Windows可以在常規DataStream上定義。Windows根據某些特征(例如,在最后5秒內到達的數據)對所有流事件進行分組。

    警告:在許多情況下,這是非並行轉換。所有記錄將收集在windowAll運算符的一個任務中。

    dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

     

     Window Apply

    WindowedStream→DataStream 
    AllWindowedStream→DataStream

     將一般功能應用於整個窗口。下面是一個手動求和窗口元素的函數。

    注意:如果您正在使用windowAll轉換,則需要使用AllWindowFunction。

    windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
        public void apply (Tuple tuple,
                Window window,
                Iterable<Tuple2<String, Integer>> values,
                Collector<Integer> out) throws Exception {
            int sum = 0;
            for (value t: values) {
                sum += t.f1;
            }
            out.collect (new Integer(sum));
        }
    });
    
    // applying an AllWindowFunction on non-keyed window stream
    allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
        public void apply (Window window,
                Iterable<Tuple2<String, Integer>> values,
                Collector<Integer> out) throws Exception {
            int sum = 0;
            for (value t: values) {
                sum += t.f1;
            }
            out.collect (new Integer(sum));
        }
    });

     

     Window Reduce

    WindowedStream→DataStream

     將reduce函數應用於window並返回減少后的值。
    windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
            return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
        }
    });

     

     Window Flod

    WindowedStream → DataStream

     將功能折疊功能應用於window並返回折疊值。示例函數應用於序列(1,2,3,4,5)時,將序列折疊為字符串“start-1-2-3-4-5”:
    windowedStream.fold("start", new FoldFunction<Integer, String>() {
        public String fold(String current, Integer value) {
            return current + "-" + value;
        }
    });

     

     Aggregations on windows

    WindowedStream → DataStream

     聚合窗口的內容。min和minBy之間的差異是min返回最小值,而minBy返回該字段中具有最小值的元素(max和maxBy相同)。
    windowedStream.sum(0);
    windowedStream.sum("key");
    windowedStream.min(0);
    windowedStream.min("key");
    windowedStream.max(0);
    windowedStream.max("key");
    windowedStream.minBy(0);
    windowedStream.minBy("key");
    windowedStream.maxBy(0);
    windowedStream.maxBy("key");

     

     Union

    DataStream *→DataStream

     兩個或多個數據流的聯合,創建包含來自所有流的所有元素的新流。注意:如果將數據流與自身聯合,則會在結果流中獲取兩次元素。
    dataStream.union(otherStream1, otherStream2, ...);

     

     Window Join

    DataStream,DataStream→DataStream

     在給定密鑰和公共窗口上連接兩個數據流。
    dataStream.join(otherStream)
        .where(<key selector>).equalTo(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        .apply (new JoinFunction () {...});

     

     Interval Join

    KeyedStream,KeyedStream→DataStream

     在給定的時間間隔內使用公共密鑰加入兩個鍵控流的兩個元素e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
    // this will join the two streams so that
    // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
    keyedStream.intervalJoin(otherKeyedStream)
        .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
        .upperBoundExclusive(true) // optional
        .lowerBoundExclusive(true) // optional
        .process(new IntervalJoinFunction() {...});

     

     Window CoGroup

    DataStream,DataStream → DataStream

     在給定密鑰和公共窗口上對兩個數據流進行Cogroup。
    dataStream.coGroup(otherStream)
        .where(0).equalTo(1)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        .apply (new CoGroupFunction () {...});

     

     Connect

    DataStream,DataStream → ConnectedStreams

     “連接”兩個保留其類型的數據流。連接允許兩個流之間共享狀態。
    DataStream<Integer> someStream = //...
    DataStream<String> otherStream = //...
    
    ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

     

     CoMap,CoFlatMap

    ConnectedStreams → DataStream

     類似於連接數據流上的map和flatMap。
    connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
        @Override
        public Boolean map1(Integer value) {
            return true;
        }
    
        @Override
        public Boolean map2(String value) {
            return false;
        }
    });
    connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
    
       @Override
       public void flatMap1(Integer value, Collector<String> out) {
           out.collect(value.toString());
       }
    
       @Override
       public void flatMap2(String value, Collector<String> out) {
           for (String word: value.split(" ")) {
             out.collect(word);
           }
       }
    });

     

     Split

    DataStream → SplitStream

     根據一些標准把一個流拆分成多個流。
    SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
        @Override
        public Iterable<String> select(Integer value) {
            List<String> output = new ArrayList<String>();
            if (value % 2 == 0) {
                output.add("even");
            }
            else {
                output.add("odd");
            }
            return output;
        }
    });

     

     Select

    SplitStream → DataStream

     從拆分流中選擇一個或多個流。
    SplitStream<Integer> split;
    DataStream<Integer> even = split.select("even");
    DataStream<Integer> odd = split.select("odd");
    DataStream<Integer> all = split.select("even","odd");

     

     Iterate

    DataStream → IterativeStream → DataStream

     通過將一個運算符的輸出重定向到某個先前的運算符,在流中創建“反饋”循環。這對於定義不斷更新模型的算法特別有用。以下代碼以流開頭並連續應用迭代體。大於0的元素將被發送回反饋通道,其余元素將向下游轉發。
    IterativeStream<Long> iteration = initialStream.iterate();
    DataStream<Long> iterationBody = iteration.map (/*do something*/);
    DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
        @Override
        public boolean filter(Integer value) throws Exception {
            return value > 0;
        }
    });
    iteration.closeWith(feedback);
    DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
        @Override
        public boolean filter(Integer value) throws Exception {
            return value <= 0;
        }
    });

     

     Extract Timestamps

    DataStream → DataStream

     從記錄中提取時間戳,以便使用使用事件時間語義的窗口。
    stream.assignTimestamps (new TimeStampExtractor() {...});

     

  • 以下轉換可用於元組的數據流:
    轉換 描述

    Project

    DataStream→DataStream

    從元組中選擇字段的子集。
    DataStream<Tuple3<Integer, Double, String>> in = // [...]
    DataStream<Tuple2<String, Integer>> out = in.project(2,0);

     

  • 鏈接兩個后續轉換意味着將它們共同定位在同一個線程中以獲得更好的性能。如果可能的話,Flink默認鏈操作符(例如,兩個后續的map轉換)。

 5.視窗windows

  • Windows是處理無限流的核心。Windows將流拆分為有限大小的“buckets(桶)”,我們可以在其上應用計算。
  • 窗口Flink程序的一般結構如下所示。第一個片段指的是鍵控流,而第二個片段指的是非鍵控流。
    鍵控Windows:
    
    stream
           .keyBy(...)               <-  keyed versus non-keyed windows
           .window(...)              <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    
    非鍵控Windows:
    
    stream
           .windowAll(...)           <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    
    在上面,方括號([...])中的命令是可選的。這表明Flink允許您以多種不同方式自定義窗口邏輯,以便最適合您的需求。
  • 窗口生命周期:只要屬於此窗口的第一個元素到達,就會創建一個窗口當時間(事件或處理時間)超過其結束時間戳加上用戶指定允許遲到時,窗口將被完全刪除。Flink保證僅刪除基於時間的窗口而不是其他類型。
  • 翻滾窗口( tumbling windows):翻滾窗口分配器分配每個元素給指定窗口大小的窗口。例如,如果指定大小為5分鍾的翻滾窗口,則將評估當前窗口,並且每五分鍾將啟動一個新窗口,如下圖所示。 
    DataStream<T> input = ...;
    
    // tumbling event-time windows
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // tumbling processing-time windows
    input
        .keyBy(<key selector>)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // daily tumbling event-time windows offset by -8 hours.
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
        .<windowed transformation>(<window function>);
  • 滑動窗口(Sliding Windows):滑動窗口分配器分配元素以固定長度的窗口。與翻滾窗口分配器類似,窗口大小由窗口大小參數配置。附加的窗口滑動參數控制滑動窗口的啟動頻率。                                                                                                                   

    DataStream<T> input = ...;
    
    // sliding event-time windows
    input
        .keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // sliding processing-time windows
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // sliding processing-time windows offset by -8 hours
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .<windowed transformation>(<window function>);
  • 會話窗口(Session Windows):在會話窗口中按活動會話分配組中的元素。與翻滾窗口和滑動窗口相比,會話窗口不重疊並且沒有固定的開始和結束時間。相反,當會話窗口在一段時間內沒有接收到元素時,即當發生不活動的間隙時,會關閉會話窗口。   

    DataStream<T> input = ...;
    
    // event-time session windows with static gap
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>);
        
    // event-time session windows with dynamic gap
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withDynamicGap((element) -> {
            // determine and return session gap
        }))
        .<windowed transformation>(<window function>);
    
    // processing-time session windows with static gap
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>);
        
    // processing-time session windows with dynamic gap
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
            // determine and return session gap
        }))
        .<windowed transformation>(<window function>);
  • window join:window join連接兩個共享公共密鑰並位於同一窗口中的流的元素。一般用法可概括如下:
    stream.join(otherStream)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)
    

    關於語義的一些注釋:

    • 兩個流的元素的成對組合的創建表現得像內部連接,意味着如果它們沒有來自要連接的另一個流的對應元素,則不會發出來自一個流的元素。
    • 那些加入的元素將在其時間戳中包含仍位於相應窗口中的最大時間戳。例如,[5, 10)具有其邊界的窗口將導致連接的元素具有9作為其時間戳。
  • 異步IO訪問外部數據:與數據庫的異步交互意味着單個並行函數實例可以同時處理許多請求並同時接收響應。這樣,可以通過發送其他請求和接收響應來覆蓋等待時間。至少,等待時間在多個請求上攤銷。這導致大多數情況下流量吞吐量更高。

Streaming Connectors:

  • Flink內置了一些基本數據源和接收器,並且始終可用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM