flink系列-5、flink常見函數使用及自定義轉換函數


代碼地址

一、flink編程方法

  • 獲取執行環境(execution environment)
  • 加載/創建初始數據集
  • 對數據集進行各種轉換操作(生成新的數據集)
  • 指定將計算的結果放到何處去
  • 觸發APP執行

flink的計算方式和spark一樣都是惰性的

  • Flink APP都是延遲執行的 
  • 只有當execute()被顯示調用時才會真正執行 
  • 本地執行還是在集群上執行取決於執行環境的類型 
  • 好處:用戶可以根據業務構建復雜的應用,Flink可以整體進優化並生成執行計划

二、DataStream

  • DataStream 是 Flink 流處理 API 中最核心的數據結構。它代表了一個運行在多個分區上的並行流。一 個 DataStream 可以從 StreamExecutionEnvironment 通過env.addSource(SourceFunction) 獲得。 DataStream 上的轉換操作都是逐條的,比如 map(),flatMap(),filter() 
  • 下圖展示了Flink 中目前支持的主要幾種流的類型,以及它們之間的轉換關系。

2.1、自定義轉換函數

1、函數

scala函數

data.flatMap(f => f.split(" "))

java的lambda表達式

data.flatMap(f -> f.split(" "));

2、實現接口

data.flatMap(new FlatMapFunction[String,String] {
    override def flatMap(value: String, out: Collector[String]) = {
        val strings: Array[String] = value.split(" ")
            for(s <- strings){
                out.collect(s)
            }
        }
    })

3、RichFunctions

RichFunction中有非常有用的四個方法:open,close,getRuntimeContext和setRuntimecontext 這些功能在參數化函數、創建和確定本地狀態、獲取廣播變量、獲取運行時信息(例如累加器和計數器)和迭代信息時非常有幫助。

 

 以RichFlatMapFunction為例:

import java.util.Properties

import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFlatMapFunction, RuntimeContext}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

/**
  * @author xiandongxie
  */
class KafkaRichFlatMapFunction(topic: String,properites: Properties) extends RichFlatMapFunction[String, Collector[Int]]{

  var producer: KafkaProducer[String, String] = null

  override def open(parameters: Configuration): Unit = {
    // 創建kafka生產者
    producer = new KafkaProducer[String, String](properites)
  }

  override def close(): Unit = {
    // 關閉kafka生產者
    producer.close()
  }

  override def getRuntimeContext: RuntimeContext = super.getRuntimeContext

  override def setRuntimeContext(t: RuntimeContext): Unit = super.setRuntimeContext(t)

  override def getIterationRuntimeContext: IterationRuntimeContext = super.getIterationRuntimeContext


  override def flatMap(value: String, out: Collector[Collector[Int]]): Unit = {
    //使用RuntimeContext得到子線程ID,比如可以用於多線程寫文件
    println(getRuntimeContext.getIndexOfThisSubtask)
    //發送數據到kafka
    producer.send(new ProducerRecord[String, String](topic, value))
  }
}

2.2、operators

 1、map flatMap 與 DataStreamUtils.collect的使用

  •  map flatMap 
    • 含義:數據映射(1進1出和1進n出)
    • 轉換關系:DataStream → DataStream 
  • DataStreamUtils.collect
    • 含義:數據拉回Client
    • 轉換關系:DataStream → util.Iterator 

示例代碼:

import java.util

import org.apache.flink.api.scala._
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.datastream.DataStreamUtils
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object SocketMap {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少個solor
    conf.setString("taskmanager.numberOfTaskSlots","8")
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定義數據源
//    val dataSource: DataStream[String] = env.socketTextStream("*********",666)
    val unit: DataStream[String] = env.addSource(new SocketTextStreamFunction("******",6666,"\n",2))

    val mapData: DataStream[(String, Int)] = unit.flatMap(f => f.split(" ")).map((_,1))
    //把數據拉回到client端進行操作,比如發起一次數據連接把數據統一插入
    //使用了DataStreamUtils.collect就可以省略env.execute
    import scala.collection.convert.wrapAll._
    val value: util.Iterator[(String, Int)] = DataStreamUtils.collect(mapData.javaStream)
    for(v <- value){
      println(v)
    }
//    env.execute("SocketMap")
  }
}

2、filter 與 循環迭代流

  • 含義:數據篩選(滿足條件event的被篩選出來進行后續處理),根據FliterFunction返回的布爾值來判斷是否 保留元素,true為保留,false則丟棄
  • 轉換關系:DataStream → DataStream 

示例代碼:

import org.apache.flink.api.scala._
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

//輸入一組數據,我們對他們分別進行減1運算,直到等於0為止
object IterativeFilter {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少個solor
    conf.setString("taskmanager.numberOfTaskSlots","8")
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //設置全局並行度為1
    env.setParallelism(1)
    // 生成包含一個0到10的DStream
    val input: DataStream[Long] = env.generateSequence(0, 10)
    //流中的元素每個減1,並過濾出大於0的,然后生成新的流
//    input.filter(f => {
//      f > 5
//    }).print()
    val value: DataStream[Long] = input.iterate(
      d => (d.map(f => {
        println("map\t"+ f)
        f - 1
      }),
        d.filter(f => {
          println("filter \t" + f)
          f > 0
        })))
    value.print()
    env.execute("IterativeFilter")
  }
}

3、keyBy

  • 含義: 根據指定的key進行分組(邏輯上把DataStream分成若干不相交的分區,key一樣的event會 被划分到相同的partition,內部采用hash分區來實現)
  • 轉換關系: DataStream → KeyedStream 
  • 限制: 
    • 可能會出現數據傾斜,可根據實際情況結合物理分區來解決 

KeyedStream

  • KeyedStream用來表示根據指定的key進行分組的數據流。
  • 一個KeyedStream可以通過調用DataStream.keyBy()來獲得。 
  • 在KeyedStream上進行任何transformation都將轉變回DataStream。 
  • 在實現中,KeyedStream會把key的信息傳入到算子的函數中。 
  • 每個event只能訪問所屬key的狀態,其上的聚合函數可以方便地操作和保存對應key的狀態
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}

object KeyBy {
  def main(args: Array[String]): Unit = {
    //生成配置對象
    val config = new Configuration()
    //開啟spark-webui
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件,否則打印日志到控制台
    config.setString("web.log.path", "/tmp/flink_log")
    //配置taskManager的日志文件,否則打印日志到控制台
    config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/flink_log")
    //配置tm有多少個slot
    config.setString("taskmanager.numberOfTaskSlots", "8")
    // 獲取local運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    val tuple = List(
      ("xxd", "class12", "小李", 55),
      ("xxd", "class12", "小王", 50),
      ("xxd", "class11", "小張", 50),
      ("xxd", "class11", "小強", 45))
    // 定義數據源,使用集合生成
    val input = env.fromCollection(tuple)

    //對於元組類型來說數據的選擇可以使用數字(從0開始),keyBy(0,1)這種寫法代表組合key
    val keyBey: KeyedStream[(String, String, String, Int), Tuple] = input.keyBy(1)
//    val unit: KeyedStream[(String, String, String, Int), String] = input.keyBy(_._1)

    //對於key選擇來說還可以使用keySelector
    val keyBy: KeyedStream[(String, String, String, Int), String] = input.keyBy(new KeySelector[(String, String, String, Int), String] {
      override def getKey(value: (String, String, String, Int)): String = {
        value._2
      }
    })

    //對於scala的元組可以使用"_1"、對於java的元組可以使用"f0",其實也是類中屬性的名字
    val max: DataStream[(String, String, String, Int)] = keyBy.maxBy("_4")
//    max.print()

    val myList = List(
      new MyEventKey("xxd", "class12", "小李", 55),
      new MyEventKey("xxd", "class12", "小王", 50),
      new MyEventKey("xxd", "class11", "小張", 50),
      new MyEventKey("xxd", "class11", "小強", 45))
    // 定義數據源,使用集合生成
    val myInput = env.fromCollection(myList)
    //對於自定義類型來說也可以用類中的字段名稱,記住這個自定義類型必須得是樣例類
    val myKeyBy: KeyedStream[MyEventKey, Tuple] = myInput.keyBy("b")
    myKeyBy.maxBy("d").print()
//    myKeyBy.map(f => new MyEventValue(f.a,f.b,f.c,f.d)).print()
    env.execute("keyby")

  }
}

//樣例類,可以用於key,因為其默認實現了hashCode方法,可用於對象比較,當然也可用於value
case class MyEventKey(a:String,b:String,c:String,d:Int){
  override def toString: String = a + "\t" + b + "\t" + c + "\t" + d
 }
//普通類可以用於非key,只能用於value
class MyEventValue(a:String,b:String,c:String,d:Int){
  override def toString: String = a + "\t" + b + "\t" + c + "\t" + d
}

4、reduce 與 fold

  • 分組之后當然要對分組之后的數據也就是KeyedStream進行各種聚合操作啦
  • KeyedStream → DataStream
  • 對於KeyedStream的聚合操作都是滾動的(rolling,在前面的狀態基礎上繼續聚合),千萬不要理解為批處理 時的聚合操作(DataSet,其實也是滾動聚合,只不過他只把最后的結果給了我們)
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}

object ReduceFoldAggregation {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    // 生成對象
    var conf: Configuration = new Configuration()
    // 開啟flink web UI
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    // 配置web UI的日志文件,否則打印日志到控制台
    conf.setString("web.log.path", logPath)
    // 配置taskManager的日志文件,否則打印到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取local運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定義socket 源
    //scala開發需要加一行隱式轉換,否則在調用operator的時候會報錯
    import org.apache.flink.api.scala._
    val tuple = List(
      ("xxd", "class12", "小王", 50),
      ("xxd", "class12", "小李", 55),
      ("xxd", "class11", "小張", 50),
      ("xxd", "class11", "小強", 45))
    val text = env.fromCollection(tuple)
    val map: DataStream[(String, Int)] = text.map(f => (f._2, 1))
    val keyBy: KeyedStream[(String, Int), String] = map.keyBy(_._1)
    //相同的key的數據聚合在一起使用reduce求合,使用的時候注意與spark不同的地方是key也參與運算
    val reduce: DataStream[(String, Int)] = keyBy.reduce((a,b) => (a._1,a._2 + b._2))
    reduce.print()
    //使用fold完成和reduce一樣的功能,不同的是這里的返回值類型由fold的第一個參數決定
//    val fold: DataStream[(String, Int)] = keyBy.fold(("",0))((a,b) => (b._1,a._2 + b._2))
//    fold.print()
    env.execute("ReduceFoldAggregation")
  }
} 

5、connect
 與
 union
(合並流)

  • connect之后生成ConnectedStreams,會對兩個流的數據應用不同的處理方法,並且雙流之間可以共享狀態 (比如計數)。
  • union 合並多個流,新的流包含所有流的數據。
  • union是DataStream → DataStream
  • connect只能連接兩個流,而union可以連接多於兩個流
  • connect連接的兩個流類型可以不一致,而union連接的流的類型必須一致

import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}

object ConnectUnion {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    // 生成對象
    var conf: Configuration = new Configuration()
    // 開啟flink web UI
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    // 配置web UI的日志文件,否則打印日志到控制台
    conf.setString("web.log.path", logPath)
    // 配置taskManager的日志文件,否則打印到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取local運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //scala開發需要加一行隱式轉換,否則在調用operator的時候會報錯
    import org.apache.flink.api.scala._
    val input1: DataStream[Long] = env.generateSequence(0, 10)
    val input2: DataStream[String] = env.fromCollection(List("xxd it dashuju"))
    //連接兩個流
    val connectInput: ConnectedStreams[Long, String] = input1.connect(input2)
    //使用connect連接兩個流,類型可以不一致
    val connect: DataStream[String] = connectInput.map[String](
      //處理第一個流的數據,需要返回String類型
      (a: Long) => (a + 100).toString,
      //處理第二個流的數據,需要返回String類型
      (b: String) => b + "_input2")
    connect.print()
    val input3: DataStream[Long] = env.generateSequence(11, 20)
    val input4: DataStream[Long] = env.generateSequence(21, 30)
    //使用union連接多個流,要求數據類型必須一致,且返回結果是DataStream
    val unionData: DataStream[Long] = input1.union(input3).union(input4)
    unionData.print()
    env.execute()

  }
}

6、CoMap, CoFlatMap

  • 跟map and flatMap類似,只不過作用在ConnectedStreams上
  • ConnectedStreams → DataStream
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

object ConnectCoFlatMap {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"

    // 生成對象
    var conf: Configuration = new Configuration()
    // 開啟flink web UI
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    // 配置web UI的日志文件,否則打印日志到控制台
    conf.setString("web.log.path", logPath)
    // 配置taskManager的日志文件,否則打印到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取local運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //scala開發需要加一行隱式轉換,否則在調用operator的時候會報錯
    import org.apache.flink.api.scala._

    val input1: DataStream[Long] = env.generateSequence(0,10)
    val input2: DataStream[String] = env.fromCollection(List("xxd it dashuju"))
    //連接兩個流
    val connectInput: ConnectedStreams[Long, String] = input1.connect(input2)
    //flatMap之后的泛型確定了兩個流合並之后的返回類型
    val value: DataStream[String] = connectInput.flatMap[String](
      //處理第一個流的數據,需要返回String類型
      (data:Long, out:Collector[String]) => {
        out.collect(data.toString)
      },
      //處理第二個流的數據,需要返回String類型
      (data:String, out:Collector[String]) => {
        val strings: Array[String] = data.split(" ")
        for (s <- strings) {
          out.collect(s)
        }
      }
    )
    value.print()
    env.execute()
  }
}

7、split 與 select(拆分流& SideOutPut 

  • split
    • DataStream → SplitStream
    • 按照指定標准將指定的DataStream拆分成多個流用SplitStream來表示 
  • select
    • SplitStream → DataStream
    • 跟split搭配使用,從SplitStream中選擇一個或多個流
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}

import scala.collection.mutable.ListBuffer

object SplitAndSelect {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    // 生成對象
    var conf: Configuration = new Configuration()
    // 開啟flink web UI
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    // 配置web UI的日志文件,否則打印日志到控制台
    conf.setString("web.log.path", logPath)
    // 配置taskManager的日志文件,否則打印到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取local運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    val input: DataStream[Long] = env.generateSequence(0, 10)
    val splitStream: SplitStream[Long] = input.split(f => {
      val out = new ListBuffer[String]
      //返回數據的拆分標記
      if (f % 2 == 0) {
        out += "xxd"
      } else {
        out += "it"
      }
      out
    })
    //根據拆分標記選擇數據
//     splitStream.select("xxd").print()
//     splitStream.select("it").print()
    splitStream.select("xxd", "it").print()
    env.execute("SplitAndSelect")
  }
}

但是需要注意的是經過 split 拆分后的流,是不能二次拆分的,否則會報錯:

Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.

在源碼中可以看到注釋,該方式已經廢棄並且建議使用最新的 SideOutPut 進行分流操作

  • SideOutPut 拆分:  
    • 定義 OutputTag
    • 調用特定函數進行數據拆分
      • ProcessFunction
      • KeyedProcessFunction
      • CoProcessFunction
      • KeyedCoProcessFunction
      • ProcessWindowFunction
      • ProcessAllWindowFunction
package com.xxd.flink.operator

import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._

/**
  * SideOutPut 可以多次分流
  *
  * @author xiandongxie
  */
object SideOutPutDemo {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    // 生成對象
    var conf: Configuration = new Configuration()
    // 開啟flink web UI
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    // 配置web UI的日志文件,否則打印日志到控制台
    conf.setString("web.log.path", logPath)
    // 配置taskManager的日志文件,否則打印到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取local運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    val input: DataStream[Long] = env.generateSequence(0, 10)

    /**
      * 根據奇數偶數拆分
      */

    // 定義 OutputTag
    val evenOutPutTag: OutputTag[Long] = new OutputTag[Long]("even")
    val oddOutPutTag: OutputTag[Long] = new OutputTag[Long]("odd")
    // 調用特定函數進行數據拆分
    val processStream: DataStream[Long] = input.process(new ProcessFunction[Long, Long] {
      override def processElement(value: Long, ctx: ProcessFunction[Long, Long]#Context, out: Collector[Long]): Unit = {
        if (value % 2 == 0) {
          ctx.output(evenOutPutTag, value)
        } else {
          ctx.output(oddOutPutTag, value)
        }
      }
    })

    //    processStream.getSideOutput(oddOutPutTag).printToErr()
    val eventStream: DataStream[Long] = processStream.getSideOutput(evenOutPutTag)

    // 偶數的基礎上根據是否為2拆分
    val twoOutPutTag: OutputTag[Long] = new OutputTag[Long]("two")
    val otherOutPutTag: OutputTag[Long] = new OutputTag[Long]("other")

    val processStream2: DataStream[Long] = eventStream.process(new ProcessFunction[Long, Long] {
      override def processElement(value: Long, ctx: ProcessFunction[Long, Long]#Context, out: Collector[Long]): Unit = {
        if (value == 2) {
          ctx.output(twoOutPutTag, value)
        } else {
          ctx.output(otherOutPutTag, value)
        }
      }
    })

    processStream2.getSideOutput(twoOutPutTag).printToErr()
    processStream2.getSideOutput(otherOutPutTag).print()


    env.execute("SideOutPutDemo")

  }

}

8、物理分區

  • 算子間數據傳遞模式
    • One-to-one streams 保持元素的分區和順序
    • Redistributing streams 
  • 改變流的分區策略取決於使用的算子
    • keyBy()(re-partitions by hashing the key)
    • broadcast() 
    • rebalance()(which re-partitions randomly) 
    • 都是Transformation,只是改變了分區 
    • 都是DataStream → DataStream

rebalance

  • 含義:再平衡,用來減輕數據傾斜
  • 轉換關系: DataStream → DataStream
  • 使用場景:處理數據傾斜,比如某個kafka的partition的數據比較多

示例代碼:

val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer[String](...))
val str1: DataStream[(String, MyType)] = stream.flatMap { ... }
val str2: DataStream[(String, MyType)] = str1.rebalance()
val str3: DataStream[AnotherType] = str2.map { ... }

如上圖的執行圖所示,DataStream 各個算子會並行運行,算子之間是數據流分區。如 Source 的第一個並行實例 (S1)和 flatMap() 的第一個並行實例(m1)之間就是一個數據流分區。而在 flatMap() 和 map() 之間由於加了 rebalance(),它們之間的數據流分區就有3個子分區(m1的數據流向3個map()實例)。

rescale

  • 原理:通過輪詢調度將元素從上游的task一個子集發送到下游task的一個子集
  • 轉換關系:DataStream → DataStream
  • 使用場景:數據傳輸都在一個TaskManager內,不需要通過網絡。

如下圖所示:第一個task並行度為2,第二個task並行度為6,第三個task並行度為2。從第一個task到第二個task,Src的 子集 Src1 和 Map的子集Map1,2,3對應起來,Src1會以輪詢調度的方式分別向Map1,2,3發送記錄。 從第二個 task到第三個task,Map的子集1,2,3對應Sink的子集1,這三個流的元素只會發送到Sink1。 假設我們每個 TaskManager有三個Slot,並且我們開了 SlotSharingGroup,那么通過rescale,所有的數據傳輸都在一個 TaskManager內,不需要通過網絡。

自定義partitioner

  • 轉換關系:DataStream → DataStream
  • 使用場景:自定義數據處理負載 
  • 實現方法: 
    • 實現org.apache.flink.api.common.functions.Partitioner接口 
    • 覆蓋partition方法 
    • 設計算法返回partitionId
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object CustomPartitioner {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    // 生成對象
    var conf: Configuration = new Configuration()
    // 開啟flink web UI
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    // 配置web UI的日志文件,否則打印日志到控制台
    conf.setString("web.log.path", logPath)
    // 配置taskManager的日志文件,否則打印到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取local運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //scala開發需要加一行隱式轉換,否則在調用operator的時候會報錯
    import org.apache.flink.api.scala._
    val tuple = List(
      ("xxd", "class12", "小王", 50),
      ("xxd", "class12", "小李", 55),
      ("xxd", "class11", "小張", 50),
      ("xxd", "class11", "小強", 45))
    // 定義數據源,使用集合生成
    val input = env.fromCollection(tuple)
    //第二個參數 _._2 是指定partitioner的key是數據中的那個字段
    input.partitionCustom(new MyFlinkPartitioner, _._2).print()
    env.execute()
  }
}

//由於返回的永遠是1,所以所有的數據都跑到第2個分區
class MyFlinkPartitioner extends Partitioner[String] {
  override def partition(key: String, numPartitions: Int): Int = {
    println(key)
    1
  }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM