內存計算平台Spark在今年6月份的時候正式發布了spark2.0,相比上一版本的spark1.6版本,在內存優化,數據組織,流計算等方面都做出了較大的改變,同時更加注重基於DataFrame數據組織的MLlib,更加注重機器學習整個過程的管道化。
當然,作為使用者,特別是需要運用到線上的系統,大部分廠家還是會繼續選擇已經穩定的spark1.6版本,並且在spark2.0逐漸成熟之后才會開始考慮系統組件的升級。作為開發者,還是有必要先行一步,去了解spark2.0的一些特性和使用,及思考/借鑒一些spark2.0做出某些改進的思路。
首先,為了調用spark API 來完成我們的計算,需要先創建一個sparkContext:
String warehouseLocation = System.getProperty("user.dir") + "spark-warehouse";//用戶的當前工作目錄
SparkConf conf = new SparkConf().setAppName("spark sql test") .set("spark.sql.warehouse.dir", warehouseLocation) .setMaster("local[3]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
上述代碼主要有三點:
- 使用spark sql時需要指定數據庫的文件地址,這里使用了一個本地的目錄
- spark配置,指定spark app的名稱和數據庫地址,master url為local 3核
- 使用SparkSession,取代了原本的SQLContext與HiveContext。對於DataFrame API的用戶來說,Spark常見的混亂源頭來自於使用哪個“context”。現在你可以使用SparkSession了,它作為單個入口可以兼容兩者。注意原本的SQLContext與HiveContext仍然保留,以支持向下兼容。這是spark2.0的一個較大的改變,對用戶更加友好。
下面開始體驗spark sql:
//===========================================1 spark SQL=================== //數據導入方式 Dataset<Row> df = spark.read().json("..\\sparkTestData\\people.json"); //查看表 df.show(); //查看表結構 df.printSchema(); //查看某一列 類似於MySQL: select name from people df.select("name").show(); //查看多列並作計算 類似於MySQL: select name ,age+1 from people df.select(col("name"), col("age").plus(1)).show(); //設置過濾條件 類似於MySQL:select * from people where age>21 df.filter(col("age").gt(21)).show(); //做聚合操作 類似於MySQL:select age,count(*) from people group by age df.groupBy("age").count().show(); //上述多個條件進行組合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age df.select(col("name"), col("age").plus(1).alias("age")).filter(col("age").gt(21)).groupBy("age").count().show(); //直接使用spark SQL進行查詢 //先注冊為臨時表 df.createOrReplaceTempView("people"); Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); sqlDF.show();
主要關注以下幾點:
- 數據來源:spark可以直接導入json格式的文件數據,people.json是我從spark安裝包下拷貝的測試數據。
- spark sql:sparkSql語法和用法和mysql有一定的相似性,可以查看表、表結構、查詢、聚合等操作。用戶可以使用sparkSql的API接口做聚合查詢等操作或者用類SQL語句實現(但是必須將DataSet注冊為臨時表)
- DataSet:DataSet是spark2.0i引入的一個新的特性(在spark1.6中屬於alpha版本)。DataSet結合了RDD和DataFrame的優點, 並帶來的一個新的概念Encoder當序列化數據時,,Encoder產生字節碼與off-heap進行交互,,能夠達到按需訪問數據的效果,而不用反序列化整個對象。
/** * 一個描述人屬性的JavaBeans * A JavaBean is a Java object that satisfies certain programming conventions: The JavaBean class must implement either Serializable or Externalizable The JavaBean class must have a no-arg constructor All JavaBean properties must have public setter and getter methods All JavaBean instance variables should be private */ public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
接下來,就可以為該類的對象創建DataSet了,並像操作表一樣操作自定義對象的DataSet了:
//為自定義的對象創建Dataset List<Person> personpList = new ArrayList<Person>(); Person person1 = new Person(); person1.setName("Andy"); person1.setAge(32); Person person2 = new Person(); person2.setName("Justin"); person2.setAge(19); personpList.add(person1); personpList.add(person2); Encoder<Person> personEncoder = Encoders.bean(Person.class); Dataset<Person> javaBeanDS = spark.createDataset( personpList, personEncoder ); javaBeanDS.show();
同時,可以利用Java反射的特性,來從其他數據集中創建DataSet對象:
//spark支持使用java 反射機制推斷表結構 //1 首先創建一個存儲person對象的RDD JavaRDD<Person> peopleRDD = spark.read() .textFile("..\\sparkTestData\\people.txt") .javaRDD() .map(new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); //2 表結構推斷 Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class); peopleDF.createOrReplaceTempView("people"); //3 定義map 這里對每個元素做序列化操作 Encoder<String> stringEncoder = Encoders.STRING(); Dataset<String> peopleSerDF = peopleDF.map(new MapFunction<Row, String>() { public String call(Row row) throws Exception { return "Name: " + row.getString(1) + " and age is " + String.valueOf(row.getInt(0)); } }, stringEncoder); peopleSerDF.show(); //==============================================3 從RDD創建Dataset StructType對象的使用 JavaRDD<String> peopleRDD2 = spark.sparkContext() .textFile("..\\sparkTestData\\people.txt", 1) .toJavaRDD(); // 創建一個描述表結構的schema String schemaString = "name age"; List<StructField> fields = new ArrayList<StructField>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows JavaRDD<Row> rowRDD = peopleRDD2.map(new Function<String, Row>() { //@Override public Row call(String record) throws Exception { String[] attributes = record.split(","); return RowFactory.create(attributes[0], attributes[1].trim()); } }); // Apply the schema to the RDD Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema); // Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView("people"); peopleDataFrame.show();
主要關注以下幾點:
- RDD:從普通文本文件中解析數據,並創建結構化數據結構的RDD。
- 表結構推斷的方式創建DataSet:利用Java類反射特性將RDD轉換為DataSet。
- 指定表結構的方式創建DataSet:我們可以使用StructType來明確定義我們的表結構,完成DataSet的創建
//在Spark 2.0中,window API內置也支持time windows!Spark SQL中的time windows和Spark Streaming中的time windows非常類似。 Dataset<Row> stocksDF = spark.read().option("header","true"). option("inferSchema","true"). csv("..\\sparkTestData\\stocks.csv"); //stocksDF.show(); Dataset<Row> stocks201606 = stocksDF.filter("year(Date)==2016"). filter("month(Date)==6"); stocks201606.show(100,false);
首先讀入了csv格式的數據文件,同時將2016年6月份的數據過濾出來,並以不截斷的方式輸出前面100條記錄,運行的結果為:
調用window接口做窗口統計:
//window一般在group by語句中使用。window方法的第一個參數指定了時間所在的列; //第二個參數指定了窗口的持續時間(duration),它的單位可以是seconds、minutes、hours、days或者weeks。 Dataset<Row> tumblingWindowDS = stocks201606.groupBy(window(stocks201606.col("Date"),"1 week")). agg(avg("Close").as("weekly_average")); tumblingWindowDS.show(100,false); tumblingWindowDS.sort("window.start"). select("window.start","window.end","weekly_average"). show(false);
其運行結果為:
由於沒有指定窗口的開始時間,因此統計的開始時間為2016-05-26,並且不是從0點開始的。通常情況下,這樣統計就顯得有點不對了,因此我們需要指定其開始的日期和時間,但是遺憾的是spark並沒有接口/參數讓我們明確的指定統計窗口的開始時間。好在提供了另外一種方式,指定偏移時間,上述時間(2016-05-26 08:00:00)做一個時間偏移,也可以得到我們想要的開始時間(2016-06-01 00:00:00)。
//在前面的示例中,我們使用的是tumbling window。為了能夠指定開始時間,我們需要使用sliding window(滑動窗口)。 //到目前為止,沒有相關API來創建帶有開始時間的tumbling window,但是我們可以通過將窗口時間(window duration) //和滑動時間(slide duration)設置成一樣來創建帶有開始時間的tumbling window。代碼如下: Dataset<Row> windowWithStartTime = stocks201606. groupBy(window(stocks201606.col("Date"),"1 week","1 week", "136 hour")). agg(avg("Close").as("weekly_average")); //6 days參數就是開始時間的偏移量;前兩個參數分別代表窗口時間和滑動時間,我們打印出這個窗口的內容: windowWithStartTime.sort("window.start"). select("window.start","window.end","weekly_average"). show(false);
運行結果為:
這就得到了我們需要的統計結果了。
關於spark2.0的sparkSql部分,基本就介紹這么多了。