Spark讀取數據API
spark.read.format("json").load(path) spark.read.format("text").load(path) spark.read.format("parquet").load(path) spark.read.format("json").option("...","...").load(path)
實例
package com.imooc.bigdata.chapter05 import java.util.Properties import com.typesafe.config.ConfigFactory import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} object DataSourceApp { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local").getOrCreate() text(spark) json(spark) common(spark) parquet(spark) convert(spark) jdbc(spark) jdbc2(spark) spark.stop() } // 代碼打包,提交到YARN或者Standalone集群上去,注意driver的使用 def jdbc2(spark:SparkSession): Unit = { import spark.implicits._ val config = ConfigFactory.load() val url = config.getString("db.default.url") val user = config.getString("db.default.user") val password = config.getString("db.default.password") val driver = config.getString("db.default.driver") val database = config.getString("db.default.database") val table = config.getString("db.default.table") val sinkTable = config.getString("db.default.sink.table") val connectionProperties = new Properties() connectionProperties.put("user", user) connectionProperties.put("password", password) val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties) jdbcDF.filter($"game_top" > 100).show() //.write.jdbc(url, s"$database.$sinkTable", connectionProperties) } /** * 有些數據是在MySQL,如果使用Spark處理,肯定需要通過Spark讀取出來MySQL的數據 * 數據源是text/json,通過Spark處理完之后,我們要將統計結果寫入到MySQL */ def jdbc(spark:SparkSession): Unit = { import spark.implicits._ // val jdbcDF = spark.read // .format("jdbc") // .option("url", "jdbc:mysql://10.133.3.10:3306") // .option("dbtable", "spark.browser_stat") // .option("user", "root") // .option("password", "root") // .load() // // jdbcDF.filter($"cnt" > 100).show(100) // 死去活來法 val url = "jdbc:mysql://10.133.3.10:3306" val connectionProperties = new Properties() connectionProperties.put("user", "root") connectionProperties.put("password", "root") val jdbcDF: DataFrame = spark.read .jdbc(url, "spark.taptap", connectionProperties) jdbcDF.filter($"game_top" > 100) .write.jdbc(url, "spark.taptaps", connectionProperties) } // 存儲類型轉換:JSON==>Parquet def convert(spark:SparkSession): Unit = { import spark.implicits._ val jsonDF: DataFrame = spark.read.format("json").load("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.json") // jsonDF.show() jsonDF.filter("age>20").write.format("parquet").mode(SaveMode.Overwrite).save("out") spark.read.parquet("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\out").show() } // Parquet數據源 def parquet(spark:SparkSession): Unit = { import spark.implicits._ val parquetDF: DataFrame = spark.read.parquet("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\users.parquet") parquetDF.printSchema() parquetDF.show() // parquetDF.select("name","favorite_numbers") // .write.mode("overwrite") // .option("compression","none") // .parquet("out") // spark.read.parquet("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\out").show() } // 標准API寫法 def common(spark:SparkSession): Unit = { import spark.implicits._ // 源碼面前 了無秘密 // val textDF: DataFrame = spark.read.format("text").load("file:///Users/rocky/IdeaProjects/imooc-workspace/sparksql-train/data/people.txt") val jsonDF: DataFrame = spark.read.format("json").load("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.json") // // textDF.show() // println("~~~~~~~~") // jsonDF.show() jsonDF.write.format("json").mode("overwrite").save("out") } // JSON def json(spark:SparkSession): Unit = { import spark.implicits._ val jsonDF: DataFrame = spark.read.json("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.json") //jsonDF.show() // TODO... 只要age>20的數據 //jsonDF.filter("age > 20").select("name").write.mode(SaveMode.Overwrite).json("out") val jsonDF2: DataFrame = spark.read.json("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people2.json") jsonDF2.select($"name",$"age",$"info.work".as("work"), $"info.home".as("home")).write.mode("overwrite").json("out") } // 文本 def text(spark:SparkSession): Unit = { import spark.implicits._ val textDF: DataFrame = spark.read.text("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.txt") // textDF.show() val result: Dataset[(String)] = textDF.map(x => { val splits: Array[String] = x.getString(0).split(",") (splits(0).trim) //, splits(1).trim }) // SaveMode.Append:保留上次結果新增 overwrite:先刪除上次結果,然后在添加這次結果 result.write.mode("overwrite").text("out") // 如果才能支持使用text方式輸出多列的值呢? } }