1、Transform
1.1 map
val streamMap = stream.map { x => x * 2 }
1.2 flatmap
flatMap的函數簽名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
例如: flatMap(List(1,2,3))(i ⇒ List(i,i))
結果是List(1,1,2,2,3,3)
而List("a b", "c d").flatMap(line ⇒ line.split(" "))
結果是List(a, b, c, d)
val streamFlatMap = stream.flatMap{ x => x.split(" ") }
1.3 filter
val streamFilter = stream.filter{ x => x == 1 }
1.4 keyby
DataStream→ KeyedStream:邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的。
val streamKeyby = stream.keyBy(0)
1.5 滾動聚合算子(Rolling Aggregation)
這些算子可以針對KeyedStream的每一個支流做聚合。
- sum()
- min()
- max()
- minBy()
- maxBy()
1.6 reduce
KeyedStream → DataStream:一個分組數據流的聚合操作,合並當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataDS: DataStream[String] = env.readTextFile("input/data.txt") val ds: DataStream[WaterSensor] = dataDS.map( s => { val datas = s.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble) } ).keyBy(0).reduce( (s1, s2) => { println(s"${s1.vc} <==> ${s2.vc}") WaterSensor(s1.id, s1.ts, math.max(s1.vc, s2.vc)) } ) ds.print() env.execute("sensor")
1.7 split & select
Split算子
DataStream → SplitStream:根據某些特征把一個DataStream拆分成兩個或者多個DataStream。
val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } )
Select算子
SplitStream→DataStream:從一個SplitStream中獲取一個或者多個DataStream。
val even = split select("even") val odd = split select("odd") val all = split.select("even","odd")
1.8 Connect & CoMap
DataStream,DataStream → ConnectedStreams:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。
someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream)
connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false )
1.9 Union
DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream。
dataStream.union(otherStream1, otherStream2, ...)
Connect與 Union 區別:
- Union之前兩個流的類型必須是一樣,Connect可以不一樣,在之后的coMap中再去調整成為一樣的。
- Connect只能操作兩個流,Union可以操作多個。
2、支持的數據類型
Flink 會盡力推斷有關數據類型的大量信息,這些數據會在分布式計算期間被網絡交換或存儲。 可以把它想象成一個推斷表結構的數據庫。在大多數情況下,Flink 可以依賴自身透明的推斷出所有需要的類型信息。 掌握這些類型信息可以幫助 Flink 實現很多意想不到的特性:
- 對於使用 POJOs 類型的數據,可以通過指定字段名(比如
dataSet.keyBy("username")
)進行 grouping 、joining、aggregating 操作。 類型信息可以幫助 Flink 在運行前做一些拼寫錯誤以及類型兼容方面的檢查,而不是等到運行時才暴露這些問題。 - Flink 對數據類型了解的越多,序列化和數據布局方案就越好。 這對 Flink 中的內存使用范式尤為重要(可以盡可能處理堆上或者堆外的序列化數據並且使序列化操作很廉價)。
- 最后,它還使用戶在大多數情況下免於擔心序列化框架以及類型注冊。
通常在應用運行之前的階段 (pre-flight phase),需要數據的類型信息 - 也就是在程序對 DataStream
或者 DataSet
的操作調用之后,在 execute()
、print()
、count()
、collect()
調用之前。
Flink支持Java和Scala中所有常見數據類型。使用最廣泛的類型有以下幾種。
2.0 Flink 的 TypeInformation 類
類 TypeInformation 是所有類型描述符的基類。該類表示類型的基本屬性,並且可以生成序列化器,在一些特殊情況下可以生成類型的比較器。 (請注意,Flink 中的比較器不僅僅是定義順序 - 它們是處理鍵的基礎工具)
Flink 內部對類型做了如下區分:
- 基礎類型:所有的 Java 主類型(primitive)以及他們的包裝類,再加上
void
、String
、Date
、BigDecimal
以及BigInteger
。 - 主類型數組(primitive array)以及對象數組
- 復合類型
- Flink 中的 Java 元組 (Tuples) (元組是 Flink Java API 的一部分):最多支持25個字段,null 是不支持的。
- Scala 中的 case classes (包括 Scala 元組):null 是不支持的。
- Row:具有任意數量字段的元組並且支持 null 字段。。
- POJOs: 遵循某種類似 bean 模式的類。
- 輔助類型 (Option、Either、Lists、Maps 等)
- 泛型類型:這些不是由 Flink 本身序列化的,而是由 Kryo 序列化的。
POJOs 是特別有趣的,因為他們支持復雜類型的創建以及在鍵的定義中直接使用字段名: dataSet.join(another).where("name").equalTo("personName")
它們對運行時也是透明的,並且可以由 Flink 非常高效地處理。
TypeInformation 支持以下幾種類型:
- BasicTypeInfo: 任意Java 基本類型或 String 類型
- BasicArrayTypeInfo: 任意Java基本類型數組或 String 數組
- WritableTypeInfo: 任意 Hadoop Writable 接口的實現類
- TupleTypeInfo: 任意的 Flink Tuple 類型(支持Tuple1 to Tuple25)。Flink tuples 是固定長度固定類型的Java Tuple實現
- CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
- PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java對象的所有成員變量,要么是 public 修飾符定義,要么有 getter/setter 方法
- GenericTypeInfo: 任意無法匹配之前幾種類型的類
針對前六種類型數據集,Flink皆可以自動生成對應的TypeSerializer,能非常高效地對數據集進行序列化和反序列化。
2.1 基礎數據類型
Flink支持所有的Java和Scala基礎數據類型,Int, Double, Long, String, …
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L) numbers.map( n => n + 1 )
2.2 Java和Scala元組(Tuples)
val persons: DataStream[(String, Integer)] = env.fromElements( ("Adam", 17), ("Sarah", 23) ) persons.filter(p => p._2 > 18)
2.3 Scala樣例類(case classes)
case class Person(name: String, age: Int) val persons: DataStream[Person] = env.fromElements( Person("Adam", 17), Person("Sarah", 23) ) persons.filter(p => p.age > 18)
2.4 Java簡單對象(POJOs)
public class Person { public String name; public int age; public Person() {} public Person(String name, int age) { this.name = name; this.age = age; } } DataStream<Person> persons = env.fromElements( new Person("Alex", 42), new Person("Wendy", 23));
2.5 其它(Arrays, Lists, Maps, Enums, 等等)
Flink對Java和Scala中的一些特殊目的的類型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。
3、實現UDF函數----更細粒度的控制流
Flink在使用各種不同算子的同時,為了能更細粒度的控制數據和操作數據,給開發者提供了對現有函數進行擴展的能力
3.1 函數類(Function Classes)
Flink暴露了所有udf函數的接口(實現方式為接口或者抽象類)。例如MapFunction, FilterFunction, ProcessFunction等等。
自定義函數類實現MapFunction接口:
def main(args: Array[String]): Unit = { // TODO 從文件中獲取數據源 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1) val list = List( WaterSensor("sensor_1", 150000L, 25), WaterSensor("sensor_1", 150001L, 27), WaterSensor("sensor_1", 150005L, 30), WaterSensor("sensor_1", 150007L, 40) ) val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list) // UDF函數:自定義函數進行數據的處理 //waterSensorDS.map(ws=>(ws.id, ws.vc)) // 也可以使用函數類來代替匿名函數 val mapFunctionDS: DataStream[(String, Int)] = waterSensorDS.map( new MyMapFunction ) mapFunctionDS.print("mapfun>>>") env.execute() } // 自定義UDF函數。來實現映射轉換功能 // 1. 繼承MapFunction // 2. 重寫方法 class MyMapFunction extends MapFunction[WaterSensor, (String, Int)]{ override def map(ws: WaterSensor): (String, Int) = { (ws.id, ws.vc) } }
3.2 匿名函數(Lambda Functions)
val tweets: DataStream[String] = ... val flinkTweets = tweets.filter(_.contains("flink"))
3.3 富函數(Rich Functions)
“富函數”是DataStream API提供的一個函數類的接口,所有Flink函數類都有其Rich版本。它與常規函數的不同在於,可以獲取運行環境的上下文,並擁有一些生命周期方法,所以可以實現更復雜的功能。也有意味着提供了更多的,更豐富的功能
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- ...
Rich Function有一個生命周期的概念。典型的生命周期方法有:
open()
方法是rich function的初始化方法,當一個算子例如map或者filter被調用之前open()會被調用。close()
方法是生命周期中的最后一個調用的方法,做一些清理工作。getRuntimeContext()
方法提供了函數的RuntimeContext的一些信息,例如函數執行的並行度,任務的名字,以及state狀態
def main(args: Array[String]): Unit = { // TODO 從文件中獲取數據源 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1) val list = List( WaterSensor("sensor_1", 150000L, 25), WaterSensor("sensor_1", 150001L, 27), WaterSensor("sensor_1", 150005L, 30), WaterSensor("sensor_1", 150007L, 40) ) val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list) // UDF函數:自定義函數進行數據的處理 //waterSensorDS.map(ws=>(ws.id, ws.vc)) // 也可以使用函數類來代替匿名函數 val mapFunctionDS: DataStream[(String, Int)] = waterSensorDS.map( new MyMapRichFunction ) mapFunctionDS.print("mapfun>>>") env.execute() } // 自定義UDF 富函數。來實現映射轉換功能 // 1. 繼承RichMapFunction // 2. 重寫方法 class MyMapRichFunction extends RichMapFunction[WaterSensor, (String, Int)] { override def open(parameters: Configuration): Unit = super.open(parameters) override def map(ws: WaterSensor): (String, Int) = { //getRuntimeContext. (ws.id, getRuntimeContext.getIndexOfThisSubtask) } override def close(): Unit = super.close() }