Spark_總結四


轉載請標明出處http://www.cnblogs.com/haozhengfei/p/22bba3b1ef90cbfaf073eb44349c0757.html 


Spark_總結四

1.Spark SQL  

    Spark SQL 和 Hive on Spark 兩者的區別?
        spark on hive:hive只是作為元數據存儲的角色,解析,優化,執行都是spark做的     
           hive on spark:   hive既作為存儲的角色,又作為計算角色的一部分,hive將sql解析Spark任務, 底層是Spark引擎 hive2.0以后推薦使用Spark引擎,轉化為Spark任務,hvie2.0以前都是轉化為MR任務)
            
     Spark SQL 轉化的過程(底層架構)
 
【SQL/HQL-->解析器-->分析器-->優化器-->CostModel消耗模型(選出消耗最低的,就是效率最高的),最終將傳入的SQL轉換為RDD的計算】
 
須知:
         若想使用SparkSQL必須創建SQLContext 必須是傳入SparkContext 不能是SparkConf
 
1.DataFrame與RDD的區別?   ||   什么是DataFrame?

區別:
        Spark core是基於RDD的編程,Spark SQL是基於DataFrame的編程 ,DataFrame的底層就是封裝的 RDD,只不過DataFrame底層RDD的泛型是ROW(DataFrame <==> RDD<ROW>),另外, DataFrame中有對列的描述,但是RDD沒有對列的描述。        
What is DataFrame:
        DataFrame   RDD 類似,DataFrame 是一個分布式數據容器,更像傳統數據庫的二維表格,除了數據以外,還掌握數據的結構信息(比如對列的描述), 即 schema。同時,與 Hive 類似,DataFrame 也支持嵌套數據類型(struct、 array 和 map)。 從 API 易用性的角度上 看,DataFrameAPI 提供的是一套高層的關系操作函數式的 RDDAPI 更加友好,門檻更低。 

3.創建DataFrame的來源和方   ||   如何對DataFrame中封裝的數據進行操作?

3.1創建DataFrame的來源和方
 

3.2如何對DataFrame中封裝的數據進行操作?

   當我們的DataFrame構建好之后,里面封裝了我們的數據,需要對數據進行操作即對DataFrame進行操作,有兩種方式
3.2.1    通過方法
           sqlContext.read()      返回DataFrameReader對象
          sqlContext.read().json("student.json")     讀取一個json文件(這個json文件中的內容不能是嵌套的)讀進來變成DataFrame,
         df.select("age").show(),如果沒有show,這個程序就不會執行,這個show就類似與Spark中Action類型的算子,觸發執行
 
示例代碼:
 1 package com.hzf.spark.exercise;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.JavaSparkContext;
 5 import org.apache.spark.sql.DataFrame;
 6 import org.apache.spark.sql.SQLContext;
 7 
 8 public class TestSparkSQL {
 9     public static void main(String[] args) {
10         SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local");
11         
12         JavaSparkContext sc = new JavaSparkContext(conf);
13         SQLContext sqlContext = new SQLContext(sc);
14         
15         DataFrame df = sqlContext.read().json("people.json");
16         
17         
18         /*
19          * 操作DataFrame的第一種方式
20          * */
21         //類似 SQL的select from table;
22         df.show();
23         //desc table
24         df.printSchema();
25         
26         //select age from table;
27         df.select("age").show();
28         //select name from table;
29         df.select("name").show();
30         //select name,age+10 from table;
31         df.select(df.col("name"),df.col("age").plus(10)).show();
32         //select * from table where age > 20
33         df.filter(df.col("age").gt(20)).show();
34     }
35 }
View Code
 
result:
 
3.2.2    通過注冊臨時表,傳入SQL語句(推薦使用)
 
示例代碼:
 1 package com.hzf.spark.exercise;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.JavaSparkContext;
 5 import org.apache.spark.sql.DataFrame;
 6 import org.apache.spark.sql.SQLContext;
 7 
 8 public class TestSparkSQL01 {
 9     public static void main(String[] args) {
10         SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local");
11         
12         JavaSparkContext sc = new JavaSparkContext(conf);
13         SQLContext sqlContext = new SQLContext(sc);
14         
15         DataFrame df = sqlContext.read().json("people.json");
16         
17         //將DataFrame中封裝的數據注冊為一張臨時表,對臨時表進行sql操作
18         df.registerTempTable("people");
19         DataFrame sql = sqlContext.sql("SELECT * FROM people WHERE age IS NOT NULL");
20         sql.show(); 
21     }
22 }
View Code
 
result:
 
3.3創建DataFrame的幾種方式,來源(json,jsonRDD,parquet,非json格式,mysql)
 
<1>讀取Json格式文件-->DataFrame: Json 文件中不能有嵌套的格式
      加載json格式文件-->DataFrame有兩種方式
            方式一: DataFrame df = sqlContext.read().format("json").load("people.json");
           方式二: DataFrame df = sqlContext.read().json("people.json");
 
數據集:
 
示例代碼:
 1 package com.bjsxt.java.spark.sql.json;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.SparkContext;
 8 import org.apache.spark.api.java.JavaPairRDD;
 9 import org.apache.spark.api.java.JavaRDD;
10 import org.apache.spark.api.java.JavaSparkContext;
11 import org.apache.spark.api.java.function.Function;
12 import org.apache.spark.api.java.function.PairFunction;
13 import org.apache.spark.sql.DataFrame;
14 import org.apache.spark.sql.Row;
15 import org.apache.spark.sql.RowFactory;
16 import org.apache.spark.sql.SQLContext;
17 import org.apache.spark.sql.types.DataTypes;
18 import org.apache.spark.sql.types.StructField;
19 import org.apache.spark.sql.types.StructType;
20 
21 import scala.Tuple2;
22 
23 /**
24  * JSON數據源
25  * @author Administrator
26  *
27  */
28 public class JSONDataSource {
29 
30     public static void main(String[] args) {
31         SparkConf conf = new SparkConf()
32                 .setAppName("JSONDataSource")
33 //                .set("spark.default.parallelism", "100")
34                 .setMaster("local");  
35         JavaSparkContext sc = new JavaSparkContext(conf);
36         SQLContext sqlContext = new SQLContext(sc);
37         
38         DataFrame studentScoresDF = sqlContext.read().json("student.json");  
39         
40         studentScoresDF.registerTempTable("student_scores");
41         DataFrame goodStudentScoresDF = sqlContext.sql(
42                 "select name,count(score) from student_scores where score>=80 group by name");
43         
44         List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(new Function<Row, String>() {
45                     private static final long serialVersionUID = 1L;
46                     
47                     @Override
48                     public String call(Row row) throws Exception {
49                         return row.getString(0);
50                     }
51                     
52         }).collect();
53         
54         for(String str: goodStudentNames){
55             System.out.println(str);
56         }
57     }
58 }
View Code
 
result:
 
<2>jsonRDD -->DataFrame
 
<3>讀取Parquet格式文件 -->DataFrame 自動推測分區,合並 Schema。
經驗:將Spark中的文本轉換為Parquet以提升性能

    parquet是一個基於列的存儲格式,列式存儲布局可以加速查詢,因為它只檢查所有需要的列並對它們的值執行計算,因此只讀取一個數據文件或表的小部分數據。Parquet 還支持靈活的壓縮選項,因此可以顯著減少磁盤上的存儲

   如果在 HDFS 上擁有基於文本的數據文件或表,而且正在使用 Spark SQL 對它們執行查詢,那么強烈推薦將文本數據文件轉換為 Parquet 數據文件,以實現性能和存儲收益。當然,轉換需要時間,但查詢性能的提升在某些情況下可能達到 30 倍或更高,存儲的節省可高達 75%!

parquet的壓縮比高,將一個普通的文本轉化為parquet格式,如何去轉?
       val lineRDD = sc.textFile()
       DF.save(parquet) // 將RDD轉化為DF
parquet操作示例
    是否指定format--若存儲時,指定format為json格式,那么則生成json格式文件,否則不指定format,默認文件以parquet形式進行存儲 
測試一:指定format為json格式,存儲在本地
測試數據:     top.txt
測試代碼

測試結果
 
 
測試二:不指定format,那么文件默認以parquet形式進行存儲,存儲在本地
 
測試數據:    people.json
測試代碼
測試結果
 
測試三:讀取本地parquet存儲格式的文件
測試代碼

測試結果
 
測試四:讀取hdfs上parquet形式的文件
測試代碼
測試結果
 
<4> RDD(非json格式變成DataFrame)
讀取txt 文件 -->DataFrame 從 txt 文件讀取,然后轉為 RDD,最后轉為 DataFrame
                                                    RDD 轉為 DataFrame 有兩種方式
                                (1)反射機制
                                        注意點:自定義的類一定要是 public,並且要實現序列化接口 Serializable
                                                           取數據的時候, 在 JavaAPI 中會有順序問題(因為 DataFrame 轉為 RDD<Row> 的時候,會進行一次字典排序改變 Row 的位置,而Scala 的 API 則沒有這個問題)
                                (2)動態創建 Schema,先將 RDD 中的每一行類型變 為 RDD<Row> 類型,然后創建 DataFrame 的元數據-->構建 StructType,用於最后 DataFrame 元數據的描述,基於現有的 StructType 以及 RDD<Row> 來構造 DataFrame。(如果列的信息比較長可以存到數據庫 里) 
<4.1>反射機制
數據
示例代碼:
自定義類
 1 package com.bjsxt.java.spark.sql.createdf;
 2 
 3 import java.util.List;
 4 
 5 import org.apache.spark.SparkConf;
 6 import org.apache.spark.api.java.JavaRDD;
 7 import org.apache.spark.api.java.JavaSparkContext;
 8 import org.apache.spark.api.java.function.Function;
 9 import org.apache.spark.sql.DataFrame;
10 import org.apache.spark.sql.Row;
11 import org.apache.spark.sql.SQLContext;
12 
13 /**
14  * 使用反射的方式將RDD轉換成為DataFrame
15  * 1、自定義的類必須是public
16  * 2、自定義的類必須是可序列化的
17  * 3、RDD轉成DataFrame的時候,他會根據自定義類中的字段名進行排序。
18  * @author zfg
19  *
20  */
21 
22 public class RDD2DataFrameByReflection {
23 
24     public static void main(String[] args) {
25         SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
26         JavaSparkContext sc = new JavaSparkContext(conf);
27         SQLContext sqlcontext = new SQLContext(sc);
28         
29         JavaRDD<String> lines = sc.textFile("Peoples.txt");
30         JavaRDD<Person> personsRdd = lines.map(new Function<String, Person>() {
31             
32             private static final long serialVersionUID = 1L;
33 
34             @Override
35             public Person call(String line) throws Exception {
36                 String[] split = line.split(",");
37                 Person p = new Person();
38                 p.setId(Integer.valueOf(split[0].trim()));
39                 p.setName(split[1]);
40                 p.setAge(Integer.valueOf(split[2].trim()));
41                 return p;
42             }
43         });
44         
45         //傳入進去Person.class的時候,sqlContext是通過反射的方式創建DataFrame
46         //在底層通過反射的方式或得Person的所有field,結合RDD本身,就生成了DataFrame
47         DataFrame df = sqlcontext.createDataFrame(personsRdd, Person.class);
48      
49         //命名table的名字為person
50         df.registerTempTable("personTable");
51         
52         DataFrame resultDataFrame = sqlcontext.sql("select * from personTable where age > 7");
53         resultDataFrame.show();
54         
55          //將df轉成rdd
56         JavaRDD<Row> resultRDD = resultDataFrame.javaRDD();
57         JavaRDD<Person> result = resultRDD.map(new Function<Row, Person>() {
58             
59             private static final long serialVersionUID = 1L;
60 
61             @Override
62             public Person call(Row row) throws Exception {
63                  Person p = new Person();
64                  p.setAge(row.getInt(0));
65                  p.setId(row.getInt(1));
66                  p.setName(row.getString(2));
67                 return p;
68             }
69         });
70         
71          List<Person> personList = result.collect();
72          
73          for (Person person : personList) {
74             System.out.println(person.toString());
75         } 
76     }
77 }
View Code
 
 
result:

<4.2>動態創建Schema方式
數據
 
示例代碼:
 1 package com.bjsxt.java.spark.sql.createdf;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.JavaRDD;
 8 import org.apache.spark.api.java.JavaSparkContext;
 9 import org.apache.spark.api.java.function.Function;
10 import org.apache.spark.sql.DataFrame;
11 import org.apache.spark.sql.Row;
12 import org.apache.spark.sql.RowFactory;
13 import org.apache.spark.sql.SQLContext;
14 import org.apache.spark.sql.types.DataTypes;
15 import org.apache.spark.sql.types.StructField;
16 import org.apache.spark.sql.types.StructType;
17 
18 
19 public class RDD2DataFrameByProgrammatically {
20     public static void main(String[] args) {
21         SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
22         JavaSparkContext sc = new JavaSparkContext(conf);
23         SQLContext sqlcontext = new SQLContext(sc);
24         /**
25          * 在RDD的基礎上創建類型為Row的RDD
26          */
27         JavaRDD<String> lines = sc.textFile("Peoples.txt");
28         JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() {
29             
30             private static final long serialVersionUID = 1L;
31 
32             @Override
33             public Row call(String line) throws Exception {
34                 String[] split = line.split(",");
35                 return RowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2]));
36             }
37         });
38         
39         /**
40          * 動態構造DataFrame的元數據,一般而言,有多少列以及每列的具體類型可能來自於Json,也可能來自於DB
41          */
42         ArrayList<StructField> structFields = new ArrayList<StructField>();
43         structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
44         structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
45         structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
46         //構建StructType,用於最后DataFrame元數據的描述
47         StructType schema = DataTypes.createStructType(structFields);
48         
49         /**
50          * 基於已有的MetaData以及RDD<Row> 來構造DataFrame
51          */
52         DataFrame df = sqlcontext.createDataFrame(rowRDD, schema);
53         
54         /**
55          *注冊成為臨時表以供后續的SQL操作查詢
56          */
57         df.registerTempTable("persons");
58         
59         /**
60          * 進行數據的多維度分析
61          */
62         DataFrame result = sqlcontext.sql("select * from persons where age > 7");
63         result.show();
64 
65         /**
66          * 對結果進行處理,包括由DataFrame轉換成為RDD<Row>
67          */
68          List<Row> listRow = result.javaRDD().collect();
69          for (Row row : listRow) {
70             System.out.println(row);
71         }
72     }
73 }
View Code
 
result:

<5> 讀取MySql 中表里的數據 -->DataFrame
        Spark Build-in內置支持的 json jdbc mysql,hive...如果數據庫支持jdbc連接,Spark 就可以基於這個數據庫盡行數據的處理
 
示例代碼:
 1 package com.bjsxt.java.spark.sql.jdbc;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.JavaSparkContext;
 5 import org.apache.spark.sql.DataFrame;
 6 import org.apache.spark.sql.DataFrameReader;
 7 import org.apache.spark.sql.SQLContext;
 8 
 9 /**
10  * JDBC數據源
11  * 
12  * @author Administrator
13  *
14  */
15 public class JDBCDataSource {
16 
17     public static void main(String[] args) {
18         SparkConf conf = new SparkConf().setAppName("JDBCDataSource").setMaster("local");
19         JavaSparkContext sc = new JavaSparkContext(conf);
20         SQLContext sqlContext = new SQLContext(sc);
21         
22         // 方法1、分別將mysql中兩張表的數據加載為DataFrame
23         /*
24          * Map<String, String> options = new HashMap<String, String>();
25          * options.put("url", "jdbc:mysql://hadoop1:3306/testdb");
26          * options.put("driver", "com.mysql.jdbc.Driver"); 
27          * options.put("user","spark");
28          * options.put("password", "spark2016");
29          
30          * options.put("dbtable", "student_info"); 
31          * DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();
32          * options.put("dbtable", "student_score"); 
33          * DataFrame studentScoresDF = sqlContext.read().format("jdbc") .options(options).load();
34          */
35         // 方法2、分別將mysql中兩張表的數據加載為DataFrame
36         DataFrameReader reader = sqlContext.read().format("jdbc");
37         reader.option("url", "jdbc:mysql://node4:3306/testdb");
38         reader.option("driver", "com.mysql.jdbc.Driver");
39         reader.option("user", "root");
40         reader.option("password", "123");
41         
42         reader.option("dbtable", "student_info");
43         DataFrame studentInfosDF = reader.load();
44         reader.option("dbtable", "student_score");
45         DataFrame studentScoresDF = reader.load();
46         
47         // 將兩個DataFrame轉換為JavaPairRDD,執行join操作
48         studentInfosDF.registerTempTable("studentInfos");
49         studentScoresDF.registerTempTable("studentScores");
50         
51         String sql = "SELECT studentInfos.name,age,score "
52                 + "        FROM studentInfos JOIN studentScores"
53                 + "         ON (studentScores.name = studentInfos.name)"
54                 + "     WHERE studentScores.score > 80";
55         
56         DataFrame sql2 = sqlContext.sql(sql);
57         sql2.show();
58     }
59 }
View Code

 

result:

4. 如何將DataFrame中的值寫入到外部存儲中去?

存儲模式SaveMode.Overwrite || Ignore || Append    ||    ErrorifExit)
<1> 讀取本地json格式文件,並以json形式寫入到hdfs(不指定format,默認是parquet)
測試代碼

測試結果
 
補充:
1.什么是下推過濾器?
       在join之前過濾,而不是join之后進行過濾
 
2.select * from table 在SparkSQL和Hive on MR中的區別?
        SparkSQL  中 select * from table 在spark中是要具體執行spark任務的,而在   Hive on MR   中 select * from table直接讀取數據,所以 SparkSQL  中執行select * from不一定比 Hive on MR 中的快
 
3.如何將一個DataFrame變成一個RDD?
    JavaRDD<ROW> rdd = resultFrame.javaRDD()
 

5.整合Spark和Hive?

    6.1Spark 目錄下面的 conf 下放一個配置文件 hive-site.xml 文件。
    6.2在 hive 的服務端 啟動 MetaStore Server 因為 HiveContext 會用到 metastore 服務。( 在 Spark-shell 里面使用 HiveContext 的時候,要記住導入 HiveContext)】(hive --service metastore)
    6.3 啟動hdfs【因為hive的數據是存在hdfs上的 Spark集群(start-all.sh spark-start-all.sh)
    6.4進入 Spark shell,測試Spark 和 Hive是否整合成功
  1. scala>import org.apache.spark.sql.hive.HiveContext
  2. scala>val hiveContext =newHiveContext(sc)
  3. scala>hiveContext.sql("show tables").show
    6.5整合測試(詳見Spark_some配置)注意!將代碼提交到Spark集群上運行時,需要將hdfs-site.xml拷貝到SPARK_HOME/conf下

6.SqlContext和HiveContext的關系?

        SQLcontext 是  HiveContext 的父類
       在 集群中運行的時候用  HiveContext,可以 基於 Hive 來操作 Hive 表,對源數據進行CRUD的操作。 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM