Flink官網文檔翻譯


http://ifeve.com/flink-quick-start/

http://vinoyang.com/2016/05/02/flink-concepts/

http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/

並行數據流

程序在Flink內部的執行具有並行分布式的特性。stream被分割成stream partition,operator被分割成operator subtask,這些operator subtasks在不同的線程、不同的物理機或不同的容器中彼此互不依賴得執行。

一個特定operator的subtask的個數被稱之為其parallelism(並行度)。一個stream的並行度總是等同於其producing operator的並行度。一個程序中,不同的operator可能具有不同的並行度。

flink-concepts_parallel-dataflowflink-concepts_parallel-dataflow

Stream在operator之間傳輸數據的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式。

  • One-to-one : strem(比如在source和map operator之間)維護着分區以及元素的順序。那意味着map operator的subtask看到的元素的個數以及順序跟source operator的subtask生產的元素的個數、順序相同。
  • Redistributing : stream(map()跟keyBy/window之間或者keyBy/window跟sink之間)的分區會發生改變。每一個operator subtask依據所選擇的transformation發送數據到不同的目標subtask。例如,keyBy() (基於hash碼重分區),broadcast()或者rebalance()(隨機redistribution)。在一個redistribution的交換中,只有每一個發送、接收task對的順序才會被維持(比如map()的subtask和keyBy/window的subtask)。

tasks & operator chains

出於分布式執行的目的,Flink將operator的subtask鏈接在一起形成task。每個task在一個線程中執行。將operators鏈接成task是非常有效的優化:它能減少線程之間的切換和基於緩存區的數據交換,在減少時延的同時提升吞吐量。鏈接的行為可以在編程API中進行指定。

下面這幅圖,展示了5個subtask以5個並行的線程來執行。

flink-concepts_tasks-chainsflink-concepts_tasks-chains

http://wuchong.me/

 

Flink官網文檔翻譯:安裝部署(集群模式)

本文主要介紹如何將Flink以分布式模式運行在集群上(可能是異構的)。

環境准備

Flink 運行在所有類 UNIX 環境上,例如 Linux、Mac OS X 和 Cygwin(對於Windows),而且要求集群由一個master節點和一個或多個worker節點組成。在安裝系統之前,確保每台機器上都已經安裝了下面的軟件:

  • Java 1.7.x或更高版本
  • ssh(Flink的腳本會用到sshd來管理遠程組件)

如果你的集群還沒有完全裝好這些軟件,你需要安裝/升級它們。例如,在 Ubuntu Linux 上, 你可以執行下面的命令安裝 ssh 和 Java :

sudo apt-get install ssh 
sudo apt-get install openjdk-7-jre

SSH免密碼登錄

譯注:安裝過Hadoop、Spark集群的用戶應該對這段很熟悉,如果已經了解,可跳過。*

為了能夠啟動/停止遠程主機上的進程,master節點需要能免密登錄所有worker節點。最方便的方式就是使用ssh的公鑰驗證了。要安裝公鑰驗證,首先以最終會運行Flink的用戶登錄master節點。所有的worker節點上也必須要有同樣的用戶(例如:使用相同用戶名的用戶)。本文會以 flink 用戶為例。非常不建議使用 root 賬戶,這會有很多的安全問題。

當你用需要的用戶登錄了master節點,你就可以生成一對新的公鑰/私鑰。下面這段命令會在 ~/.ssh 目錄下生成一對新的公鑰/私鑰。

ssh-keygen -b 2048 -P '' -f ~/.ssh/id_rsa

接下來,將公鑰添加到用於認證的authorized_keys文件中:

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

最后,將authorized_keys文件分發給集群中所有的worker節點,你可以重復地執行下面這段命令:

scp ~/.ssh/authorized_keys <worker>:~/.ssh/

將上面的<worker>替代成相應worker節點的IP/Hostname。完成了上述拷貝的工作,你應該就可以從master上免密登錄其他機器了。

ssh <worker>

配置JAVA_HOME

Flink 需要master和worker節點都配置了JAVA_HOME環境變量。有兩種方式可以配置。

一種是,你可以在conf/flink-conf.yaml中設置env.java.home配置項為Java的安裝路徑。

另一種是,sudo vi /etc/profile,在其中添加JAVA_HOME

export JAVA_HOME=/path/to/java_home/

然后使環境變量生效,並驗證 Java 是否安裝成功

$ source /etc/profile   #生效環境變量
$ java -version         #如果打印出版本信息,則說明安裝成功
java version "1.7.0_75"
Java(TM) SE Runtime Environment (build 1.7.0_75-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.75-b04, mixed mode)

安裝 Flink

進入下載頁面。請選擇一個與你的Hadoop版本相匹配的Flink包。如果你不打算使用Hadoop,選擇任何版本都可以。

在下載了最新的發布包后,拷貝到master節點上,並解壓:

tar xzf flink-*.tgz
cd flink-*

配置 Flink

在解壓完之后,你需要編輯conf/flink-conf.yaml配置Flink。

設置jobmanager.rpc.address配置項為你的master節點地址。另外為了明確 JVM 在每個節點上所能分配的最大內存,我們需要配置jobmanager.heap.mbtaskmanager.heap.mb,值的單位是 MB。如果對於某些worker節點,你想要分配更多的內存給Flink系統,你可以在相應節點上設置FLINK_TM_HEAP環境變量來覆蓋默認的配置。

最后,你需要提供一個集群中worker節點的列表。因此,就像配置HDFS,編輯*conf/slaves*文件,然后輸入每個worker節點的 IP/Hostname。每一個worker結點之后都會運行一個 TaskManager。

每一條記錄占一行,就像下面展示的一樣:

192.168.0.100
192.168.0.101
.
.
.
192.168.0.150

譯注:conf/master文件是用來做JobManager HA的,在這里不需要配置

每一個worker節點上的 Flink 路徑必須一致。你可以使用共享的 NSF 目錄,或者拷貝整個 Flink 目錄到各個worker節點。

scp -r /path/to/flink <worker>:/path/to/

請查閱配置頁面了解更多關於Flink的配置。

特別的,這幾個

  • TaskManager 總共能使用的內存大小(taskmanager.heap.mb
  • 每一台機器上能使用的 CPU 個數(taskmanager.numberOfTaskSlots
  • 集群中的總 CPU 個數(parallelism.default
  • 臨時目錄(taskmanager.tmp.dirs

是非常重要的配置項。

啟動 Flink

下面的腳本會在本地節點啟動一個 JobManager,然后通過 SSH 連接所有的worker節點(*slaves*文件中所列的節點),並在每個節點上運行 TaskManager。現在你的 Flink 系統已經啟動並運行了。跑在本地節點上的 JobManager 現在會在配置的 RPC 端口上監聽並接收任務。

假定你在master節點上,並在Flink目錄中:

bin/start-cluster.sh

要停止Flink,也有一個 stop-cluster.sh 腳本。

添加 JobManager/TaskManager 實例到集群中

你可以使用 bin/jobmanager.sh 和 bin/taskmanager 腳本來添加 JobManager 和 TaskManager 實例到你正在運行的集群中。

添加一個 JobManager

bin/jobmanager.sh (start cluster)|stop|stop-all

添加一個 TaskManager

bin/taskmanager.sh start|stop|stop-all

確保你是在需要啟動/停止相應實例的節點上運行的這些腳本。

 

 

Flink中的一些核心概念

文章目錄
  1. 1. 程序和數據流
  2. 2. 並行數據流
  3. 3. tasks & operator chains
  4. 4. 分布式執行
    1. 4.1. Master,Worker,Client
  5. 5. Workers,Slots,Resources
  6. 6. 時間和窗口
  7. 7. 時間
  8. 8. 狀態和失效容忍
    1. 8.1. 用於失敗容忍的檢查點
    2. 8.2. 狀態的最終存儲
  9. 9. 基於流的批處理

在源碼解讀前我們有必要先了解一下Flink的一些基本的但卻很關鍵的概念。這有助於幫助我們理解整個架構。在翻譯文檔的同時,對於有爭議的或者不是非常適合用中文表達的地方,我盡量保留原始英文單詞。

程序和數據流

Flink程序的基本構建塊是streamstransformations(注意,DataSet在內部也是一個stream)。一個stream可以看成一個中間結果,而一個transformations是以一個或多個stream作為輸入的某種operation,該operation利用這些stream進行計算從而產生一個或多個result stream

在運行時,Flink上運行的程序會被映射成streaming dataflows,它包含了streamstransformations operators。每一個dataflow以一個或多個sources開始以一個或多個sinks結束。dataflow類似於任意的有向無環圖(DAG),當然特定形式的可以通過iteration構建。在大部分情況下,程序中的transformations跟dataflow中的operator是一一對應的關系。但有時候,一個transformation可能對應多個operator

flink-concepts_parallel-dataflowflink-concepts_parallel-dataflow

並行數據流

程序在Flink內部的執行具有並行分布式的特性。stream被分割成stream partition,operator被分割成operator subtask,這些operator subtasks在不同的線程、不同的物理機或不同的容器中彼此互不依賴得執行。

一個特定operator的subtask的個數被稱之為其parallelism(並行度)。一個stream的並行度總是等同於其producing operator的並行度。一個程序中,不同的operator可能具有不同的並行度。

flink-concepts_parallel-dataflowflink-concepts_parallel-dataflow

Stream在operator之間傳輸數據的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式。

  • One-to-one : strem(比如在source和map operator之間)維護着分區以及元素的順序。那意味着map operator的subtask看到的元素的個數以及順序跟source operator的subtask生產的元素的個數、順序相同。
  • Redistributing : stream(map()跟keyBy/window之間或者keyBy/window跟sink之間)的分區會發生改變。每一個operator subtask依據所選擇的transformation發送數據到不同的目標subtask。例如,keyBy() (基於hash碼重分區),broadcast()或者rebalance()(隨機redistribution)。在一個redistribution的交換中,只有每一個發送、接收task對的順序才會被維持(比如map()的subtask和keyBy/window的subtask)。

tasks & operator chains

出於分布式執行的目的,Flink將operator的subtask鏈接在一起形成task。每個task在一個線程中執行。將operators鏈接成task是非常有效的優化:它能減少線程之間的切換和基於緩存區的數據交換,在減少時延的同時提升吞吐量。鏈接的行為可以在編程API中進行指定。

下面這幅圖,展示了5個subtask以5個並行的線程來執行。

flink-concepts_tasks-chainsflink-concepts_tasks-chains

分布式執行

Master,Worker,Client

Flink運行時包含了兩種類型的處理器:

  • master處理器:也稱之為JobManagers用於協調分布式執行。它們用來調度task,協調檢查點,協調失敗時恢復等。

Flink運行時至少存在一個master處理器。一個高可用的運行模式會存在多個master處理器,它們其中有一個是leader,而其他的都是standby。

  • worker處理器:也稱之為TaskManagers用於執行一個dataflow的task(或者特殊的subtask)、數據緩沖和data stream的交換。

Flink運行時至少會存在一個worker處理器。

master和worker處理器可以以如下方式中的任意一種啟動:直接在物理機上啟動,通過容器,或者通過像YARN這樣的資源調度框架。worker連接到master,告知自身的可用性進而獲得任務分配。

客戶端不是運行時和程序執行的一部分。但它用於准備並發送dataflow給master。然后,客戶端斷開連接或者維持連接以等待接收計算結果。客戶端可以以兩種方式運行:要么作為Java/Scala程序的一部分被程序觸發執行,要么以命令行./bin/flink run的方式執行。

flink-concepts_processesflink-concepts_processes

Workers,Slots,Resources

每一個worker(TaskManager)是一個JVM進程,它可能會在獨立的線程上執行一個或多個subtask。為了控制一個worker能接收多少個task。worker通過task slot來進行控制(一個worker至少有一個task slot)。

每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那么它會將其管理的內存分成三份給各個slot。資源slot化意味着一個subtask將不需要跟來自其他job的subtask競爭被管理的內存,取而代之的是它將擁有一定數量的內存儲備。需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內存。

通過調整task slot的數量,允許用戶定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那將意味着每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的)。而一個TaskManager多個slot意味着更多的subtask可以共享同一個JVM。而在同一個JVM進程中的task將共享TCP連接(基於多路復用)和心跳消息。它們也可能共享數據集和數據結構,因此這減少了每個task的負載。

flink-concepts_tasks-slotsflink-concepts_tasks-slots

默認,如果subtask是來自相同job,但不是相同的task,Flink允許subtask共享slot。結果是,一個slot可能hold住該job的整個pipeline。允許slot共享有兩個好處:

  • Flink集群確實需要許多task slots來讓Job達到最高的並行度。不需要計算一個程序總共包含多少個task。
  • 更容易獲得更好的資源利用。如果沒有slot共享,非密集型的source/map()的subtask將阻塞跟密集型的window的subtask一樣多的占用資源。而如果有slot共享,基本的並發度通過完整地利用共享的slot資源將獲得2到6倍的提升,同時仍然保證每一個TaskManager會在任務繁重的subtask之間進行合理的slot共享。

slot共享行為可以通過API來控制,以防止不合理的共享。這個機制稱之為resource groups,它定義了subtask可能共享的slot是什么資源。

作為一個約定俗成的規則,task slot推薦的默認值是CPU的核數。基於超線程技術,每個slot占用兩個或者更多的實際線程上下文。

flink-concepts_slot-sharingflink-concepts_slot-sharing

時間和窗口

聚合事件(比如count,sum)工作起來比起批處理略微有些不同。例如,它不能一次完成對流中所有元素的數量統計,然后返回結果。因為流通常都是無限的(無邊界)。取而代之的是,在流上的聚合(count,sum等)被隔離到window域中,比如,“統計最近5分鍾的數量”或“對最近100個元素求和”。

窗口可以是時間驅動的(比如,每30秒)也可以是數據驅動的(比如,每100個元素)。通常我們將窗口划分為:tumbing windows(不重疊),sliding windows(有重疊)和session windows(有空隙的活動)。

flink-concepts_windowsflink-concepts_windows

時間

當在流式編程中涉及到時間的(比如定義一個窗口),可能會牽扯到時間的不同定義:

  • Event Time:指一個事件的創建時間。通常在event中用時間戳來描述,比如,可能是由生產事件的傳感器或生產服務來附加。Flink訪問事件時間戳通過時間戳分配器。
  • Ingestion time:指一個事件從source operator進入Flink dataflow的時間。
  • Processing time:每一個執行一個基於時間操作的operator的本地時間。

flink-concepts_event-ingestion-processing-timeflink-concepts_event-ingestion-processing-time

狀態和失效容忍

在dataflow中的許多操作一次只關注一個獨立的事件(比如一個事件解析器),還有一些操作能記住多個獨立事件的信息(比如,window operator),而這些操作被稱為stateful(有狀態的)。

有狀態的操作,其狀態被維護的地方,可以將其看作是一個內嵌的key/value存儲器。狀態和流一起被嚴格得分區和分布以供有狀態的operator讀取。因此,訪問key/value的狀態僅能在keyed streams中(在執行keyBy()函數之后產生keyed stream),並且只能根據當前事件的鍵來訪問其值。對齊stream的鍵和狀態可以確保所有的狀態更新都是本地操作,在不需要事務開銷的情況下保證一致性。這個對齊機制也允許Flink重新分布狀態並顯式調整stream的分區。

flink-concepts_state-partitioningflink-concepts_state-partitioning

用於失敗容忍的檢查點

Flink實現失敗容忍使用了流重放檢查點的混合機制。一個檢查點會在流和狀態中定義一個一致點,在該一致點streaming dataflow可以恢復並維持一致性(exactly-once的處理語義)。在最新的檢查點之后的事件或狀態更新將在input stream中被重放。

檢查點的設置間隔意味着在執行時對失敗容忍產生的額外開銷以及恢復時間(也決定了需要被重放的事件數)。

flink-concepts_checkpointsflink-concepts_checkpoints

狀態的最終存儲

給key/value構建索引的數據結構最終被存儲的地方取決於狀態最終存儲的選擇。其中一個選擇是在內存中基於hash map,另一個是RocksDB。另外用來定義Hold住這些狀態的數據結構,狀態的最終存儲也實現了基於時間點的快照機制,給key/value做快照,並將快照作為檢查點的一部分來存儲。

基於流的批處理

Flink執行批處理程序是將其作為流處理程序的一個特例來看待。它將其看作有界的流(有限數量的元素)。DataSet在內部被當作一個流數據,因此上面的這些適用於流處理的這些概念在批處理中同樣適用,只有很少的幾個例外:

  • DataSet的編程API不適用檢查點。恢復機制是通過重放完整的流數據來進行。那是合理的,因為輸入時有界的。它將開銷更多地引入到恢復操作上,但另一方面也使得運行時的常規流程代價更低,因為它規避了檢查點機制。
  • DataSet的有狀態的operation API簡單地使用in-memory/out-of-core的數據結構,而不是基於key/value的索引機制
  • DataSet的API引進了獨特的同步迭代機制(基於superstep),它僅在有界的流中存在。

 

 

 

 

 

文檔翻譯自Flink DataStream API Programming Guide

-----------------------------------------------------------------------

Flink中的DataStream程序是實現在數據流上的transformation(如filtering,updating state, defining windows,aggregating)的普通程序。創建數據流的來源多種多樣(如消息隊列,socket流,文件等)。程序通過data sink返回結果,如將數據寫入文件,或發送到標准輸出(如命令行終端)。Flink程序可以在多種上下文中運行,如獨立運行或是嵌入在其他程序中執行。程序的執行可以發生在本地JVM,或者在一個擁有許多設備的集群上。

 

有關介紹Flink API基礎概念的文檔,請見basic concepts

 

為了創建你自己的Flink DataStream程序,我們鼓勵你從文檔anatomy of a Flink Program開始,且歡迎你添加自己的transformations。該文檔接下來的部分是額外的operation和進階特性的參考文檔。

一、示例程序

下面的程序是一個完整的流式窗口word count應用,它計算出在web socket的大小為5秒的窗口中的出現各個單詞的數量。你可以復制 & 粘貼代碼並在本地運行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
  .socketTextStream("localhost"9999)
  .flatMap(new Splitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1);

dataStream.print();

env.execute("Window WordCount");

}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override

public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {

for (String word: sentence.split(" ")) {

out.collect(new Tuple2<String, Integer>(word1));

}

}

}

}

要運行該示例程序,首先從終端運行netcat來開始輸入流

nc -lk 9999

 

僅需要輸入一些單詞,這些將是word count程序的輸入數據。如果你想看到count大於1的結果,在5秒內重復輸入同一個單詞。

 

二、DataStream Transformations

Data transformation會將一或多個DataStream轉換成一個新的DataStream。程序可以將多個transformation結合形成復雜的拓撲結構(topology)。

 

本小節給出了所有可用的transformation的描述。

Transformation

描述

Map

DataStream -> DataStream

獲取一個element並產出一個element。下例是一個將輸入*2的map方法:
 

DataStream<Integer> dataStream //...
dataStream.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer map(Integer value) throws Exception {
    return * value;
  }
});

FlapMap

DataStream -> DataStream

獲取一個element,並產生出0、1或多個element。下例是一個為句子分詞的flatmap方法

 

dataStream.flatMap(new FlatMapFunction<String, String>() {
  @Override
  public void flatMap(String value, Collector<String> outthrows Exception {
    for(String word: value.split(" ")){
    out.collect(word);
    }
  }
});

Filter

DataStream -> DataStream

在每個獲取的element上運行一個boolean方法,留下那些方法返回true的element。下例是一個過濾掉0值的filter
 

dataStream.filter(new FilterFunction<Integer>() {
  @Override
  public boolean filter(Integer value) throws Exception {
    return value != 0;
  }
});

KeyBy
DataStream -> KeyedStream

將流邏輯分為不相交的分區,每個分區包含的都是具有相同key的element,該分區方法使用hash分區實現。定義key的方法見於Keys。下例是一個返回KeyedDataStream的transformation。
 

dataStream.keyBy("someKey"// Key by field "someKey"
dataStream.keyBy(0// Key by the first element of a Tuple

Reduce

KeyedStream -> DataStream

一個在keyed data stream上“滾動”進行的reduce方法。將上一個reduce過的值和當前element結合,產生新的值並發送出。下例是一個創建部分和的reduce方法。
 

keyedStream.reduce(new ReduceFunction<Integer>() {
  @Override
  public Integer reduce(Integer value1, Integer value2throws Exception {
    return value1 + value2;
  }
});

Fold

KeyedStream -> DataStream

一個在帶有初始值的數據流上“滾動”進行的fold方法。將上一個fold的值和當前element結合,產生新的值並發送出。下例是一個fold方法,當應用於序列{1, 2, 3, 4, 5}時,它發出序列{"start-1", "start-1-2", "start-1-2-3" …}。
 

DataStream<String> result keyedStream.fold("start", new FoldFunction<Integer, String>() {
  @Override
  public String fold(String current, Integer value) {
    return current "-" + value;
  }
});

Aggregations

KeyedStream -> DataStream

在一個keyed DataStream上“滾動”進行聚合的方法。其中,min和minBy的區別在於min返回最小值,而minBy返回的是帶有在此域中最小值的element(max和maxBy一樣如此)。
 

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

Window

KeyedStream - > WindowedStream

Window可以定義在已經分區的KeyedStream上。窗口將根據一些特征(如最近5秒到達的數據)將數據按其各自的key集合在一起。有關窗口的完整描述見於windows
 

// Last 5 seconds of data

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

WindowAll

DataStream -> AllWindowedStream

Window可以定義在普通的DataStream上。窗口將根據一些特征(如最近5秒到達的數據)將所有Stream事件集合在一起。有關窗口的完整描述見於windows
警告:該transformation在很多情況下都不是並行化的,所有數據將被收集到一個運行windowAll Operator的任務上。

 

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

Window Apply

WindowedStream -> DataStream

AllWindowedStream -> DataStream

將一個一般函數應用到window整體上去,下面是一個人工計算window中所有element的總和的應用。
注意:如果你正在使用一個windowAll的transformation,你需要使用AllWindowFunction來代替下例中的參數。
 

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
  Window window,
  Iterable<Tuple2<String, Integer>> values,
  Collector<Integer> out) throws Exception {
    int sum 0;
    for (value t: values) {
      sum += t.f1;
    }
    out.collect (new Integer(sum));
  }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window,
  Iterable<Tuple2<String, Integer>> values,
  Collector<Integer> out) throws Exception {
    int sum 0;
    for (value t: values) {
      sum += t.f1;
    }
    out.collect (new Integer(sum));
  }
});

Window Reduce

WindowedStream -> DataStream

對窗口應用一個功能性reduce方法並返回reduce的結果
 

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {
  public Tuple2<String, Integerreduce(Tuple2<String, Integer> value1, Tuple2<String, Integer>value2) throws Exception {
  return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
  }
};

Window Fold

Windowed Stream -> DataStream

對窗口應用一個功能性fold方法。下例代碼在應用到序列(1, 2, 3, 4, 5)時,它將該序列fold成為字符串"start-1-2-3-4-5"
 

windowedStream.fold("start-", new FoldFunction<Integer, String>() {
  public String fold(String current, Integer value) {
    return current "-" + value;
  }
};

Aggregations on windows

WindowedStream -> DataStream

對窗口中的內容聚合。其中,min和minBy的區別在於min返回最小值,而minBy返回的是帶有在此域中最小值的element(max和maxBy一樣如此)。
 

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");

Union

DataStream* -> DataStream

將2個或多個data stream合並創建出一個新的包含所有stream的element的stream。注意:如果你對一個data stream自己進行union操作,則在返回的結果中,每個element都會出現2個。
 

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream, DataStream -> DataStream

在給定key和普通window中,將2個DataStream進行Join操作

 

dataStream.join(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

Window CoGroup

DataStream, DataStream -> DataStream

在給定key和普通window中,對2個DataStream進行CoGroup操作。
 

dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});

Connect

DataStream, DataStream -> ConnectedStreams

在保留兩個DataStream的類型的情況下,將二者"連接"起來。Connect使我們可以共享兩個Stream的狀態
 

DataStream<Integer> someStream //...
DataStream<String> otherStream //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap

ConnectedStreams -> DataStream

該操作類似於map和flatMap針對連接的Data Stream版本。Sd
 

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
  @Override
  public Boolean map1(Integer value) {
    return true;
  }

  @Override
  public Boolean map2(String value) {
    return false;
  }
});

 

connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

  @Override
  public void flatMap1(Integer value, Collector<String> out) {
    out.collect(value.toString());
  }

  @Override
  public void flatMap2(String value, Collector<String> out) {
    for (String word: value.split(" ")) {
      out.collect(word);
    }
  }
});

Split

DataStream -> SplitStream

根據某些標准將Stream分割成2個或更多的stream
 

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
  @Override
  public Iterable<Stringselect(Integer value) {
    List<String> output = new ArrayList<String>();
    if (value == 0) {
      output.add("even");
    }
    else {
      output.add("odd");
    }
    return output;
  }
});

Select

SplitStream -> DataStream

從SplitStream中選擇1個或多個stream

 

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

Iterate

DataStream -> IterativeStream -> DataStream

通過將一個Operator的輸出重定向到前面的某個Operator的方法,在數據流圖中創建一個“反饋”循環。這在定義持續更新模型的算法時十分有用。下面的例子從一個Stream開始,並持續應用迭代體(Iteration body)。大於0的element被送回到反饋通道,而其他的element則被轉發到下游。相關完整描述請見Iterations
 

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
  @Override
  public boolean filter(Integer value) throws Exception {
    return value 0;
  }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
  @Override
  public boolean filter(Integer value) throws Exception {
    return value <= 0;
  }
});

Extract Timestamps

DataStream -> DataStream

通過從數據中抽取時間戳來使得通過使用事件時間語義的窗口可以工作。詳情見於Event Time
 

stream.assignTimestamps (new TimeStampExtractor() {...});

 

接下來的Transformation是對Tuple類型的data stream可用的Transformation:

Transformation

描述

Project

DataStream -> DataStream

從tuple中選擇出域的子集而產生新的DataStream
 

DataStream<Tuple3<Integer, Double, String>> in // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

 

物理級分割(Physical Partitioning)

如果需要,Flink同樣提供了在進行一次transformation后針對精確stream分割的低層次的控制(low-level control),它們通過以下幾個方法實現。

 

Transformations

描述

Custom partitioning

DataStream -> DataStream

使用一個用戶自定義的Partitioner來對每個element選擇目標任務sd
 

dataStream.partitionCustom(partitioner"someKey");
dataStream.partitionCustom(partitioner0);

Random partitioning

DataStream -> DataStream

根據均勻分布來隨機分割element
 

dataStream.shuffle();

Rebalancing(輪詢分割)

DataStream -> DataStream

輪詢分割element,創建相同負荷的分割。對數據變形(data skew)時的性能優化十分有用s
 

dataStream.rebalance();

Rescaling

DataStream -> DataStream

將element輪詢分割到下游Operator子集中去。這在你想流水線並行時十分有用,例如,需要從每個並行的source實例中將數據fan out到一個有着一些mapper來分發負載,但是又不想要函數rebalance()那樣引起的完全rebalance的效果時。這就需要僅在本地傳輸數據,而不是需要從網絡傳輸,這需要依賴其他諸如TaskManager的任務槽數量等等configuration值。
上游Operation發送element的下游Operation子集同時依賴於上游和下游兩方Operation的並行度。例如,若上游Operation的並行度為2,下游Operation並行度為4,則1個上游Operation將會把它的element分發給2個下游Operation。另一方面,若下游並行度為2而上游並行度為4,則2個上游Operation將會把它們的element分發給1個下游Operation,而另外兩個上游Operation則分發給另一個下游Operation。
當一個或是多個上下游Operation的並行度不是倍數關系時,下游的Operation將擁有不同的從上游獲得的輸入的數量。
下圖是上面例子的連接模式圖:
 

 

dataStream.rescale();

Broadcasting

DataStream -> DataStream

將element廣播到每一個分割中去
 

dataStream.broadcast();

 

鏈接任務以及資源組(Task chaining & resource groups)

將兩個transformation鏈接起來意味着將它們部署在一起(co-locating),共享同一個線程來獲得更好的性能。Flink默認地盡可能地鏈接Operator(如兩個連續的map transformation)。如有需要,API還給出了細粒度的鏈接控制:

 

使用StreamExecutionEnvironment.disableOperatorChaining()來關閉整個Job的鏈接操作。下面表格中的方法則是更加細粒度的控制函數,注意,由於這些函數引用的是前一個transformation,所以它們僅僅在一個DataStream的transformation后使用才是正確的,例如someStream.map( … ).startNewChain()是正確的,而someStream.startNewChain()是錯誤的。

 

一個資源組就是Flink中的一個任務槽,如有需要,你可以人工孤立某個Operator到一個獨立的任務槽中。

Transformation

描述

startNewChain()

以當前Operator起點,開始一個新的鏈接。在下例中,兩個mapper將會被鏈接而filter則不會與第一個mapper鏈接
 

someStream.filter(...).map(...).startNewChain().map(...);

disableChaining()

下例中,將不會鏈接mapOperator。
 

someStream.map(...).disableChaining();

slotSharingGroup()

設置一個Operation的共享任務槽的分組。Flink將會把同一個任務槽共享組的Operation放到同一個任務槽中,而不在同一個任務槽共享組的Operation放到其他任務槽中。這可以用來孤立任務槽。如果所有的輸入Operation都在同一個任務槽共享組中,則該任務槽共享組會繼承下來。任務槽共享組的默認名為"default",Operation可以通過調用slotSharingGroup("default")來定義其名稱。
 

someStream.filter(...).slotSharingGroup("name");

 

三、數據源

數據源可以通過StreamExecutionEnvironment.addSource(sourceFunction)來創建數據源。你可以使用Flink提供的source方法,也可以通過實現SourceFunction來編寫自定義的非並行數據源,也可以通過實現ParallelSourceFunction接口或繼承RichParallelSourceFunction來編寫自定義並行數據源。

以下是幾個預定義的數據流源,可以通過StreamExecutionEnvironment來訪問:

1.    基於文件的:

  • readTextFile(path) / TextInputFormat - 以行讀取方式讀文件並返回字符串
  • readFile(path) / 任意輸入格式 - 按用輸入格式的描述讀取文件
  • readFileStream - 創建一個stream,在文件有改動時追加element

2.    基於Socket的:

  • socketTextStream - 從socket讀取,element可以通過分割符來分開

3.    基於Collection的:

  • fromCollection(Collection) - 從Java.util.Collection創建一個數據流。collection中所有的element都必須是同一類型的。
  • fromCollection(Iterator, Class) - 從一個迭代器中創建一個數據流。class參數明確了迭代器返回的element的類型。
  • fromElement(T …) - 從一個給定的對象序列創建一個數據流。所有對象都必須是同一類型的。
  • fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中創建一個並行數據流。class參數明確了迭代器返回的element的類型。
  • generateSequence(from, to) - 從一個給定區間中生成一個並行數字序列。

4.    自定義:

  • addSource - 附上一個新的source方法。例如,通過調用addSource(new FlinkKafkaConsumer08<>(…))來從Apache Kafka讀取數據,更多信息見於connector

 

四、Data Sink

Data Sink消耗DataStream並將它們轉發到文件、socket、外部系統或打印它們。Flink自帶了許多內置的輸出格式,封裝為DataStream的operation中:

  • writeAsText() / TextOutputFormat - 以行字符串的方式寫文件,字符串通過調用每個element的toString()方法獲得。
  • writeAsCsv(…) / CsvOutputFormat - 以逗號分隔的值來講Tuple寫入文件,行和域的分隔符是可以配置的。每個域的值是通過調用object的toString()方法獲得的。
  • print() / printToErr() - 將每個element的toString()值打印在標准輸出 / 標准錯誤流中。可以提供一個前綴(msg)作為輸出的前綴,使得在不同print的調用可以互相區分。如果並行度大於1,輸出也會以task的標識符(identifier)為產生的輸出的前綴。
  • writeUsingOutputFormat() / FileOutputFormat - 自定義文件輸出所用的方法和基類,支持自定義object到byte的轉換。
  • writeToSocket - 依據SerializationSchema將element寫到socket中。
  • addSink - 調用自定義sink方法,Flink自帶連接到其他系統的connector(如Apache Kafka),這些connector都以sink方法的形式實現。

 

注意DataStream的write*()函數主要用於debug,它們不參與Flink的檢查點,這意味着這些方法通常處於“至少一次(at-least-once)“的執行語義下。flush到目標系統的數據依賴於OutputFormat的實現,這意味着不是所有發送到OutputFormat的element都會立即出現在目標系統中,此外,在失效的情況下,這些數據很可能會丟失。

 

故為了可靠性以及將stream“恰好一次(exact once)”地傳入文件系統,我們應當使用flink-connector-filesystem。此外,通過實現“.addSink(…)”的自定義內容會參加Flink的檢查點機制,故會保證“恰好一次”的執行語義。

 

五、迭代(Iterations)

迭代流程序實現了一個階段方法並將之嵌入到一個IterativeStream中。作為一個可能永遠不會結束的程序,它沒有最大迭代數,反之,你需要使用splitfilter的transformation來明確流的哪一部分會被反饋到迭代中,哪一部分則繼續轉發到下游。這里,我們使用filter作為例子,我們定義IterativeStream

IterativeStream<Integer> iteration = input.iterate();

然后,我們定義在循環中將要進行的邏輯處理,我們通過一系列transformation來實現(這里用了一個簡單的map transformation):

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

 

我們可以調用IterativeStreamcloseWith(feedbackStream)函數來關閉一個迭代並定義迭代尾。傳遞給closeWith方法的DataStream將會反饋回迭代頭。分割出用來反饋的stream的部分和向前傳播的stream部分通常的方法便是使用filter來進行分割。這些filter可以定義諸如"termination"邏輯,即element將會傳播到下游,而不是被反饋回去。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

 

默認地,反饋的那部分流將會自動設置為迭代頭的輸入,要想重載該行為,用戶需要設置closeWith函數中的一個boolean參數。例如,下面是一個持續將整數序列中的數字減1知道它們變為0的程序:

DataStream<Long> someIntegers = env.generateSequence(01000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

 

六、執行參數

StreamExecutionEnvironment包含ExecutionConfig,它可以使用戶設置job的確切運行時配置值。

請參考execution configuration來查看參數的解釋。特別的,以下這些參數僅適用於DataStream API:

enableTimestamps() / disableTimestamps():在每一個source發出的事件上附加上一個時間戳。函數areTimestampsEnabled()可以返回該狀態的當前值。

setAutoWatermarkInterval(long milliseconds):設置自動水印發布(watermark emission)區間。你可以通過調用函數getAutoWatermarkInterval()來獲取當前值。

 

6.1 容錯

文檔Fault Tolerance Documentation描述了打開並配置Flink的檢查點機制的選項和參數

 

6.2 控制執行時間

默認的,element在網絡傳輸時不是一個個單獨傳輸的(這會導致不必要的網絡流量),而是緩存后傳輸。緩存(是在設備間傳輸的實際單位)的大小可以在Flink的配置文件中設置。盡管該方法有益於優化吞吐量,他會在stream到達不夠快時導致執行時間方面的問題。為了控制吞吐量和執行時間,你可以在執行環境(或獨立的Operator)中調用env.setBufferTimeout(timeoutMillis)來設置等待裝滿buffer的最大等待時間,在這個時間過后,不管buffer是否已滿,它都會自動發出。該默認超時時間是100ms。下例是設置API的用法:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

 

要最大化吞吐量,設置setBufferTimeout(-1)來去除超時時間,則buffer僅在它滿后才會被flush。要最小化執行時間,設置timeout為一個接近0的數字(如5ms或10ms)。應當避免設置Timeout為0,因為它會造成嚴重的性能下降。

 

七、Debugging

在分布式集群上運行流程序之前,確保算法正確執行很重要。因此,實現數據分析程序通常需要遞增的檢查結果、debug、優化的過程。

Flink提供了可以顯著簡化數據分析程序的開發過程的特性,即可以在IDE中本地進行debug、注入測試數據、以及結果數據的收集等。本節對如何簡化Flink程序開發提出幾點建議。

 

7.1 本地執行環境

LocalStreamEnvironment在創建它的同一個JVM進程下創建Flink系統。如果你從IDE中啟動一個LocalEnvironment,你可以在代碼中設置斷點來簡單地debug你的程序。下例為LocalEnvironment是如何創建並使用的:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();

 

7.2 Collection數據源

Flink提供基於Java collection的特殊數據源來方便測試。一旦程序測試之后,source和sink可以簡單地替代為對外部系統的讀取/寫出的source和sink。Collection數據源使用方法如下:

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(12345);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

 

注意:當前Collection數據源需要實現Serializable接口的數據類型和迭代器。此外,Collection數據源無法並行執行(並行度=1)

 

7.3 迭代器Data Sink

Flink同樣提供了一個收集測試和debug的DataStream結果的sink,它的使用方式如下:

import org.apache.flink.contrib.streaming.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

 

 

 

 

 

Flink 原理與實現:數據流上的類型和操作

Flink 為流處理和批處理分別提供了 DataStream API 和 DataSet API。正是這種高層的抽象和 flunent API 極大地便利了用戶編寫大數據應用。不過很多初學者在看到官方 Streaming 文檔中那一大坨的轉換時,常常會蒙了圈,文檔中那些只言片語也很難講清它們之間的關系。所以本文將介紹幾種關鍵的數據流類型,它們之間是如何通過轉換關聯起來的。下圖展示了 Flink 中目前支持的主要幾種流的類型,以及它們之間的轉換關系。

DataStream

DataStream 是 Flink 流處理 API 中最核心的數據結構。它代表了一個運行在多個分區上的並行流。一個 DataStream 可以從 StreamExecutionEnvironment 通過env.addSource(SourceFunction) 獲得。

DataStream 上的轉換操作都是逐條的,比如 map()flatMap()filter()。DataStream 也可以執行 rebalance(再平衡,用來減輕數據傾斜)和 broadcaseted(廣播)等分區轉換。

val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer08[String](...))
val str1: DataStream[(String, MyType)] = stream.flatMap { ... }
val str2: DataStream[(String, MyType)] = stream.rebalance()
val str3: DataStream[AnotherType] = stream.map { ... }

上述 DataStream 上的轉換在運行時會轉換成如下的執行圖:

如上圖的執行圖所示,DataStream 各個算子會並行運行,算子之間是數據流分區。如 Source 的第一個並行實例(S1)和 flatMap() 的第一個並行實例(m1)之間就是一個數據流分區。而在 flatMap() 和 map() 之間由於加了 rebalance(),它們之間的數據流分區就有3個子分區(m1的數據流向3個map()實例)。這與 Apache Kafka 是很類似的,把流想象成 Kafka Topic,而一個流分區就表示一個 Topic Partition,流的目標並行算子實例就是 Kafka Consumers。

KeyedStream

KeyedStream用來表示根據指定的key進行分組的數據流。一個KeyedStream可以通過調用DataStream.keyBy()來獲得。而在KeyedStream上進行任何transformation都將轉變回DataStream。在實現中,KeyedStream是把key的信息寫入到了transformation中。每條記錄只能訪問所屬key的狀態,其上的聚合函數可以方便地操作和保存對應key的狀態。

WindowedStream & AllWindowedStream

WindowedStream代表了根據key分組,並且基於WindowAssigner切分窗口的數據流。所以WindowedStream都是從KeyedStream衍生而來的。而在WindowedStream上進行任何transformation也都將轉變回DataStream

val stream: DataStream[MyType] = ...
val windowed: WindowedDataStream[MyType] = stream
.keyBy( "userId")
.window( TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
val result: DataStream[ResultType] = windowed.reduce(myReducer)

上述 WindowedStream 的樣例代碼在運行時會轉換成如下的執行圖:

Flink 的窗口實現中會將到達的數據緩存在對應的窗口buffer中(一個數據可能會對應多個窗口)。當到達窗口發送的條件時(由Trigger控制),Flink 會對整個窗口中的數據進行處理。Flink 在聚合類窗口有一定的優化,即不會保存窗口中的所有值,而是每到一個元素執行一次聚合函數,最終只保存一份數據即可。

在key分組的流上進行窗口切分是比較常用的場景,也能夠很好地並行化(不同的key上的窗口聚合可以分配到不同的task去處理)。不過有時候我們也需要在普通流上進行窗口的操作,這就是 AllWindowedStreamAllWindowedStream是直接在DataStream上進行windowAll(...)操作。AllWindowedStream 的實現是基於 WindowedStream 的(Flink 1.1.x 開始)。Flink 不推薦使用AllWindowedStream,因為在普通流上進行窗口操作,就勢必需要將所有分區的流都匯集到單個的Task中,而這個單個的Task很顯然就會成為整個Job的瓶頸。

JoinedStreams & CoGroupedStreams

雙流 Join 也是一個非常常見的應用場景。深入源碼你可以發現,JoinedStreams 和 CoGroupedStreams 的代碼實現有80%是一模一樣的,JoinedStreams 在底層又調用了 CoGroupedStreams 來實現 Join 功能。除了名字不一樣,一開始很難將它們區分開來,而且為什么要提供兩個功能類似的接口呢??

實際上這兩者還是很點區別的。首先 co-group 側重的是group,是對同一個key上的兩組集合進行操作,而 join 側重的是pair,是對同一個key上的每對元素進行操作。co-group 比 join 更通用一些,因為 join 只是 co-group 的一個特例,所以 join 是可以基於 co-group 來實現的(當然有優化的空間)。而在 co-group 之外又提供了 join 接口是因為用戶更熟悉 join(源於數據庫吧),而且能夠跟 DataSet API 保持一致,降低用戶的學習成本。

JoinedStreams 和 CoGroupedStreams 是基於 Window 上實現的,所以 CoGroupedStreams 最終又調用了 WindowedStream 來實現。

val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
 
val result: DataStream[(MyType, AnotherType)] = firstInput.join(secondInput)
.where( "userId").equalTo("id")
.window( TumblingEventTimeWindows.of(Time.seconds(3)))
.apply ( new JoinFunction () {...})

上述 JoinedStreams 的樣例代碼在運行時會轉換成如下的執行圖:

雙流上的數據在同一個key的會被分別分配到同一個window窗口的左右兩個籃子里,當window結束的時候,會對左右籃子進行笛卡爾積從而得到每一對pair,對每一對pair應用 JoinFunction。不過目前(Flink 1.1.x)JoinedStreams 只是簡單地實現了流上的join操作而已,距離真正的生產使用還是有些距離。因為目前 join 窗口的雙流數據都是被緩存在內存中的,也就是說如果某個key上的窗口數據太多就會導致 JVM OOM(然而數據傾斜是常態)。雙流join的難點也正是在這里,這也是社區后面對 join 操作的優化方向,例如可以借鑒Flink在批處理join中的優化方案,也可以用ManagedMemory來管理窗口中的數據,並當數據超過閾值時能spill到硬盤。

ConnectedStreams

在 DataStream 上有一個 union 的轉換 dataStream.union(otherStream1, otherStream2, ...),用來合並多個流,新的流會包含所有流中的數據。union 有一個限制,就是所有合並的流的類型必須是一致的。ConnectedStreams 提供了和 union 類似的功能,用來連接兩個流,但是與 union 轉換有以下幾個區別:

  1. ConnectedStreams 只能連接兩個流,而 union 可以連接多於兩個流。
  2. ConnectedStreams 連接的兩個流類型可以不一致,而 union 連接的流的類型必須一致。
  3. ConnectedStreams 會對兩個流的數據應用不同的處理方法,並且雙流之間可以共享狀態。這在第一個流的輸入會影響第二個流時, 會非常有用。

如下 ConnectedStreams 的樣例,連接 input 和 other 流,並在input流上應用map1方法,在other上應用map2方法,雙流可以共享狀態(比如計數)。

val input: DataStream[MyType] = ...
val other: DataStream[AnotherType] = ...
 
val connected: ConnectedStreams[MyType, AnotherType] = input.connect(other)
 
val result: DataStream[ResultType] =
connected.map( new CoMapFunction[MyType, AnotherType, ResultType]() {
override def map1(value: MyType): ResultType = { ... }
override def map2(value: AnotherType): ResultType = { ... }
})

當並行度為2時,其執行圖如下所示:

總結

本文介紹通過不同數據流類型的轉換圖來解釋每一種數據流的含義、轉換關系。后面的文章會深入講解 Window 機制的實現,雙流 Join 的實現等。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM