SparkSQL(三)——idea開發SparkSQL程序


首先導入maven依賴

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

 

 

dataframe

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


object Demo1 {

  //創建case類
  case class User(name:String,age:Int)

  def main(args: Array[String]): Unit = {
 
//創建sparkconf和sparksession

val conf = new SparkConf().setAppName("sparlsql").setMaster("local[*]")
  val spark = SparkSession.builder().config(conf).getOrCreate()

//隱式轉換 import spark.implicits._ val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23))) val df: DataFrame = raw.toDF("name", "age") df.show() //創建表格 df.createOrReplaceTempView("user") //執行sql語句 val selectUser: DataFrame = spark.sql("select * from user where age > 21") selectUser.show() //轉換成dataset val ds: Dataset[User] = df.as[User] //轉換回rdd val rdd: RDD[Row] = df.rdd //遍歷該rdd for(row <- rdd){ println(row.getString(0)) println(row.getInt(1)) } spark.stop() } }

注意:

1)sparksession的創建不能用new SparkSession的方式,而是利用伴生對象SparkSession來創建builder,通過builder來創建sparksession。

2)隱式轉換import spark.implicits._不是引入了一個包,spark指的是程序上下文環境中的sparksession對象,所以這里引入了該對象的implicitis方法,_指代該方法的參數。如果該對象改名為sparksession,相應的隱式轉換語句變為import sparksession.implicitis._

3)row對象的get方法的下標索引是從0開始,而不像jdbc的resultset下標索引從1開始

 

DataSet

 

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object Demo2 {

  //創建case類
  case class People(name:String, age:Int)

  def main(args: Array[String]): Unit = {

    //創建sparkconf和sparksession
    val conf = new SparkConf().setAppName("sparlsql").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()

//隱式轉換 import spark.implicits._ //創建dataset val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS() //轉換成dataframe val peopleDataframe: DataFrame = peopleDataset.toDF() peopleDataframe.show() //轉換成rdd val rdd: RDD[People] = peopleDataset.rdd //遍歷該rdd for(people <- rdd){ println(people.name+"\t"+people.age) } spark.stop() } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM