Flink原理、實戰與性能優化讀書筆記


第一章 ApacheFlink介紹

一、Flink優勢

1. 目前唯一同時支持高吞吐、低延遲、高性能的分布式流式數據處理框架

2. 支持事件事件概念

3. 支持有狀態計算,保持了事件原本產生的時序性,避免網絡傳輸帶來的影響

4. 支持高度靈活的窗口操作,Flink將窗口分為Time、Count、Session以及Data-driven等類型的窗口操作,可以靈活的處罰條件定制化來達到對復雜的流傳輸模式的支持。

5. 基於輕量級分布式快照實現容錯,大型計算任務的流程拆解成小的計算過程,task分布到並行節點上處理。基於分布式快照技術的Checkpoints,將執行過程中的狀態信息進行持久化存儲,可以自動恢復出現異常的任務。

5. 基於JVM實現獨立的內存管理

二、Flink的應用場景

1. 實時智能推薦

2. 復雜事件處理

3. 實時欺詐檢測

4. 實時數倉與ETL

5. 流數據分析

6. 實時報表分析

三、Flink基本組件棧

1. Flink架構體系基本上分三層(自頂向下):API&Libraries層、Runtime核心層、物理部署層
    - API&Libraries層: 提供支撐流計算和批計算的接口,,同時在此基礎上抽象出不同的應用類型的組件庫。

    - Runtime核心層:Flink分布式計算框架的核心實現層,支持分布式Stream作業的執行、JobGraph到ExecutionGraph的映射轉換、任務調度等。將DataStream和DataSet轉成同意的可執行的Task Operator

    - 物理部署層:目前Flink支持本地、集群、雲、容器部署,Flink通過蓋層能夠支持不同平台的部署,用戶可以根據需要選擇使用對應的部署模式。

2. Flink基本架構
    - Client客戶端:負責將任務提交到集群,與JobManager構建Akka連接,然后將任務提交到JobManager,通過和JobManager之間進行交互獲取任務執行狀態。

    - JobManager:負責整個Flink集群任務的調度以及資源的管理

    - TaskManager:相當於整個集群的Slave節點,負責具體的任務執行和對應任務在每個節點上的資源申請與管理。

第二章 Flink環境准備

- Notes:Flink同時支持Java及Scala,但以下所有的配置以及代碼說明均以Java為例

一、運行環境要求

- JDK版本必須在1.8及以上

- Maven版本必須在3.0.4及以上

- Hadoop環境支持handoop2.4、2.6、2.7、2.8等主要版本

二、Flink項目模板

- 本地環境
C:\Users\016322500>java -version
java version "12.0.2" 2019-07-16
Java(TM) SE Runtime Environment (build 12.0.2+10)
Java HotSpot(TM) 64-Bit Server VM (build 12.0.2+10, mixed mode, sharing)
C:\Users\016322500>mvn -v
Apache Maven 3.6.1 (d66c9c0b3152b2e69ee9bac180bb8fcc8e6af555; 2019-04-05T03:00:2
9+08:00)
Maven home: C:\Program Files\apache-maven-3.6.1\bin\..
Java version: 12.0.2, vendor: Oracle Corporation, runtime: C:\Program Files\Java
\jdk-12.0.2
Default locale: zh_CN, platform encoding: GBK
OS name: "windows 7", version: "6.1", arch: "amd64", family: "windows"
- 通過Maven Archetype進行構建:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0 -DgroupId=com.test -DartifactId=flink -Dversion=1.0.0 -Dpackage=com.test -DinteractiveMode=false
- 構建成功並檢查項目
[INFO] Project created from Archetype in dir: C:\Users\016322500\Documents\Flink
\flink
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  29.929 s
[INFO] Finished at: 2019-09-02T13:52:12+08:00
[INFO] ------------------------------------------------------------------------


└─flink
    └─src
        └─main
            ├─java
            │  └─com
            │      └─test
            └─resources
  • Notes: Maveny依賴要注意scope改為compile
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
   <scope>compile</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>compile</scope>
</dependency>
  • Flink簡單demo - 統計單詞出現的頻率
		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();

		var list = Arrays.asList(1,2,3);
		list.stream().forEach(element ->System.out.println(element));

		String inputPath = "C:\\Users\\016322500\\Documents\\Flink\\flink\\src\\main\\java\\com\\flink\\resource\\file.txt";

		DataSource<String> text = environment.readTextFile(inputPath);

		text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
				String[] values = s.toLowerCase().split(" ");
				for (String value : values) {
					if (values.length > 0) {
						collector.collect(new Tuple2<>(value, 1));
					}
				}
			}
		}).groupBy(0)
				.sum(1)
				.print();

第三章 Flink編程模型

一、Flink數據類型

- 1.原生數據類型
    * Flink通過實現BasicTypeInfo數據類型,能夠支持任意Java原生基本類型(裝箱)或String類型,例如Integer、String、Double等
DataSource<Integer> inputStream= environment.fromElements(1, 2, 3, 4, 5, 6);

DataSource<String> inputStream= environment.fromElements("1", "2", "3", "4", "5", "6");
- 2.Java Tuples類型
    * Flink通過定義TupleTypeInfo來標書Tuple類型數據
DataSource<Tuple2> inputStreamTuple = environment.fromElements(new Tuple2("fangpc", 1), new Tuple2("fangpengcheng", 2));
- 3.POJOs類型
    * Flink通過PojoTypeInfo來描述任意的POJOs,包括Java和Scala類
    * POJOs類必須是Public修飾且必須獨立定義,不能是內部類
    * POJOs類中必須含有默認構造器
    * POJOs類中所有的Fields必須是Public或者具有普Public修飾的getter和setter方法
    * POJOs類中的字段必須是Flink支持的
var personStream = environment.fromElements(new Person("fangpc", 24), new Person("fangpengcheng", 25));
- 4. Flink Value類型
    * Value數據類型實現了org.apache.flink.types.Value,其中包括read()和write()兩個方法完成序列化和反序列化操作,相對於通用的序列化工具會有着比較高效的性能。Flink提供的內建Value類型有IntValue、DoubleValue、StringValue等

- 5. 特殊數據類型
    * Scala中的List、Map、Either、Option、Try數據類型
    * Java中Either
    * Hadoop的Writable數據類型

二、TypeInfomation信息獲取

- 1.Java API類型信息
    * 由於Java泛型會出現類型擦除問題,Flink通過Java反射機制盡可能重構類型信息
    * 如果Kryo序列化工具無法對POJOs類序列化時,可以使用Avro對POJOs類進行序列化
    * 
environment.getConfig().enableForceAvro();

第四章 DataStream API介紹與使用

. DataStream接口編程中的基本操作,包括定義數據源、數據轉換、數據輸出、操作拓展
. Flink流式計算過程,對時間概念的區分和使用包括事件時間(Event Time)、注入時間(Ingestion Time)、處理時間(Process Time)
. 流式計算中常見的窗口計算類型,如滾動窗口、滑動窗口、會話窗口
. Flink任務優化

一、DataStream編程模型

- DataStream API 主要分為三個部分,DataSource模塊、Transformationmok、DataSink模塊
- DataSource模塊主要定義了數據接入功能
- Transformation模塊定義了對DataStream數據集的各種轉換操作,例如進行map、filter、windows等操作
- DataSink模塊將數據寫出到外部存儲介質,例如將數據輸出到文件或Kafka消息中間件

- Flink將數據源主要分為內置數據源、第三方數據源
    * 內置數據源包含文件、Socket網絡端口、集合類型數據
    * Flink中定義了非常多的第三方數據源連接器,例如Apache kafa Connector、Elatic Search Connector等
    * 用戶也可以自定義實現Flink中數據接入函數SourceFunction,並封裝成第三方數據源Connector

- 內置數據源
    * 文件數據源
        1. 使用readTextFile直接讀取文本文件
        2. 使用readFile方法通過指定文件的InputFormat來讀取指定類型的文件,比如CsvInputFormat,用戶可以自定義InputFormat接口類
var csvStream = environment.readFile(new CsvInputFormat<String>(new Path(inputPath)) {
    @Override
    protected String fillRecord(String reuse, Object[] parsedValues) {
        return null;
    }
}, inputPath);
- Socket數據源
    * StreamExecutionEnvironment調用socket-TextStream方法(參數為IP地址、端口號、字符串切割符delimiter、最大嘗試次數maxRetry)
var socketDataStream = streamExecutionEnvironment.socketTextStream("localhost", 8080);
- 集合數據源
    * Flink可以直接將Java集合類轉換成DataStream數據集,本質上是將本地集合中的數據分發到遠端並行執行的節點中
// 通過fromElements從元素集合中穿件創建DataStream數據集
var dataStream = environment.fromElements(new Tuple2(1L, 2L), new Tuple2(2L, 3L));

// 通過fromCollection從數組中創建DataStream數據集
var collectionStream = environment.fromCollection(Arrays.asList("fangpc", "fang"));

- 外部數據源
    * 數據源連接器
        1. 部分連接器僅支持讀取數據:如Twitter Streaming API、Netty等
        2. 既支持數據輸入也支持數據輸出:Apache Kafka、Amazon Kinesis、RabbitMQ等連接器
    * Flink內部提供了常用的序列化協議的Schema,例如TypeInfomationSerializationSchema、JsonDeserializationSchema和AvroDeserializationSchema等
    * 以Kafka為例進行數據接入

//maven 依賴
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.9 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.9_2.12</artifactId>
    <version>1.9.0</version>
</dependency>

// 創建和使用Kafka的Connector
// Properties參數定義
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> input = streamExecutionEnvironment
        .addSource(new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), properties));

二、DataStream轉換操作

* 從一個或多個DataStream生成新的DataStream的過程被稱為Transformation操作
* 將每種操作類型被定義為不同的Operator,Flink程序能夠將多個Transformation組成一個DataFlow的拓撲。
* DataStream的轉換操作可以分為單Single-DataStream、Multi-DataStream、物理分區三類類型

- Single-DataStream操作
    * (1). Map[DataStream -> DataStream],調用用戶定義的MapFunction對DataStream[T]數據進行處理,形成新的DataStream[T],其中數據格式可能會發生變化,常用作對數據集內數據的清洗和轉換。
var singleDataStream = environment.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new Tuple2<>("c", 5));
// 指定map計算表達式
var mapDataStream = singleDataStream.map(t -> new Tuple2(t.f0, t.f1 + 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
mapDataStream.print();


// 通過指定MapFunction
mapDataStream = singleDataStream.map((new MapFunction<Tuple2<String, Integer>, Tuple2>() {
    @Override
    public Tuple2 map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
        return new Tuple2(stringIntegerTuple2.f0, stringIntegerTuple2.f1 + 1);
    }
})).returns(Types.TUPLE(Types.STRING, Types.INT));
    * (2). FlatMap[DataStream -> DataStream],該算子主要應用處理輸入一個元素產生一個或者多個元素的計算場景,例如對每一行的文本進行切割,生成單詞序列
var flatMapDataStream = environment.fromElements("fangpc fangpc fangpc aaa bbb cccc");
flatMapDataStream.flatMap((String str, Collector<String> out) -> {
    Arrays.stream(str.split(" ")).forEach(string -> out.collect(string));
}).returns(Types.STRING).print();


// 通過指定FlatMapFunction
flatMapDataStream.flatMap(new FlatMapFunction<String, String>() {

    @Override
    public void flatMap(String s, Collector<String> collector) throws Exception {
        Arrays.stream(s.split(" ")).forEach(strr -> collector.collect(strr));
    }
}).returns(Types.STRING).print();
    * (3). Filter[DataStream -> DataStream],該算子將按照條件對輸入數據集進行篩選操作,將符合條件的數據集輸出,將不符合條件的數據過濾掉
// 通過運算表達式
var filterDataStream = environment.fromElements(1, 2, 3, 4, 5, 6);
filterDataStream.filter(x -> x % 2 == 0).print();

// 指定FilterFunction
filterDataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer integer) throws Exception {
        return integer % 2 == 0;
    }
}).print();
    * KeyBy[DataStream -> KeyedStream],該算子根據指定的Key將輸入的DataStream[T]數據格式轉換為KeyStream[T],也就是在數據集中執行Partition操作,將相同的Key值得數據放置在相同的分區中。
var keyByDataStream = streamExecutionEnvironment.fromElements(new Tuple2<>(1, 2), new Tuple2<>(2, 3), new Tuple2<>(2, 4), new Tuple2<>(3, 6));
keyByDataStream.keyBy(0).print();
    * (4). Reduce[KeyedStream -> DataStream],該算子和MapReduce中Reduce原理基本一致,主要目的是將輸入的KeyedStream通過傳入的用戶自定義的ReduceFunction滾動地進行數據聚合處理,其中定義的ReduceFunction必須滿足運算結合律和交換律。
var keyByDataStream = streamExecutionEnvironment.fromElements(new Tuple2<>(1, 2), new Tuple2<>(2, 3), new Tuple2<>(2, 4), new Tuple2<>(3, 6));
keyByDataStream.keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> t1) throws Exception {
        return new Tuple2<>(integerIntegerTuple2.f0, integerIntegerTuple2.f1 + t1.f1);
    }
}).print();
    * Aggregations[KeyedStream -> DataStream],該算子提供聚合算子,根據指定的字段進行聚合操作,滾動地產生一系列數據聚合結果。其實是將Reduce算子中的函數進行了封裝,封裝的聚合操作有sum、min、minBy、max、maxBy

- Multi-DataStream操作
    * (1). Union[DataStream -> DataStream],Union算子主要講兩個或者多個輸入的數據集合並成一個數據集,需要保證兩個數據集的格式一致。
var dataStream1 = environment.fromElements(new Tuple2<>("a", 3), new Tuple2<>("d", 4), new Tuple2<>("c", 2), new Tuple2<>("c", 5), new Tuple2<>("a", 5));

var dataStream2 = environment.fromElements(new Tuple2<>("d", 1), new Tuple2<>("s", 2), new Tuple2<>("a", 4), new Tuple2<>("e", 5), new Tuple2<>("a", 6));

dataStream1.union(dataStream2).print();

// 輸出結果
(a,3)
(a,5)
(d,1)
(a,6)
(d,4)
(s,2)
(c,2)
(a,4)
(c,5)
(e,5)
    * (2). Connect, CoMap, CoFlatMap[DataStream -> DataStream],Connect算子主要是為了合並兩種或者多種不同數據類型的數據集,和並之后會保留原來數據集的數據類型。
dataStream1.connect(dataStream3).map(new CoMapFunction<Tuple2<String, Integer>, Integer, Tuple2<String, Integer>>() {

    @Override
    public Tuple2<String, Integer> map1(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
        return stringIntegerTuple2;
    }

    @Override
    public Tuple2<String, Integer> map2(Integer integer) throws Exception {
        return new Tuple2<>("default", integer);
    }
}).print();
- 物理分區操作
    * 物理分區操作的作用是根據指定的分區策略將數據重新分配到不同的節點的Task實例上執行
    * (1). 隨機分區: 通過隨機的方式將數據分配在下游算子的每個分區中,分區相對均衡,但是較容易失去原有數據的分區結構
// 通過調用DataStream API中的shuffle方法實現數據集的隨機分區
dataStream1.shuffle();
    * (2). Roundrobin:[DataStream -> DataStream]:通過循環的方式對數據集中的數據進行重分區,能夠盡可能保證每個分區的數據平衡。
// 通過調用DataStream API中reblance()方法來實現數據的重平衡分區
dataStream1.rebalance();
    * (3). Rescaling:[DataStream -> DataStream]: 和Roundrobin一樣,Rescaling也是一種通過循環的方式進行數據重平衡的分區策略。但是Rescaling與Roundrobin不同的是,使用Roundrobin時數據會全局性地通過網絡介質傳輸到其他的節點完成數據的重新平衡,而Rescaling僅僅會對上下游繼承的算子數據進行重平衡。
// 通過調用DataStream API中rescale()方法實現Rescaling Partitioning操作
dataStream1.rescale();
    * (4). Broadcasting:[DataStream -> DataStream]: 廣播策略將輸入的數據集復制到下游算子的並行的Tasks實例中,下游算子中的Tasks可以直接從本地內存中獲取廣播數據集,不再依賴於網絡傳輸,這種分區策略適合於小數據集。
// 通過調用DataStream API的broadcast()方法實現廣播分區
dataStream1.broadcast();

三、DataSinks數據輸出

* 數據經過Transformation操作后,最終形成和用戶需要的結果數據集,DataSink操作將結果數據輸出在外部存儲介質或者傳輸到下游的消息中間件內。
* Flink支持的數據輸出有Apache Kafka、Apache Cassandra、ElasticSearch、Hadoop FileSystem、RabbitMQ、NIFI等
* Flink支持Redis、Netty等第三方系統

* (1). 基本數據輸出:基本數據輸出包含了文件輸出、客戶端輸出、Socket網絡端口等
// writeAsCsv
personDataStream.writeAsCsv(outputPath, FileSystem.WriteMode.OVERWRITE);
environment.execute();

// writeAsCsv
personDataStream.writeAsText(outputPath);
environment.execute();
* (2). 第三方數據輸出:所有的數據輸出都可以基於實現SinkFunction完成定義。

四、時間概念

* Flink包含三種時間概念:事件生成時間(Event Time)、事件接入時間(Ingestion Time)、事件處理時間(Processing Time)
* 事件生成時間:是每個獨立事件在產生它的設備上發生的時間。
* 接入時間:數據進入Flink系統的時間,接入時間依賴於Source Operator所在主機系統的時間。
* 處理時間:數據在操作算子計算過程中獲取到的所在主機的時間。
* 時間概念的指定:
    - 事件生成時間:TimeCharacteristic.EventTime 
    - 事件接入時間:TimeCharacteristic.IngestionTime
    - 事件處理時間:TimeCharacteristic.ProcessingTime

五、EventTime和Watermark

* Watermark存在的目的就是為了解決亂序的數據問題,可以在時間窗口內根據事件時間來進行業務處理,對於亂序的有延遲的數據可以在一定范圍內進行等待。
* 舉例:若window設置為5s,對事件延遲的容忍度為3s,flink會以5s將每分鍾划分為連續的多個窗口,窗口是左閉右開的,如0~5s、5s~10s....55~60s,假設這個時候過來一個事件時間為13s的事件,則落入10~15s的窗口,那么什么時候進行window操作?
    - 窗口中要有數據,這個時候事件時間為13s的事件已經確定有了
    - 存在一條數據的事件時間大於等於18s,所以還要等待別的事件進入窗口

*上游:生成Watermark
    - 兩種Watermark,AssignWithPeriodWatermarks、AssignWithPunctuatedWatermarks

    - AssignWithPeriodWatermarks:每隔N秒自動向流里注入一個Watermark,時間間隔由ExecutionConfig.setAutoWatermarkInterval()決定,每次調用getCurrentWatermark方法,如果得到的Watermark不為空,並且比之前的大就注入流中。

    - AssignWithPunctuatedWatermarks:基於事件向流里注入一個Watermark,每一個元素都有機會判斷是否生成一個Watermark,如果得到的Watermark不為空且比之前的大就注入流中。

* 下游:處理Watermark
    - StatusWatermarkValve 負責將不同Channel 的Watermark 對齊,再傳給pipeline 下游,對齊的概念是當前Channel的Watermark時間大於所有Channel

六、Windows窗口計算

* Flink按照固定時間或長度將數據流切分成不同的窗口,然后對數據進行相應的聚合運算,從而得到一定時間范圍內的統計結果。

* Flink支持兩種窗口,一種是基於時間的窗口,這種窗口由起止時間戳來決定,並且是左閉右開的。另一種是基於數據的窗口,這種窗口根據固定的數據定義窗口的大小,該類型的窗口若出現數據亂序的情況會造成計算結果不確定。

* Fink的Windows編程模型主要分為:Keyed-Windows、NoKeyed-Windows,兩者的區別在於是否調用keyby()、然后再根據此調用window()或windowAll()。

* Window的生命周期:
    - 一般而言,當第一個屬於某個窗口的事件到達時,窗口被創建。當time超過窗口定義的endTimestamp時,窗口被刪除。
    - 對於用戶自定義的延遲特性,則要等待滿足條件后窗口才會被刪除
    - 觸發器

- (1). Keyed-windows& NoKeyed-windows
dataStream1.keyBy(0).window(new WindowAssigner<Tuple2<String, Integer>, Window>() {
    @Override
    public Collection<Window> assignWindows(Tuple2<String, Integer> stringIntegerTuple2, long l, WindowAssignerContext windowAssignerContext) {
        return null;
    }

    @Override
    public Trigger<Tuple2<String, Integer>, Window> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
        return null;
    }

    @Override
    public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) {
        return null;
    }

    @Override
    public boolean isEventTime() {
        return false;
    }
});

dataStream1.windowAll();

- (2). Windows Assigner
    - Flink流式計算中,通過Windows Assigner將接入的數據分配到不同的窗口

    - 可以根據Windows Assigner數據分配方式的不同將Windows分為4大類,分別是滾動窗口(Tumbling Windows)、滑動窗口(Sliding Windows)、會話窗口(Session Windows)和全局窗口(Global Windows)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM