DStream轉為DF的兩種方式(突破map時元組22的限制)


在進行Spark Streaming的開發時,我們常常需要將DStream轉為DataFrame來進行進一步的處理,
共有兩種方式,方式一:

val spark = SparkSession.builder()
  .appName("Test")
  .getOrCreate()
import spark.implicits._
dStream.foreachRDD{ rdd =>
  val df = rdd.map(_.split(" "))
    .map(t => (t(1),t(2),t(3)))
    .toDF("col1","col2","col3")
  // 業務邏輯
}

利用map算子和tuple來完成,一般的場景下采用這種方式即可。

但是有的時候我們會遇到列數大於22的情況,這個時候會受到scala的tuple數不能超過22的影響。這時可以采用方式二:

val spark = SparkSession.builder()
  .appName("Test")
  .getOrCreate()
dStream.foreachRDD{ rdd =>
  val res:RDD[Row] = rdd.map{ row =>
    val buffer = ArrayBuffer.empty[Any]
    val fields: Array[String] = row.split("\\|~\\|")
    buffer.append(fields(0))
    buffer.append(fields(1))
    buffer.append(fields(2))
    // 省略
    buffer.append(fields(25))
    Row.fromSeq(buffer)
  } 
  val schema = StructType(Seq(
    StructField("col1", StringType, false),
    StructField("col2", StringType, false),
    StructField("col3", StringType, false),
    // 省略
    StructField("col26", StringType, false)
  ))
  val df: DataFrame = spark.createDataFrame(result, schema)
  // 業務邏輯
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM