package org.onepiece.bigdata.windows import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Row, SQLContext, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.types._ import org.apache.spark.{SparkConf, SparkContext, sql} import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types import org.apache.spark.sql.types._ import org.apache.spark.sql.Row object App { def init(): Unit = { //Logger.getLogger("org.apache.spark").setLevel(Level.WARN) org.apache.log4j.Logger.getLogger("org.apache.spark").setLevel(org.apache.log4j.Level.ERROR) org.apache.log4j.Logger.getLogger("org.apache.spark.sql").setLevel(org.apache.log4j.Level.ERROR) } def main(args: Array[String]): Unit = { init() test1() test2() } def test1(): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("spark-rdd-test") val sparkSession = SparkSession.builder().config(conf).getOrCreate() import sparkSession.implicits._ //根据json数据,创建Dataset val jsonList = List( """{"store_id":1094,"channel":"SNG","gmv":100}""", """{"store_id":3409,"channel":"SNG","gmv":200}""", """{"store_id":107,"channel":"JDDJ","gmv":300}""" ) val ds = sparkSession.createDataset(jsonList) val df = sparkSession.read.json(ds) df.printSchema() /* root |-- channel: string (nullable = true) |-- gmv: long (nullable = true) |-- store_id: long (nullable = true) **/ df.createOrReplaceTempView("cte_json") sparkSession.sql("select * from cte_json where channel != 'JDDJ' ").show() /* +-------+---+--------+ |channel|gmv|store_id| +-------+---+--------+ | SNG|100| 1094| | SNG|200| 3409| +-------+---+--------+ * */ }
def test2(): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("spark-rdd-test") val sparkSession = SparkSession.builder().config(conf).getOrCreate() import sparkSession.implicits._ //根据json数据,创建Dataset val jsonList = List( """{"store_id":1094,"channel":"SNG","gmv":100,"address":{"city":"ShenZhen","zip":"0755"}}""", """{"store_id":3409,"channel":"SNG","gmv":200,"address":{"city":"DongGuan","zip":"0769"}}""", """{"store_id":1094,"channel":"JDDJ","gmv":300,"address":{"city":"GuangZhou","zip":"020"}}""" ) //val ds = sparkSession.createDataset(jsonList) val ds = sparkSession.createDataset(sparkSession.sparkContext.parallelize(jsonList)) val df = sparkSession.read.json(ds) df.printSchema() /* root |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- zip: string (nullable = true) |-- channel: string (nullable = true) |-- gmv: long (nullable = true) |-- store_id: long (nullable = true) **/ df.createOrReplaceTempView("cte_json") sparkSession.sql("select * from cte_json where channel != 'JDDJ' ").show() /* +----------------+-------+---+--------+ | address|channel|gmv|store_id| +----------------+-------+---+--------+ |[ShenZhen, 0755]| SNG|100| 1094| |[DongGuan, 0769]| SNG|200| 3409| +----------------+-------+---+--------+ **/ sparkSession.sql( """ |select | store_id,channel,gmv,address, | address.city as city, | address.zip as zip |from cte_json |where channel = 'SNG' """.stripMargin).show() /* +--------+-------+---+----------------+--------+----+ |store_id|channel|gmv| address| city| zip| +--------+-------+---+----------------+--------+----+ | 1094| SNG|100|[ShenZhen, 0755]|ShenZhen|0755| | 3409| SNG|200|[DongGuan, 0769]|DongGuan|0769| +--------+-------+---+----------------+--------+----+ **/ } }