Flink 的 Transformations 操作主要用於將一個和多個 DataStream 按需轉換成新的 DataStream。主要分為以下三類:
- DataStream Transformations:進行數據流相關轉換操作;
- Physical partitioning:物理分區。Flink 提供的底層 API ,允許用戶定義數據的分區規則;
- Task chaining and resource groups:任務鏈和資源組。允許用戶進行任務鏈和資源組的細粒度的控制。
Map [DataStream->DataStream]
對一個 DataStream 中的每個元素都執行特定的轉換操作:
DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5); integerDataStream.map((MapFunction<Integer, Object>) value -> value * 2).print(); // 輸出 2,4,6,8,10
val data = 1 to 10 import org.apache.flink.api.scala._ env.fromCollection(data).map(x => x + 1).print() env.fromCollection(data).map(_ + 1).print()
FlatMap 中的一個輸入元素可以被映射成一個或者多個輸出元素,示例如下:
String str = "one one one two two"; String str1 = "third third third four"; DataSource<String> stringDataSource = env.fromElements(str, str1); stringDataSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { for (String str : s.split(" ")) { collector.collect(str); } } }).print();
public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); List<String> list = new ArrayList<String>(); list.add("I love Beijing"); list.add("I love China"); list.add("Beijing is the capital of China"); DataSet<String> source = env.fromCollection(list); source.map(new MapFunction<String, List<String>>() { @Override public List<String> map(String line) throws Exception { String[] words = line.split(" "); List<String> wds = new ArrayList<String>(); for (String word : words) { wds.add(word); } return wds; } }).print(); System.out.println("*************************************"); source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> collector) throws Exception { String[] words = line.split(" "); for (String word : words) { collector.collect(word); } } }).print(); System.out.println("*************************************"); source.mapPartition(new MapPartitionFunction<String, String>() { Integer index = 0; @Override public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception { Iterator<String> iterator = iterable.iterator(); while (iterator.hasNext()) { String line = iterator.next(); String[] words = line.split(" "); for (String word : words) { collector.collect("分區" + index + ",單詞為:" + word); } } index++; } }).print(); }
[I, love, Beijing] [I, love, China] [Beijing, is, the, capital, of, China] ************************************* I love Beijing I love China Beijing is the capital of China ************************************* 分區0,單詞為:I 分區0,單詞為:love 分區0,單詞為:Beijing 分區0,單詞為:I 分區0,單詞為:love 分區0,單詞為:China 分區0,單詞為:Beijing 分區0,單詞為:is 分區0,單詞為:the 分區0,單詞為:capital 分區0,單詞為:of 分區0,單詞為:China
source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> collector) throws Exception { String[] words = line.split(" "); for (String word : words) { collector.collect(word); } } }).map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } }).groupBy(0).sum(1).print(); System.out.println("*************************************"); (China,2) (is,1) (love,2) (the,1) (of,1) (Beijing,2) (I,2) (capital,1)
scala實現
def flapMapFunction(env: ExecutionEnvironment): Unit = { val info = new ListBuffer[String](); info.append("hadoop,flink") info.append("java,hadoop") info.append("flink,java") import org.apache.flink.api.scala._ val data = env.fromCollection(info) data.flatMap(_.split(",")).map((_, 1)).groupBy(0).sum(1).print() }
Filter: 過濾出需要的數據
StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserAction> source = envStream.fromCollection(Arrays.asList(new UserAction("user1", 10001, "click", "iphone", BigDecimal.valueOf(10)), new UserAction("user2", 10002, "browse", "pc", BigDecimal.valueOf(12)), new UserAction("user1", 10001, "click", "mac", BigDecimal.valueOf(10)))); SingleOutputStreamOperator<UserAction> result = source.filter(new FilterFunction<UserAction>() { @Override public boolean filter(UserAction userAction) throws Exception { return userAction.getUserId().equalsIgnoreCase("user1"); } }); result.print(); envStream.execute();
- KeyBy [DataStream → KeyedStream] :用於將相同 Key 值的數據分到相同的分區中;
分區結果和KeyBy下游算子的並行度強相關。如下游算子只有一個並行度,不管怎么分,都會分到一起。
對於POJO類型,KeyBy可以通過keyBy(fieldName)指定字段進行分區。
對於Tuple類型,KeyBy可以通過keyBy(fieldPosition)指定字段進行分區。
對於一般類型,如上, KeyBy可以通過keyBy(new KeySelector {...})指定字段進行分區。
StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserAction> source = envStream.fromCollection(Arrays.asList(new UserAction("user1", 10001, "click", "iphone", BigDecimal.valueOf(10)), new UserAction("user2", 10002, "browse", "pc", BigDecimal.valueOf(12)), new UserAction("user1", 10001, "click", "mac", BigDecimal.valueOf(10)))); KeyedStream<UserAction, String> result = source.keyBy(new KeySelector<UserAction, String>() { @Override public String getKey(UserAction userAction) throws Exception { return userAction.getUserId(); } }); result.print().setParallelism(3); envStream.execute();
- Reduce [KeyedStream → DataStream] :用於對數據執行歸約計算。
Reduce: 基於ReduceFunction進行滾動聚合,並向下游算子輸出每次滾動聚合后的結果。注意: Reduce會輸出每一次滾動聚合的結果。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 輸入: 用戶行為。某個用戶在某個時刻點擊或瀏覽了某個商品,以及商品的價格。 DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList( new UserAction("user1", 1293984000, "click", "productID1", BigDecimal.valueOf(12)), new UserAction("user2", 1293984001, "browse", "productID2", BigDecimal.valueOf(12)), new UserAction("user2", 1293984002, "browse", "productID2", BigDecimal.valueOf(12)), new UserAction("user2", 1293984003, "browse", "productID2", BigDecimal.valueOf(12)), new UserAction("user1", 1293984002, "click", "productID1", BigDecimal.valueOf(12)), new UserAction("user1", 1293984003, "click", "productID3", BigDecimal.valueOf(12)), new UserAction("user1", 1293984004, "click", "productID1", BigDecimal.valueOf(12)) )); KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() { @Override public String getKey(UserAction value) throws Exception { return value.getUserId(); } }); SingleOutputStreamOperator<UserAction> result = keyedStream.reduce(new ReduceFunction<UserAction>() { @Override public UserAction reduce(UserAction userAction, UserAction t1) throws Exception { BigDecimal price = userAction.getPrice().add(t1.getPrice()); return new UserAction(userAction.getUserId(), userAction.getUserNo() , userAction.getAction(), userAction.getProduct(), price); } }); result.print(); env.execute();
Aggregate 對KeyedStream按指定字段滾動聚合並輸出每一次滾動聚合后的結果。默認的聚合函數有:sum
、min
、minBy
、max
、mabBy
。
max(field)
與maxBy(field)
的區別:maxBy
返回field
最大的那條數據;而max
則是將最大的field
的值賦值給第一條數據並返回第一條數據。同理,min
與minBy
。- Aggregate聚合算子會滾動輸出每一次聚合后的結果。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 輸入: 用戶行為。某個用戶在某個時刻點擊或瀏覽了某個商品,以及商品的價格。 DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList( new UserAction("user1", 1293984000, "click", "productID1", BigDecimal.valueOf(30)), new UserAction("user2", 1293984001, "browse", "productID2", BigDecimal.valueOf(12)), new UserAction("user2", 1293984002, "browse11", "productID2", BigDecimal.valueOf(25)), new UserAction("user2", 1293984003, "browse", "productID2", BigDecimal.valueOf(12)), new UserAction("user1", 1293984002, "click11", "productID1", BigDecimal.valueOf(40)), new UserAction("user1", 1293984003, "click", "productID3", BigDecimal.valueOf(12)), new UserAction("user1", 1293984004, "click", "productID1", BigDecimal.valueOf(12)) )); KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() { @Override public String getKey(UserAction value) throws Exception { return value.getUserId(); } }); keyedStream.maxBy("price").print(); keyedStream.max("price").print(); env.execute();
Connect 操作用於連接兩個或者多個類型不同的 DataStream ,其返回的類型是 ConnectedStreams ,此時被連接的多個 DataStreams 可以共享彼此之間的數據狀態。但是需要注意的是由於不同 DataStream 之間的數據類型是不同的,如果想要進行后續的計算操作,還需要通過 CoMap 或 CoFlatMap 將 ConnectedStreams 轉換回 DataStream:
DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(new Tuple2<>("a", 2), new Tuple2<>("b", 5)); DataStreamSource<Integer> integerDataStreamSource = env.fromElements(2, 3, 6); ConnectedStreams<Tuple2<String, Integer>, Integer> connect = tuple2DataStreamSource.connect(integerDataStreamSource); connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() { @Override public Integer map1(Tuple2<String, Integer> value) throws Exception { return value.f1; } @Override public Integer map2(Integer value) throws Exception { return value; } }).map(x -> x * 100).print(); env.execute();
- Split [DataStream → SplitStream]:用於將一個 DataStream 按照指定規則進行拆分為多個 DataStream,需要注意的是這里進行的是邏輯拆分,即 Split 只是將數據貼上不同的類型標簽,但最終返回的仍然只是一個 SplitStream;
- Select [SplitStream → DataStream]:想要從邏輯拆分的 SplitStream 中獲取真實的不同類型的 DataStream,需要使用 Select 算子,示例如下:
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8); // 標記 SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); output.add(value % 2 == 0 ? "even" : "odd"); return output; } }); // 獲取偶數數據集 split.select("even").print(); // 輸出 2,4,6,8
物理分區 (Physical partitioning) 是 Flink 提供的底層的 API,允許用戶采用內置的分區規則或者自定義的分區規則來對數據進行分區,從而避免數據在某些分區上過於傾斜,常用的分區規則如下:
Random partitioning [DataStream → DataStream]
隨機分區 (Random partitioning) 用於隨機的將數據分布到所有下游分區中,通過 shuffle 方法來進行實現:
dataStream.shuffle();
Rebalancing [DataStream → DataStream]
Rebalancing 采用輪詢的方式將數據進行分區,其適合於存在數據傾斜的場景下,通過 rebalance 方法進行實現:
dataStream.rebalance();
當采用 Rebalancing 進行分區平衡時,其實現的是全局性的負載均衡,數據會通過網絡傳輸到其他節點上並完成分區數據的均衡。 而 Rescaling 則是低配版本的 rebalance,它不需要額外的網絡開銷,它只會對上下游的算子之間進行重新均衡,通過 rescale 方法進行實現:
dataStream.rescale();
ReScale 這個單詞具有重新縮放的意義,其對應的操作也是如此,具體如下:如果上游 operation 並行度為 2,而下游的 operation 並行度為 6,則其中 1 個上游的 operation 會將元素分發到 3 個下游 operation,另 1 個上游 operation 則會將元素分發到另外 3 個下游 operation。反之亦然,如果上游的 operation 並行度為 6,而下游 operation 並行度為 2,則其中 3 個上游 operation 會將元素分發到 1 個下游 operation,另 3 個上游 operation 會將元素分發到另外 1 個下游operation:
Broadcasting [DataStream → DataStream]
將數據分發到所有分區上。通常用於小數據集與大數據集進行關聯的情況下,此時可以將小數據集廣播到所有分區上,避免頻繁的跨分區關聯,通過 broadcast 方法進行實現:
dataStream.broadcast();
Custom partitioning [DataStream → DataStream]
Flink 運行用戶采用自定義的分區規則來實現分區,此時需要通過實現 Partitioner 接口來自定義分區規則,並指定對應的分區鍵,示例如下:
DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromElements(new Tuple2<>("Hadoop", 1), new Tuple2<>("Spark", 1), new Tuple2<>("Flink-streaming", 2), new Tuple2<>("Flink-batch", 4), new Tuple2<>("Storm", 4), new Tuple2<>("HBase", 3)); streamSource.partitionCustom(new Partitioner<String>() { @Override public int partition(String key, int numPartitions) { // 將第一個字段包含flink的Tuple2分配到同一個分區 return key.toLowerCase().contains("flink") ? 0 : 1; } }, 0).print(); // 輸出如下: 1> (Flink-streaming,2) 1> (Flink-batch,4) 2> (Hadoop,1) 2> (Spark,1) 2> (Storm,4) 2> (HBase,3)
任務鏈和資源組
任務鏈和資源組 ( Task chaining and resource groups ) 也是 Flink 提供的底層 API,用於控制任務鏈和資源分配。默認情況下,如果操作允許 (例如相鄰的兩次 map 操作) ,則 Flink 會嘗試將它們在同一個線程內進行,從而可以獲取更好的性能。但是 Flink 也允許用戶自己來控制這些行為,這就是任務鏈和資源組 API:
startNewChain
startNewChain 用於基於當前 operation 開啟一個新的任務鏈。如下所示,基於第一個 map 開啟一個新的任務鏈,此時前一個 map 和 后一個 map 將處於同一個新的任務鏈中,但它們與 filter 操作則分別處於不同的任務鏈中:
someStream.filter(...).map(...).startNewChain().map(...);
disableChaining
disableChaining 操作用於禁止將其他操作與當前操作放置於同一個任務鏈中,示例如下
someStream.map(...).disableChaining();
slotSharingGroup
slot 是任務管理器 (TaskManager) 所擁有資源的固定子集,每個操作 (operation) 的子任務 (sub task) 都需要獲取 slot 來執行計算,但每個操作所需要資源的大小都是不相同的,為了更好地利用資源,Flink 允許不同操作的子任務被部署到同一 slot 中。slotSharingGroup 用於設置操作的 slot 共享組 (slot sharing group) ,Flink 會將具有相同 slot 共享組的操作放到同一個 slot 中 。示例如下:
someStream.filter(...).slotSharingGroup("slotSharingGroupName");
First函數
def firstFunction(env: ExecutionEnvironment): Unit = { val info = ListBuffer[(Int, String)](); info.append((1, "hadoop")) info.append((1, "spark")) info.append((1, "flink")) info.append((2, "java")) info.append((2, "httpclient")) info.append((3, "linux")) import org.apache.flink.api.scala._ val data = env.fromCollection(info) data.first(3).print() data.groupBy(0).first(2).print() data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() } -------------------------------結果-------------------- (1,hadoop) (1,spark) (1,flink) (3,linux) (1,hadoop) (1,spark) (2,java) (2,httpclient) (3,linux) (1,flink) (1,hadoop) (2,httpclient) (2,java)
JOIN
def joinFunction(env: ExecutionEnvironment): Unit = { val groupO = ListBuffer[(Int, String)]() groupO.append((1, "小明")) groupO.append((2, "王二")) groupO.append((3, "趙三")) groupO.append((4, "李四")) val groupT = ListBuffer[(Int, String)]() groupT.append((1, "上海")) groupT.append((2, "北京")) groupT.append((3, "武漢")) groupT.append((5, "廈門")) val data1 = env.fromCollection(groupO) val data2 = env.fromCollection(groupT) data1.join(data2).where(0).equalTo(0).apply((first, second) => { (first._1, first._2, second._2) }).print() }
------------------結果-------------------
(3,趙三,武漢)
(1,小明,上海)
(2,王二,北京)
java實現
public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<Integer, String>> groupOne = new ArrayList<>(); groupOne.add(new Tuple2<>(1, "小明")); groupOne.add(new Tuple2<>(2, "小王")); groupOne.add(new Tuple2<>(3, "小張")); groupOne.add(new Tuple2<>(4, "小趙")); List<Tuple2<Integer, String>> groupTwo = new ArrayList<>(); groupTwo.add(new Tuple2<>(1, "北京")); groupTwo.add(new Tuple2<>(2, "上海")); groupTwo.add(new Tuple2<>(3, "武漢")); groupTwo.add(new Tuple2<>(5, "華盛頓")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(groupOne); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(groupTwo); data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.f0, first.f1, second.f1); } }).print(); }
leftOutJoin()
def leftOuterjoinFunction(env: ExecutionEnvironment): Unit = { val groupO = ListBuffer[(Int, String)]() groupO.append((1, "小明")) groupO.append((2, "王二")) groupO.append((3, "趙三")) groupO.append((4, "李四")) val groupT = ListBuffer[(Int, String)]() groupT.append((1, "上海")) groupT.append((2, "北京")) groupT.append((3, "武漢")) groupT.append((5, "廈門")) val data1 = env.fromCollection(groupO) val data2 = env.fromCollection(groupT) data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (second == null) { (first._1, first._2, "-") } else { (first._1, first._2, second._2) } }).print() }
Split和Select
object SplitAndSelectApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment var counter = 0L val data = env.addSource(new SourceFunction[Long] { override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (counter < 100) { ctx.collect(counter) counter += 1 Thread.sleep(1000) } } override def cancel(): Unit = { return false } }) val splits = data.split(new OutputSelector[Long] { override def select(value: Long): lang.Iterable[String] = { val list = new util.ArrayList[String](); if (value % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) splits.select("even").print().setParallelism(1) env.execute("splitAndSelect") } }