原博文來自於: http://blog.csdn.net/u012297062/article/details/52207934 感謝!
使用Spark SQL中的內置函數對數據進行分析,Spark SQL API不同的是,DataFrame中的內置函數操作的結果是返回一個Column對象,而DataFrame天生就是"A distributed collection of data organized into named columns.",這就為數據的復雜分析建立了堅實的基礎並提供了極大的方便性,例如說,我們在操作DataFrame的方法中可以隨時調用內置函數進行業務需要的處理,這之於我們構建附件的業務邏輯而言是可以極大的減少不必須的時間消耗(基於上就是實際模型的映射),讓我們聚焦在數據分析上,這對於提高工程師的生產力而言是非常有價值的Spark 1.5.x開始提供了大量的內置函數,例如agg:
- def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
- groupBy().agg(aggExpr, aggExprs : _*)
- }
還有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acros、asin、atan
總體上而言內置函數包含了五大基本類型:
1、聚合函數,例如countDistinct、sumDistinct等;
2、集合函數,例如sort_array、explode等
3、日期、時間函數,例如hour、quarter、next_day
4、數學函數,例如asin、atan、sqrt、tan、round等;
5、開窗函數,例如rowNumber等
6、字符串函數,concat、format_number、rexexp_extract
7、其它函數,isNaN、sha、randn、callUDF
第一步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,Spark程序在本地運行
- val conf = new SparkConf() //創建SparkConf對象
- conf.setAppName("SparkSQL") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱
- //conf.setMaster("spark://DaShuJu-040:7077") //此時,程序在Spark集群
- conf.setMaster("local")
第二步:創建SparkContext對象
SparkContext是Spark程序所有功能的唯一入口,無論是采用Scala、Java、Python、R等都必須有一個SparkContext
SparkContext核心作用:初始化Spark應用程序運行所需要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBackend
同時還會負責Spark程序往Master注冊程序等
SparkContext是整個Spark應用程序中最為至關重要的一個對象
- val sc = new SparkContext(conf) //創建SparkContext對象,通過傳入SparkConf實例來定制Spark運行的具體參數和配置信息
- val sqlContext = new SQLContext(sc) //構建SQL上下文</span>
- //要使用Spark SQL的內置函數,就一定要導入SQLContext下的隱式轉換
- import sqlContext.implicits._
第三步:模擬數據,最后生成RDD
- val userData = Array(
- "2016-3-27,001,http://spark.apache.org/,1000",
- "2016-3-27,001,http://hadoop.apache.org/,1001",
- "2016-3-27,002,http://fink.apache.org/,1002",
- "2016-3-28,003,http://kafka.apache.org/,1020",
- "2016-3-28,004,http://spark.apache.org/,1010",
- "2016-3-28,002,http://hive.apache.org/,1200",
- "2016-3-28,001,http://parquet.apache.org/,1500",
- "2016-3-28,001,http://spark.apache.org/,1800"
- )</span>
- val userDataRDD = sc.parallelize(userData) //生成DD分布式集合對象
- </span>
第四步:根據業務需要對數據進行預處理生成DataFrame,要想把RDD轉換成DataFrame,需要先把RDD中的元素類型變成Row類型
於此同時要提供DataFrame中的Columns的元數據信息描述
- val userDataRDDRow = userDataRDD.map(row => {val splited = row.split(",") ;Row(splited(0),splited(1).toInt,splited(2),splited(3).toInt)})
- val structTypes = StructType(Array(
- StructField("time", StringType, true),
- StructField("id", IntegerType, true),
- StructField("url", StringType, true),
- StructField("amount", IntegerType, true)
- ))
- <span style="font-family: Arial, Helvetica, sans-serif;">val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structTypes)</span>
第五步:使用Spark SQL提供的內置函數對DataFrame進行操作,特別注意:內置函數生成的Column對象且自定進行CG;
- userDataDF.groupBy("time").agg('time, countDistinct('id)).map(row=>Row(row(1),row(2))).collect.foreach(println)
- userDataDF.groupBy("time").agg('time, sum('amount)).show()