首先我們要創建SparkSession
val spark = SparkSession.builder() .appName("test") .master("local") .getOrCreate() import spark.implicits._ //將RDD轉化成為DataFrame並支持SQL操作
然后我們通過SparkSession來創建DataFrame
1.使用toDF
函數創建DataFrame
通過導入(importing)spark.implicits, 就可以將本地序列(seq), 數組或者RDD轉為DataFrame。
只要這些數據的內容能指定數據類型即可。
import spark.implicits._ val df = Seq( (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15")) ).toDF("id", "name", "created_time")
注意:如果直接用toDF()而不指定列名字,那么默認列名為"_1", "_2"
可以通過df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")進行修改列名
2.使用createDataFrame
函數創建DataFrame
通過schema + row 來創建
我們可以通俗的理解為schema為表的表頭,row為表的數據記錄
import org.apache.spark.sql.types._ //定義dataframe的結構的schema val schema = StructType(List( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("create_time", DateType, nullable = true) )) //定義dataframe內容的rdd val rdd = sc.parallelize(Seq( Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15")) )) //創建dataframe val df = spark.createDataFrame(rdd, schema)
不過,我們可以把文件結構當做參數來使用,通過rdd自動產生schema和row,不用自己手動生成。
import org.apache.spark.sql.types._ //傳入屬性參數 val schemaString = " id name create_time" //解析參數變成StructField val fields = schemaString.split(" ") .map(fieldName => StructField(fieldname, StringType, nullable = true)) //定義dataframe的結構的schema val schema = StructType(fields) //定義dataframe內容的rdd val lines = sc.textFile("file:///people.txt") val rdd = lines.spilt(_.split(",")) .map(attributes=>ROW(attributes(0),attributes(1).trim) ) //創建dataframe val df = spark.createDataFrame(rdd, schema)
3.通過反射機制創建DataFrame
首先要定義一個case class,因為只有case class才能被Spark隱式轉化為DataFrame
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import spark.implicits._ //創建匹配類 case class Person(id:Int,name:String,age:Long) //讀取文件生成rdd val rdd = sc.textFile("file:///") //通過匹配類把rdd轉化成dataframe val df = rdd.map(_.split(",")) .map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()
4.通過文件直接創建DataFrame
(1)使用parquet文件read創建
val df = spark.read.parquet("hdfs:/path/to/file")
(2)使用json文件read創建
val df = spark.read.json("examples/src/main/resources/people.json")
(3)使用csv文件load創建
val df = spark.read .format("com.databricks.spark.csv") .option("header", "true") //reading the headers .option("mode", "DROPMALFORMED") .load("csv/file/path")
(4)使用Hive表創建
spark.table("test.person") // 庫名.表名 的格式 .registerTempTable("person") // 注冊成臨時表 spark.sql( """ | select * | from person | limit 10 """.stripMargin).show()
記得,最后我們要調用spark.stop()來關閉SparkSession。
5.保存
(1)通過df.write.format().save("file:///")保存
write.format()支持輸出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式
,save()定義保存的位置
當我們保存成功后可以在保存位置的目錄下看到文件,但是這個文件並不是一個文件而是一個目錄。
里面的內容一般為
不用擔心,這是沒錯的。
我們讀取的時候,並不需要使用文件夾里面的part-xxxx文件,直接讀取目錄即可。
(2)通過df.rdd.saveAsTextFile("file:///")轉化成rdd再保存
我們對於不同格式的文件讀寫來說,我們一般使用兩套對應方式
val df = spark.read.格式("file:///")//讀取文件 df.write.格式("file:///")//保存文件
val df = spark.read.format("").load("file:///")//讀取文件 df.write.save("file:///")//保存文件
具體read和load方法有什么不同,我還不是很清楚,弄明白了回來補充。
6.通過JDBC創建DataFrame
我們在啟動Spark-shell或者提交任務的時候需要添加相應的jar包
spark-shell(spark-submit)
--jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
--driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar
val jdbcDf = spark.read.format("jdbc") .option("driver", "com.mysql.jdbc.Driver") //驅動 .option("url", "jdbc:mysql://ip:3306") //數據庫地址 .option("dbtable", "db.user_test") //表名:數據庫名.表名 .option("user", "test") //用戶名 .option("password", "123456") //密碼 .load() jdbcDf.show()