Flink的算子
flink代碼分為三部分:
1、Source----數據源,讀取數據
2、Transformation----轉換,對數據進行處理,也就是算子
3、Sink----將數據發出去
Transformation:數據轉換的各種操作,有Map / FlatMap / Filter / KeyBy / Reduce / Fold /
Window / WindowAll / Union / Window join / Split / Select / Project等,操作很多,可以將數據轉換計算成你想要的數據。
1、Map
Map 算子的輸入流是 DataStream,經過 Map 算子后返回的數據格式是 SingleOutputStreamOperator 類型,獲取一個元素並生成一個元素,舉個例子:
新的一年給每個員工的工資加 5000。
SingleOutputStreamOperator<Employee> map = employeeStream.map(new MapFunction<Employee, Employee>() {
@Override
public Employee map(Employee employee) throws Exception {
employee.salary = employee.salary + 5000;
return employee;
}
});
map.print();
Map 程序示例
package com.shujia.flink.tf
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
object Demo1Map {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀取學生文件創建DS
val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
/**
* map 算子有兩種格式:
* 1、傳入一個函數 -- scala api
* 2、傳入一個MapFunction -- java api(使用較多)
*/
val mapDS: DataStream[(String, Int)] = studentDS.map(new MapFunction[String, (String, Int)] {
/**
* @param value : DS中的一行數據
* @return 返回數據的類型
*/
//重寫MapFunction的map方法
override def map(value: String): (String, Int) = {
val clazz: String = value.split(",")(4)
(clazz, 1)
}
})
mapDS
.keyBy(_._1)
.sum(1)
.print()
env.execute()
}
}
2、FlatMap
FlatMap 算子的輸入流是 DataStream,經過 FlatMap 算子后返回的數據格式是 SingleOutputStreamOperator 類型,獲取一個元素並生成零個、一個或多個元素,舉個例子:
將工資大於 40000 的找出來
SingleOutputStreamOperator<Employee> flatMap = employeeStream.flatMap(new FlatMapFunction<Employee, Employee>() {
@Override
public void flatMap(Employee employee, Collector<Employee> out) throws Exception {
if (employee.salary >= 40000) {
out.collect(employee);
}
}
});
flatMap.print();
FlatMap 程序示例
package com.shujia.flink.tf
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object Demo2FlatMap {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀取數據構建DS
val linesDS: DataStream[String] = env.readTextFile("data/words.txt")
//scala api
val scalaDS: DataStream[String] = linesDS.flatMap(_.split(","))
//java api
val wordDS: DataStream[String] = linesDS.flatMap(new FlatMapFunction[String, String] {
/**
* @param value : 原始的數據類型
* @param out : 用於將數據發送到下游
*/
//重寫FlatMapFunction的flatMap方法
override def flatMap(value: String, out: Collector[String]): Unit = {
val split: Array[String] = value.split(",")
//將數據一條一條發送到下游
for (word <- split) {
//將數據發送到下游
out.collect(word)
}
}
})
wordDS.print()
env.execute()
}
}
3、Filter
SingleOutputStreamOperator filter = ds.filter(new FilterFunction<Employee>() {
@Override
public boolean filter(Employee employee) throws Exception {
if (employee.salary >= 40000) {
return true;
}
return false;
}
});
filter.print();
對每個元素都進行判斷,返回為 true 的元素,如果為 false 則丟棄數據,上面找出工資大於 40000 的員工其實也可以用 Filter 來做
Filter 程序示例
package com.shujia.flink.tf
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object Demo3Filter {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀書學生數據構建DS
val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
val filterDS: DataStream[String] = studentDS.filter(new FilterFunction[String] {
/**
* 在Flink中沒有轉換算子和操作算子的區分
*
* @param value : 數據
* @return : 返回值,如果返回true 保留數據,如果返回false 過濾數據
*/
override def filter(value: String): Boolean = {
val gender: String = value.split(",")(3)
"男".equals(gender) //gender.equals("男")--這樣寫可能會出現空指針異常
}
})
filterDS.print()
env.execute()
}
}
4、KeyBy
KeyBy 在邏輯上是基於 key 對流進行分區,相同的 Key 會被分到一個分區(這里分區指的就是下游算子多個並行節點的其中一個)。在內部,它使用 hash 函數對流進行分區。它返回 KeyedDataStream 數據流。舉個例子:根據商品的店鋪 id 來進行分區
KeyedStream<ProductEvent, Integer> keyBy = productStream.keyBy(new KeySelector<ProductEvent, Integer>() {
@Override
public Integer getKey(ProductEvent product) throws Exception {
return product.shopId;
}
});
keyBy.print();
KeyBy 程序示例
package com.shujia.flink.tf
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
object Demo4KeyBY {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
//讀取socket構建DS
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
//轉成KV格式,便於查看
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
/**
* keyBy:將相同的key發送到同一個task中
*/
//scala api
//kvDS.keyBy(kv => kv._1).print()
//java api
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(new KeySelector[(String, Int), String] {
//重寫KeySelector中的getkey方法
override def getKey(value: (String, Int)): String = {
value._1
}
})
keyByDS.print()
env.execute()
}
}
5、Reduce
Reduce 返回單個的結果值,並且 reduce 操作每處理一個元素總是創建一個新值。
常用的方法有 average、sum、min、max、count,使用 Reduce 方法都可實現。
舉例:上面先將數據流進行 keyby 操作,因為執行 Reduce 操作只能是 KeyedStream,然后將員工的工資做了一個求平均值的操作。
SingleOutputStreamOperator<Employee> reduce = employeeStream.keyBy(new KeySelector<Employee, Integer>() {
@Override
public Integer getKey(Employee employee) throws Exception {
return employee.shopId;
}
}).reduce(new ReduceFunction<Employee>() {
@Override
public Employee reduce(Employee employee1, Employee employee2) throws Exception {
employee1.salary = (employee1.salary + employee2.salary) / 2;
return employee1;
}
});
reduce.print();
Reduce 程序示例
package com.shujia.flink.tf
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
object Demo5Reduce {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
/**
* reduce: 分組之后進行聚合計算
*/
//scala api
//val reduceDS: DataStream[(String, Int)] = keyByDS.reduce((x, y) => (y._1, x._2 + y._2))
//java api
val reduceDS: DataStream[(String, Int)] = keyByDS.reduce(new ReduceFunction[(String, Int)] {
//重寫ReduceFunction的reduce方法
override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
(value1._1, value1._2 + value2._2)
}
})
reduceDS.print()
env.execute()
}
}
6、Aggregations
DataStream API 支持各種聚合,例如 min、max、sum 等。
這些函數可以應用於 KeyedStream 以獲得 Aggregations 聚合。
max 和 maxBy 之間的區別在於 max 返回流中的最大值,但 maxBy 返回具有最大值的鍵, min 和 minBy 同理。
KeyedStream.sum(0)
KeyedStream.sum("key")
KeyedStream.min(0)
KeyedStream.min("key")
KeyedStream.max(0)
KeyedStream.max("key")
KeyedStream.minBy(0)
KeyedStream.minBy("key")
KeyedStream.maxBy(0)
KeyedStream.maxBy("key")
Aggregations 程序示例
package com.shujia.flink.tf
import org.apache.flink.streaming.api.scala._
object Demo6Agg {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀取學生數據構建DS
val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
/**
* 取出每個班級年齡最大的學生
*/
val stuDS: DataStream[(String, String, Int, String, String)] = studentDS.map(line => {
val split: Array[String] = line.split(",")
(split(0), split(1), split(2).toInt, split(3), split(4))
})
val keyByDS: KeyedStream[(String, String, Int, String, String), String] = stuDS.keyBy(_._5) //取出班級進行分組----元組中的第五個元素
//取出年齡最大的那個學生----keyByDS中的第三個元素
//val maxDS: DataStream[(String, String, Int, String, String)] = keyByDS.max(2)
//maxDS.print()
/**
* 注意:使用max(),返回的是每個班級最大的年齡,但是這個最大的年齡對應的不一定是他本人
* 也就是說,最大的年齡可以返回出來,但是其他幾個元素並不一定是對應的。
* 使用maxBy()可以將最大值返回出來,也可以將該最大值對應的其他信息返回出來
*/
/**
* sum
* max
* min
* maxBy
* minBy
*
* max 和 maxBy 之間的區別在於 max 返回流中的最大值,
* 但 maxBy 返回具有最大值的鍵, min 和 minBy 同理。
*/
//maxBy() -- 參數可以傳入 列名 或 下標
val maxDS: DataStream[(String, String, Int, String, String)] = keyByDS.maxBy(2)
maxDS.print()
env.execute()
}
}
7、Window
Window 函數允許按時間或其他條件對現有 KeyedStream 進行分組。 以下是以 10 秒的時間窗口聚合:
inputStream.keyBy(0).window(Time.seconds(10));
有時候因為業務需求場景要求:聚合一分鍾、一小時的數據做統計報表使用。
Window 程序示例
這邊先看個簡單的,之后有 Window 詳解
package com.shujia.flink.tf
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object Demo7Window {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
/**
* 每隔5秒統計單詞的數量
* timeWindow() -- 里面可以傳入一個參數(滾動窗口)或者兩個參數(滑動窗口)
*/
kvDS
.keyBy(_._1)
.timeWindow(Time.seconds(5)) //滾動窗口,本次設置5秒,每擱5秒算一次
.sum(1)
.print()
env.execute()
}
}
8、Union
Union 函數將兩個或多個數據流結合在一起。 這樣后面在使用的時候就只需使用一個數據流就行了。 如果我們將一個流與自身組合,那么組合后的數據流會有兩份同樣的數據。
inputStream.union(inputStream1, inputStream2, ...);
Union 程序示例
package com.shujia.flink.tf
// 當代碼中沒有用到Scala API的時候不需要導 _ (隱式轉換)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object Demo8Union {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val ds1: DataStream[String] = env.socketTextStream("master", 7777)
val ds2: DataStream[String] = env.socketTextStream("master", 8888)
/**
* 將多個流合並成一個流,類型要一致
* 在虛擬機中要開兩個master,一個登錄7777.一個登錄8888
*/
val unionDS: DataStream[String] = ds1.union(ds2)
unionDS.print()
env.execute()
}
}
9、Window Join
我們可以通過一些 key 將同一個 window 的兩個數據流 join 起來。
inputStream.join(inputStream1)
.where(0).equalTo(1)
.window(Time.seconds(5))
.apply (new JoinFunction () {...});
以上示例是在 5 秒的窗口中連接兩個流,其中第一個流的第一個屬性的連接條件等於另一個流的第二個屬性。
10、Split
此功能根據條件將流拆分為兩個或多個流。 當你獲得混合流然后你可能希望單獨處理每個數據流時,可以使用此方法。
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
});
上面就是將偶數數據流放在 even 中,將奇數數據流放在 odd 中。
11、Select
上面用 Split 算子將數據流拆分成兩個數據流(奇數、偶數),接下來你可能想從拆分流中選擇特定流,那么就得搭配使用 Select 算子(一般這兩者都是搭配在一起使用的),
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");