Flink基礎(八):DS簡介(8) Flink DataStream API(三)


1 設置並行度

Flink應用程序在一個像集群這樣的分布式環境中並行執行。當一個數據流程序提交到作業管理器執行時,系統將會創建一個數據流圖,然后准備執行需要的操作符。每一個操作符將會並行化到一個或者多個任務中去。每個算子的並行任務都會處理這個算子的輸入流中的一份子集。一個算子並行任務的個數叫做算子的並行度。它決定了算子執行的並行化程度,以及這個算子能處理多少數據量。

算子的並行度可以在執行環境這個層級來控制,也可以針對每個不同的算子設置不同的並行度。默認情況下,應用程序中所有算子的並行度都將設置為執行環境的並行度。執行環境的並行度(也就是所有算子的默認並行度)將在程序開始運行時自動初始化。如果應用程序在本地執行環境中運行,並行度將被設置為CPU的核數。當我們把應用程序提交到一個處於運行中的Flink集群時,執行環境的並行度將被設置為集群默認的並行度,除非我們在客戶端提交應用程序時顯式的設置好並行度。

通常情況下,將算子的並行度定義為和執行環境並行度相關的數值會是個好主意。這允許我們通過在客戶端調整應用程序的並行度就可以將程序水平擴展了。我們可以使用以下代碼來訪問執行環境的默認並行度。

我們還可以重寫執行環境的默認並行度,但這樣的話我們將再也不能通過客戶端來控制應用程序的並行度了。

算子默認的並行度也可以通過重寫來明確指定。在下面的例子里面,數據源的操作符將會按照環境默認的並行度來並行執行,map操作符的並行度將會是默認並行度的2倍,sink操作符的並行度為2。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
int defaultP = env.getParallelism;
env
  .addSource(new CustomSource)
  .map(new MyMapper)
  .setParallelism(defaultP * 2)
  .print()
  .setParallelism(2);

當我們通過客戶端將應用程序的並行度設置為16並提交執行時,source操作符的並行度為16,mapper並行度為32,sink並行度為2。如果我們在本地環境運行應用程序的話,例如在IDE中運行,機器是8核,那么source任務將會並行執行在8個任務上面,mapper運行在16個任務上面,sink運行在2個任務上面。

並行度是動態概念,任務槽數量是靜態概念。並行度<=任務槽數量。一個任務槽最多運行一個並行度。

2 類型

Flink程序所處理的流中的事件一般是對象類型。操作符接收對象輸出對象。所以Flink的內部機制需要能夠處理事件的類型。在網絡中傳輸數據,或者將數據寫入到狀態后端、檢查點和保存點中,都需要我們對數據進行序列化和反序列化。為了高效的進行此類操作,Flink需要流中事件類型的詳細信息。Flink使用了Type Information的概念來表達數據類型,這樣就能針對不同的數據類型產生特定的序列化器,反序列化器和比較操作符。

Flink也能夠通過分析輸入數據和輸出數據來自動獲取數據的類型信息以及序列化器和反序列化器。盡管如此,在一些特定的情況下,例如匿名函數或者使用泛型的情況下,我們需要明確的提供數據的類型信息,來提高我們程序的性能。

在這一節中,我們將討論Flink支持的類型,以及如何為數據類型創建相應的類型信息,還有就是在Flink無法推斷函數返回類型的情況下,如何幫助Flink的類型系統去做類型推斷。

2.1 支持的數據類型

Flink支持Java和Scala提供的所有普通數據類型。最常用的數據類型可以做以下分類:

  • Primitives(原始數據類型)
  • Java和Scala的Tuples(元組)
  • Scala的樣例類
  • POJO類型
  • 一些特殊的類型

接下來讓我們一探究竟。

Primitives

Java和Scala提供的所有原始數據類型都支持,例如Int(Java的Integer),String,Double等等。下面舉一個例子:

DataStream[Long] numbers = env.fromElements(1L, 2L, 3L, 4L);
numbers.map(n -> n + 1);

Tuples

元組是一種組合數據類型,由固定數量的元素組成。

Flink為Java的Tuple提供了高效的實現。Flink實現的Java Tuple最多可以有25個元素,根據元素數量的不同,Tuple都被實現成了不同的類:Tuple1,Tuple2,一直到Tuple25。Tuple類是強類型。

DataStream<Tuple2<String, Integer>> persons = env
  .fromElements(
    Tuple2.of("Adam", 17),
    Tuple2.of("Sarah", 23)
  );

persons.filter(p -> p.f1 > 18);

Tuple的元素可以通過它們的public屬性訪問——f0,f1,f2等等。或者使用getField(int pos)方法來訪問,元素下標從0開始:

import org.apache.flink.api.java.tuple.Tuple2

Tuple2<String, Integer> personTuple = Tuple2.of("Alex", 42);
Integer age = personTuple.getField(1); // age = 42

不同於Scala的Tuple,Java的Tuple是可變數據結構,所以Tuple中的元素可以重新進行賦值。重復利用Java的Tuple可以減輕垃圾收集的壓力。舉個例子:

personTuple.f1 = 42; // set the 2nd field to 42
personTuple.setField(43, 1); // set the 2nd field to 43

POJO

POJO類的定義:

  • 公有類
  • 無參數的公有構造器
  • 所有的字段都是公有的,可以通過getters和setters訪問。
  • 所有字段的數據類型都必須是Flink支持的數據類型。

舉個例子:

public class Person {
  public String name;
  public int age;

  public Person() {}

  public Person(String name, int age) {
    this.name = name;
    this.age = age;
  }
}

DataStream<Person> persons = env.fromElements(
  new Person("Alex", 42),
  new Person("Wendy", 23)
);

其他數據類型

  • Array, ArrayList, HashMap, Enum
  • Hadoop Writable types

2.2 為數據類型創建類型信息

Flink類型系統的核心類是TypeInformation。它為系統在產生序列化器和比較操作符時,提供了必要的類型信息。例如,如果我們想使用某個key來做聯結查詢或者分組操作,TypeInformation可以讓Flink做更嚴格的類型檢查。

Flink針對Java和Scala分別提供了類來產生類型信息。在Java中,類是

org.apache.flink.api.common.typeinfo.Types

舉個例子:

TypeInformation<Integer> intType = Types.INT;

TypeInformation<Tuple2<Long, String>> tupleType = Types
  .TUPLE(Types.LONG, Types.STRING);

TypeInformation<Person> personType = Types
  .POJO(Person.class);

3 定義Key以及引用字段

在Flink中,我們必須明確指定輸入流中的元素中的哪一個字段是key。

3.1 使用字段位置進行keyBy

DataStream<Tuple3<Int, String, Long>> input = ...
KeyedStream<Tuple3<Int, String, Long>, String> keyed = input.keyBy(1);

如果我們想要用元組的第2個字段和第3個字段做keyBy,可以看下面的例子。

input.keyBy(1, 2);

3.2 使用字段表達式來進行keyBy

對於樣例類:

DataStream<SensorReading> sensorStream = ...
sensorStream.keyBy("id");

對於元組:

DataStream<Tuple3<Integer, String, Long>> javaInput = ...
javaInput.keyBy("f2") // key Java tuple by 3rd field

3.3 Key選擇器

方法類型

KeySelector[IN, KEY]
  > getKey(IN): KEY

兩個例子

scala version

val sensorData = ...
val byId = sensorData.keyBy(r => r.id)
val input = ...
input.keyBy(value => math.max(value._1, value._2))

java version

DataStream<SensorReading> sensorData = ...
KeyedStream<SensorReading, String> byId = sensorData.keyBy(r -> r.id);
DataStream<Tuple2<Int, Int>> input = ...
input.keyBy(value -> Math.max(value.f0, value.f1));

4 實現UDF函數,更細粒度的控制流

4.1 函數類

Flink暴露了所有udf函數的接口(實現方式為接口或者抽象類)。例如MapFunction, FilterFunction, ProcessFunction等等。

例子實現了FilterFunction接口

class FilterFilter extends FilterFunction<String> {
  @Override
  public Boolean filter(String value) {
    return value.contains("flink");
  }
}

DataStream<String> flinkTweets = tweets.filter(new FlinkFilter);

還可以將函數實現成匿名類

DataStream<String> flinkTweets = tweets.filter(
  new RichFilterFunction<String> {
    @Override
    public Boolean filter(String value) {
      return value.contains("flink");
    }
  }
)

我們filter的字符串"flink"還可以當作參數傳進去。

DataStream<String> tweets = ...
DataStream<String> flinkTweets = tweets.filter(new KeywordFilter("flink"));

class KeywordFilter(keyWord: String) extends FilterFunction<String> {
  @Override
  public Boolean filter(String value) = {
    return value.contains(keyWord);
  }
}

4.2 匿名函數

匿名函數可以實現一些簡單的邏輯,但無法實現一些高級功能,例如訪問狀態等等。

DataStream<String> tweets = ...
DataStream<String> flinkTweets = tweets.filter(r -> r.contains("flink"));

4.3 富函數

我們經常會有這樣的需求:在函數處理數據之前,需要做一些初始化的工作;或者需要在處理數據時可以獲得函數執行上下文的一些信息;以及在處理完數據時做一些清理工作。而DataStream API就提供了這樣的機制。

DataStream API提供的所有轉換操作函數,都擁有它們的“富”版本,並且我們在使用常規函數或者匿名函數的地方來使用富函數。例如下面就是富函數的一些例子,可以看出,只需要在常規函數的前面加上Rich前綴就是富函數了。

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction
  • ...

當我們使用富函數時,我們可以實現兩個額外的方法:

  • open()方法是rich function的初始化方法,當一個算子例如map或者filter被調用之前open()會被調用。open()函數通常用來做一些只需要做一次即可的初始化工作。
  • close()方法是生命周期中的最后一個調用的方法,通常用來做一些清理工作。

另外,getRuntimeContext()方法提供了函數的RuntimeContext的一些信息,例如函數執行的並行度,當前子任務的索引,當前子任務的名字。同時還它還包含了訪問分區狀態的方法。下面看一個例子:

public static class MyFlatMap extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>> {
  private int subTaskIndex = 0;

  @Override
  public void open(Configuration configuration) {
    int subTaskIndex = getRuntimeContext.getIndexOfThisSubtask;
    // 做一些初始化工作
    // 例如建立一個和HDFS的連接
  }

  @Override
  public void flatMap(Integer in, Collector<Tuple2<Integer, Integer>> out) {
    if (in % 2 == subTaskIndex) {
      out.collect((subTaskIndex, in));
    }
  }

  @Override
  public void close() {
    // 清理工作,斷開和HDFS的連接。
  }
}

5 Sink

Flink沒有類似於spark中foreach方法,讓用戶進行迭代的操作。所有對外的輸出操作都要利用Sink完成。最后通過類似如下方式完成整個任務最終輸出操作。

stream.addSink(new MySink(xxxx));

官方提供了一部分的框架的sink。除此以外,需要用戶自定義實現sink。

5.1 Kafka

Kafka版本為0.11

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

Kafka版本為2.0以上

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

主函數中添加sink:

DataStream<String> union = high
  .union(low)
  .map(r -> r.temperature.toString);

union.addSink(
  new FlinkKafkaProducer011<String>(
    "localhost:9092",
    "test",
    new SimpleStringSchema()
  )
);

5.2 Redis

<dependency>
  <groupId>org.apache.bahir</groupId>
  <artifactId>flink-connector-redis_2.11</artifactId>
  <version>1.0</version>
</dependency>

定義一個redis的mapper類,用於定義保存到redis時調用的命令:

scala version

object SinkToRedisExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build()

    stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink))

    env.execute()
  }

  class MyRedisSink extends RedisMapper[SensorReading] {
    override def getKeyFromData(t: SensorReading): String = t.id

    override def getValueFromData(t: SensorReading): String = t.temperature.toString

    override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "sensor")
  }
}

java version

public class WriteToRedisExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<SensorReading> stream = env.addSource(new SensorSource());


        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build();

        stream.addSink(new RedisSink<SensorReading>(conf, new MyRedisSink()));

        env.execute();
    }

    public static class MyRedisSink implements RedisMapper<SensorReading> {
        @Override
        public String getKeyFromData(SensorReading sensorReading) {
            return sensorReading.id;
        }

        @Override
        public String getValueFromData(SensorReading sensorReading) {
            return sensorReading.temperature + "";
        }

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "sensor");
        }
    }
}

5.3 ElasticSearch

在主函數中調用:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

可選依賴:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.9.1</version>

</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.9.1</version>
</dependency>

示例代碼:

scala version

object SinkToES {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
      httpHosts,
      new ElasticsearchSinkFunction[SensorReading] {
        override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          val hashMap = new util.HashMap[String, String]()
          hashMap.put("data", t.toString)

          val indexRequest = Requests
            .indexRequest()
            .index("sensor") // 索引是sensor,相當於數據庫
            .source(hashMap)

          requestIndexer.add(indexRequest)
        }
      }

    )
    // 設置每一批寫入es多少數據
    esSinkBuilder.setBulkFlushMaxActions(1)

    val stream = env.addSource(new SensorSource)

    stream.addSink(esSinkBuilder.build())

    env.execute()
  }
}

java version

public class SinkToES {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

        ElasticsearchSink.Builder<SensorReading> sensorReadingBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<SensorReading>() {
                    @Override
                    public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                        HashMap<String, String> map = new HashMap<>();
                        map.put("data", sensorReading.toString());

                        IndexRequest indexRequest = Requests
                                .indexRequest()
                                .index("sensor") // 索引是sensor,相當於數據庫
                                .source(map);

                        requestIndexer.add(indexRequest);
                    }
                }
        );

        sensorReadingBuilder.setBulkFlushMaxActions(1);

        DataStream<SensorReading> stream = env.addSource(new SensorSource());

        stream.addSink(sensorReadingBuilder.build());

        env.execute();
    }
}

5.4 JDBC自定義sink

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.21</version>
</dependency>

示例代碼:

scala version

object SinkToMySQL {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    stream.addSink(new MyJDBCSink)

    env.execute()
  }

  class MyJDBCSink extends RichSinkFunction[SensorReading] {
    var conn: Connection = _
    var insertStmt: PreparedStatement = _
    var updateStmt: PreparedStatement = _

    override def open(parameters: Configuration): Unit = {
      conn = DriverManager.getConnection(
        "jdbc:mysql://localhost:3306/sensor",
        "zuoyuan",
        "zuoyuan"
      )
      insertStmt = conn.prepareStatement("INSERT INTO temps (id, temp) VALUES (?, ?)")
      updateStmt = conn.prepareStatement("UPDATE temps SET temp = ? WHERE id = ?")
    }

    override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
      updateStmt.setDouble(1, value.temperature)
      updateStmt.setString(2, value.id)
      updateStmt.execute()

      if (updateStmt.getUpdateCount == 0) {
        insertStmt.setString(1, value.id)
        insertStmt.setDouble(2, value.temperature)
        insertStmt.execute()
      }
    }

    override def close(): Unit = {
      insertStmt.close()
      updateStmt.close()
      conn.close()
    }
  }
}

java version

public class SinkToMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<SensorReading> stream = env.addSource(new SensorSource());

        stream.addSink(new MyJDBCSink());

        env.execute();
    }

    public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
        private Connection conn;
        private PreparedStatement insertStmt;
        private PreparedStatement updateStmt;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            conn = DriverManager.getConnection(
                    "jdbc:mysql://localhost:3306/sensor",
                    "zuoyuan",
                    "zuoyuan"
            );
            insertStmt = conn.prepareStatement("INSERT INTO temps (id, temp) VALUES (?, ?)");
            updateStmt = conn.prepareStatement("UPDATE temps SET temp = ? WHERE id = ?");
        }

        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            updateStmt.setDouble(1, value.temperature);
            updateStmt.setString(2, value.id);
            updateStmt.execute();

            if (updateStmt.getUpdateCount() == 0) {
                insertStmt.setString(1, value.id);
                insertStmt.setDouble(2, value.temperature);
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            super.close();
            insertStmt.close();
            updateStmt.close();
            conn.close();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM