spark dataframe 處理數據 增刪改查


1、配置文件

package config
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
case object conf {
   private val master = "local[*]"
   val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs")
//   val confs: SparkConf = new SparkConf().setMaster("http://laptop-2up1s8pr:4040/").setAppName("jobs")
   val sc = new SparkContext(confs)
   sc.setLogLevel("ERROR")
   val spark_session: SparkSession = SparkSession.builder()
    .appName("jobs").config(confs).getOrCreate()

//   設置支持笛卡爾積 對於spark2.0來說
   spark_session.conf.set("spark.sql.crossJoin.enabled",value = true)
}

  

2、處理腳本

package sparkDataMange
import config.conf.{sc, spark_session}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode}
import config.conf.spark_session.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType

object irisDataFrame {

  def main(args: Array[String]): Unit = {

    val st: Long = System.currentTimeMillis()

    val path:String = "data/iris.data"
    var df: DataFrame = spark_session.read.csv(path)

    /*
    * csv 文件數據內容如下
    *
    5.1,3.5,1.4,0.2,Iris-setosa
    5.0,2.0,3.5,1.0,Iris-versicolor
    6.2,3.4,5.4,2.3,Iris-virginica

    * */

    /*
    +---+---+---+---+-----------+
    |_c0|_c1|_c2|_c3|        _c4|
    +---+---+---+---+-----------+
    */

    val  ls: Column = when(col("_c4").equalTo("Iris-setosa"), "1").
      when(col("_c4").equalTo("Iris-versicolor"), "2").
      otherwise("3")

    df = df.withColumn("_c4",ls)

    df = df.select(df.columns.map(f => df(f).cast(DoubleType)): _*)
    df.show()

    /*
    *  處理結果
    *
    5.1,3.5,1.4,0.2,1
    5.0,2.0,3.5,1.0,2
    6.2,3.4,5.4,2.3,3

    * */

    df.printSchema()

    spark_session.stop()

    println("執行時間為:"+ (System.currentTimeMillis()-st)/1000.toDouble +"s")
  }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM