轉載請標明出處http://www.cnblogs.com/haozhengfei/p/22bba3b1ef90cbfaf073eb44349c0757.html
Spark_總結四
1.Spark SQL
Spark SQL 和 Hive on Spark 兩者的區別?
spark on hive:hive只是作為元數據存儲的角色,解析,優化,執行都是spark做的
hive on spark:
hive既作為存儲的角色,又作為計算角色的一部分,hive將sql解析Spark任務,
底層是Spark引擎
(
hive2.0以后推薦使用Spark引擎,轉化為Spark任務,hvie2.0以前都是轉化為MR任務)
Spark SQL 轉化的過程(底層架構)

【SQL/HQL-->解析器-->分析器-->優化器-->CostModel消耗模型(選出消耗最低的,就是效率最高的),最終將傳入的SQL轉換為RDD的計算】
須知:
若想使用SparkSQL必須創建SQLContext 必須是傳入SparkContext 不能是SparkConf

1.DataFrame與RDD的區別? || 什么是DataFrame?
區別:
Spark core是基於RDD的編程,Spark SQL是基於DataFrame的編程
,DataFrame的底層就是封裝的
RDD,只不過DataFrame底層RDD的泛型是ROW(DataFrame <==> RDD<ROW>),另外,
DataFrame中有對列的描述,但是RDD沒有對列的描述。
What is DataFrame:
DataFrame
與 RDD 類似,DataFrame 是一個分布式數據容器,更像傳統數據庫的二維表格,除了數據以外,還掌握數據的結構信息(比如對列的描述), 即 schema。同時,與 Hive 類似,DataFrame 也支持嵌套數據類型(struct、 array 和 map)。 從 API 易用性的角度上 看,DataFrameAPI 提供的是一套高層的關系操作,比函數式的 RDDAPI 要更加友好,門檻更低。
3.創建DataFrame的來源和方式 || 如何對DataFrame中封裝的數據進行操作?
3.1創建DataFrame的來源和方式

3.2如何對DataFrame中封裝的數據進行操作?
當我們的DataFrame構建好之后,里面封裝了我們的數據,需要對數據進行操作即對DataFrame進行操作,有兩種方式
3.2.1
通過方法
sqlContext.read()
返回DataFrameReader對象
sqlContext.read().json("student.json")
讀取一個json文件(這個json文件中的內容不能是嵌套的)讀進來變成DataFrame,
df.select("age").show(),如果沒有show,這個程序就不會執行,這個show就類似與Spark中Action類型的算子,觸發執行
示例代碼:

1 package com.hzf.spark.exercise; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaSparkContext; 5 import org.apache.spark.sql.DataFrame; 6 import org.apache.spark.sql.SQLContext; 7 8 public class TestSparkSQL { 9 public static void main(String[] args) { 10 SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local"); 11 12 JavaSparkContext sc = new JavaSparkContext(conf); 13 SQLContext sqlContext = new SQLContext(sc); 14 15 DataFrame df = sqlContext.read().json("people.json"); 16 17 18 /* 19 * 操作DataFrame的第一種方式 20 * */ 21 //類似 SQL的select from table; 22 df.show(); 23 //desc table 24 df.printSchema(); 25 26 //select age from table; 27 df.select("age").show(); 28 //select name from table; 29 df.select("name").show(); 30 //select name,age+10 from table; 31 df.select(df.col("name"),df.col("age").plus(10)).show(); 32 //select * from table where age > 20 33 df.filter(df.col("age").gt(20)).show(); 34 } 35 }
result:

3.2.2
通過注冊臨時表,傳入SQL語句(推薦使用)
示例代碼:

1 package com.hzf.spark.exercise; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaSparkContext; 5 import org.apache.spark.sql.DataFrame; 6 import org.apache.spark.sql.SQLContext; 7 8 public class TestSparkSQL01 { 9 public static void main(String[] args) { 10 SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local"); 11 12 JavaSparkContext sc = new JavaSparkContext(conf); 13 SQLContext sqlContext = new SQLContext(sc); 14 15 DataFrame df = sqlContext.read().json("people.json"); 16 17 //將DataFrame中封裝的數據注冊為一張臨時表,對臨時表進行sql操作 18 df.registerTempTable("people"); 19 DataFrame sql = sqlContext.sql("SELECT * FROM people WHERE age IS NOT NULL"); 20 sql.show(); 21 } 22 }
result:

3.3創建DataFrame的幾種方式,來源(json,jsonRDD,parquet,非json格式,mysql)
<1>讀取Json格式文件-->DataFrame:
Json 文件中不能有嵌套的格式
加載json格式文件-->DataFrame有兩種方式:
方式一:
DataFrame df = sqlContext.read().format("json").load("people.json");
方式二:
DataFrame df = sqlContext.read().json("people.json");
數據集:

示例代碼:

1 package com.bjsxt.java.spark.sql.json; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.SparkContext; 8 import org.apache.spark.api.java.JavaPairRDD; 9 import org.apache.spark.api.java.JavaRDD; 10 import org.apache.spark.api.java.JavaSparkContext; 11 import org.apache.spark.api.java.function.Function; 12 import org.apache.spark.api.java.function.PairFunction; 13 import org.apache.spark.sql.DataFrame; 14 import org.apache.spark.sql.Row; 15 import org.apache.spark.sql.RowFactory; 16 import org.apache.spark.sql.SQLContext; 17 import org.apache.spark.sql.types.DataTypes; 18 import org.apache.spark.sql.types.StructField; 19 import org.apache.spark.sql.types.StructType; 20 21 import scala.Tuple2; 22 23 /** 24 * JSON數據源 25 * @author Administrator 26 * 27 */ 28 public class JSONDataSource { 29 30 public static void main(String[] args) { 31 SparkConf conf = new SparkConf() 32 .setAppName("JSONDataSource") 33 // .set("spark.default.parallelism", "100") 34 .setMaster("local"); 35 JavaSparkContext sc = new JavaSparkContext(conf); 36 SQLContext sqlContext = new SQLContext(sc); 37 38 DataFrame studentScoresDF = sqlContext.read().json("student.json"); 39 40 studentScoresDF.registerTempTable("student_scores"); 41 DataFrame goodStudentScoresDF = sqlContext.sql( 42 "select name,count(score) from student_scores where score>=80 group by name"); 43 44 List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(new Function<Row, String>() { 45 private static final long serialVersionUID = 1L; 46 47 @Override 48 public String call(Row row) throws Exception { 49 return row.getString(0); 50 } 51 52 }).collect(); 53 54 for(String str: goodStudentNames){ 55 System.out.println(str); 56 } 57 } 58 }
result:

<2>jsonRDD
-->DataFrame

<3>讀取Parquet格式文件
-->DataFrame
:
自動推測分區,合並 Schema。
經驗:將Spark中的文本轉換為Parquet以提升性能
parquet是一個基於列的存儲格式,列式存儲布局可以加速查詢,因為它只檢查所有需要的列並對它們的值執行計算,因此只讀取一個數據文件或表的小部分數據。Parquet 還支持靈活的壓縮選項,因此可以顯著減少磁盤上的存儲。
如果在 HDFS 上擁有基於文本的數據文件或表,而且正在使用 Spark SQL 對它們執行查詢,那么強烈推薦將文本數據文件轉換為 Parquet 數據文件,以實現性能和存儲收益。當然,轉換需要時間,但查詢性能的提升在某些情況下可能達到 30 倍或更高,存儲的節省可高達 75%!
parquet的壓縮比高,將一個普通的文本轉化為parquet格式,如何去轉?
val lineRDD = sc.textFile()
DF.save(parquet) //
將RDD轉化為DF
parquet操作示例
是否指定format--若存儲時,指定format為json格式,那么則生成json格式文件,否則不指定format,默認文件以parquet形式進行存儲
測試一:指定format為json格式,存儲在本地
測試數據:
top.txt

測試代碼
測試結果


測試二:不指定format,那么文件默認以parquet形式進行存儲,存儲在本地
測試數據:
people.json

測試代碼

測試結果


測試三:讀取本地parquet存儲格式的文件
測試代碼

測試結果

測試四:讀取hdfs上parquet形式的文件

測試代碼

測試結果

<4> RDD(非json格式變成DataFrame)
讀取txt 文件
-->DataFrame
:
從 txt 文件讀取,然后轉為 RDD,最后轉為 DataFrame
RDD 轉為 DataFrame 有兩種方式
(1)反射機制,
注意點:自定義的類一定要是 public,並且要實現序列化接口 Serializable,
取數據的時候,
在 JavaAPI 中會有順序問題(因為 DataFrame 轉為 RDD<Row> 的時候,會進行一次字典排序改變 Row 的位置,而Scala 的 API 則沒有這個問題)
(2)動態創建 Schema,先將 RDD 中的每一行類型變 為 RDD<Row> 類型,然后創建 DataFrame 的元數據-->構建 StructType,用於最后 DataFrame 元數據的描述,基於現有的 StructType 以及 RDD<Row> 來構造 DataFrame。(如果列的信息比較長可以存到數據庫
里)
<4.1>反射機制
數據

示例代碼:
自定義類


1 package com.bjsxt.java.spark.sql.createdf; 2 3 import java.util.List; 4 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaRDD; 7 import org.apache.spark.api.java.JavaSparkContext; 8 import org.apache.spark.api.java.function.Function; 9 import org.apache.spark.sql.DataFrame; 10 import org.apache.spark.sql.Row; 11 import org.apache.spark.sql.SQLContext; 12 13 /** 14 * 使用反射的方式將RDD轉換成為DataFrame 15 * 1、自定義的類必須是public 16 * 2、自定義的類必須是可序列化的 17 * 3、RDD轉成DataFrame的時候,他會根據自定義類中的字段名進行排序。 18 * @author zfg 19 * 20 */ 21 22 public class RDD2DataFrameByReflection { 23 24 public static void main(String[] args) { 25 SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection"); 26 JavaSparkContext sc = new JavaSparkContext(conf); 27 SQLContext sqlcontext = new SQLContext(sc); 28 29 JavaRDD<String> lines = sc.textFile("Peoples.txt"); 30 JavaRDD<Person> personsRdd = lines.map(new Function<String, Person>() { 31 32 private static final long serialVersionUID = 1L; 33 34 @Override 35 public Person call(String line) throws Exception { 36 String[] split = line.split(","); 37 Person p = new Person(); 38 p.setId(Integer.valueOf(split[0].trim())); 39 p.setName(split[1]); 40 p.setAge(Integer.valueOf(split[2].trim())); 41 return p; 42 } 43 }); 44 45 //傳入進去Person.class的時候,sqlContext是通過反射的方式創建DataFrame 46 //在底層通過反射的方式或得Person的所有field,結合RDD本身,就生成了DataFrame 47 DataFrame df = sqlcontext.createDataFrame(personsRdd, Person.class); 48 49 //命名table的名字為person 50 df.registerTempTable("personTable"); 51 52 DataFrame resultDataFrame = sqlcontext.sql("select * from personTable where age > 7"); 53 resultDataFrame.show(); 54 55 //將df轉成rdd 56 JavaRDD<Row> resultRDD = resultDataFrame.javaRDD(); 57 JavaRDD<Person> result = resultRDD.map(new Function<Row, Person>() { 58 59 private static final long serialVersionUID = 1L; 60 61 @Override 62 public Person call(Row row) throws Exception { 63 Person p = new Person(); 64 p.setAge(row.getInt(0)); 65 p.setId(row.getInt(1)); 66 p.setName(row.getString(2)); 67 return p; 68 } 69 }); 70 71 List<Person> personList = result.collect(); 72 73 for (Person person : personList) { 74 System.out.println(person.toString()); 75 } 76 } 77 }
result:


<4.2>動態創建Schema方式
數據

示例代碼:

1 package com.bjsxt.java.spark.sql.createdf; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 import org.apache.spark.api.java.function.Function; 10 import org.apache.spark.sql.DataFrame; 11 import org.apache.spark.sql.Row; 12 import org.apache.spark.sql.RowFactory; 13 import org.apache.spark.sql.SQLContext; 14 import org.apache.spark.sql.types.DataTypes; 15 import org.apache.spark.sql.types.StructField; 16 import org.apache.spark.sql.types.StructType; 17 18 19 public class RDD2DataFrameByProgrammatically { 20 public static void main(String[] args) { 21 SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection"); 22 JavaSparkContext sc = new JavaSparkContext(conf); 23 SQLContext sqlcontext = new SQLContext(sc); 24 /** 25 * 在RDD的基礎上創建類型為Row的RDD 26 */ 27 JavaRDD<String> lines = sc.textFile("Peoples.txt"); 28 JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() { 29 30 private static final long serialVersionUID = 1L; 31 32 @Override 33 public Row call(String line) throws Exception { 34 String[] split = line.split(","); 35 return RowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2])); 36 } 37 }); 38 39 /** 40 * 動態構造DataFrame的元數據,一般而言,有多少列以及每列的具體類型可能來自於Json,也可能來自於DB 41 */ 42 ArrayList<StructField> structFields = new ArrayList<StructField>(); 43 structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true)); 44 structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); 45 structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); 46 //構建StructType,用於最后DataFrame元數據的描述 47 StructType schema = DataTypes.createStructType(structFields); 48 49 /** 50 * 基於已有的MetaData以及RDD<Row> 來構造DataFrame 51 */ 52 DataFrame df = sqlcontext.createDataFrame(rowRDD, schema); 53 54 /** 55 *注冊成為臨時表以供后續的SQL操作查詢 56 */ 57 df.registerTempTable("persons"); 58 59 /** 60 * 進行數據的多維度分析 61 */ 62 DataFrame result = sqlcontext.sql("select * from persons where age > 7"); 63 result.show(); 64 65 /** 66 * 對結果進行處理,包括由DataFrame轉換成為RDD<Row> 67 */ 68 List<Row> listRow = result.javaRDD().collect(); 69 for (Row row : listRow) { 70 System.out.println(row); 71 } 72 } 73 }
result:


<5> 讀取MySql 中表里的數據
-->DataFrame
Spark Build-in內置支持的
json jdbc mysql,hive...如果數據庫支持jdbc連接,Spark 就可以基於這個數據庫盡行數據的處理
示例代碼:

1 package com.bjsxt.java.spark.sql.jdbc; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaSparkContext; 5 import org.apache.spark.sql.DataFrame; 6 import org.apache.spark.sql.DataFrameReader; 7 import org.apache.spark.sql.SQLContext; 8 9 /** 10 * JDBC數據源 11 * 12 * @author Administrator 13 * 14 */ 15 public class JDBCDataSource { 16 17 public static void main(String[] args) { 18 SparkConf conf = new SparkConf().setAppName("JDBCDataSource").setMaster("local"); 19 JavaSparkContext sc = new JavaSparkContext(conf); 20 SQLContext sqlContext = new SQLContext(sc); 21 22 // 方法1、分別將mysql中兩張表的數據加載為DataFrame 23 /* 24 * Map<String, String> options = new HashMap<String, String>(); 25 * options.put("url", "jdbc:mysql://hadoop1:3306/testdb"); 26 * options.put("driver", "com.mysql.jdbc.Driver"); 27 * options.put("user","spark"); 28 * options.put("password", "spark2016"); 29 30 * options.put("dbtable", "student_info"); 31 * DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load(); 32 * options.put("dbtable", "student_score"); 33 * DataFrame studentScoresDF = sqlContext.read().format("jdbc") .options(options).load(); 34 */ 35 // 方法2、分別將mysql中兩張表的數據加載為DataFrame 36 DataFrameReader reader = sqlContext.read().format("jdbc"); 37 reader.option("url", "jdbc:mysql://node4:3306/testdb"); 38 reader.option("driver", "com.mysql.jdbc.Driver"); 39 reader.option("user", "root"); 40 reader.option("password", "123"); 41 42 reader.option("dbtable", "student_info"); 43 DataFrame studentInfosDF = reader.load(); 44 reader.option("dbtable", "student_score"); 45 DataFrame studentScoresDF = reader.load(); 46 47 // 將兩個DataFrame轉換為JavaPairRDD,執行join操作 48 studentInfosDF.registerTempTable("studentInfos"); 49 studentScoresDF.registerTempTable("studentScores"); 50 51 String sql = "SELECT studentInfos.name,age,score " 52 + " FROM studentInfos JOIN studentScores" 53 + " ON (studentScores.name = studentInfos.name)" 54 + " WHERE studentScores.score > 80"; 55 56 DataFrame sql2 = sqlContext.sql(sql); 57 sql2.show(); 58 } 59 }
result:

4. 如何將DataFrame中的值寫入到外部存儲中去?
存儲模式(SaveMode.Overwrite || Ignore || Append
||
ErrorifExit)
<1> 讀取本地json格式文件,並以json形式寫入到hdfs(不指定format,默認是parquet)
測試代碼

測試結果


補充:
1.什么是下推過濾器?
在join之前過濾,而不是join之后進行過濾
2.select * from table 在SparkSQL和Hive on MR中的區別?
SparkSQL
中 select * from table 在spark中是要具體執行spark任務的,而在
Hive on MR
中 select * from table直接讀取數據,所以
SparkSQL
中執行select * from不一定比
Hive on MR
中的快
3.如何將一個DataFrame變成一個RDD?
JavaRDD<ROW> rdd = resultFrame.javaRDD()
5.整合Spark和Hive?
6.1Spark 目錄下面的
conf 下放一個配置文件
hive-site.xml 文件。
6.2在 hive 的服務端
啟動 MetaStore Server
【因為 HiveContext 會用到 metastore 服務。(
在 Spark-shell 里面使用 HiveContext 的時候,要記住導入 HiveContext)】(hive --service metastore)
6.3
啟動hdfs【因為hive的數據是存在hdfs上的
】和
Spark集群(start-all.sh spark-start-all.sh)
6.4進入
Spark shell,測試Spark 和 Hive是否整合成功
scala>import org.apache.spark.sql.hive.HiveContext
scala>val hiveContext =newHiveContext(sc)
scala>hiveContext.sql("show tables").show
6.5整合測試(詳見Spark_some配置),注意!將代碼提交到Spark集群上運行時,需要將hdfs-site.xml拷貝到SPARK_HOME/conf下
6.SqlContext和HiveContext的關系?
SQLcontext 是
HiveContext 的父類
在
集群中運行的時候用
HiveContext,可以
基於 Hive 來操作 Hive 表,對源數據進行CRUD的操作。