flink系列-4、flink自定義source、sink


代碼地址

一、Source

輸入 Controlling Latency(控制延遲)
默認情況下,流中的元素並不會一個一個的在網絡中傳輸(這會導致不必要的網絡流量消耗) ,而是緩存起來,緩存 的大小可以在Flink的配置文件、ExecutionEnvironment、或者某個算子上進行配置(默認為100ms)。

  • 好處:提高吞吐
  • 壞處:增加了延遲
  • 為了最大吞吐量,可以設置 setBufferTimeout(-1),這會移除 timeout 機制,緩存中的數據一滿就會被發送 。
  • 為了最小的延遲,可以將超時設置為 0 ,但是會有一些性能的損耗。

 

1.1、flink內置數據源

1、基於文件

env.readTextFile("file://path")
env.readFile(inputFormat, "file://path");

2、基於socket數據源

env.socketTextStream("localhost", 6666, '\n')

3. 基於Collection 

import org.apache.flink.api.scala._
env.fromCollection(List(1,2,3))
env.fromElements(1,2,3)
env.generateSequence(0, 1000) 

1.2、自定義數據源

1、實現SourceFunction

SourceFunction 是非並行的,所以不能指定並行度,即不能用setParallelism(num) 算子;

SocketTextStreamFunction就是實現的SourceFunction ,源碼中也有詳細的用例;

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
// 需要加上這一行隱式轉換 否則在調用flatmap方法的時候會報錯
import org.apache.flink.api.scala._

// SourceFunction 是非並行的,所以不能指定並行度 即 不能 用 setParallelism(num) 算子
class MySourceFunction extends SourceFunction[String]{
  var num: Long = 0
  var isCancel: Boolean = true

   //在cancel的時候被執行,傳遞變量用於控制run方法中的執行
  override def cancel(): Unit = {
    println("cancel")
    isCancel = false
  }

  // 調用run 方法向下游產生數據
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (isCancel){
      ctx.collect(s"xxd\t${num}")
      Thread.sleep(1000)
      num += 1
    }
  }

}

object SourceFunctionWordCount{
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少個solor
    conf.setString("taskmanager.numberOfTaskSlots","3")
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
// 設置延時時間
env.setBufferTimeout(3000)
// 定義數據源 val sourceDataStream: DataStream[String] = env.addSource(new MySourceFunction) // 增加 setParallelism就會報錯 // val sourceDataStream: DataStream[String] = env.addSource(new MySourceFunction).setParallelism(2) // 定義 operators,作用是解析數據,分組,窗口化,並且聚合就SUM val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(value: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = value.split(" ") for (f <- strings) { out.collect((f, 1)) } } }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2) // 定義sink打印輸出 wordCountData.print().setParallelism(2) // 打印任務執行計划 println(env.getExecutionPlan) // 運行 env.execute("Socket Window WordCount") } }

2、實現ParallelSourceFunction

ParallelSourceFunction是並行化的source所以能指定並行度

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
//ParallelSourceFunction是並行化的source所以能指定並行度
class MyParallelSource extends ParallelSourceFunction[String] {
  var num = 0
  var isCancel = true

  override def cancel(): Unit = {
    println("cancel")
    isCancel = false
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (isCancel) {
      ctx.collect(s"xxd\t${num}")
      Thread.sleep(1000)
      num += 1
    }
  }
}

object ParallelSourceWordCount {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 配置有多少個solor
    conf.setString("taskmanager.numberOfTaskSlots", "8")
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定義數據源
    val sourceDataStream: DataStream[String] = env.addSource(new MyParallelSource).setParallelism(4)
    // 定義 operators,作用是解析數據,分組,窗口化,並且聚合就SUM
    val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] {
      override def flatMap(value: String, out: Collector[(String, Int)]): Unit = {
        val strings: Array[String] = value.split(" ")
        for (f <- strings) {
          out.collect((f, 1))
        }
      }
    }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2)
    // 定義sink打印輸出
    wordCountData.print().setParallelism(2)
    // 打印任務執行計划
    println(env.getExecutionPlan)
    // 運行
    env.execute("Socket Window WordCount")

  }
}

3、繼承RichParallelSourceFunction

RichParallelSourceFunction不僅實現了ParallelSourceFunction,還繼承了AbstractRichFunction

所以RichParallelSourceFunction不僅能夠並行化,還比ParallelSourceFunction增加了open和close方法、getRuntimeContext

 

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._

//RichParallelSourceFunction不但能並行化
//還比ParallelSourceFunction增加了open和close方法、getRuntimeContext
class MyRichParallelSource extends RichParallelSourceFunction[String]{
  var num = 0
  var isCancel = true
  //初始化 在source開啟的時候執行一次,比如可以在這里開啟mysql的連接
  override def open(parameters: Configuration): Unit = {
    println("open")
    num = 100
  }
  //在source關閉的時候執行一次
  //比如mysql連接用完了,給還回連接池
  override def close(): Unit = {
    while (isMysql){
      Thread.sleep(1000)
      println("close sleep")
    }
    println("close")
    num = 0
  }

  //在輸出的時候被執行,傳遞變量用於控制run方法中的執行
  //這個是被手動觸發,在執行完cancel之后,會再執行close
  override def cancel(): Unit = {
    println("cancel")
    isCancel = false
  }

  //調用run方法向下游產生數據
  //手動cancel之后,不會等待run方法中處理結束而是強制執行close方法
  //這樣就可能導致run方法中正在使用的連接被close了
  //所以此時需要加一個處理完成標識,用於判斷是否可以進行close
  var isMysql = false
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    import scala.util.control.Breaks._
    breakable{
      while (isCancel){
        println(getRuntimeContext.getIndexOfThisSubtask) // 獲取執行的taskid
        ctx.collect(s"xxd\t${num}")
        Thread.sleep(2000)
        num += 1
        if (num > 1200){
          break()
        }
      }
    }
    isMysql = true
  }

}

object RichParallelSourceWordCount{
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少個solor
    conf.setString("taskmanager.numberOfTaskSlots","8")
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定義數據源
    val sourceDataStream: DataStream[String] = env.addSource(new MyRichParallelSource).setParallelism(4)
    // 定義 operators,作用是解析數據,分組,窗口化,並且聚合就SUM
    val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] {
      override def flatMap(value: String, out: Collector[(String, Int)]): Unit = {
        val strings: Array[String] = value.split(" ")
        for (f <- strings) {
          out.collect((f, 1))
        }
      }
    }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2)
    // 定義sink打印輸出
    wordCountData.slotSharingGroup("xxd").print().setParallelism(2)
    // 打印任務執行計划
    println(env.getExecutionPlan)
    // 運行
    env.execute("Socket Window WordCount")

  }
}

二、sink

2.1、內置數據輸出源

1、基於文件

#使用TextOutputFormat
stream.writeAsText("/path/to/file")
#使用CsvOutputFormat
stream.writeAsCsv("/path/to/file")

2、基於socket

stream.writeToSocket(host, port, SerializationSchema)

3、基於標准/錯誤輸出

#注: 線上應用杜絕使用,采用抽樣打印或者日志的方式
stream.print()
stream.printToErr()

2.2、自定義輸出源

1、實現SinkFunction

import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

class MySinkFunction extends SinkFunction[(String, Int)] {
  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    println(s"value:${value}," +
      s"processTime:${context.currentProcessingTime()}," +
      s"waterMark:${context.currentWatermark()}")
  }
}


object SinkFunctionWordCount {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 配置有多少個solor
    conf.setString("taskmanager.numberOfTaskSlots", "8")
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定義數據源
    val input = env.fromElements("xxd xxd xxd")
    val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_, 1))
    // 使用自定義的sink
    output.addSink(new MySinkFunction)
    env.execute()

  }
}

2、繼承RichSinkFunction

package com.xxd.flink.sink
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

class MyRichSinkFunction extends RichSinkFunction[(String, Int)]{
  //在Sink開啟的時候執行一次,比如可以在這里開啟mysql的連接
  override def open(parameters: Configuration): Unit = {
    println("open")
  }

  //在Sink關閉的時候執行一次
  //比如mysql連接用完了,給還回連接池
  override def close(): Unit = {
    println("close")
  }

  //調用invoke方法,執行數據的輸出
  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    //在rich方法中可以使用getRuntimeContext方法得到比如廣播變量和累加器
    //getRuntimeContext.getBroadcastVariable("")
    println(s"value:${value}," +
      s"processTime:${context.currentProcessingTime()}," +
      s"waterMark:${context.currentWatermark()}")
  }
}

object RichSinkFunctionWordCount {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少個solor
    conf.setString("taskmanager.numberOfTaskSlots","8")
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定義數據源
    val input = env.fromElements("xxd xxd xxd")
    val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_, 1))
      // 使用自定義的sink
      output.addSink(new MyRichSinkFunction)
    env.execute()
  }
}

3、使用自定義OutputFormat,然后使用stream.writeUsingOutputFormat("自定義outputFormat") 

import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

class MyOutPutFormat extends OutputFormat[(String, Int)]{
  //配置outputformat
  override def configure(parameters: Configuration): Unit = {
    println("configure")
  }
  //在Sink開啟的時候執行一次,比如可以在這里開啟mysql的連接
  override def open(taskNumber: Int, numTasks: Int): Unit = {
    //taskNumber第幾個tak,numTasks總任務數
    println(s"taskNumber:${taskNumber},numTasks:${numTasks}")
  }
  //調用writeRecord方法,執行數據的輸出
  override def writeRecord(record: (String,Int)): Unit = {
    println(record)
  }
  //在Sink關閉的時候執行一次
  //比如mysql連接用完了,給還回連接池
  override def close(): Unit = {
    println("close")
  }

}

object OutputFormatWordCount {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"
    var conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath)
    // 配置有多少個solor
    conf.setString("taskmanager.numberOfTaskSlots","8")
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 定義數據源
    val input = env.fromElements("xxd xxd xxd")
    val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_,1))
    //使用自定義的outputFormat
    output.writeUsingOutputFormat(new MyOutPutFormat)
    env.execute()
  }
}
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM