Flink (三)DataStream API


第五章 DataStream API

Flink有非常靈活的分層 API設計,其中的核心層就是 DataStream/DataSet API。由於新版本已經實現了流批一體, DataSet API將被棄用,官方推薦統一使用 DataStream API處理流數據和批數據。由於內容較多,我們將會用幾章的篇幅來做詳細講解,本章主要介紹基本的DataStream API用法。

DataStream(數據流)本身是 Flink中一個用來表示數據集合的類( Class),我們編寫的Flink代碼其實就是基於這種數據類型的處理,所以這套核心 API就以 DataStream命名。對於批處理和流處理,我們都可以用這同一套 API來實現。

DataStream在用法上有些類似於常規的 Java集合,但又有所不同。我們在代碼中往往並不關心集合中具體的數據,而只是用 API定義出一連串的操作來處理它們;這就叫作數據流的 “轉換 transformations)。
一個Flink程序,其實就是對 DataStream的各種轉換。具體來說,代碼基本上都由以下幾部分構成,如圖

  • 獲取執行環境 execution environment
  • 讀取數據源 source
  • 定義基於數據的轉換操作 transformations
  • 定義計算結果的輸出位置 sink
  • 觸發程序執行 execute

其中,獲取環境和觸發執行,都可以認為是針對執行環境的操作。所以本章我們就從執行環境、數據源( source)、轉換操作 transformation)、輸出( sink)四大部分,對常用的 DataStream API做基本介紹。

image-20220405194001510

5.1 執行環境

Flink程序可以在各種上下文環境中運行:我們可以在本地 JVM中執行程序,也可以提交到遠程集群上運行。

不同的環境,代碼的提交運行的過程會有所不同。這就要求我們在提交作業執行計算時,首先必須獲取當前 Flink的運行環境,從而建立起與 Flink框架之間的聯系。只有獲取了環境上下文信息,才能將具體的任務調度到不同的 TaskManager執行。

1.1.1 創建執行環境

編寫Flink程序的第一步,就是創建執行環境。我們要獲取的執行環境,是StreamExecutionEnvironment類的對象,這是所有 Flink程序的基礎。在代碼中創建執行環境的方式,就是調用這個類的靜態方法,具體有以下三種。

  1. getExecutionEnvironment

最簡單的方式,就是直接調用getExecutionEnvironment方法。它會根據當前運行的上下文直接得到正確的結果:如果程序是獨立運行的,就返回一個本地執行環境;如果是創建了 jar包,然后從命令行調用它並提交到集群執行,那么就返回集群的執行環境。也就是說,這個方法會根據當前運行的方式,自行決定該返回什么樣的運行環境 。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

這種“智能”的方式不需要我們額外做判斷,用起來簡單高效,是最常用的一種創建執行環境的方式。

  1. createLocalEnvironment

這個方法返回一個本地執行環境。可以在調用時傳入一個參數,指定默認的並行度;如果不傳入,則默認並行度就是本地的 CPU核心數。

StreamExecutionEnvironment localE nv =StreamExecutionEnvironment.createLocalEnvironment();
  1. createRemoteEnvironment

這個方法返回集群執行環境。需要在調用時指定JobManager的主機名和端口號,並指定要在集群中運行的 Jar包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
    .createRemoteEnvironment(
    "host", // JobManager 主機名
    1234, // JobManager 進程端口號
    "path/to/jarFile. // 提交給 JobManager 的 JAR 包
);

在獲取到程序執行環境后,我們還可以對執行環境進行靈活的設置。比如可以全局設置程序的並行度、禁用算子鏈,還可以定義程序的時間語義、配置容錯機制。關於時間語義和容錯機制,我們會在后續的章節介紹。

5.1.2 執行模式

上節中我們獲取到的執行環境,是一個StreamExecutionEnvironment,顧名思義它應該是做流處理的。那對於批處理,又應該怎么獲取執行環境呢?

在之前的Flink版本中,批處理的執行環境與流處理類似,是調用類 ExecutionEnvironment的靜態方法,返回它的對象:

// 批處理環境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流處理環境
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

基於ExecutionEnvironment讀入數據創建的數據集合,就是 DataSet;對應的調用的一整套轉換方法,就 是 DataSet API。這些我們在第二章的批處理 word count程序中已經有了基本了解。

而從1.12.0版本起, Flink實現了 API上的流批統一。 DataStream API新增了一個重要特性:可以支持不同的“執行模式”( execution mode),通過簡單的設置就可以讓一段Flink程序在流處理和批處理之間切換。這樣一來, DataSet API也就沒有存在的必要了。

  • 流執行模式( STREAMING )

    這是DataStream API最 經典的模式,一般用於需要持續實時處理的無界數據流。默認情況下,程序使用的就是STREAMING執行模式。

  • 批執行模式( BATCH )

    專門用於批處理的執行模式 , 這種模式下, Flink處理作業的方式類似於 MapReduce框架 。
    對於不會持續計算的 有界數據,我們用這種模式處理會更方便。

  • 自動模式( AUTOMATIC )

    在這種模式下,將由程序根據輸入數據源是否有界,來自動選擇執行模式。

  1. BATCH模式的配置方法

由於Flink程序默認是 STREAMING模式,我們這里重點介紹一下 BATCH模式的配置。
主要有兩種方式:

(1)通過命令行配置

bin/flink run Dexecution.runtime mode=BATCH ...

在提交作業時,增加execution.runtime mode參數,指定值為 BATCH。

(2 )通過代碼配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代碼中,直接基於執行環境調用setRuntimeMode方法,傳入 BATCH模式。

建議: 不要在代碼中配置 而是使用命令行 。這同設置並行度是類似的: 在提交作業時指定參數可以更加靈活,同一段應用程序寫好之后,既可以用於批處理也可以用於流處理。而在代碼中硬編碼( hard code)的方式可擴展性比較差,一般都不推薦。

  1. 什么時候選擇 BATCH模式

我們知道,Flink本身持有的就是流處理的世界觀,即使是批量數據,也可以看作“有界流”來進行處理。所以 STREAMING 執行模式對於有界數據和無界數據都是有效的;而 BATCH模式僅能用於有界數據。

看起來BATCH模式似乎被 STREAMING模式全覆蓋了,那還有必要存在嗎?我們能不能所有情況下都用流處理模式呢?

當然是可以的,但是這樣有時不夠高效。

我們可以仔細回憶一下word count程序中,批處理和流處理輸出的不同:在 STREAMING模式下,每來一條數據,就會輸出一次結果(即使輸入數據是有界的);而 BATCH模式下,只有數據全部處理完之后,才會一次性輸出結果(攢批的過程)。最終的結果兩者是一致的,但是流處理模式會將更多的中間結果輸出。在本來輸入有界、只希望通過批處理得到最終的結果的場景下,STREAMING模式的逐個輸出結果就沒有必要了。

所以總結起來,一個簡單的原則就是:用BATCH模式處理批量數據,用 STREAMING模式處理流式數據。因為數據有界的時候,直接輸出結果會更加高效;而當數據無界的時候 , 我們沒得選擇——只有 STREAMING模式才能處理持續的數據流。

當然,在后面的示例代碼中,即使是有界的數據源,我們也會統一用 STREAMING模式處理。這是因為我們的主要目標還是構建實時處理流數據的程序,有界數據源也只是我們用來測試的手段。

5.1.3 觸發程序執行

有了執行環境,我們就可以構建程序的處理流程了:基於環境讀取數據源,進而進行各種轉換操作,最后輸出結果到外部系統。

需要注意的是,寫完輸出(sink)操作並不代表程序已經結束。因為當 main()方法被調用時,其實只是定義了作業的每個執行操作,然后添加到數據流圖中;這時並沒有真正處理數據因為數據可能還沒來。 Flink是由事件驅動的,只有等到數據到來,才會觸發真正的計算,這也被稱為“延遲執行”或“懶執行”( lazy execution)。

所以我們需要顯式地調用執行環境的 execute()方法,來觸發程序執行。 execute()方法將一直等待作業完成,然后返回一個執行結果( JobExecutionResult)。

env.execute();

5.2 源算子

image-20220405195801583

創建環境之后,就可以構建數據處理的業務邏輯了,如圖所示,本節將主要講解 Flink的源算子( Source)。想要處理數據,先得有數據,所以首要任務就是把數據讀進來。

Flink可以從各種來源獲取數據,然后構建 DataStream進行轉換處理。一般將數據的輸入來源稱為數據源 (data source),而讀取數據的算子就是源算子 source operator)。所以 source就是我們整個處理程序的輸入端。

Flink代碼中通用的添加 source的方式,是調用執行環境的 addSource()方法:

DataStream<String> stream = env.addSource(...);

方法傳入一個對象參數,需要實現SourceFunction接口;返回 DataStreamSource。這里的DataStreamSource類繼承自 SingleOutputStreamOperator類,又進一步繼承自 DataStream。所以很明顯,讀取數據的 source操作是一個算子,得到的是一個數據流( DataStream)。

這里可能會有些麻煩:傳入的參數是一個“源函數”(source function),需要實現SourceFunction接口。這是何方神聖,又該怎么實現呢?

自己去實現它顯然不會是一件容易的事。好在 Flink直接提供了很多預實現的接口,此外還有很多外部連接工具也幫我們實現了對應的 source function,通常情況下足以應對我們的實際需求。接下來我們就詳細展開講解。

5.2.1 准備工作

為了更好地理解,我們先構建一個實際應用場景。比如網站的訪問操作,可以抽象成一個三元組(用戶名,用戶訪問的 url,用戶訪問 url的時間戳),所以在這里,我們可以創建一個類 Event,將用戶行為包裝成它的一個對象。 Event包含了以下一些字段,如表所示:

字段名 數據類型 說明
user String 用戶名
url String 用戶訪問的url
timestamp Long 用戶訪問url的時間戳

具體代碼如下:

import java.sql.Timestamp;

public class Event {
    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

這里需要注意,我們定義的Event,有這樣幾個特點

  • 類是公有( public)的

  • 有一個無參的構造方法

  • 所有屬性都是公有( public)的

  • 所有屬性的類型都是可以序列化的

Flink會把這樣的類作為一種特殊的 POJO數據類型來對待,方便數據的解析和序列化。

另外我們在類中還重寫了 toString方法,主要是為了測試輸出顯示更清晰。關於 Flink支持的數據類型,我們會在后面章節做詳細說明。
我們這里自定義的Event POJO類會在后面的代碼中頻繁使用,所以在后面的代碼中碰到Event,把這里的 POJO類導入就好了。

注:Java編程比較好的實踐是重寫每一個類的 toString方法,來自 Joshua Bloch編寫的《 Effective Java》。

5.2.2 從集合中讀取數據

最簡單的讀取數據的方式,就是在代碼中直接創建一個Java集合,然后調用執行環境的fromCollection方法進行讀取。這相當於將數據臨時存儲到內存中,形成特殊的數據結構后,作為數據源使用,一般用於測試。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

ArrayList<Integer> nums = new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource<Integer> numStream = env.fromCollection(nums);

ArrayList<Event> events = new ArrayList<>();
events.add(new Event("Mary", "./home", 1000L));
events.add(new Event("Bob", "./cart", 2000L));
DataStreamSource<Event> stream2 = env.fromCollection(events);

stream2.print();
env.execute();

我們也可以不構建集合,直接將元素列舉出來,調用fromElements方法進行讀取數據:

DataStreamSource<Event> stream2 = env.fromElements(
    new Event("Mary", "./home", 1000L),
    new Event("Bob", "./cart", 2000L)
);

5.2.3 從文件讀取數據

真正的實際應用中,自然不會直接將數據寫在代碼中。通常情況下,我們會從存儲介質中獲取數據,一個比較常見的方式就是讀取日志文件。這也是批處理中最常見的讀取方式。

DataStreamSource<String> stream1 = env.readTextFile("input/clicks.csv");

說明:

  • 參數可以是目錄,也可以是文件;

  • 路徑可以是相對路徑,也可以是絕對路徑;

  • 相對路徑是從系統屬性 user.dir獲取路徑 : idea下是 project的根目錄 , standalone模式下是集群節點根目錄;

  • 也可以從 hdfs目錄下讀取 , 使用路徑 hdfs://..., 由於 Flink沒有提供 hadoop相關依賴 , 需要 pom中添加相關依賴 :

    <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-client</artifactId>
     <version>2.7.5</version>
     <scope>provided</scope>
    </dependency>
    

5.2.4 從 Socket讀取數據

不論從集合還是文件,我們讀取的其實都是有界數據。在流處理的場景中,數據往往是無界的。這時又從哪里讀取呢?

一個簡單的方式,就是我們之前用到的讀取socket文本流。這種方式由於吞吐量小、穩定性較差,一般也是用於測試。

DataStream<String> stream = env.socketTextStream("localhost", 7777);

5.2.5 從 Kafka讀取數據

那對於真正的流數據,實際項目應該怎樣讀取呢?
Kafka作為分布式消息傳輸隊列,是一個高吞吐、易於擴展的消息系統。而消息隊列的傳輸方式,恰恰和流處理是完全一致的。所以可以說 Kafka和 Flink天生一對,是當前處理流式數據的雙子星。在如今的實時流處理應用中,由 Kafka進行數據的收集和傳輸, Flink 進行分析計算,這樣的架構已經 成為眾多企業的首選,如圖所示。

image-20220405201110047

略微遺憾的是,與Kafka的連接比較復雜, Flink內部並沒有提供預實現的方法。所以我們只能采用通用的 addSource方式、實現一個 SourceFunction了。

好在Kafka與 Flink確實是非常契合,所以 Flink官方提供了連接工具 flink connector kafka直接幫我們實現了一個消費者 FlinkKafkaConsumer,它就是用來讀取 Kafka數據的SourceFunction。

所以想要以Kafka作為數據源獲取數據,我們只需要引入 Kafka連接器的依賴。 Flink官方提供的是一個通用的 Kafka連接器,它會自動跟蹤最新版本的 Kafka客戶端。 目前最新版本只支持 0.10.0版本以上的 Kafka,讀者使用時可以根據自己安裝的 Kafka版本選定連接器的依賴版本。這里我們需要導入的依賴如下。

   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
    </dependency>

然后調用env.addSource(),傳入 FlinkKafkaConsumer的對象實例就可以了。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class SourceKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
                "clicks",
                new SimpleStringSchema(),
                properties
        ));
        stream.print("Kafka");

        env.execute();
    }
}

創建FlinkKafkaConsumer時需要傳入三個參數

  • 第一個參數 topic,定義了從哪些主題中讀取數據。可以是一個 topic,也可以是 topic列表,還可以是匹配所有想要讀取的 topic的正則表達式。當從多個 topic中讀取數據時, Kafka連接器將會處理所有 topic的分區,將這些分區的數據放到一條流中去。

  • 第二個參數是一個 DeserializationSchema或者 KeyedDeserializationSchema。 Kafka消息被存儲為原始的字節數據,所以需要反序列化成 Java或者 Scala對象。上面代碼中使用的 SimpleStringSchema,是一個內置的 DeserializationSchema,它只是將字節數組簡單地反序列化成字符串。 DeserializationSchema和 KeyedDeserializationSchema是公共接口,所以我們也可以自定義反序列化邏輯。

  • 第三個參數是一個 Properties對象,設置了 Kafka客戶端的一些屬性。

5.2.6 自定義 Source

大多數情況下,前面的數據源已經能夠滿足需要。但是凡事總有例外,如果遇到特殊情況,我們想要讀取的數據源來自某個外部系統,而 flink既沒有預實現的方法、也沒有提供連接器,又該怎么辦呢?

那就只好自定義實現SourceFunction了。

接下來我們創建一個自定義的數據源,實現SourceFunction接口。主要重寫兩個關鍵方法:run()和 cancel()。

  • run()方法:使用運行時上下文對象( SourceContext)向下游發送數據
  • cancel()方法:通過標識位控制退出循環,來達到中斷數據源的效果。

代碼如下:
我們先來自定義一下數據源:

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class ClickSource implements SourceFunction<Event> {
    // 聲明一個布爾變量,作為控制數據生成的標識位
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();    // 在指定的數據集中隨機選取數據
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

        while (running) {
            ctx.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            // 隔1秒生成一個點擊事件,方便觀測
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }

}

這個數據源,我們后面會頻繁使用,所以在后面的代碼中涉及到ClickSource()數據源,使用上面的代碼就可以了。

下面的代碼我們來讀取一下自定義的數據源。有了自定義的source function,接下來只要調用 addSource()就可以了:

env.addSource(new ClickSource())

下面是完整的代碼:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SourceCustomTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //有了自定義的source function,調用addSource方法
        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        stream.print("SourceCustom");

        env.execute();
    }
}

運行結果:

SourceCustom> Event{user='Mary', url='./prod?id=2', timestamp=2022-04-05 20:26:19.724}
SourceCustom> Event{user='Alice', url='./home', timestamp=2022-04-05 20:26:20.724}
SourceCustom> Event{user='Mary', url='./cart', timestamp=2022-04-05 20:26:21.724}
SourceCustom> Event{user='Bob', url='./home', timestamp=2022-04-05 20:26:22.739}
SourceCustom> Event{user='Cary', url='./cart', timestamp=2022-04-05 20:26:23.754}
SourceCustom> Event{user='Alice', url='./fav', timestamp=2022-04-05 20:26:24.759}
SourceCustom> Event{user='Bob', url='./home', timestamp=2022-04-05 20:26:25.774}
SourceCustom> Event{user='Alice', url='./home', timestamp=2022-04-05 20:26:26.787}
SourceCustom> Event{user='Cary', url='./home', timestamp=2022-04-05 20:26:27.799}
......

這里要注意的是SourceFunction接口定義的數據源,並行度只能設置為 1,如果數據源設置為大於 1的並行度,則會拋出異常。

所以如果我們想要自定義並行的數據源的話,需要使用 ParallelSourceFunction,示例程序如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.Random;

public class SourceCustomParallelTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new CustomSource()).setParallelism(2).print();

        env.execute();
    }

    public static class CustomSource implements ParallelSourceFunction<Integer> {
        private boolean running = true;
        private Random random = new Random();

        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
            while (running) {
                sourceContext.collect(random.nextInt());
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

5.2.7 Flink支持的數據類型

我們已經了解了Flink怎樣從不同的來源讀取數據。在之前的代碼中,我們的數據都是定義好的 UserBehavior類型,而且在5.2.1小節中特意說明了對這個類的要求。那還有沒有其他更靈活的類型可以用呢? Flink支持的數據類型到底有哪些?

1. Flink的類型系統

為什么會出現“不支持”的數據類型呢?因為Flink作為一個分布式處理框架,處理的是以數據對象作為元素的流。如果用水流來類比,那么我們要處理的數據元素就是隨着水流漂動的物體。在這條流動的河里,可能漂浮着小木塊,也可能行駛着內部錯綜復雜的大船。要分布式地處理這些數據,就不可避免地要面對數據的網絡傳輸、狀態的落盤和故障恢復等問題,這就需要對數據進行序列化和反序列化。小木塊是容易序列化的;而大船想要序列化之后傳輸,就需要將它拆解、清晰地知道其中每一個零件的類型。

為了方便地處理數據,Flink有自己一整套類型系統。 Flink使用“類型信息”(TypeInformation)來統一表示數據類型。 TypeInformation類是 Flink中所有類型描述符的基類。它涵蓋了類型的一些基本屬性, 並為每個數據類型生成特定的序列化器、反序列化器和比較器。

2. Flink支持的數據類型

簡單來說,對於常見的Java和 Scala數據類型, Flink都是支持的。 Flink在內部, Flink對支持不同的類型進行了划分,這些類型可以在Types工具類中找到:
(1)基本類型
所有Java基本類型及其包裝類,再加上 Void、 String、 Date、 BigDecimal和 BigInteger。

(2)數組類型
包括基本類型數組(PRIMITIVE_ARRAY)和對象數組 (OBJECT_ARRAY)

(3)復合數據類型

  • Java元組類型( TUPLE):這是 Flink內置的元組類型,是 Java API的一部分。最多25個字段,也就是從 Tuple0~Tuple25,不支持空字段
  • Scala 樣例類及 Scala元組:不支持空字段
  • 行類型( ROW):可以認為是具有任意個字段的元組 ,並支持空字段
  • POJO Flink自定義的類似於 Java bean模式的類

(4)輔助類型
Option、 Either、 List、 Map等

(5)泛型類型 GENERIC

Flink支持所有的 Java類和 Scala類。不過如果沒有按照上面 POJO類型的要求來定義,就會被 Flink當作泛型類來處理。 Flink會把泛型類型當作黑盒,無法獲取它們內部的屬性;它們也不是由 Flink本身序列化的,而是由 Kryo序列化的。

在這些類型中,元組類型和POJO類型最為靈活,因為它們支持創建復雜類型。而相比之下, POJO還支持在鍵( key)的定義中直接使用字段名,這會讓我們的代碼可讀性大大增加。

所以,在項目實踐中 ,往往會將流處理程序中的元素類型定為 Flink的 POJO類型。

Flink對 POJO類型的要求如下:

  • 類是公共的( public)和獨立的 standalone,也就是說沒有非靜態的內部類
  • 類有一個公共的無參構造方法;
  • 類中的所有字段是 public且非 final的;或者有一個公共的 getter和 setter方法,這些方法需要符合 Java bean的命名規范。

所以我們看到,之前的UserBehavior,就是我們創建的符合 Flink POJO定義的數據類型。

3. 類型提示( Type Hints )

Flink還具有一個類型提取系統,可以分析函數的輸入和返回類型,自動獲取類型信息,從而獲得對應的序列化器和反序列化器。但是,由於 Java中泛型擦除的存在,在某些特殊情況下(比如 Lambda表達式中),自動提取的信息是不夠精細的 只告訴 Flink當前的元素由“船頭、船身、船尾”構成,根本無法重建出“大船”的模樣;這時就需要顯式地提供類型信息,才能使應用程序正常工作或提高其性能。

為了解決這類問題,Java API提供了專門的“類型提示”( type hints) 。

回憶一下之前的word count流處理程序,我們在將 String類型的每個詞轉換成( wordcount)二元組后,就明確地用 returns指定了返回的類型。因為對於 map里傳入的 Lambda表達式,系統只能推斷出返回的是 Tuple2類型,而無法得到 Tuple2<String, Long>。只有顯式地告訴系統當前的返回類型,才能正確地解析出完整數據。

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

這是一種比較簡單的場景,二元組的兩個元素都是基本數據類型。那如果元組中的一個元素又有泛型,該怎么處理呢?

Flink專門提供了 TypeHint類,它可以捕獲泛型的類型信息,並且一直記錄下來,為運行時提供足夠的信息。我們同樣可以通過 .returns()方法,明確地指定轉換之后的 DataStream里元素的類型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

5.3 轉換算子

image-20220405204833175

數據源讀入數據之后,我們就可以使用各種轉換算子,將一個或多個DataStream轉換為新的 DataStream,如圖所示。一個 Flink程序的核心,其實就是所有的轉換操作,它們決定了處理的業務邏輯。

我們可以針對一條流進行轉換處理,也可以進行分流、合流等多流轉換操作,從而組合成復雜的數據流拓撲。在本節中,我們將重點介紹基本的單數據流的轉換,多流轉換的內容我們將在后續章節展開。

5.3.1 基本轉換算子

首先我們來介紹一些基本的轉換算子,它們的概念和使用想必讀者不會陌生。

  1. 映射( map )
    map是大家非常熟悉的大數據操作算子,主要用於將數據流中的數據進行轉換,形成新的數據流。簡單來說,就是一個“一一映射”,消費一個元素就產出一個元素,如圖所示。

image-20220405205003696

我們只需要基於DataStrema調用 map()方法就可以進行轉換處理。方法需要傳入的參數是接口 MapFunction的實現;返回值類型還是 DataStream,不過泛型(流中的元素類型)可能改變。

下面的代碼用不同的方式,實現了提取Event中的 user字段的功能。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransMapTest {
  public static void main(String[] args) throws Exception{
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);

      DataStreamSource<Event> stream = env.fromElements(
              new Event("Mary", "./home", 1000L),
              new Event("Bob", "./cart", 2000L)
      );

      // 傳入匿名類,實現MapFunction
      stream.map(new MapFunction<Event, String>() {
          @Override
          public String map(Event e) throws Exception {
              return e.user;
          }
      });

      // 傳入MapFunction的實現類
      stream.map(new UserExtractor()).print();

      env.execute();
  }
  public static class UserExtractor implements MapFunction<Event, String> {
      @Override
      public String map(Event e) throws Exception {
          return e.user;
      }
  }
}

上面代碼中,MapFunction實現類的泛型類型,與輸入數據類型和輸出數據的類型有關。在實現 MapFunction接口的時候,需要指定兩個泛型,分別是輸入事件輸出事件的類型,還需要重寫一個 map()方法,定義從一個輸入事件轉換為另一個輸出事件的具體邏輯。

另外,細心的讀者通過查看Flink源碼可以發現,基於 DataStream調用 map方法,返回的其實是一個SingleOutputStreamOperator。

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
    return this.map(mapper, outType);
}

這表示map是一個用戶可以自定義的轉換( transformation)算子,它作用於一條數據流上,轉換處理的結果是一個確定的輸出類型。當然,SingleOutputStreamOperator 類本身也繼承自 DataStream類,所以說 map是將一個 DataStream轉換成另一個 DataStream是完全正確的。

  1. 過濾( filter )
    filter轉換操作,顧名思義是對數據流執行一個過濾,通過一個布爾條件表達式設置過濾條件,對於每一個流內元素進行判斷,若為 true則元素正常輸出,若為 false則元素被過濾掉,如圖所示。

image-20220405205812498

進行filter轉換之后的新數據流的數據類型與原數據流是相同的。 filter轉換需要傳入的參數需要實現 FilterFunction接口,而 FilterFunction內要實現 filter()方法,就相當於一個返回布爾類型的條件表達式。

下面的代碼會將數據流中用戶Mary的瀏覽行為過濾出來 。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransFilterTest {
  public static void main(String[] args) throws Exception{
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);

      DataStreamSource<Event> stream = env.fromElements(
              new Event("Mary", "./home", 1000L),
              new Event("Bob", "./cart", 2000L)
      );

      // 傳入匿名類實現FilterFunction
      stream.filter(new FilterFunction<Event>() {
          @Override
          public boolean filter(Event e) throws Exception {
              return e.user.equals("Mary");
          }
      });

      // 傳入FilterFunction實現類
      stream.filter(new UserFilter()).print();

      env.execute();
  }
  public static class UserFilter implements FilterFunction<Event> {
      @Override
      public boolean filter(Event e) throws Exception {
          return e.user.equals("Mary");
      }
  }
}

  1. 扁平映射( flatMap )

flatMap操作又稱為扁平映射,主要是將數據流中的整體(一般是集合類型)拆分成一個一個的個體使用。消費一個元素,可以產生 0到多個元素。 flatMap可以認為是“扁平化”( flatten)和“映射”( map)兩步操作的結合,也就是先按照某種規則對數據進行打散拆分,再對拆分后的元素做轉換處理,如圖所示。我們此前 WordCount程序的第一步分詞操作,就用到了flatMap。

image-20220405210003727

同map一樣, flatMap也可以使用 Lambda表達式或者 FlatMapFunction接口實現類的方式來進行傳參,返回值類型取決於所傳參數的具體邏輯,可以與原數據流相同,也可以不同。

flatMap操作會應用在每一個輸入事件上面, FlatMapFunction接口中定義了 flatMap方法,用戶可以重寫這個方法,在這個方法中對輸入數據進行處理,並決定是返回 0個、 1個或多個結果數據。因此 flatMap並沒有直接定義返回值類型,而是通過一個“收集器”( Collector)來指定輸出。希望輸出結果時,只要調用收集器的 .collect()方法就可以了;這個方法可以多次調用,也可以不調用。所以 flatMap方法也可以實現 map方法和 filter方法的功能,當返回結果是 0個的時候,就相當於對數據進行了過濾,當返回結果是 1個的時候,相當於對數據進行了簡單的轉換操作。

flatMap的使用非常靈活,可以對結果進行任意輸出,下面就是一個例子:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class TransFlatmapTest {
  public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);

      DataStreamSource<Event> stream = env.fromElements(
              new Event("Mary", "./home", 1000L),
              new Event("Bob", "./cart", 2000L)
      );

      stream.flatMap(new MyFlatMap()).print();

      env.execute();
  }
  public static class MyFlatMap implements FlatMapFunction<Event, String> {
      @Override
      public void flatMap(Event value, Collector<String> out) throws Exception {
          if (value.user.equals("Mary")) {
              out.collect(value.user);
          } else if (value.user.equals("Bob")) {
              out.collect(value.user);
              out.collect(value.url);
          }
      }
  }
}

5.3.2 聚合算子

直觀上看,基本轉換算子確實是在“轉換”因為它們都是基於當前數據,去做了處理和輸出。而在實際應用中,我們往往需要對大量的數據進行統計或整合,從而提煉出更有用的信息。比如之前 word count程序中,要對每個詞出現的頻次進行疊加統計。這種操作,計算的結果不僅依賴當前數據,還跟之前的數據有關,相當於要把所有數據聚在一起進行匯總合並這就是所謂的“聚合”( Aggregation),也對應着 MapReduce中的 reduce操作。

  1. 按鍵分區( keyBy )

對於Flink而言, DataStream是沒有直 接進行聚合的 API的。因為我們對海量數據做聚合肯定要進行分區並行處理,這樣才能提高效率。所以在 Flink中,要做聚合,需要先進行分區;這個操作就是通過 keyBy來完成的。

keyBy是聚合前必須要用到的一個算子。 keyBy通過指定鍵( key),可以將一條流從邏輯上划分成不同的分區( partitions)。這里所說的分區,其實就是並行處理的子任務,也就對應着任務槽( task slot)。

基於不同的key,流中的數據將被分配到不同的分區中去,如圖所示;這樣一來,所有具有相同的 key的數據,都將被發往同一 個分區,那么下一步算子操作就將會在同一個 slot中進行處理了。

image-20220405210501441在內部,是通過計算key的哈希值( hash code),對分區數進行取模運算來實現的。所以這里 key如果是 POJO的話,必須要重寫 hashCode()方法。

keyBy()方法需要傳入一個參數,這個參數指定了一個或一組 key。有很多不同的方法來指定 key:比如對於 Tuple數據類型,可以指定字段的位置或者多個位置的組合;對於 POJO類型,可以指定字段的名稱( String);另外,還可以傳入 Lambda表達式或者實現一個鍵選擇器KeySelector),用於說明從數據中提取 key的邏輯。

我們可以以id作為 key做一個分區操作,代碼實現如下:

DataStreamSource<Event> stream = env.fromElements(
    new Event("Mary", "./home", 1000L),
    new Event("Bob", "./cart", 2000L)
);

// 使用 Lambda 表達式
KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);
env.execute();

需要注意的是,keyBy得到的結果將不再是 DataStream,而是會將 DataStream轉換為KeyedStream。 KeyedStream可以認為是“分區流”或者“鍵控流”,它是對 DataStream按照key的一個邏輯分區,所以泛型有兩個類型:除去當前流中的元素類型外,還需要指定 key的類型。

KeyedStream也繼承自 DataStream,所以基於它的操作也都歸屬於 DataStream API。但它跟之前的轉換操作得到的 SingleOutputStreamOperator不同,只是一個流的分區操作,並不是一個轉換算子。 KeyedStream是一個非常重要的數據結構,只有基於它才可以做后續的聚合操作(比如 sum reduce);而且它可以將當前算子任務的狀態 state)也按照 key進行划分、限定為僅對 當前 key有效。關於狀態的相關知識我們會在后面章節繼續討論。

  1. 簡單聚合

有了按鍵分區的數據流KeyedStream,我們就可以基於它進行聚合操作了。 Flink為我們內置實現了一些最基本、最簡單的聚合 API,主要有以下幾種

  • sum():在輸入流上,對指定的字段做疊加求和的操作。
  • min():在輸入流上,對指定的字段求最小值。
  • max():在輸入流上,對指定的字段求最大值。
  • minBy():與 min()類似,在輸入流上針對指定字段求最小值。不同的是, min()只計算指定字段的最小值,其他字段會保留最初第 一個數據的值;而 minBy()則會返回包含字段最小值的整條數據。
  • maxBy():與 max()類似,在輸入流上針對指定字段求最大值。兩者區別與min()/minBy()完全一致。

簡單聚合算子使用非常方便,語義也非常明確。這些聚合方法調用時,也需要傳入參數;但並不像基本轉換算子那樣需要實現自定義函數,只要說明聚合指定的字段就可以了。指定字段的方式有兩種:指定位置,和指定名稱。

對於元組類型的數據,同樣也可以使用這兩種方式來指定字段。需要注意的是,元組中字段的名稱,是以 f0、 f1、 f2、 …來命名的。

例如,下面就是對元組數據流進行聚合的測試:

public class TransTupleAggreationTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(
                Tuple2.of("a", 1),
                Tuple2.of("a", 3),
                Tuple2.of("b", 3),
                Tuple2.of("b", 4)
        );

//        stream.keyBy(r -> r.f0).sum(1).print();
//        stream.keyBy(r -> r.f0).sum("f1").print();
//        stream.keyBy(r -> r.f0).max(1).print();
//        stream.keyBy(r -> r.f0).max("f1").print();
//        stream.keyBy(r -> r.f0).min(1).print();
//        stream.keyBy(r -> r.f0).min("f1").print();
//        stream.keyBy(r -> r.f0).maxBy(1).print();
//        stream.keyBy(r -> r.f0).maxBy("f1").print();
//        stream.keyBy(r -> r.f0).minBy(1).print();
        stream.keyBy(r -> r.f0).minBy("f1").print();

        env.execute();
    }
}

而如果數據流的類型是POJO類,那么就只能通過字段名稱來指定,不能通過位置來指定了。

public class TransPojoAggregationTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Mary", "./cart", 3000L),
                new Event("Mary", "./fav", 4000L)
        );

        stream.keyBy(e -> e.user)
                .min("timestamp")    // 指定字段名稱
                .print();

        env.execute();
    }

簡單聚合算子返回的,同樣是一個SingleOutputStreamOperator,也就是從 KeyedStream又轉換成了常規的 DataStream。所以可以這樣理解: keyBy和聚合是成對出現的,先分區、后聚合,得到的依然是一個 DataStream。而且經過簡單聚合之后的數據流,元素的數據類型保持不變。

一個聚合算子,會為每一個key保存一個聚合的值,在 Flink中我們 把它叫作“狀態”( state)。所以每當有一個新的數據輸入,算子就會更新保存的聚合結果,並發送一個帶有更新后聚合值的事件到下游算子。對於無界流來說,這些狀態是永遠不會被清除的,所以我們使用聚合算子,應該只用在含有有限個 key的數據流上。

  1. 歸約聚合( reduce )

如果說簡單聚合是對一些特定統計需求的實現,那么reduce算子就是一個一般化的聚合統計操作了。從大名鼎鼎的 MapReduce開始,我們對 reduce操作就不陌生:它可以對已有的數據進行歸約處理,把每一個新輸入的數據和當前已經歸約出來的值,再做 一個聚合計算

與簡單聚合類似,reduce操作也會將 KeyedStream轉換為 DataStream。它不會改變流的元素數據類型,所以輸出類型和輸入類型是一樣的。

調用KeyedStream的 reduce方法時,需要傳入一個參數,實現 ReduceFunction接口。接口在源碼中的定義如下:

public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction接口里需要實現 reduce()方法,這個方法接收兩個輸入事件,經過轉換處理之后輸出一個相同類型的事件;所以,對於一組數據,我們可以先取兩個進行合並,然后再將合並的結果看作一個數據、再跟后面的數據合並,最終會將它“簡化”成唯一的一個數據,這也就是 reduce“歸約”的含義。在流處理的底層實現過程中,實際上是將中間“合並的結果作為任務的一個狀態保存起來的;之后每來一個新的數據,就和之前的聚合狀態進一步做歸約。

其實,reduce的語義是針對列表進行規約操作,運算規則由 ReduceFunction中的 reduce方法來定義,而在 ReduceFunction內部會維護一個初始值為空的累加器,注意累加器的類型和輸入元素的類型相同,當第一條元素到來時,累加器的值更新為第一條元素的值,當新的元素到來時,新元素會和累加器進行累加操作,這里的累加操作就是 reduce函數定義的運算規則。然后將更新以后的累加器的值向下游輸出。

我們可以單獨定義一個函數類實現ReduceFunction接口,也可以直接傳入一個匿名類。當然,同樣也可以通過傳入 Lambda表達式實現 類似的功能。

與簡單聚合類似,reduce操作也會將 KeyedStream轉換為 DataStrema。它不會改變流的元素數據類型,所以輸出類型和輸入類型是一樣的。

下面我們來看一個稍復雜的例子。

我們將數據流按照用戶id進行分區,然后用一個 reduce算子實現 sum的功能,統計每個用戶訪問的頻次;進而將所有統計結果分到一組,用另一個 reduce算子實現 maxBy的功能,記錄所有用戶中訪問頻次最高的那個,也就是當前訪問量最大的用戶是誰。

public class TransReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 這里的使用了之前自定義數據源小節中的ClickSource()
        env.addSource(new ClickSource())
                // 將Event數據類型轉換成元組類型
                .map(new MapFunction<Event, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(Event e) throws Exception {
                        return Tuple2.of(e.user, 1L);
                    }
                })
                .keyBy(r -> r.f0) // 使用用戶名來進行分流
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 每到一條數據,用戶pv的統計值加1
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                })
                .keyBy(r -> true) // 為每一條數據分配同一個key,將聚合結果發送到一條流中去
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 將累加器更新為當前最大的pv統計值,然后向下游發送累加器的值
                        return value1.f1 > value2.f1 ? value1 : value2;
                    }
                })
                .print();

        env.execute();

    }
}

reduce同簡單聚合算子一樣,也要針對每一個 key保存狀態。因為狀態不會清空,所以我們需要將 reduce算子作用在一個有限 key的流上。

5.3.3 用戶自定義函數

在前面的介紹我們可以發現,Flink的 DataStream API編程風格其實是一致的:基本上都是基於 DataStream調用一個方法,表示要做一個轉換操作;方法需要傳入一個參數,這個參數都是需要實現一個接口。我們還可以擴展到 5.2節講到的 Source算子,其實也是需要自定義類實現一個 SourceFunction接口。我們能否從中總結出一些規律呢?

很容易發現,這些接口有一個共同特點:全部都以算子操作名稱 + Function命名,例如源算子需要實現 SourceFunction接口, map算子需要實現 MapFunction接口, reduce算子需要實現 ReduceFunction接口。而且查看源碼會發現,它們都繼承自 Function接口 ;這個接口是空的,主要就是為了方便擴展為單一抽象方法( Single Abstract Method SAM)接口,這就是我們所說的“函數接口” 比如 MapFunction中需要實現一個 map()方法, ReductionFunction中需要實現一個 reduce()方法,它們都是 SAM接口。我們知道, Java 8新增的 Lambda表達式就可以實現 SAM接口;所以這樣的好處就是,我們不僅可以通過自定義函數類或者匿名類來實現接口,也可以直接傳入 Lambda表達式。這就是所謂的用戶自定義函數( user defined function UDF)。

接下來我們就對這幾種編程方式做一個梳理總結。

  1. 函數類( Function Classes )

對於大部分操作而言,都需要傳入一個用戶自定義函數(UDF),實現相關操作的接口來完成處理邏輯的定義。 Flink暴露了所有 UDF函數的接口,具體實現方式為接口或者抽象類,例如 MapFunction、 FilterFunction、 ReduceFunction等。

所以最簡單直接的方式,就是自定義一個函數類,實現對應的接口。之前我們對於API的練習,主要就是基於這種方式。

下面例子實現了 FilterFunction接口,用來篩選 url中 包含 “home”的 事件

public class TransUdfTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> clicks = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        // 1. 傳入實現FilterFunction接口的自定義函數類
        DataStream<Event> stream = clicks.filter(new FlinkFilter());
        
        stream.print();

        env.execute();
    }

    public static class FlinkFilter implements FilterFunction<Event> {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.url.contains("home");
        }
    }
}

當然還可以通過匿名類來實現FilterFunction接口:

        // 2. 傳入匿名類
        DataStream<Event> stream3 = clicks.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.url.contains("home");
            }
        });

為了類可以更加通用,我們還可以將用於過濾的關鍵字"home"抽象出來作為類的屬性,調用構造方法時傳進去。

     // 傳入屬性字段
   DataStream<Event> stream = clicks.filter(new KeyWordFilter("home"));
   
   public static class KeyWordFilter implements FilterFunction<Event> {
        private String keyWord;

        KeyWordFilter(String keyWord) { this.keyWord = keyWord; }

        @Override
        public boolean filter(Event value) throws Exception {
            return value.url.contains(this.keyWord);
        }
    }
  1. 匿名函數( Lambda )

匿名函數(Lambda表達式)是 Java 8 引入的新特性,方便我們更加快速清晰地寫代碼。
Lambda 表達式允許以簡潔的方式實現函數,以及將函數作為參數來進行傳遞,而不必聲明額外的(匿名)類。

Flink 的所有算子都可以使用 Lambda 表達式的方式來進行編碼,但是,當 Lambda 表達式使用 Java 的泛型時,我們需要顯式的聲明類型信息。

下例演示了如何使用Lambda表達式來實現一個簡單的 map() 函數,我們使用 Lambda 表達式來計算輸入的平方。在這里,我們不需要聲明 map() 函數的輸入 i 和輸出參數的數據類型,因為 Java 編譯器會對它們做出類型推斷。

 public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
        //map 函數使用 Lambda 表達式, 返回簡單類型, 不需要進行類型聲明
        DataStream< String > stream1 = stream.map(event->event.url);
        stream1.print();
        
        env.execute();
    }

由於OUT 是 String 類型而不是泛型,所以 Flink 可以從函數簽名 OUT map(IN value) 的實現中自動提取出結果的類型信息。

但是對於像flatMap() 這樣的函數,它的函數簽名 void flatMap(IN value, Collector<OUT> out) 被 Java 編譯器編譯成了 void flatMap(IN value, Collector out),也就是說將 Collector的泛型信息擦除掉了。這樣 Flink 就無法自動推斷輸出的類型信息了。

在這種情況下,我們需要顯式地指定類型信息,否則輸出將被視為 Object 類型,這會導致低效的序列化。

// flatMap 使用 Lambda 表達式,必須 通過 returns 明確聲明 返回類型
DataStream<String> stream 2 = clicks.flatMap((Event event, Collector<String>out) -> {
    out.collect(event.url);
}).returns(Types.STRING);

stream2.print();

當使用map() 函數返回 Flink自定義的元組類型時也會發生類似的問題。下例中的函數簽名 Tuple2<String, Long> map(Event value) 被類型擦除為 Tuple2 map(Event value)。

//使用 map 函數也會出現類似問題,以下代碼會報錯
DataStream<Tuple2< String , Long>> stream3 = clicks.map( event -> Tuple2.of(event.user, 1L));
stream3.print();

一般來說,這個問題可以通過多種方式解決:

public class TransReturnTypeTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> clicks = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        // 想要轉換成二元組類型,需要進行以下處理
        // 1) 使用顯式的 ".returns(...)"
        DataStream<Tuple2<String, Long>> stream3 = clicks
                .map( event -> Tuple2.of(event.user, 1L) )
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        stream3.print();


        // 2) 使用類來替代Lambda表達式
        clicks.map(new MyTuple2Mapper())
                .print();

        // 3) 使用匿名類來代替Lambda表達式
        clicks.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event value) throws Exception {
                return Tuple2.of(value.user, 1L);
            }
        }).print();

        env.execute();
    }

    // 自定義MapFunction的實現類
    public static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String, Long>>{
        @Override
        public Tuple2<String, Long> map(Event value) throws Exception {
            return Tuple2.of(value.user, 1L);
        }
    }
}

這些方法對於其它泛型擦除的場景同樣適用。

  1. 富函數類( Rich Function Classes )

“富函數類”也是DataStream API提供的一個函數類的接口,所有的 Flink函數類都有其Rich版本。富函數類一般是以抽象類的形式出現的。例如: RichMapFunction、 RichFilterFunction、RichReduceFunction等。

既然“富”,那么它一定會比常規的函數類提供更多、更豐富的功能。與常規函數類的不同主要在於,富函數類可以獲取運行環境的上下文,並擁有一些生命周期方法,所以可以實現更復雜的功能。

注:生命周期的概念在編程中其實非常重要,到處都有體現。例如:對於C語言來說,我們需要手動管理內存的分配和回收,也就是手動管理內存的生命周期。分配內存而不回收,會造成內存泄漏,回收沒有分配過的內存,會造成空指針異常。而在 JVM中,虛擬機會自動幫助我們管理對象的生命周期。對於前端來說,一個頁面也會有生命周期。數據庫連接、網絡連接以及文件描述符的創建和關閉,也都形成了生命周期。所以生命周期的概念在編程中是無處不在的,需要我們多加注意。

Rich Function有生命周期的概念。 典型的生命周期方法有

  • open()方法,是 Rich Function的初始化方法,也就是會開啟一個算子的生命周期。當一個算子的實際工作方法例如 map()或者 filter()方法被調用之前, open()會首先被調用。所以像文件 IO的創建,數據庫連接的創建,配置文件的讀取等等這樣一次性的工作,都適合在 open()方法中完成。。

  • close()方法,是生命周期中的最后一個調用的方法,類似於解構方法。一般用來做一些清理工作。

需要注意的是,這里的生命周期方法,對於一個並行子任務來說只會調用一次;而對應的,實際工作方法,例如 RichMapFunction中的 map(),在每條數據到來后都會觸發一次調用。

來看一個例子:

輸出結果是:

public class TransRichFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 和並行度相關
        env.setParallelism(2);
        DataStreamSource<Event> clicks = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=1", 5 * 1000L),
                new Event("Cary", "./home", 60 * 1000L)
        );

        // 將點擊事件轉換成長整型的時間戳輸出
        clicks.map(new RichMapFunction<Event, Long>() {
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        System.out.println("索引為 " + getRuntimeContext().getIndexOfThisSubtask() + " 的任務開始");
                    }

                    @Override
                    public Long map(Event value) throws Exception {
                        return value.timestamp;
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        System.out.println("索引為 " + getRuntimeContext().getIndexOfThisSubtask() + " 的任務結束");
                    }
                })
                .print();

        env.execute();
    }
}

輸出結果:

索引為 1 的任務開始
索引為 0 的任務開始
2> 1000
1> 2000
2> 5000
1> 60000
索引為 0 的任務結束
索引為 1 的任務結束

一個常見的應用場景就是,如果我們希望連接到一個外部數據庫進行讀寫操作,那么將連接操作放在 map()中顯然不是個好選擇 因為每來一條數據就會重新連接一次數據庫;所以我們可以在 open()中建立連接,在 map()中讀寫數據,而在 close()中關閉連接。所以我們推薦的最佳實踐如下:

public class MyFlatMap extends RichFlatMapFunction<IN, OUT>{ 
    @Override 
    public void open(Configuration configuration){
        // 做一些初始化工作
        // 例如建立一個和 MySQL 的連接
    }
    @Override
    public void flatMap(IN in,Collector<OUT out>){
        // 對數據庫進行讀寫
    }
    @Override
    public void close(){
        // 清理工作,關閉和 MySQL 數據庫的連接。
    }
}

另外,富函數類提供了getRuntimeContext()方法(我們在本節的第一個例子中使用了一下),可以獲取到運行時上下文的一些信息,例如程序執行的並行度,任務名稱,以及狀態(state)。這使得我們可以大大擴展程序的功能,特別是對於狀態的操作,使得 Flink中的算子具備了處理復雜業務的能力。關於 Flink中的狀態管理和狀態編程,我們會在后續章節逐漸展開。

5.3.4 物理分區

本節的最后,我們再來深入了解一下分區操作 。
顧名思義,“分區”(partitioning) 操作 就是要將數據進行重新分布,傳遞到不同的流分區去進行下一步 處理 。其實我們對分區操作並不陌生,前面介紹聚合算子時,已經提到了 keyBy它就是一種按照鍵的哈希值來進行重新分區的 操作 。只不過這種分區操作只能保證把數據按key“分開”,至於分得均不均勻、每個 key的數據具體會分到哪一區去,這些是完全無從控制的 所以我們有時也說, keyBy是一種邏輯分區( logical partitioning)操作。

如果說keyBy這種邏輯分區是一種“軟分區”,那真正硬核的分區就應該是所謂的“物理分區”( physical partitioning)。也就是我們要真正控制分區策略,精准地調配數據,告訴每個數據到底去哪里。其實這種分區方式在一些情況下已經在發生了:例如我們編寫的程序可能對多個處理任務設置了不同的並行度,那么當數據執行的上下游任務並行度變化時,數據就不應該還在當前分區以直通( forward)方式傳輸了 因為如果並行度變小,當前分區可能沒有下游任務了;而如果並行度變大,所有數據還在原先的分區處理就會導致資源的浪費。 所以這種情況下,系統會自動地將數據均勻地發往下游所有的並行任務,保證各個分區的負載均衡。

有些時候,我們還需要手動控制數據分區分配策略。比如當發生數據傾斜的時候,系統無法自動調整,這時就需要我們重新進行負載均衡,將數據流較為平均地發送到下游任務操作分區中去。 Flink對於經過轉換操作之后的 DataStream,提供了一系列的底層操作接口 ,能夠幫我們實現數據流的手動重分區。為了同 keyBy相區別,我們把這些操作統稱為“物理分區”操作。物理分區與 keyBy另一大區別在於, keyBy之后得到的是一個 KeyedStream,而物理分區之后結果仍是 DataStream,且流中元素數據類型保持不變。從這一點也可以看出,分區算子並不對數據進行轉換處理,只是定義了數據的傳輸方式。

常見的物理分區策略有隨機分配(Random)、輪詢分配( Round Robin)、重縮放 Rescale和廣播( Broadcast),下邊我們分別來做了解。

  1. 隨機分區( shuffle )

最簡單的重分區方式就是直接“洗牌”。通過調用DataStream的 .shuffle()方法,將數據隨機地分配到下游算子的並行任務中去。

隨機分區服從均勻分布(uniform distribution),所以可以把流中的數據隨機打亂,均勻地傳遞到下游任務分區,如圖所示。因為是完全隨機的,所以對於同樣的輸入數據 , 每次執行得到的結果也不會相同。

image-20220406124746490

經過隨機分區之后,得到的依然是一個DataStream。
我們可以做個簡單測試:將數據讀入之后直接打印到控制台,將輸出的並行度設置為4,中間經歷一次 shuffle。執行多次,觀察結果是否相同。

public class TransPhysicalPatitioningTest {
    public static void main(String[] args) throws Exception {
        // 創建執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L));

        // 隨機分區
        stream.shuffle().print("shuffle").setParallelism(4);

        env.execute();
    }
}

輸出結果:

shuffle:2> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.5}
shuffle:4> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
shuffle:3> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
shuffle:1> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
shuffle:4> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.0}
shuffle:2> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:02.3}
shuffle:3> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:02.5}
shuffle:3> Event{user='Alice', url='./prod?id=300', timestamp=1970-01-01 08:00:03.6}
shuffle:3> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:03.3}
  1. 輪詢分區( Round Robin )

輪詢也是一種常見的重分區方式。簡單來說就是“發牌”,按照先后順序將數據做依次分發,如圖所示。通過調用 DataStream的 .rebalance()方法,就可以實現輪詢重分區。 rebalance使用的是 Round Robin負載均衡算法,可以將輸入流數據平均分配到下游的並行任務中去。

注:Round Robin算法用在了很多地方,例如 Kafka和 Nginx

image-20220406125212873

我們同樣可以在代碼中進行測試:

// 2. 輪詢分區
stream.rebalance().print("rebalance").setParallelism(4);

輸出結果:

rebalance:1> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
rebalance:2> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
rebalance:3> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.5}
rebalance:4> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
rebalance:2> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.0}
rebalance:1> Event{user='Alice', url='./prod?id=300', timestamp=1970-01-01 08:00:03.6}
rebalance:3> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:02.3}
rebalance:4> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:02.5}
rebalance:4> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:03.3}
  1. 重縮放分區( rescale )

重縮放分區和輪詢分區非常相似。當調用rescale()方法時,其實底層也是使用 Round Robin算法進行輪詢,但是只會將數據輪詢發送到下游並行任務的一部分中,如圖所示。也就是說,“發牌人”如果有多個,那么 rebalance的方式是每個發牌人都面向所有人發牌;而 rescale的做法是分成小團體,發牌人只給自己團體內的所有人輪流發牌。

image-20220406125554271

當下游任務(數據接收方)的數量是上游任務(數據發送方)數量的整數倍時,rescale的效率明顯會更高。比如當上游任務數量是 2,下游任務數量是 6時,上游任務其中一個分區的數據就將會平均分配到下游任務的 3個分區中。

由於rebalance是所有分區數據的“重新平衡”,當 TaskManager數據量較多時,這種跨節點的網絡傳輸必然影響效率;而如果我們配置的 task slot數量合適,用 rescale的方式進行“局部重縮放”,就可以讓數據只在當前 TaskManager的多個 slot之間重新分配,從而避免了網絡傳輸帶來的損耗。

從底層實現上看,rebalance和 rescale的根本區別在於任務之間的連接機制不同。 rebalance將會針對所有上游任務( 發送數據方)和所有下游任務(接收數據方)之間建立通信通道,這是一個笛卡爾積的關系;而 rescale僅僅針對每一個任務和下游對應的部分任務之間建立通信通道,節省了很多資源。

可以在代碼中測試如下:

    // 3. rescale重縮放分區
        env.addSource(new RichParallelSourceFunction<Integer>() {  // 這里使用了並行數據源的富函數版本
                    @Override
                    public void run(SourceContext<Integer> sourceContext) throws Exception {
                        for (int i = 1; i <= 8; i++) {
                            // 將奇數發送到索引為1的並行子任務
                            // 將偶數發送到索引為0的並行子任務
                            if ( i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                                sourceContext.collect(i);
                            }
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                })
                .setParallelism(2)
                .rescale()
                .print().setParallelism(4);

這里使用rescale方法,來做數據的分區,輸出結果是:

3> 1
2> 4
1> 2
4> 3
1> 6
2> 8
3> 5
4> 7

可以將rescale方法換成 rebalance方法,輸出結果是:

3> 2
2> 1
1> 7
4> 4
3> 3
2> 8
1> 6
4> 5
  1. 廣播( broadcast )

這種方式其實不應該叫做“重分區”,因為經過廣播之后,數據會在不同的分區都保留一份,可能進行重復處理。可以通過調用 DataStream的 broadcast()方法,將輸入數據復制並發送到下游算子的所有並行任務中去。

具體代碼測試如下:

// 4. 廣播
stream.broadcast().print("broadcast").setParallelism(4);

可以看到,數據被復制然后廣播到了下游的所有並行任務中去了。

broadcast:2> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
broadcast:1> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
broadcast:4> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
broadcast:3> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
broadcast:1> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
broadcast:2> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
broadcast:4> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
broadcast:3> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
...
  1. 全局分區( global )

全局分區也是一種特殊的分區方式。這種做法非常極端,通過調用.global()方法,會將所有的輸入流數據都發送到下游算子的第一個並行子任務中去。這就相當於強行讓下游任務並行度變成了 1,所以使用這個操作需要非常謹慎,可能對程序造成很大的壓力。

  1. 自定義分區( Custom )

當Flink提供的所有分區策略都不能滿足用戶的需求時,我們可以通過使用partitionCustom()方法來自定義分區策略。

在調用時,方法需要傳入兩個參數,第一個是自定義分區器(Partitioner)對象,第二個是應用分區器的字段,它的指定方式與 keyBy指定 key基本一樣:可以通過字段名稱指定,也可以通過字段位置索引來指定,還可以實現一個 KeySelector。

例如,我們可以對一組自然數按照奇偶性進行重分區。代碼如下:

        // 6. 自定義重分區
        // 將自然數按照奇偶分區
        env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        return key % 2;
                    }
                }, new KeySelector<Integer, Integer>() {
                    @Override
                    public Integer getKey(Integer value) throws Exception {
                        return value;
                    }
                })
                .print().setParallelism(2);

5.4 輸出算子

image-20220406131826672

Flink作為數據處理框架,最終還是要把計算處理的結果寫入外部存儲,為外部應用提供支持,如圖所示,本節將主要講解 Flink中的 Sink操作。我們已經了解了 Flink程序如何對數據進行讀取、轉換等操作,最后一步當然就應該將結果數據保存或輸出到外部系統了。

5.4.1 連接到外部系統

在Flink中,如果我們希望將數據寫入外部系統,其實並不是一件難事。我們知道所有算子都可以通過實現函數類來自定義處理邏輯,所以只要有讀寫客戶端,與外部系統的交互在任何一個處理算子中都可以實現。例如在 MapFunction中,我們完全可以構建一個到 Redis的連接,然后將當前處理的結果保存到 Redis中。如果考慮到只需建立一次連接,我們也可以利用RichMapFunction,在 open() 生命周期中做連接操作。

這樣看起來很方便,卻會帶來很多問題。Flink作為一個快速的分布式實時流處理系統,對穩定性和容錯性要求極高 。一旦出現故障,我們應該有能力恢復之前的狀態,保障處理結果的正確性。這種性質一般被稱作“狀態一致性”。 Flink內部提供了一致性檢查點(checkpoint )來保障我們可以回滾到正確的狀態;但如果我們在處理過程中任意讀寫外部系統,發生故障后就很難回退到從前了。

為了避免這樣的問題,Flink的 DataStream API專門提供了向外部寫入數據的方法:addSink。與 addSource類似, addSink方法對應着一個“ Sink”算子,主要就是用來實現與外部系統連接、並將數據提交寫入的; Flink程序中所有對外的輸出操作,一般都是利用 Sink算子 完成的。

Sink一詞有“下沉”的意思,有些資料會相對於“數據源”把它翻譯為“數據匯”。不論怎樣理解, Sink在 Flink中代表了將結果數據收集起來、輸出到外部的意思,所以我們這里統一把它直觀地叫作“輸出算子”。

之前我們一直在使用的print方法其實就是一種 Sink,它表示將數據流寫入標准控制台打印輸出。查看源碼可以發現, print方法返回的就是一個 DataStreamSink

    @PublicEvolving
    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
        return addSink(printFunction).name("Print to Std. Out");
    }

與Source算子非常類似,除去一些 Flink預實現的 Sink,一般情況下 Sink算子的創建是通過調用 DataStream的 .addSink()方法實現的。

stream.addSink (new SinkFunction(...))

addSource的參數需要實現一個 SourceFunction接口;類似地, addSink方法同樣需要傳入一個參數,實現的是 SinkFunction接口。在這個接口中只需要重寫一個方法 invoke(),用來將指定的值寫入到外部系統中。這個方法在每條數據記錄到來時都會調用:

default void invoke(IN value, Context context) throws Exception

當然,SinkFuntion多數情況下同樣並不需要我們自己實現。 Flink官方提供了一部分的框架的 Sink連接器。如圖所示,列出了 Flink官方目前支持的第三方系統連接器:

image-20220406132500707

我們可以看到,像Kafka之類流式系統, Flink提供了完美對接, source/sink兩端都能連接,可讀可寫;而對於 Elasticsearch、文件系統( FileSystem)、 JDBC等數據存儲系統,則只提供了輸出寫入的 sink連接器。

除Flink官方之外, Apache Bahir作為給 Spark和 Flink提供擴展支持的項目,也實現了一些其他第三方系統與 Flink的連接器,如圖所示。

image-20220406132529194

除此以外,就需要用戶自定義實現sink連接器了。
接下來,我們就選取一些常見的外部系統進行展開講解。

5.4.2 輸出到文件

最簡單的輸出方式,當然就是寫入文件了。對應着讀取文件作為輸入數據源,Flink本來也有一些非常簡單粗暴的輸出到文件的預實現方法:如 writeAsText()、 writeAsCsv(),可以直接將輸出結果保存到文本文件或 Csv文件。但我們知道,這種方式是不支持同時寫入一份文件的;所以我們往往會將最后的 Sink操作並行度設為 1,這就大大拖慢了系統效率;而且對於故障恢復后的狀態一致性,也沒有任何保證。所以目前這些簡單的方法已經要被棄用。

Flink為此專門提供了一個流式文件系統的連接器: StreamingFileSink,它繼承自抽象類RichSinkFunction,而且集成了 Flink的檢查點( checkpoint)機制,用來保證精確一次 exactly once)的一致性語義。

StreamingFileSink為批處理和流處理提供了一個統一的 Sink,它可以將分區文件寫入 Flink支持的文件系統。它可以保證精確一次 的狀態一致性, 大大改進了之前流式文件 Sink的方式。它的主要操作是將數據寫入桶( buckets),每個桶中的數據都可以分割成一個個大小有限的分區文件,這樣一來就實現真正意義上的分布式文件存儲。我們可以通過各種配置來控制“分桶”的操作; 默認的分桶方式是基於時間的,我們每小時寫入一個新的桶。換句話說,每個桶內保存的文件,記錄的都是 1小時的輸出數據 。

StreamingFileSink支持行編碼( Row encoded)和批量編碼 Bulk encoded 比如 Parquet格式。這兩種不同的方式都有各自的構建器( builder),調用方法也非常簡單,可以直接調用StreamingFileSink的靜態方法

  • 行編碼: StreamingFileSink.forRowFormat basePath rowEncoder)。
  • 批量編碼: StreamingFileSink.forBulkFormat basePath bulkWriterFactory)。

在創建行或批量編碼Sink時,我們需要傳入兩個參數,用來指定存儲桶的基本路徑( basePath)和數據的編碼邏輯( rowEncoder或 bulkWriterFactory)。

下面我們就以行編碼為例,將一些測試數據直接寫入文件:

public class SinkToFileTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L));

        StreamingFileSink<String> fileSink = StreamingFileSink
                .<String>forRowFormat(new Path("./output"),
                        new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();

        // 將Event轉換成String寫入文件
        stream.map(Event::toString).addSink(fileSink);

        env.execute();
    }
}

這里我們創建了一個簡單的文件Sink,通過 .withRollingPolicy()方法指定了一個“滾動策略”。“滾動”的概念在日志文件的寫入中經常遇到:因為文件會有內容持續不斷地寫入,所以我們應該給一個標准,到什么時候就開啟新的文件,將之前的內容歸檔保存。也就是說,上面的代碼設置了在以下 3種情況下,我們就會滾動分區文件:

  • 至少包含 15分鍾的數據
  • 最近 5分鍾沒有收到新的數據
  • 文件大小已達到 1 GB

5.4.3 輸出到 Kafka

Kafka是一個分布式的 基於 發布 /訂閱的消息系統,本身處理的也是流式數據,所以跟Flink“天生一對 ”,經常會作為 Flink的輸入數據源和輸出系統。 Flink官方為 Kafka提供了 Source和 Sink的連接器,我們可以用它方便地從 Kafka讀寫數據。如果僅僅是支持讀寫,那還說明不了 Kafka和 Flink關系的親密;真正讓它們密不可分的是, Flink與 Kafka的連接器提供了端到端的精確一次( exactly once)語義保證,這在實際項目中是最高級別的一致性保證。關於這部分內容,我們會在后續章節做更詳細的講解。

現在我們要將數據輸出到
Kafka,整個數據處理的閉環已經形成,所以可以完整測試如下

(1)添加 Kafka 連接器依賴

由於我們已經測試過從Kafka數據源 讀取數據,連接器相關依賴已經引入,這里就不重復
介紹了。
(2)啟動 Kafka集群
(3)編寫輸出到 Kafka的示例代碼
我們可以直接將用戶行為數據保存為文件clicks.csv,讀取后不做轉換直接寫入 Kafka,主題( topic)命名為 clicks”。

public class SinkToKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "hadoop102:9092");

        DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");

        stream.addSink(new FlinkKafkaProducer<String>(
                        "clicks",
                        new SimpleStringSchema(),
                        properties
                ));

        env.execute();
    }
}

這里我們可以看到,addSink傳入的參數是一個 FlinkKafkaProducer。這也很好理解,因為需要向 Kafka寫入數據,自然應該創建一個生產者。 FlinkKafkaProducer繼承了抽象類TwoPhaseCommitSinkFunction,這是一個實現了 “兩階段提交 ”的 RichSinkFunction。兩階段提交提供了 Flink向 Kafka寫入數據的事務性保證,能夠真正做到精確一次( exactly once)的狀態一致性。關於這部分內容,我們會在后續章節展開介紹。

(4)運行代碼,在 Linux主機啟動一個消費者 , 查看是否收到數據

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic clicks

我們可以看到消費者可以正常消費數據,證明向Kafka寫入數據成功。另外,我們也可以讀取 5.2節中介紹過的任意數據源,進行更多的完整測試。比較有趣的一個實驗是,我們可以同時將 Kafka作為 Flink程序的數據源和寫入結果的外部系統。只要將輸入和輸出的數據設置為不同的 topic,就可以看到整個系統運行的路徑 Flink從 Kakfa的一個 topic讀取消費數據,然后進行處理轉換,最終將結果數據寫入 Kafka的另一個 topic 數據從 Kafka流入、經 Flink處理后又流回到 Kafka去,這就是所謂的“數據管道” 應用。

5.4.4 輸出到 Redis

Redis是一個開源的內存式的數據存儲,提供了像字符串(string)、哈希表 (hash)、列表 (list)、集合(set)、排序集合 (sorted set)、位圖( bitmap)、地理索引和流 (stream)等一系列常用的數據結構。因為它運行速度快、支持的數據類型豐富,在實際項目中已經成為了架構優化必不可少的一員,一般用作數據庫、緩存,也可以作為消息代理。

Flink沒有直接提供官方的 Redis連接器,不過 Bahir項目還是擔任了合格的輔助角色,為我們提供了 Flink Redis的連接工具。但版本升級略顯滯后,目前連接器版本為 1.0,支持的Scala版本最新到 2.11。由於我們的測試不涉及到 Scala的相關版本變化,所以並不影響使用。實際項目應用中,應該以匹配的組件版本運行。

具體測試步驟如下:

(1)導入的 Redis連接器依賴

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

(2)啟動 Redis集群
這里我們為方便測試,只啟動了單節點Redis。
(3)編寫輸出到 Redis的示例代碼
連接器為我們提供了一個RedisSink,它繼承了抽象類 RichSinkFunction,這就是已經實現好的向 Redis寫入數據的 SinkFunction。我們可以直接將 Event數據輸出到 Redis

public class SinkToRedisTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        // 創建一個到redis連接的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .build();

        stream.addSink(new RedisSink<Event>(conf, new MyRedisMapper()));

        env.execute();
    }
}

這里RedisSink的構造方法需要傳入兩個參數:

  • JFlinkJedisConfigBase Jedis的連接配置
  • RedisMapper Redis映射類接口,說明怎樣將數據轉換成可以寫入 Redis的類型

接下來主要就是定義一個Redis的映射類,實現 RedisMapper接口。

    public static class MyRedisMapper implements RedisMapper<Event> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "clicks");
        }

        @Override
        public String getKeyFromData(Event data) {
            return data.user;
        }

        @Override
        public String getValueFromData(Event data) {
            return data.url;
        }
    }

在這里我們可以看到,保存到Redis時調用的命令是 HSET,所以是保存為哈希表 hash,表名為“clicks”;保存的數據以 user為 key,以 url為 value,每來一條數據就會做一次轉換。
(4)運行代碼 Redis查看是否收到數據。

$ redis cli
hadoop102:6379>hgetall clicks
1) “Mary”
2) “./home”
3) “Bob”
4) “./cart”

我們會發現,發送了多條數據 , Redis中只有 2條數據 . 原因是 hash中的 key重復了 , 后面的會把前面的覆蓋掉。

5.4.5 輸出到 Elasticsearch

ElasticSearch是一個分布式的開源搜索和分析引擎,適用於所有類型的數據。 ElasticSearch有着簡潔的 REST風格的 API,以良好的分布式特性、速度和可擴展性而聞名,在大數據領域應用非常廣泛。

Flink為 ElasticSearch專門提供了官方的 Sink 連接器, Flink 1.13支持當前最新版本的ElasticSearch。

寫入數據的ElasticSearch的測試步驟如下。
(1)添加 Elasticsearch 連接器依賴

  <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
   </dependency>

(2)啟動 Elasticsearch集群
(3)編寫輸出到 Elasticsearch的示例 代碼

public class SinkToEsTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L));

        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102", 9200, "http"));

        // 創建一個ElasticsearchSinkFunction
        ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
            @Override
            public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
                HashMap<String, String> data = new HashMap<>();
                data.put(element.user, element.url);

                IndexRequest request = Requests.indexRequest()
                        .index("clicks")
                        .type("type")    // Es6 必須定義type
                        .source(data);

                indexer.add(request);
            }
        };

        stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build());

        env.execute();
    }
}

與RedisSink類似,連接器也為我們實現了寫入到 Elasticsearch的SinkFunction ElasticsearchSink。區別在於,這個類的構造方法是私有( private)的,我們
需要使用 ElasticsearchSink的 Builder內部靜態類,調用它的 build()方法才能創建出真正的SinkFunction。

而Builder的構造方法中又有兩個參數:

  • httpHosts:連接到的 Elasticsearch集群主機列表
  • elasticsearchSinkFunction:這並不是我們所說的 SinkFunction,而是用來說明具體處理邏輯、准備數據向 Elasticsearch發送請求的函數

具體的操作需要重寫中elasticsearchSinkFunction中的 process方法,我們可以將要發送的數據放在一個 HashMap中,包裝成 IndexRequest向外部發送 HTTP請求。
(4)運行代碼,訪問 Elasticsearch查看是否收到數據。

5.4.6 輸出到 MySQL JDBC

關系型數據庫有着非常好的結構化數據設計、方便的SQL查詢,是很多企業中業務數據存儲的主要形式。 MySQL就是其中的典型代表。盡管在大數據處理中直接與 MySQL交互的場景不多,但最終處理的計算結果是要給外部應用消費使用的,而外部應用讀取的數據存儲往往就是 MySQL。所以我們也需要知道如何將數據輸出到 MySQL這樣的傳統數據庫。

寫入數據的MySQL的測試步驟如下。
(1)添加依賴

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

(2)啟動 MySQL,在 database庫下建表 clicks

mysql> create table clicks(
--> user varchar(20) not null,
--> url varchar(100) not null);

(3)編寫輸出到 MySQL的示例代碼

public class SinkToMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L));

        stream.addSink(
                JdbcSink.sink(
                        "INSERT INTO clicks (user, url) VALUES (?, ?)",
                        (statement, r) -> {
                            statement.setString(1, r.user);
                            statement.setString(2, r.url);
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/test")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .withUsername("root")
                                .withPassword("root")
                                .build()
                )
        );
        env.execute();
    }
}

(4)運行代碼,用客戶端連接 MySQL,查看是否成功寫入數據。

5.4.7 自定義Sink輸出

如果我們想將數據存儲到我們自己的存儲設備中,而Flink並沒有提供可以直接使用的連接器,又該怎么辦呢?

與Source類似, Flink為我們提供了通用的 SinkFunction接口和對應的 RichSinkDunction抽象類,只要實現它,通過簡單地調用 DataStream的 .addSink()方法 就可以自定義寫入任何外部存儲。之前與外部系統的連接,其實都是連接器幫我們實現了 SinkFunction,現在既然沒有現成的,我們就只好自力更生了。 例如, Flink並沒有提供 HBase的連接器,所以需要我們自己寫。

在實現SinkFunction的時候,需要重寫的一個關鍵方法 invoke(),在這個方法中我們就可以實現將流里的數據發送出去的邏輯。

我們這里使用了SinkFunction的富函數版本,因為這里我們又使用到了生命周期的概念,創建 HBase的連接以及關閉 HBase的連接需要分別放在 open()方法和 close()方法中。

(1)導入依賴

     <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>

(2)編寫輸出到 HBase的示例代碼

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import java.nio.charset.StandardCharsets;

public class SinkCustomtoHBase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.fromElements("hello", "world").addSink(new RichSinkFunction() {
            public org.apache.hadoop.conf.Configuration configuration; // 管理 Hbase 的配置信息 這里因為 Configuration 的重名問題,將類以完整路徑導入
            public Connection connection; // 管理 Hbase 連接

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                configuration = HBaseConfiguration.create();
                configuration.set("hbase.zookeeper.quorum", "hadoop102:");
                connection = ConnectionFactory.createConnection(configuration);
            }

            @Override
            public void invoke(String value, Context context) throws Exception {
                Table table = connection.getTable(TableName.valueOf("test")); // 表名為 test
                Put put = new Put("rowkey" .getBytes(StandardCharsets.UTF_8)); // 指定 rowkey
                put.addColumn("info" .getBytes(StandardCharsets.UTF_8) /// 指定列名
                        , value.getBytes(StandardCharsets.UTF_8) /// 寫入的數據
                        , "1" .getBytes(StandardCharsets.UTF_8)); /// 寫入的數據
                table.put(put); // 執行 put 操作
                table.close(); /// 將表關閉
            }

            @Override
            public void close() throws Exception {
                super.close();
                connection.close(); // 關閉連接
                env.execute();
            }
        });
        env.execute();
    }
}

(3)可以在 HBase查看插入的數據。

5.5 本章總結

本章從編寫Flink程序的基本流程入手,依次講解了執行環境的創建、數據源的讀取、數據流的轉換操作,和最終結果數據的輸出,對各種常見的轉換操作 API和外部系統的連接都做了詳細介紹,並在其中穿插闡述了 Flink中支持的數據類型和 UDF的用法。我們可以自信地說,到目前為止已經充分掌握了 DataStream API的基本用法,熟悉了 Flink的編程習慣,應該說已經真正跨進了 Flink流處理的大門。

當然,本章對於轉換算子只是一個簡單介紹,Flink中的操作遠遠不止這些,還有窗口(Window)、多流轉換、底層的處理函數 Process Function)以及狀態編程等更加高級的用法。另外本章中由於涉及讀寫外部系統,我們不只一次地提到了“精確一次( exactly once)”的狀態一致性,這也是 Flink的高級特性之一。關於這些內容,我們將在后續章節逐一展開。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM