SparkSQL(三)——idea开发SparkSQL程序


首先导入maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

 

 

dataframe

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


object Demo1 {

  //创建case类
  case class User(name:String,age:Int)

  def main(args: Array[String]): Unit = {
 
//创建sparkconf和sparksession

val conf = new SparkConf().setAppName("sparlsql").setMaster("local[*]")
  val spark = SparkSession.builder().config(conf).getOrCreate()

//隐式转换 import spark.implicits._ val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23))) val df: DataFrame = raw.toDF("name", "age") df.show() //创建表格 df.createOrReplaceTempView("user") //执行sql语句 val selectUser: DataFrame = spark.sql("select * from user where age > 21") selectUser.show() //转换成dataset val ds: Dataset[User] = df.as[User] //转换回rdd val rdd: RDD[Row] = df.rdd //遍历该rdd for(row <- rdd){ println(row.getString(0)) println(row.getInt(1)) } spark.stop() } }

注意:

1)sparksession的创建不能用new SparkSession的方式,而是利用伴生对象SparkSession来创建builder,通过builder来创建sparksession。

2)隐式转换import spark.implicits._不是引入了一个包,spark指的是程序上下文环境中的sparksession对象,所以这里引入了该对象的implicitis方法,_指代该方法的参数。如果该对象改名为sparksession,相应的隐式转换语句变为import sparksession.implicitis._

3)row对象的get方法的下标索引是从0开始,而不像jdbc的resultset下标索引从1开始

 

DataSet

 

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object Demo2 {

  //创建case类
  case class People(name:String, age:Int)

  def main(args: Array[String]): Unit = {

    //创建sparkconf和sparksession
    val conf = new SparkConf().setAppName("sparlsql").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()

//隐式转换 import spark.implicits._ //创建dataset val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS() //转换成dataframe val peopleDataframe: DataFrame = peopleDataset.toDF() peopleDataframe.show() //转换成rdd val rdd: RDD[People] = peopleDataset.rdd //遍历该rdd for(people <- rdd){ println(people.name+"\t"+people.age) } spark.stop() } }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM