spark sql的簡單操作


測試數據
sparkStu.text
zhangxs 24 chenxy
wangYr 21 teacher
wangx 26 teacher
sparksql
{
"name":"zhangxs","age":24,"job":"chengxy",
"name":"li","age":21,"job":"teacher",
"name":"tao","age":14,"job":"student"
}
 
object CreateDataFream {
//創建student對象
case class Student(name:String,age:BigInt,job:String);

def main(args: Array[String]){
//初始化sparkSession 這個sparkSession要用val關鍵字修飾
val spark = SparkSession
.builder()
.appName("Spark SQL Example")
.master("spark://服務器ip:7077")
.getOrCreate();
// runDataSetCreate(spark);
// runSarkOnFile(spark);
// applySchema(spark);
//loadParquet(spark);
//jsonFile(spark);
//銷毀sparkSession
spark.stop();
}

}
 
//對指定的列進行查詢
private def test1(spark :SparkSession){
//因為要使用變量,$符號,所以導入這個包
import spark.implicits._
//從hdfs上讀取json數據文件並創建dataFream
var dataFreamS= spark.read.json("hdfs://服務器ip:8020/tmp/dataTest/sparksql");
//顯示dataFream所有數據
dataFreamS.show();
//打印dataFrame結構
dataFreamS.printSchema();
//顯示指定列的數據
dataFreamS.select("name").show()
//查詢指定的列,並修改數據
dataFreamS.select($"name", $"age"+1).show();
//查詢年齡大於10的人
dataFreamS.select($"age" > 10).show();
//查看每個年齡段的人數
dataFreamS.groupBy("age").count();
//創建臨時視圖,如果這個視圖已經存在就覆蓋掉
dataFreamS.createOrReplaceTempView("zhangxsView");
}

 

 
//創建dataFrame並運行 
private def runDataSetCreate(spark:SparkSession){
import spark.implicits._
//創建DataSets對象 類型是Student
val dataStu = Seq(Student("Andy", 32,"baiLing")).toDS();
//顯示數據集信息
dataStu.show();
//創建數據的dataSet
var dataArr=Seq(1,2,3).toDS();
//顯示數據集的信息
dataArr.show();
//對屬性進行簡單操作
print(dataArr.map (_ +1 ).collect());
//dataFrame能夠被轉換成自定義對象類型的dataSet,
val dfStu=spark.read.json("hdfs://服務器ip:8020/tmp/dataTest/sparksql").as[Student];
dfStu.show();
//jsonFile支持嵌套表,讀入並注冊成表
spark.read.json("hdfs://服務器ip:8020/tmp/dataTest/sparksql").registerTempTable("student");
//根據sql查詢注冊的table
val temsql=spark.sqlContext.sql("select name from student");
//顯示name的value
print(temsql.show())
}

 

//從hdfs上讀取數據文件並轉為student對象進行操作
private def runSarkOnFile(spark:SparkSession){
import spark.implicits._
//讀取數據文件 並生成rdd
var rdd=spark.read.textFile("hdfs://服務器ip:8020/tmp/dataTest/sparkStu.txt");
//對獲取的rdd進行解析,並生成sutdent對象
var sturdd=rdd.map { x => x.split(" ")}.map { z => Student(z(0).toString(),z(1).toInt,z(2).toString())};
//顯示student對象
sturdd.show();
//將sutdent對象注冊成臨時表 student
sturdd.registerTempTable("student");
//查詢臨時表中的數據,並顯示
var sqlDF=spark.sql("select t.name,t.age,t.job from friend t where t.age>14 and t.age<26");
sqlDF.show();
}

 

private def applySchema(spark:SparkSession){
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
//確定schema名稱(列的名稱)
var schemaString="name,age,job";
//解析schemaString,並生成StructType對象數組
var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)})
//從hdfs上讀取數據文件
var stuDS=spark.sparkContext.textFile(path);
//使用Row對象,創建rowRdd
var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2)))
//創建schemaRDD
var rowDF=spark.createDataFrame(sDS, schemaType); // var rowDF=spark.sqlContext.applySchema(sDS, schemaType); 這種方法已經過時
 //打印schemaRDD的結構
rowDF.printSchema();
//注冊Student table
rowDF.createOrReplaceTempView("Student"); // rowDF.registerTempTable("Student"); 這種方法已經過時
//rowDF.collect().foreach {print(_) }
//var resDS=spark.sql("select * from Student where age > 24");
var resDS=spark.sql("select name from Student");
resDS.show();
}
 
//使用parquet文件的方式
private def loadParquet(spark:SparkSession){
import spark.implicits._
//確定schema 列名稱
var schemaString="name,age,job";
//解析schemaString,並生成StructType對象數組
var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)})
//創建rowRdd
var stuDS=spark.sparkContext.textFile(path);
var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2)))
//將schemaRDD保存成parquet文件
var rowDF=spark.sqlContext.applySchema(sDS, schemaType);
//將文件寫到hdfs://服務器ip:8020/tmp/dataTest/
rowDF.write.parquet("hdfs://服務器ip:8020/tmp/dataTest/student.parquet");
-------------------------------------------------------------------
//讀取parquet文件
var redParfile=spark.read.parquet("hdfs://服務器ip:8020/tmp/dataTest/student.parquet");
redParfile.createOrReplaceTempView("redParfilered");
var resultRdd=spark.sql("select * from redParfilered t where t.name='zhangxs'");
//DataFrame.rdd 可以將dataFrame轉為RDD類型
resultRdd.rdd.map { x => "name"+x(0) }.collect().foreach { print(_) }
}
 
/**
* spark可以自動的識別一個json模式並加載成數據集,
* 這種轉換可以使用SparkSession.read.json() 函數
* 這個數據集的來源可以是一個rdd,也可以是一個json文件
*
*/
private def jsonFile(spark:SparkSession){
var jsonRdd=spark.read.json("hdfs://192.168.177.124:8020/tmp/dataTest/sparksql");
jsonRdd.createOrReplaceTempView("student");
var jfRdd= spark.sql("select * from student t where t.age >24");
jfRdd.show();

 

/**
* 使用Json類型的rdd加載json
*
* 如果加:: Nil,返回是一個char類型的rdd,加上則返回的是String類型的rdd
*/
var rdd=spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil);
var rddre=spark.read.json(rdd);
rddre.show();
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM