初識Spark2.0之Spark SQL


內存計算平台Spark在今年6月份的時候正式發布了spark2.0,相比上一版本的spark1.6版本,在內存優化,數據組織,流計算等方面都做出了較大的改變,同時更加注重基於DataFrame數據組織的MLlib,更加注重機器學習整個過程的管道化。

當然,作為使用者,特別是需要運用到線上的系統,大部分廠家還是會繼續選擇已經穩定的spark1.6版本,並且在spark2.0逐漸成熟之后才會開始考慮系統組件的升級。作為開發者,還是有必要先行一步,去了解spark2.0的一些特性和使用,及思考/借鑒一些spark2.0做出某些改進的思路。

首先,為了調用spark API 來完成我們的計算,需要先創建一個sparkContext:

 String warehouseLocation = System.getProperty("user.dir") + "spark-warehouse";//用戶的當前工作目錄
SparkConf conf = new SparkConf().setAppName("spark sql test")  
                .set("spark.sql.warehouse.dir", warehouseLocation)  
                .setMaster("local[3]");
  SparkSession spark = SparkSession  
                .builder()  
                .config(conf)  
                .getOrCreate();

上述代碼主要有三點:

    • 使用spark sql時需要指定數據庫的文件地址,這里使用了一個本地的目錄
    • spark配置,指定spark app的名稱和數據庫地址,master url為local 3核
    • 使用SparkSession,取代了原本的SQLContext與HiveContext。對於DataFrame API的用戶來說,Spark常見的混亂源頭來自於使用哪個“context”。現在你可以使用SparkSession了,它作為單個入口可以兼容兩者。注意原本的SQLContext與HiveContext仍然保留,以支持向下兼容。這是spark2.0的一個較大的改變,對用戶更加友好。

下面開始體驗spark sql:

 //===========================================1 spark SQL===================  
        //數據導入方式  
        Dataset<Row> df = spark.read().json("..\\sparkTestData\\people.json");  
        //查看表  
        df.show();  
        //查看表結構  
        df.printSchema();  
        //查看某一列 類似於MySQL: select name from people  
        df.select("name").show();  
        //查看多列並作計算 類似於MySQL: select name ,age+1 from people  
        df.select(col("name"), col("age").plus(1)).show();  
        //設置過濾條件 類似於MySQL:select * from people where age>21  
        df.filter(col("age").gt(21)).show();  
        //做聚合操作 類似於MySQL:select age,count(*) from people group by age  
        df.groupBy("age").count().show();  
        //上述多個條件進行組合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age  
        df.select(col("name"), col("age").plus(1).alias("age")).filter(col("age").gt(21)).groupBy("age").count().show();  
  
        //直接使用spark SQL進行查詢  
        //先注冊為臨時表  
        df.createOrReplaceTempView("people");  
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");  
        sqlDF.show();

主要關注以下幾點:

  • 數據來源:spark可以直接導入json格式的文件數據,people.json是我從spark安裝包下拷貝的測試數據。
  • spark sql:sparkSql語法和用法和mysql有一定的相似性,可以查看表、表結構、查詢、聚合等操作。用戶可以使用sparkSql的API接口做聚合查詢等操作或者用類SQL語句實現(但是必須將DataSet注冊為臨時表)
  • DataSet:DataSet是spark2.0i引入的一個新的特性(在spark1.6中屬於alpha版本)。DataSet結合了RDD和DataFrame的優點, 並帶來的一個新的概念Encoder當序列化數據時,,Encoder產生字節碼與off-heap進行交互,,能夠達到按需訪問數據的效果,而不用反序列化整個對象。
我們可以為自定義的對象創建DataSet,首先創建一個JavaBeans:
/** 
     * 一個描述人屬性的JavaBeans 
     * A JavaBean is a Java object that satisfies certain programming conventions: 
 
        The JavaBean class must implement either Serializable or Externalizable 
        The JavaBean class must have a no-arg constructor 
        All JavaBean properties must have public setter and getter methods 
        All JavaBean instance variables should be private 
     */  
    public static class Person implements Serializable {  
        private String name;  
        private int age;  
  
        public String getName() {  
            return name;  
        }  
  
        public void setName(String name) {  
            this.name = name;  
        }  
  
        public int getAge() {  
            return age;  
        }  
  
        public void setAge(int age) {  
            this.age = age;  
        }  
    }

接下來,就可以為該類的對象創建DataSet了,並像操作表一樣操作自定義對象的DataSet了:

   //為自定義的對象創建Dataset  
        List<Person> personpList = new ArrayList<Person>();  
        Person person1 = new Person();  
        person1.setName("Andy");  
        person1.setAge(32);  
        Person person2 = new Person();  
        person2.setName("Justin");  
        person2.setAge(19);  
        personpList.add(person1);  
        personpList.add(person2);  
        Encoder<Person> personEncoder = Encoders.bean(Person.class);  
        Dataset<Person> javaBeanDS = spark.createDataset(  
                personpList,  
                personEncoder  
        );  
        javaBeanDS.show();

同時,可以利用Java反射的特性,來從其他數據集中創建DataSet對象:

 //spark支持使用java 反射機制推斷表結構  
        //1 首先創建一個存儲person對象的RDD  
        JavaRDD<Person> peopleRDD = spark.read()  
                .textFile("..\\sparkTestData\\people.txt")  
                .javaRDD()  
                .map(new Function<String, Person>() {  
                    public Person call(String line) throws Exception {  
                        String[] parts = line.split(",");  
                        Person person = new Person();  
                        person.setName(parts[0]);  
                        person.setAge(Integer.parseInt(parts[1].trim()));  
                        return person;  
                    }  
                });  
        //2 表結構推斷  
        Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);  
        peopleDF.createOrReplaceTempView("people");  
  
        //3 定義map 這里對每個元素做序列化操作  
        Encoder<String> stringEncoder = Encoders.STRING();  
        Dataset<String> peopleSerDF = peopleDF.map(new MapFunction<Row, String>() {  
            public String call(Row row) throws Exception {  
                return "Name: " + row.getString(1) + " and age is " + String.valueOf(row.getInt(0));  
            }  
        }, stringEncoder);  
        peopleSerDF.show();  
        //==============================================3 從RDD創建Dataset StructType對象的使用  
        JavaRDD<String> peopleRDD2 = spark.sparkContext()  
                .textFile("..\\sparkTestData\\people.txt", 1)  
                .toJavaRDD();  
  
        // 創建一個描述表結構的schema  
        String schemaString = "name age";  
        List<StructField> fields = new ArrayList<StructField>();  
        for (String fieldName : schemaString.split(" ")) {  
            StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);  
            fields.add(field);  
        }  
        StructType schema = DataTypes.createStructType(fields);  
  
        // Convert records of the RDD (people) to Rows  
        JavaRDD<Row> rowRDD = peopleRDD2.map(new Function<String, Row>() {  
            //@Override  
            public Row call(String record) throws Exception {  
                String[] attributes = record.split(",");  
                return RowFactory.create(attributes[0], attributes[1].trim());  
            }  
        });  
  
        // Apply the schema to the RDD  
        Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);  
  
        // Creates a temporary view using the DataFrame  
        peopleDataFrame.createOrReplaceTempView("people");  
        peopleDataFrame.show();

主要關注以下幾點:

  • RDD:從普通文本文件中解析數據,並創建結構化數據結構的RDD。
  • 表結構推斷的方式創建DataSet:利用Java類反射特性將RDD轉換為DataSet。
  • 指定表結構的方式創建DataSet:我們可以使用StructType來明確定義我們的表結構,完成DataSet的創建
如何將自己的數據/文本導入spark並創建spark的數據對象,對新手來說顯得尤為關鍵,對自己的數據表達好了之后,才有機會去嘗試spark的其他API ,完成我們的目標。一般數據源在經過我們其他程序的前處理之后,存儲成行形式的文本/json格式或者本身存儲的hive/mysql數據庫中,spark對這些數據源的調用都是比較方便的。
 
介紹完了spark-sql的數據導入及數據表達后,我們來完成一個比較簡單的數據統計任務。一般在工作生活中對某些數據按一定的周期進行統計分析是一個比較常見的任務了。下面,我們就以股票統計的例子為例。我們使用spark的窗口統計功能,來對某一公司的股票在2016年6月份的各個星期的均值做統計。
 //在Spark 2.0中,window API內置也支持time windows!Spark SQL中的time windows和Spark Streaming中的time windows非常類似。  
        Dataset<Row> stocksDF = spark.read().option("header","true").  
                option("inferSchema","true").  
                csv("..\\sparkTestData\\stocks.csv");  
  
        //stocksDF.show();  
  
        Dataset<Row> stocks201606 = stocksDF.filter("year(Date)==2016").  
                filter("month(Date)==6");  
        stocks201606.show(100,false);

首先讀入了csv格式的數據文件,同時將2016年6月份的數據過濾出來,並以不截斷的方式輸出前面100條記錄,運行的結果為:

調用window接口做窗口統計:

  //window一般在group by語句中使用。window方法的第一個參數指定了時間所在的列;  
    //第二個參數指定了窗口的持續時間(duration),它的單位可以是seconds、minutes、hours、days或者weeks。  
        Dataset<Row> tumblingWindowDS = stocks201606.groupBy(window(stocks201606.col("Date"),"1 week")).  
                agg(avg("Close").as("weekly_average"));  
        tumblingWindowDS.show(100,false);  
        tumblingWindowDS.sort("window.start").  
                select("window.start","window.end","weekly_average").  
                show(false);

其運行結果為:

由於沒有指定窗口的開始時間,因此統計的開始時間為2016-05-26,並且不是從0點開始的。通常情況下,這樣統計就顯得有點不對了,因此我們需要指定其開始的日期和時間,但是遺憾的是spark並沒有接口/參數讓我們明確的指定統計窗口的開始時間。好在提供了另外一種方式,指定偏移時間,上述時間(2016-05-26 08:00:00)做一個時間偏移,也可以得到我們想要的開始時間(2016-06-01 00:00:00)。

 //在前面的示例中,我們使用的是tumbling window。為了能夠指定開始時間,我們需要使用sliding window(滑動窗口)。  
    //到目前為止,沒有相關API來創建帶有開始時間的tumbling window,但是我們可以通過將窗口時間(window duration)  
    //和滑動時間(slide duration)設置成一樣來創建帶有開始時間的tumbling window。代碼如下:  
        Dataset<Row>  windowWithStartTime = stocks201606.  
                groupBy(window(stocks201606.col("Date"),"1 week","1 week", "136 hour")).  
                agg(avg("Close").as("weekly_average"));  
        //6 days參數就是開始時間的偏移量;前兩個參數分別代表窗口時間和滑動時間,我們打印出這個窗口的內容:  
        windowWithStartTime.sort("window.start").  
                select("window.start","window.end","weekly_average").  
                show(false);

運行結果為:

這就得到了我們需要的統計結果了。

關於spark2.0的sparkSql部分,基本就介紹這么多了。

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM