首先我們要創建SparkSession
val spark = SparkSession.builder()
.appName("test")
.master("local")
.getOrCreate()
import spark.implicits._ //將RDD轉化成為DataFrame並支持SQL操作
然后我們通過SparkSession來創建DataFrame
1.使用toDF函數創建DataFrame
通過導入(importing)spark.implicits, 就可以將本地序列(seq), 數組或者RDD轉為DataFrame。
只要這些數據的內容能指定數據類型即可。
import spark.implicits._
val df = Seq(
(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
).toDF("id", "name", "created_time")

注意:如果直接用toDF()而不指定列名字,那么默認列名為"_1", "_2"
可以通過df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")進行修改列名
2.使用createDataFrame函數創建DataFrame
通過schema + row 來創建
我們可以通俗的理解為schema為表的表頭,row為表的數據記錄
import org.apache.spark.sql.types._
//定義dataframe的結構的schema
val schema = StructType(List(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("create_time", DateType, nullable = true)
))
//定義dataframe內容的rdd
val rdd = sc.parallelize(Seq(
Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
))
//創建dataframe
val df = spark.createDataFrame(rdd, schema)

不過,我們可以把文件結構當做參數來使用,通過rdd自動產生schema和row,不用自己手動生成。
import org.apache.spark.sql.types._
//傳入屬性參數
val schemaString = " id name create_time"
//解析參數變成StructField
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldname, StringType, nullable = true))
//定義dataframe的結構的schema
val schema = StructType(fields)
//定義dataframe內容的rdd
val lines = sc.textFile("file:///people.txt")
val rdd = lines.spilt(_.split(","))
.map(attributes=>ROW(attributes(0),attributes(1).trim) )
//創建dataframe
val df = spark.createDataFrame(rdd, schema)
3.通過反射機制創建DataFrame
首先要定義一個case class,因為只有case class才能被Spark隱式轉化為DataFrame
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._
//創建匹配類
case class Person(id:Int,name:String,age:Long)
//讀取文件生成rdd
val rdd = sc.textFile("file:///")
//通過匹配類把rdd轉化成dataframe
val df = rdd.map(_.split(","))
.map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()
4.通過文件直接創建DataFrame
(1)使用parquet文件read創建
val df = spark.read.parquet("hdfs:/path/to/file")
(2)使用json文件read創建
val df = spark.read.json("examples/src/main/resources/people.json")
(3)使用csv文件load創建
val df = spark.read
.format("com.databricks.spark.csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.load("csv/file/path")
(4)使用Hive表創建
spark.table("test.person") // 庫名.表名 的格式
.registerTempTable("person") // 注冊成臨時表
spark.sql(
"""
| select *
| from person
| limit 10
""".stripMargin).show()
記得,最后我們要調用spark.stop()來關閉SparkSession。
5.保存
(1)通過df.write.format().save("file:///")保存
write.format()支持輸出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式
,save()定義保存的位置
當我們保存成功后可以在保存位置的目錄下看到文件,但是這個文件並不是一個文件而是一個目錄。
里面的內容一般為

不用擔心,這是沒錯的。
我們讀取的時候,並不需要使用文件夾里面的part-xxxx文件,直接讀取目錄即可。
(2)通過df.rdd.saveAsTextFile("file:///")轉化成rdd再保存
我們對於不同格式的文件讀寫來說,我們一般使用兩套對應方式
val df = spark.read.格式("file:///")//讀取文件
df.write.格式("file:///")//保存文件
val df = spark.read.format("").load("file:///")//讀取文件
df.write.save("file:///")//保存文件
具體read和load方法有什么不同,我還不是很清楚,弄明白了回來補充。
6.通過JDBC創建DataFrame
我們在啟動Spark-shell或者提交任務的時候需要添加相應的jar包
spark-shell(spark-submit)
--jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
--driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar
val jdbcDf = spark.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驅動
.option("url", "jdbc:mysql://ip:3306") //數據庫地址
.option("dbtable", "db.user_test") //表名:數據庫名.表名
.option("user", "test") //用戶名
.option("password", "123456") //密碼
.load()
jdbcDf.show()
