Spark 根据 JSON数据 创建Dataset


package org.onepiece.bigdata.windows

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object App {

  def init(): Unit = {
    //Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    org.apache.log4j.Logger.getLogger("org.apache.spark").setLevel(org.apache.log4j.Level.ERROR)
    org.apache.log4j.Logger.getLogger("org.apache.spark.sql").setLevel(org.apache.log4j.Level.ERROR)
  }

  def main(args: Array[String]): Unit = {
    init()

    test1()
    test2()
  }

  def test1(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("spark-rdd-test")
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
    import sparkSession.implicits._

    //根据json数据,创建Dataset
    val jsonList = List(
      """{"store_id":1094,"channel":"SNG","gmv":100}""",
      """{"store_id":3409,"channel":"SNG","gmv":200}""",
      """{"store_id":107,"channel":"JDDJ","gmv":300}"""
    )

    val ds = sparkSession.createDataset(jsonList)
    val df = sparkSession.read.json(ds)
    df.printSchema()
    /*
root
 |-- channel: string (nullable = true)
 |-- gmv: long (nullable = true)
 |-- store_id: long (nullable = true)
    **/

    df.createOrReplaceTempView("cte_json")
    sparkSession.sql("select * from cte_json where channel != 'JDDJ' ").show()
    /*
+-------+---+--------+
|channel|gmv|store_id|
+-------+---+--------+
|    SNG|100|    1094|
|    SNG|200|    3409|
+-------+---+--------+
    * */
  }

def test2(): Unit
= { val conf = new SparkConf().setMaster("local[*]").setAppName("spark-rdd-test") val sparkSession = SparkSession.builder().config(conf).getOrCreate() import sparkSession.implicits._ //根据json数据,创建Dataset val jsonList = List( """{"store_id":1094,"channel":"SNG","gmv":100,"address":{"city":"ShenZhen","zip":"0755"}}""", """{"store_id":3409,"channel":"SNG","gmv":200,"address":{"city":"DongGuan","zip":"0769"}}""", """{"store_id":1094,"channel":"JDDJ","gmv":300,"address":{"city":"GuangZhou","zip":"020"}}""" ) //val ds = sparkSession.createDataset(jsonList) val ds = sparkSession.createDataset(sparkSession.sparkContext.parallelize(jsonList)) val df = sparkSession.read.json(ds) df.printSchema() /* root |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- zip: string (nullable = true) |-- channel: string (nullable = true) |-- gmv: long (nullable = true) |-- store_id: long (nullable = true) **/ df.createOrReplaceTempView("cte_json") sparkSession.sql("select * from cte_json where channel != 'JDDJ' ").show() /* +----------------+-------+---+--------+ | address|channel|gmv|store_id| +----------------+-------+---+--------+ |[ShenZhen, 0755]| SNG|100| 1094| |[DongGuan, 0769]| SNG|200| 3409| +----------------+-------+---+--------+ **/ sparkSession.sql( """ |select | store_id,channel,gmv,address, | address.city as city, | address.zip as zip |from cte_json |where channel = 'SNG' """.stripMargin).show() /* +--------+-------+---+----------------+--------+----+ |store_id|channel|gmv| address| city| zip| +--------+-------+---+----------------+--------+----+ | 1094| SNG|100|[ShenZhen, 0755]|ShenZhen|0755| | 3409| SNG|200|[DongGuan, 0769]|DongGuan|0769| +--------+-------+---+----------------+--------+----+ **/ } }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM