Flink基礎(七):DS簡介(7) Flink DataStream API(二)


1 產生傳感器讀數代碼編寫(讀取數據源)

1.1 從批讀取數據

scala version

val stream = env
  .fromElements(
    SensorReading("sensor_1", 1547718199, 35.80018327300259),
    SensorReading("sensor_6", 1547718199, 15.402984393403084),
    SensorReading("sensor_7", 1547718199, 6.720945201171228),
    SensorReading("sensor_10", 1547718199, 38.101067604893444)
  )

java version

DataStream<SensorReading> stream = env
  .fromElements(
    new SensorReading("sensor_1", 1547718199, 35.80018327300259),
    new SensorReading("sensor_6", 1547718199, 15.402984393403084),
    new SensorReading("sensor_7", 1547718199, 6.720945201171228),
    new SensorReading("sensor_10", 1547718199, 38.101067604893444)
  )

1.2 從文件讀取數據

scala version

val stream = env.readTextFile(filePath)

java version

DataStream<String> stream = env.readTextFile(filePath);

1.3 以Kafka消息隊列的數據為數據來源

scala version

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty(
  "key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty(
  "value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
  // source為來自Kafka的數據,這里我們實例化一個消費者,topic為hotitems
  .addSource(
    new FlinkKafkaConsumer011[String](
      "hotitems",
      new SimpleStringSchema(),
      properties
    )
  )

java version

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty(
  "key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer"
);
properties.setProperty(
  "value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer"
);
properties.setProperty("auto.offset.reset", "latest");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<String> stream = env
  // source為來自Kafka的數據,這里我們實例化一個消費者,topic為hotitems
  .addSource(
    new FlinkKafkaConsumer011<String>(
      "hotitems",
      new SimpleStringSchema(),
      properties
    )
  );

1.4 自定義數據源

scala version

import java.util.Calendar

import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

import scala.util.Random

// 泛型是`SensorReading`,表明產生的流中的事件的類型是`SensorReading`
class SensorSource extends RichParallelSourceFunction[SensorReading] {
  // 表示數據源是否正常運行
  var running: Boolean = true

  // 上下文參數用來發出數據
  override def run(ctx: SourceContext[SensorReading]): Unit = {
    val rand = new Random

    var curFTemp = (1 to 10).map(
      // 使用高斯噪聲產生隨機溫度值
      i => ("sensor_" + i, (rand.nextGaussian() * 20))
    )

    // 產生無限數據流
    while (running) {
      curFTemp = curFTemp.map(
        t => (t._1, t._2 + (rand.nextGaussian() * 0.5))
      )

      // 產生ms為單位的時間戳
      val curTime = Calendar.getInstance.getTimeInMillis

      // 使用ctx參數的collect方法發射傳感器數據
      curFTemp.foreach(t => ctx.collect(SensorReading(t._1, curTime, t._2)))

      // 每隔100ms發送一條傳感器數據
      Thread.sleep(1000)
    }
  }

  // 定義當取消flink任務時,需要關閉數據源
  override def cancel(): Unit = running = false
}

使用方法

val sensorData = env.addSource(new SensorSource)

java version

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Calendar;
import java.util.Random;

public class SensorSource extends RichParallelSourceFunction<SensorReading> {

    private boolean running = true;

    @Override
    public void run(SourceContext<SensorReading> srcCtx) throws Exception {

        Random rand = new Random();

        String[] sensorIds = new String[10];
        double[] curFTemp = new double[10];
        for (int i = 0; i < 10; i++) {
            sensorIds[i] = "sensor_" + i;
            curFTemp[i] = 65 + (rand.nextGaussian() * 20);
        }

        while (running) {
            long curTime = Calendar.getInstance().getTimeInMillis();
            for (int i = 0; i < 10; i++) {
                curFTemp[i] += rand.nextGaussian() * 0.5;
                srcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));
            }

            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }
}

使用方法
// 攝入數據流
DataStream<SensorReading> sensorData = env.addSource(new SensorSource());

2 轉換算子

在這一小節我們將大概看一下DataStream API的基本轉換算子。與時間有關的操作符(例如窗口操作符和其他特殊的轉換算子)將會在后面的章節敘述。一個流的轉換操作將會應用在一個或者多個流上面,這些轉換操作將流轉換成一個或者多個輸出流。編寫一個DataStream API簡單來說就是將這些轉換算子組合在一起來構建一個數據流圖,這個數據流圖就實現了我們的業務邏輯。

大部分的流轉換操作都基於用戶自定義函數UDF。UDF函數打包了一些業務邏輯並定義了輸入流的元素如何轉換成輸出流的元素。像MapFunction這樣的函數,將會被定義為類,這個類實現了Flink針對特定的轉換操作暴露出來的接口。

DataStream<String> sensorIds = filteredReadings
        .map(r -> r.id);

函數接口定義了需要由用戶實現的轉換方法,例如上面例子中的map()方法。

大部分函數接口被設計為Single Abstract Method(單獨抽象方法)接口,並且接口可以使用Java 8匿名函數來實現。Scala DataStream API也內置了對匿名函數的支持。當講解DataStream API的轉換算子時,我們展示了針對所有函數類的接口,但為了簡潔,大部分接口的實現使用匿名函數而不是函數類的方式。

DataStream API針對大多數數據轉換操作提供了轉換算子。如果你很熟悉批處理API、函數式編程語言或者SQL,那么你將會發現這些API很容易學習。我們會將DataStream API的轉換算子分成四類:

  • 基本轉換算子:將會作用在數據流中的每一條單獨的數據上。
  • KeyedStream轉換算子:在數據有key的情況下,對數據應用轉換算子。
  • 多流轉換算子:合並多條流為一條流或者將一條流分割為多條流。
  • 分布式轉換算子:將重新組織流里面的事件。

2.1 基本轉換算子

基本轉換算子會針對流中的每一個單獨的事件做處理,也就是說每一個輸入數據會產生一個輸出數據。單值轉換,數據的分割,數據的過濾,都是基本轉換操作的典型例子。我們將解釋這些算子的語義並提供示例代碼。

MAP

map算子通過調用DataStream.map()來指定。map算子的使用將會產生一條新的數據流。它會將每一個輸入的事件傳送到一個用戶自定義的mapper,這個mapper只返回一個輸出事件,這個輸出事件和輸入事件的類型可能不一樣。圖5-1展示了一個map算子,這個map將每一個正方形轉化成了圓形。

 

MapFunction的類型與輸入事件和輸出事件的類型相關,可以通過實現MapFunction接口來定義。接口包含map()函數,這個函數將一個輸入事件恰好轉換為一個輸出事件。

// T: the type of input elements
// O: the type of output elements
MapFunction[T, O]
    > map(T): O

下面的代碼實現了將SensorReading中的id字段抽取出來的功能。

scala version

val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(new IdExtractor)

class IdExtractor extends MapFunction[SensorReading, String] {
    override def map(r: SensorReading) : String = r.id
}

當然我們更推薦匿名函數的寫法。

val sensorIds: DataStream[String] = filteredReadings.map(r => r.id)

java version

DataStream<SensorReading> readings = ...
DataStream<String> sensorIds = readings.map(new IdExtractor());

public static class IdExtractor implements MapFunction<SensorReading, String> {
    @Override
    public String map(SensorReading r) throws Exception {
        return r.id;
    }
}

當然我們更推薦匿名函數的寫法。

DataStream<String> sensorIds = filteredReadings.map(r -> r.id);

FILTER

filter轉換算子通過在每個輸入事件上對一個布爾條件進行求值來過濾掉一些元素,然后將剩下的元素繼續發送。一個true的求值結果將會把輸入事件保留下來並發送到輸出,而如果求值結果為false,則輸入事件會被拋棄掉。我們通過調用DataStream.filter()來指定流的filter算子,filter操作將產生一條新的流,其類型和輸入流中的事件類型是一樣的。圖5-2展示了只產生白色方框的filter操作。

 

布爾條件可以使用函數、FilterFunction接口或者匿名函數來實現。FilterFunction中的泛型是輸入事件的類型。定義的filter()方法會作用在每一個輸入元素上面,並返回一個布爾值。

// T: the type of elements
FilterFunction[T]
    > filter(T): Boolean

下面的例子展示了如何使用filter來從傳感器數據中過濾掉溫度值小於25華氏溫度的讀數。

scala version

val filteredReadings = readings.filter(r => r.temperature >= 25)

java version

DataStream<SensorReading> filteredReadings = readings.filter(r -> r.temperature >= 25);

FLATMAP

flatMap算子和map算子很類似,不同之處在於針對每一個輸入事件flatMap可以生成0個、1個或者多個輸出元素。事實上,flatMap轉換算子是filtermap的泛化。所以flatMap可以實現mapfilter算子的功能。圖5-3展示了flatMap如何根據輸入事件的顏色來做不同的處理。如果輸入事件是白色方框,則直接輸出。輸入元素是黑框,則復制輸入。灰色方框會被過濾掉。

 

flatMap算子將會應用在每一個輸入事件上面。對應的FlatMapFunction定義了flatMap()方法,這個方法返回0個、1個或者多個事件到一個Collector集合中,作為輸出結果。

// T: the type of input elements
// O: the type of output elements
FlatMapFunction[T, O]
    > flatMap(T, Collector[O]): Unit

下面的例子展示了在數據分析教程中經常用到的例子,我們用flatMap來實現。使用_來切割傳感器ID,比如sensor_1

scala version

class IdSplitter extends FlatMapFunction[String, String] {
    override def flatMap(id: String, out: Collector[String]) : Unit = {
        val arr = id.split("_")
        arr.foreach(out.collect)
    }
}

匿名函數寫法

val splitIds = sensorIds
  .flatMap(r => r.split("_"))

java version

public static class IdSplitter implements FlatMapFunction<String, String> {
    @Override
    public void flatMap(String id, Collector<String> out) {

        String[] splits = id.split("_");

        for (String split : splits) {
            out.collect(split);
        }
    }
}

匿名函數寫法:

DataStream<String> splitIds = sensorIds
        .flatMap((FlatMapFunction<String, String>)
                (id, out) -> { for (String s: id.split("_")) { out.collect(s);}})
        // provide result type because Java cannot infer return type of lambda function
        // 提供結果的類型,因為Java無法推斷匿名函數的返回值類型
        .returns(Types.STRING);

2.2 鍵控流轉換算子

很多流處理程序的一個基本要求就是要能對數據進行分組,分組后的數據共享某一個相同的屬性。DataStream API提供了一個叫做KeyedStream的抽象,此抽象會從邏輯上對DataStream進行分區,分區后的數據擁有同樣的Key值,分區后的流互不相關。

針對KeyedStream的狀態轉換操作可以讀取數據或者寫入數據到當前事件Key所對應的狀態中。這表明擁有同樣Key的所有事件都可以訪問同樣的狀態,也就是說所以這些事件可以一起處理。

要小心使用狀態轉換操作和基於Key的聚合操作。如果Key的值越來越多,例如:Key是訂單ID,我們必須及時清空Key所對應的狀態,以免引起內存方面的問題。稍后我們會詳細講解。

KeyedStream可以使用map,flatMap和filter算子來處理。接下來我們會使用keyBy算子來將DataStream轉換成KeyedStream,並講解基於key的轉換操作:滾動聚合和reduce算子。

KEYBY

keyBy通過指定key來將DataStream轉換成KeyedStream。基於不同的key,流中的事件將被分配到不同的分區中去。所有具有相同key的事件將會在接下來的操作符的同一個子任務槽中進行處理。擁有不同key的事件可以在同一個任務中處理。但是算子只能訪問當前事件的key所對應的狀態。

如圖5-4所示,把輸入事件的顏色作為key,黑色的事件輸出到了一個分區,其他顏色輸出到了另一個分區。

 

 

 

keyBy()方法接收一個參數,這個參數指定了key或者keys,有很多不同的方法來指定key。我們將在后面講解。下面的代碼聲明了id這個字段為SensorReading流的key。

scala version

 
val keyed: KeyedStream[SensorReading, String] = readings.keyBy(r => r.id) 

匿名函數r => r.id抽取了傳感器讀數SensorReading的id值。

java version

 
KeyedStream<SensorReading, String> keyed = readings.keyBy(r -> r.id);

匿名函數r -> r.id抽取了傳感器讀數SensorReading的id值。

滾動聚合

滾動聚合算子由KeyedStream調用,並生成一個聚合以后的DataStream,例如:sum,minimum,maximum。一個滾動聚合算子會為每一個觀察到的key保存一個聚合的值。針對每一個輸入事件,算子將會更新保存的聚合結果,並發送一個帶有更新后的值的事件到下游算子。滾動聚合不需要用戶自定義函數,但需要接受一個參數,這個參數指定了在哪一個字段上面做聚合操作。DataStream API提供了以下滾動聚合方法。

滾動聚合算子只能用在滾動窗口,不能用在滑動窗口。

  • sum():在輸入流上對指定的字段做滾動相加操作。
  • min():在輸入流上對指定的字段求最小值。
  • max():在輸入流上對指定的字段求最大值。
  • minBy():在輸入流上針對指定字段求最小值,並返回包含當前觀察到的最小值的事件。
  • maxBy():在輸入流上針對指定字段求最大值,並返回包含當前觀察到的最大值的事件。

滾動聚合算子無法組合起來使用,每次計算只能使用一個單獨的滾動聚合算子。

下面的例子根據第一個字段來對類型為Tuple3<Int, Int, Int>的流做分流操作,然后針對第二個字段做滾動求和操作。

scala version

val inputStream = env.fromElements((1, 2, 2), (2, 3, 1), (2, 2, 4), (1, 5, 3))

val resultStream = inputStream.keyBy(0).sum(1)

java version

DataStream<Tuple3<Integer, Integer, Integer>> inputStream = env.fromElements(new Tuple3(1, 2, 2), new Tuple3(2, 3, 1), new Tuple3(2, 2, 4), new Tuple3(1, 5, 3));

DataStream<Tuple3<Integer, Integer, Integer>> resultStream = inputStream
  .keyBy(0) // key on first field of the tuple
  .sum(1);   // sum the second field of the tuple in place

在這個例子里面,輸入流根據第一個字段來分流,然后在第二個字段上做計算。對於key 1,輸出結果是(1,2,2),(1,7,2)。對於key 2,輸出結果是(2,3,1),(2,5,1)。第一個字段是key,第二個字段是求和的數值,第三個字段未定義。

滾動聚合操作會對每一個key都保存一個狀態。因為狀態從來不會被清空,所以我們在使用滾動聚合算子時只能使用在含有有限個key的流上面。

REDUCE

reduce算子是滾動聚合的泛化實現。它將一個ReduceFunction應用到了一個KeyedStream上面去。reduce算子將會把每一個輸入事件和當前已經reduce出來的值做聚合計算。reduce操作不會改變流的事件類型。輸出流數據類型和輸入流數據類型是一樣的。

reduce函數可以通過實現接口ReduceFunction來創建一個類。ReduceFunction接口定義了reduce()方法,此方法接收兩個輸入事件,輸入一個相同類型的事件。

// T: the element type
ReduceFunction[T]
    > reduce(T, T): T

下面的例子,流根據傳感器ID分流,然后計算每個傳感器的當前最大溫度值。

scala version

val maxTempPerSensor = keyed.reduce((r1, r2) => r1.temperature.max(r2.temperature))

java version

DataStream<SensorReading> maxTempPerSensor = keyed
        .reduce((r1, r2) -> {
            if (r1.temperature > r2.temperature) {
                return r1;
            } else {
                return r2;
            }
        });

reduce作為滾動聚合的泛化實現,同樣也要針對每一個key保存狀態。因為狀態從來不會清空,所以我們需要將reduce算子應用在一個有限key的流上。

2.3 多流轉換算子

許多應用需要攝入多個流並將流合並處理,還可能需要將一條流分割成多條流然后針對每一條流應用不同的業務邏輯。接下來,我們將討論DataStream API中提供的能夠處理多條輸入流或者發送多條輸出流的操作算子。

UNION

DataStream.union()方法將兩條或者多條DataStream合並成一條具有與輸入流相同類型的輸出DataStream。接下來的轉換算子將會處理輸入流中的所有元素。圖5-5展示了union操作符如何將黑色和白色的事件流合並成一個單一輸出流。

事件合流的方式為FIFO方式。操作符並不會產生一個特定順序的事件流。union操作符也不會進行去重。每一個輸入事件都被發送到了下一個操作符。

下面的例子展示了如何將三條類型為SensorReading的數據流合並成一條流。

scala version

val parisStream: DataStream[SensorReading] = ...
val tokyoStream: DataStream[SensorReading] = ...
val rioStream: DataStream[SensorReading] = ...
val allCities: DataStream[SensorReading] = parisStream
  .union(tokyoStream, rioStream)

java version

DataStream<SensorReading> parisStream = ...
DataStream<SensorReading> tokyoStream = ...
DataStream<SensorReading> rioStream = ...
DataStream<SensorReading> allCities = parisStream
  .union(tokyoStream, rioStream)

CONNECT, COMAP和COFLATMAP

  聯合兩條流的事件是非常常見的流處理需求。例如監控一片森林然后發出高危的火警警報。報警的Application接收兩條流,一條是溫度傳感器傳回來的數據,一條是煙霧傳感器傳回來的數據。當兩條流都超過各自的閾值時,報警。

DataStream API提供了connect操作來支持以上的應用場景。DataStream.connect()方法接收一條DataStream,然后返回一個ConnectedStreams類型的對象,這個對象表示了兩條連接的流。

scala version

val first = ...
val second = ...
val connected = first.connect(second)

java version

// first stream
DataStream<Integer> first = ...
// second stream
DataStream<String> second = ...

// connect streams
ConnectedStreams<Integer, String> connected = first.connect(second);

ConnectedStreams提供了map()flatMap()方法,分別需要接收類型為CoMapFunctionCoFlatMapFunction的參數。

以上兩個函數里面的泛型是第一條流的事件類型和第二條流的事件類型,以及輸出流的事件類型。還定義了兩個方法,每一個方法針對一條流來調用。map1()flatMap1()會調用在第一條流的元素上面,map2()flatMap2()會調用在第二條流的元素上面。

// IN1: 第一條流的事件類型
// IN2: 第二條流的事件類型
// OUT: 輸出流的事件類型
CoMapFunction[IN1, IN2, OUT]
    > map1(IN1): OUT
    > map2(IN2): OUT

CoFlatMapFunction[IN1, IN2, OUT]
    > flatMap1(IN1, Collector[OUT]): Unit
    > flatMap2(IN2, Collector[OUT]): Unit

函數無法選擇讀某一條流。我們是無法控制函數中的兩個方法的調用順序的。當一條流中的元素到來時,將會調用相對應的方法。

對兩條流做連接查詢通常需要這兩條流基於某些條件被確定性的路由到操作符中相同的並行實例里面去。在默認情況下,connect()操作將不會對兩條流的事件建立任何關系,所以兩條流的事件將會隨機的被發送到下游的算子實例里面去。這樣的行為會產生不確定性的計算結果,顯然不是我們想要的。為了針對ConnectedStreams進行確定性的轉換操作,connect()方法可以和keyBy()或者broadcast()組合起來使用。我們首先看一下keyBy()的示例。

scala version

val one = ...
val two = ...

val keyedConnect1 = one.connect(two).keyBy(0, 0)

val keyedConnect2 = one.keyBy(0).connect(two.keyBy(0))

java version

DataStream<Tuple2<Integer, Long>> one = ...
DataStream<Tuple2<Integer, String>> two = ...

// keyBy two connected streams
ConnectedStreams<Tuple2<Int, Long>, Tuple2<Integer, String>> keyedConnect1 = one
  .connect(two)
  .keyBy(0, 0); // key both input streams on first attribute

// alternative: connect two keyed streams
ConnectedStreams<Tuple2<Integer, Long>, Tuple2<Integer, String>> keyedConnect2 = one
  .keyBy(0)
  .connect(two.keyBy(0));

無論使用keyBy()算子操作ConnectedStreams還是使用connect()算子連接兩條KeyedStreams,connect()算子會將兩條流的含有相同Key的所有事件都發送到相同的算子實例。兩條流的key必須是一樣的類型和值,就像SQL中的JOIN。在connected和keyed stream上面執行的算子有訪問keyed state的權限。

下面的例子展示了如何連接一條DataStream和廣播過的流。

scala version

val one = ...
val two = ...

val keyedConnect = first.connect(second.broadcast())

java version

DataStream<Tuple2<Integer, Long>> one = ...
DataStream<Tuple2<Int, String>> two = ...

// connect streams with broadcast
ConnectedStreams<Tuple2<Int, Long>, Tuple2<Int, String>> keyedConnect = first
  // broadcast second input stream
  .connect(second.broadcast());

一條被廣播過的流中的所有元素將會被復制然后發送到下游算子的所有並行實例中去。未被廣播過的流僅僅向前發送。所以兩條流的元素顯然會被連接處理。

例子:

警告類:

scala version

case class Alert(message: String, timestamp: Long)

java version

public class Alert {

    public String message;
    public long timestamp;

    public Alert() { }

    public Alert(String message, long timestamp) {
        this.message = message;
        this.timestamp = timestamp;
    }

    public String toString() {
        return "(" + message + ", " + timestamp + ")";
    }
}

煙霧傳感器讀數類:

public enum SmokeLevel {
    LOW,
    HIGH
}

產生煙霧傳感器讀數的自定義數據源:

public class SmokeLevelSource implements SourceFunction<SmokeLevel> {

    private boolean running = true;

    @Override
    public void run(SourceContext<SmokeLevel> srcCtx) throws Exception {

        Random rand = new Random();

        while (running) {

            if (rand.nextGaussian() > 0.8) {
                srcCtx.collect(SmokeLevel.HIGH);
            } else {
                srcCtx.collect(SmokeLevel.LOW);
            }

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }
}

監控一片森林然后發出高危的火警警報。報警的Application接收兩條流,一條是溫度傳感器傳回來的數據,一條是煙霧傳感器傳回來的數據。當兩條流都超過各自的閾值時,報警。

scala version

object MultiStreamTransformations {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tempReadings = env.addSource(new SensorSource)
        val smokeReadings = env
                .addSource(new SmokeLevelSource)
                .setParallelism(1)
        val keyedTempReadings = tempReadings
                .keyBy(r => r.id)
        val alerts = keyedTempReadings
                .connect(smokeReadings.broadcast())
                .flatMap(new RaiseAlertFlatMap)

        alerts.print()

        env.execute("Multi-Stream Transformations Example")
    }

    class RaiseAlertFlatMap extends CoFlatMapFunction[SensorReading, SmokeLevel, Alert] {
        private var smokeLevel = "LOW"

        override def flatMap1(tempReading: SensorReading, out: Collector[Alert]) : Unit = {
            if (smokeLevel == "HIGH" && tempReading.temperature > 100) {
                out.collect(Alert("Risk of fire! " + tempReading, tempReading.timestamp))
            }
        }

        override def flatMap2(sl: String, out: Collector[Alert]) : Unit = {
            smokeLevel = sl
        }
    }
}

java version

public class MultiStreamTransformations {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<SensorReading> tempReadings = env
                .addSource(new SensorSource());

        DataStream<SmokeLevel> smokeReadings = env
                .addSource(new SmokeLevelSource())
                .setParallelism(1);

        KeyedStream<SensorReading, String> keyedTempReadings = tempReadings
                .keyBy(r -> r.id);

        DataStream<Alert> alerts = keyedTempReadings
                .connect(smokeReadings.broadcast())
                .flatMap(new RaiseAlertFlatMap());

        alerts.print();

        env.execute("Multi-Stream Transformations Example");
    }

    public static class RaiseAlertFlatMap implements CoFlatMapFunction<SensorReading, SmokeLevel, Alert> {

        private SmokeLevel smokeLevel = SmokeLevel.LOW;

        @Override
        public void flatMap1(SensorReading tempReading, Collector<Alert> out) throws Exception {
            // high chance of fire => true
            if (this.smokeLevel == SmokeLevel.HIGH && tempReading.temperature > 100) {
                out.collect(new Alert("Risk of fire! " + tempReading, tempReading.timestamp));
            }
        }

        @Override
        public void flatMap2(SmokeLevel smokeLevel, Collector<Alert> out) {
            // update smoke level
            this.smokeLevel = smokeLevel;
        }
    }
}

2.4 分布式轉換算子

  分區操作對應於我們之前講過的“數據交換策略”這一節。這些操作定義了事件如何分配到不同的任務中去。當我們使用DataStream API來編寫程序時,系統將自動的選擇數據分區策略,然后根據操作符的語義和設置的並行度將數據路由到正確的地方去。有些時候,我們需要在應用程序的層面控制分區策略,或者自定義分區策略。例如,如果我們知道會發生數據傾斜,那么我們想要針對數據流做負載均衡,將數據流平均發送到接下來的操作符中去。又或者,應用程序的業務邏輯可能需要一個算子所有的並行任務都需要接收同樣的數據。再或者,我們需要自定義分區策略的時候。在這一小節,我們將展示DataStream的一些方法,可以使我們來控制或者自定義數據分區策略。

keyBy()方法不同於分布式轉換算子。所有的分布式轉換算子將產生DataStream數據類型。而keyBy()產生的類型是KeyedStream,它擁有自己的keyed state。

Random

隨機數據交換由DataStream.shuffle()方法實現。shuffle方法將數據隨機的分配到下游算子的並行任務中去。

Round-Robin

rebalance()方法使用Round-Robin負載均衡算法將輸入流平均分配到隨后的並行運行的任務中去。圖5-7為round-robin分布式轉換算子的示意圖。

Rescale

rescale()方法使用的也是round-robin算法,但只會將數據發送到接下來的並行運行的任務中的一部分任務中。本質上,當發送者任務數量和接收者任務數量不一樣時,rescale分區策略提供了一種輕量級的負載均衡策略。如果接收者任務的數量是發送者任務的數量的倍數時,rescale操作將會效率更高。

rebalance()rescale()的根本區別在於任務之間連接的機制不同。 rebalance()將會針對所有發送者任務和所有接收者任務之間建立通信通道,而rescale()僅僅針對每一個任務和下游算子的一部分子並行任務之間建立通信通道。rescale的示意圖為圖5-7。

 

 

 

 

Broadcast

broadcast()方法將輸入流的所有數據復制並發送到下游算子的所有並行任務中去。

Global

global()方法將所有的輸入流數據都發送到下游算子的第一個並行任務中去。這個操作需要很謹慎,因為將所有數據發送到同一個task,將會對應用程序造成很大的壓力。

Custom

當Flink提供的分區策略都不適用時,我們可以使用partitionCustom()方法來自定義分區策略。這個方法接收一個Partitioner對象,這個對象需要實現分區邏輯以及定義針對流的哪一個字段或者key來進行分區。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM