flink批處理中的source以及sink介紹


 

一、flink在批處理中常見的source

  flink在批處理中常見的source主要有兩大類:  

    1.基於本地集合的source(Collection-based-source)   

    2.基於文件的source(File-based-source)

 

 1.基於本地集合的source

      在flink最常見的創建DataSet方式有三種。   

1.使用env.fromElements(),這種方式也支持Tuple,自定義對象等復合形式。   

2.使用env.fromCollection(),這種方式支持多種Collection的具體類型   

3.使用env.generateSequence()方法創建基於Sequence的DataSet

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import scala.collection.immutable.{Queue, Stack}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object DataSource001 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //0.用element創建DataSet(fromElements)
    val ds0: DataSet[String] = env.fromElements("spark", "flink")
    ds0.print()

    //1.用Tuple創建DataSet(fromElements)
    val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    ds1.print()

    //2.用Array創建DataSet
    val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    ds2.print()

    //3.用ArrayBuffer創建DataSet
    val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
    ds3.print()

    //4.用List創建DataSet
    val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
    ds4.print()

    //5.用List創建DataSet
    val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))
    ds5.print()

    //6.用Vector創建DataSet
    val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))
    ds6.print()

    //7.用Queue創建DataSet
    val ds7: DataSet[String] = env.fromCollection(Queue("spark", "flink"))
    ds7.print()

    //8.用Stack創建DataSet
    val ds8: DataSet[String] = env.fromCollection(Stack("spark", "flink"))
    ds8.print()

    //9.用Stream創建DataSet(Stream相當於lazy List,避免在中間過程中生成不必要的集合)
    val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))
    ds9.print()

    //10.用Seq創建DataSet
    val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))
    ds10.print()

    //11.用Set創建DataSet
    val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))
    ds11.print()

    //12.用Iterable創建DataSet
    val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
    ds12.print()

    //13.用ArraySeq創建DataSet
    val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))
    ds13.print()

    //14.用ArrayStack創建DataSet
    val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))
    ds14.print()

    //15.用Map創建DataSet
    val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
    ds15.print()

    //16.用Range創建DataSet
    val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))
    ds16.print()

    //17.用fromElements創建DataSet
    val ds17: DataSet[Long] =  env.generateSequence(1,9)
    ds17.print()
  }
}

2.基於文件的source(File-based-source)

flink支持多種存儲設備上的文件,包括本地文件,hdfs文件,alluxio文件等。
flink支持多種文件的存儲格式,包括text文件,CSV文件等。
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment,_}

object DataSource002 {
  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    //1.讀取本地文本文件,本地文件以file://開頭
    val ds1: DataSet[String] = env.readTextFile("file:///Applications/flink-1.1.3/README.txt")
    ds1.print()

    //2.讀取hdfs文本文件,hdfs文件以hdfs://開頭,不指定master的短URL
    val ds2: DataSet[String] = env.readTextFile("hdfs:///input/flink/README.txt")
    ds2.print()

    //3.讀取hdfs CSV文件,轉化為tuple
    val path = "hdfs://qingcheng11:9000/input/flink/sales.csv"
    val ds3 = env.readCsvFile[(String, Int, Int, Double)](
      filePath = path,
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      lenient = false,
      ignoreFirstLine = true,
      includedFields = Array(0, 1, 2, 3))
    ds3.print()

    //4.讀取hdfs CSV文件,轉化為case class
    case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)
    val ds4 = env.readCsvFile[Sales](
      filePath = path,
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      lenient = false,
      ignoreFirstLine = true,
      includedFields = Array(0, 1, 2, 3),
      pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")
    )
    ds4.print()
  }
}

3.基於文件的source(遍歷目錄)

flink支持對一個文件目錄內的所有文件,包括所有子目錄中的所有文件的遍歷訪問方式。
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

/**
  * 遞歸讀取hdfs目錄中的所有文件,會遍歷各級子目錄
  */
object DataSource003 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    // create a configuration object
    val parameters = new Configuration
    // set the recursive enumeration parameter
    parameters.setBoolean("recursive.file.enumeration", true)
    // pass the configuration to the data source
    val ds1 = env.readTextFile("hdfs:///input/flink").withParameters(parameters)
    ds1.print()
  }
}
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM