Spark-Dataframe操作


准備代碼
// 屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession
.builder()
.appName("SparkSessionT")
.master("local[1]")
.getOrCreate()

val commodityDF = spark.read.format("jdbc") //利用jdbc讀取MySQL數據庫的數據
.option("url", "jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC") //連接的URL
.option("driver", "com.mysql.jdbc.Driver") //連接的驅動
.option("dbtable", "commodityPrice") //獲取的 數據表
.option("user", "root") //用戶名
.option("password", "123456") //密碼
.load() //登錄
使用SQL語句查詢
// 轉換為虛擬表 "commodityTable"使用 SQL語句查詢
commodityDF.registerTempTable("commodityTable")
spark.sql("select * from commodityTable").show(10)
查看數據
// 默認只顯示20條
commodityDF.show()
// 是否最多只顯示20個字符,默認為true
commodityDF.show(false)
// 完整查看10條數據
commodityDF.show(10, false)
// 取前n行數據, 和take與head不同的是,limit方法不是Action操作。
commodityDF.limit(5).show(false)
加載數據到數組
// 將數據加載到集合里面
commodityDF.collect().foreach(println)
// 和collect類似,只不過轉換成list
commodityDF.collectAsList()
獲取指定字段的統計信息
// 獲取指定字段的統計信息count, mean, stddev, min, max 返回的還是Dataframe
commodityDF.describe("price", "yprice").show(false)
// 遍歷每個統計信息
commodityDF.describe("price", "yprice").collect().foreach(println)
// 取0行的 "count" 數據
println(commodityDF.describe("price", "yprice").collect()(0))
獲取n行數據
// 獲取第一行數據
println(commodityDF.first())
println(commodityDF.head())
// 獲取前5條數據
commodityDF.head(5).foreach(println)
commodityDF.take(5).foreach(println)
// 以行數據list返回
commodityDF.takeAsList(5) 
條件查詢
// where和filter方法和SQL的where后面的語句一樣
commodityDF.where("price>100 or yprice<200").show()
commodityDF.filter("price>100 or yprice<200").show()
選取字段
// 選取 name ,price字段
commodityDF.select("name", "price").show(5, false)
// 對price字段的數據都+100
commodityDF.select(commodityDF("name"), commodityDF("price") + 100).show(5)
// 對指定字段進行特殊處理; price字段重名名為 p, 對price取四舍五入
commodityDF.selectExpr("name", "price as p", "round(price)").show(10)
// 只獲取單個字段
val name = commodityDF.col("name")
val parice = commodityDF.apply("price")
刪除指定字段
// 刪除指定字段 price
val c1 = commodityDF.drop("price")
val c2 = c1.drop(c1("yprice"))
c2.show(5)
排序
// 降序排序
commodityDF.orderBy(-commodityDF("price")).show(5)
commodityDF.orderBy(commodityDF("price").desc).show(5)
// 升序排序
commodityDF.orderBy("price").show(5)
commodityDF.orderBy(commodityDF("price")).show(5)
分組
// 對字段數據分組, 再對分組后的數據處理 count max mean sum agg
commodityDF.groupBy("degree").count().show(5)
commodityDF.groupBy(commodityDF("degree")).count().show(5)
commodityDF.groupBy(commodityDF("degree")).max("price", "yprice").show(5)
agg聚合
// 聚合agg一般和group by一起使用
commodityDF.agg("price" -> "max", "yprice" -> "sum").show()
// 對degree分組然后取 price字段的最大值和downNum的平均值
commodityDF.groupBy("degree").agg("price" -> "max", "downNum" -> "mean").show()
去除重復數據
// 注意自增ID不同的問題
val df = commodityDF.drop("id")
println(df.count())
// 去除一行數據完全相同的
println(df.distinct().count())
// 刪除指定字段存在相同的數據
println(df.dropDuplicates(Seq("price", "yprice")).count())
同字段數據組合(unionAll)
// 同字段組合
commodityDF.limit(5).unionAll(commodityDF.limit(5)).show()
同字段數據行組合(join)
// 組合數據DF
val df1 = commodityDF.limit(5)
val df2 = commodityDF.limit(10)
val df3 = commodityDF.filter("id>5 and id<11")
// 只有id字段相同的才會橫向組合 inner
df1.join(df2, "id").show()
df1.join(df2, df1("id") === df2("id")).show()
df1.join(df2, df1("id") === df2("id"), "inner").show()
// 根據 id和name兩個字段join
df2.join(df3, Seq("id", "name")).show()
Dataframe的邏輯操作
// 計算出兩個DataFrame的交集數據
df1.intersect(df2).show()
// 獲取df2中df3沒有的記錄
df2.except(df3).show()
重命名字段名
// 重命名字段名,如果指定的字段名不存在,不進行任何操作
commodityDF.limit(5).show()
commodityDF.withColumnRenamed("name", "reName").limit(5).show()
添加字段
// 往當前DataFrame中新增一列
commodityDF.withColumn("newCol", commodityDF("name")).show()
拆分字段數據為行數據(字段本身並不刪除)
// 將列字段數據拆分為行數據
commodityDF.limit(1).explode("name", "name_") { time: String => time.split(" ") }.show()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM