Flink的算子


Flink的算子

flink代碼分為三部分:

1、Source----數據源,讀取數據

2、Transformation----轉換,對數據進行處理,也就是算子

3、Sink----將數據發出去

Transformation:數據轉換的各種操作,有Map / FlatMap / Filter / KeyBy / Reduce / Fold /

Window / WindowAll / Union / Window join / Split / Select / Project等,操作很多,可以將數據轉換計算成你想要的數據。

1、Map

Map 算子的輸入流是 DataStream,經過 Map 算子后返回的數據格式是 SingleOutputStreamOperator 類型,獲取一個元素並生成一個元素,舉個例子:

新的一年給每個員工的工資加 5000。

SingleOutputStreamOperator<Employee> map = employeeStream.map(new MapFunction<Employee, Employee>() {
    @Override
    public Employee map(Employee employee) throws Exception {
        employee.salary = employee.salary + 5000;
        return employee;
    }
});
map.print();

Map 程序示例

package com.shujia.flink.tf

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._

object Demo1Map {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	//讀取學生文件創建DS
    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")

    /**
      * map 算子有兩種格式:
      * 1、傳入一個函數 -- scala api
      * 2、傳入一個MapFunction -- java api(使用較多)
      */
      
    val mapDS: DataStream[(String, Int)] = studentDS.map(new MapFunction[String, (String, Int)] {
      /**
        * @param value : DS中的一行數據
        * @return 返回數據的類型
        */
      //重寫MapFunction的map方法
      override def map(value: String): (String, Int) = {
        val clazz: String = value.split(",")(4)
        (clazz, 1)
      }
    })

    mapDS
      .keyBy(_._1)
      .sum(1)
      .print()

    env.execute()

  }
}

2、FlatMap

FlatMap 算子的輸入流是 DataStream,經過 FlatMap 算子后返回的數據格式是 SingleOutputStreamOperator 類型,獲取一個元素並生成零個、一個或多個元素,舉個例子:

將工資大於 40000 的找出來

SingleOutputStreamOperator<Employee> flatMap = employeeStream.flatMap(new FlatMapFunction<Employee, Employee>() {
    @Override
    public void flatMap(Employee employee, Collector<Employee> out) throws Exception {
        if (employee.salary >= 40000) {
            out.collect(employee);
        }
    }
});
flatMap.print();

FlatMap 程序示例

package com.shujia.flink.tf

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object Demo2FlatMap {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	//讀取數據構建DS
    val linesDS: DataStream[String] = env.readTextFile("data/words.txt")

    //scala api
    val scalaDS: DataStream[String] = linesDS.flatMap(_.split(","))

    //java api
    val wordDS: DataStream[String] = linesDS.flatMap(new FlatMapFunction[String, String] {
      /**
        * @param value : 原始的數據類型
        * @param out   : 用於將數據發送到下游
        */
      //重寫FlatMapFunction的flatMap方法
      override def flatMap(value: String, out: Collector[String]): Unit = {
        val split: Array[String] = value.split(",")
        //將數據一條一條發送到下游
        for (word <- split) {
          //將數據發送到下游
          out.collect(word)
        }
      }
    })

    wordDS.print()

    env.execute()

  }
}

3、Filter

SingleOutputStreamOperator  filter = ds.filter(new FilterFunction<Employee>() {
    @Override
    public boolean filter(Employee employee) throws Exception {
        if (employee.salary >= 40000) {
            return true;
        }
        return false;
    }
});
filter.print();

對每個元素都進行判斷,返回為 true 的元素,如果為 false 則丟棄數據,上面找出工資大於 40000 的員工其實也可以用 Filter 來做

Filter 程序示例

package com.shujia.flink.tf

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Demo3Filter {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	//讀書學生數據構建DS
    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")

    val filterDS: DataStream[String] = studentDS.filter(new FilterFunction[String] {
      /**
        * 在Flink中沒有轉換算子和操作算子的區分
        *
        * @param value : 數據
        * @return : 返回值,如果返回true 保留數據,如果返回false 過濾數據
        */
      override def filter(value: String): Boolean = {
        val gender: String = value.split(",")(3)
        "男".equals(gender) //gender.equals("男")--這樣寫可能會出現空指針異常
      }
    })

    filterDS.print()

    env.execute()
  }
}

4、KeyBy

img

KeyBy 在邏輯上是基於 key 對流進行分區,相同的 Key 會被分到一個分區(這里分區指的就是下游算子多個並行節點的其中一個)。在內部,它使用 hash 函數對流進行分區。它返回 KeyedDataStream 數據流。舉個例子:根據商品的店鋪 id 來進行分區

KeyedStream<ProductEvent, Integer> keyBy = productStream.keyBy(new KeySelector<ProductEvent, Integer>() {
    @Override
    public Integer getKey(ProductEvent product) throws Exception {
        return product.shopId;
    }
});
keyBy.print();

KeyBy 程序示例

package com.shujia.flink.tf

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._

object Demo4KeyBY {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(2)
	//讀取socket構建DS
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
	
    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
	//轉成KV格式,便於查看
    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
     /**
      * keyBy:將相同的key發送到同一個task中
      */
      
    //scala api
    //kvDS.keyBy(kv => kv._1).print()

    //java api
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(new KeySelector[(String, Int), String] {
      //重寫KeySelector中的getkey方法
      override def getKey(value: (String, Int)): String = {
        value._1
      }
    })

    keyByDS.print()

    env.execute()

  }
}

5、Reduce

Reduce 返回單個的結果值,並且 reduce 操作每處理一個元素總是創建一個新值。

常用的方法有 average、sum、min、max、count,使用 Reduce 方法都可實現。

舉例:上面先將數據流進行 keyby 操作,因為執行 Reduce 操作只能是 KeyedStream,然后將員工的工資做了一個求平均值的操作。

SingleOutputStreamOperator<Employee> reduce = employeeStream.keyBy(new KeySelector<Employee, Integer>() {
    @Override
    public Integer getKey(Employee employee) throws Exception {
        return employee.shopId;
    }
}).reduce(new ReduceFunction<Employee>() {
    @Override
    public Employee reduce(Employee employee1, Employee employee2) throws Exception {
        employee1.salary = (employee1.salary + employee2.salary) / 2;
        return employee1;
    }
});
reduce.print();

Reduce 程序示例

package com.shujia.flink.tf

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._

object Demo5Reduce {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(2)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
     /**
      * reduce: 分組之后進行聚合計算
      */

    //scala api
    //val reduceDS: DataStream[(String, Int)] = keyByDS.reduce((x, y) => (y._1, x._2 + y._2))

    //java api
    val reduceDS: DataStream[(String, Int)] = keyByDS.reduce(new ReduceFunction[(String, Int)] {
      //重寫ReduceFunction的reduce方法
      override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
        (value1._1, value1._2 + value2._2)
      }
    })

    reduceDS.print()

    env.execute()

  }
}

6、Aggregations

DataStream API 支持各種聚合,例如 min、max、sum 等。

這些函數可以應用於 KeyedStream 以獲得 Aggregations 聚合。

max 和 maxBy 之間的區別在於 max 返回流中的最大值,但 maxBy 返回具有最大值的鍵, min 和 minBy 同理。

KeyedStream.sum(0) 
KeyedStream.sum("key") 
KeyedStream.min(0) 
KeyedStream.min("key") 
KeyedStream.max(0) 
KeyedStream.max("key") 
KeyedStream.minBy(0) 
KeyedStream.minBy("key") 
KeyedStream.maxBy(0) 
KeyedStream.maxBy("key")

Aggregations 程序示例

package com.shujia.flink.tf

import org.apache.flink.streaming.api.scala._

object Demo6Agg {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	//讀取學生數據構建DS
    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
     /**
      * 取出每個班級年齡最大的學生
      */
    val stuDS: DataStream[(String, String, Int, String, String)] = studentDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1), split(2).toInt, split(3), split(4))
    })

    val keyByDS: KeyedStream[(String, String, Int, String, String), String] = stuDS.keyBy(_._5)	//取出班級進行分組----元組中的第五個元素 
      
//取出年齡最大的那個學生----keyByDS中的第三個元素
//val maxDS: DataStream[(String, String, Int, String, String)] = keyByDS.max(2)
      //maxDS.print()
      /**
      * 注意:使用max(),返回的是每個班級最大的年齡,但是這個最大的年齡對應的不一定是他本人
      * 	 也就是說,最大的年齡可以返回出來,但是其他幾個元素並不一定是對應的。
      *		 使用maxBy()可以將最大值返回出來,也可以將該最大值對應的其他信息返回出來
      */
      
    /**
      * sum
      * max
      * min
      * maxBy
      * minBy
      *
      * max 和 maxBy 之間的區別在於 max 返回流中的最大值,
      *	但 maxBy 返回具有最大值的鍵, min 和 minBy 同理。
      */
    
    //maxBy() -- 參數可以傳入 列名 或 下標
    val maxDS: DataStream[(String, String, Int, String, String)] = keyByDS.maxBy(2)

    maxDS.print()

    env.execute()
  }
}

7、Window

Window 函數允許按時間或其他條件對現有 KeyedStream 進行分組。 以下是以 10 秒的時間窗口聚合:

inputStream.keyBy(0).window(Time.seconds(10));

有時候因為業務需求場景要求:聚合一分鍾、一小時的數據做統計報表使用。

Window 程序示例

這邊先看個簡單的,之后有 Window 詳解

package com.shujia.flink.tf

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo7Window {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    /**
      * 每隔5秒統計單詞的數量
      * timeWindow() -- 里面可以傳入一個參數(滾動窗口)或者兩個參數(滑動窗口)
      */

    kvDS
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) //滾動窗口,本次設置5秒,每擱5秒算一次
      .sum(1)
      .print()

    env.execute()

  }
}

8、Union

img

Union 函數將兩個或多個數據流結合在一起。 這樣后面在使用的時候就只需使用一個數據流就行了。 如果我們將一個流與自身組合,那么組合后的數據流會有兩份同樣的數據。

inputStream.union(inputStream1, inputStream2, ...);

Union 程序示例

package com.shujia.flink.tf

// 當代碼中沒有用到Scala API的時候不需要導 _ (隱式轉換)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Demo8Union {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val ds1: DataStream[String] = env.socketTextStream("master", 7777)
    val ds2: DataStream[String] = env.socketTextStream("master", 8888)

    /**
      * 將多個流合並成一個流,類型要一致
      * 在虛擬機中要開兩個master,一個登錄7777.一個登錄8888
      */

    val unionDS: DataStream[String] = ds1.union(ds2)

    unionDS.print()

    env.execute()
  }
}

9、Window Join

我們可以通過一些 key 將同一個 window 的兩個數據流 join 起來。

inputStream.join(inputStream1)
           .where(0).equalTo(1)
           .window(Time.seconds(5))     
           .apply (new JoinFunction () {...});

以上示例是在 5 秒的窗口中連接兩個流,其中第一個流的第一個屬性的連接條件等於另一個流的第二個屬性。

10、Split

img

此功能根據條件將流拆分為兩個或多個流。 當你獲得混合流然后你可能希望單獨處理每個數據流時,可以使用此方法。

SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>(); 
        if (value % 2 == 0) {
            output.add("even");
        } else {
            output.add("odd");
        }
        return output;
    }
});

上面就是將偶數數據流放在 even 中,將奇數數據流放在 odd 中。

11、Select

img

上面用 Split 算子將數據流拆分成兩個數據流(奇數、偶數),接下來你可能想從拆分流中選擇特定流,那么就得搭配使用 Select 算子(一般這兩者都是搭配在一起使用的),

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even"); 
DataStream<Integer> odd = split.select("odd"); 
DataStream<Integer> all = split.select("even","odd");


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM