sparksql系列(十) hive map嵌套struct、struct嵌套array、array嵌套struct


hive簡單的數據結構像基本類型一樣,處理起來沒有難度。
但是hive有復雜的數據結構如struct、map、array等,處理起來較為復雜了,下面簡單介紹一下用spark處理hive復雜數據結構。

struct結構

 熟悉C語言的同學可能會對這個比較熟悉。C語言中稍微復雜的類型都是由struct構成的。sttuct可以包含基本類型,也可以包含復雜類型。是較為常用的hive數據類型之一

hive建表語句

drop table appopendetail;
create table if not exists appopendetail
(
username String,
appname String,
opencount INT
)
row format delimited fields terminated by '|'
location '/hive/table/appopendetail';

drop table appopentable;
create table if not exists appopentable
(
username String,
appopen struct<appname:String,opencount:INT>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
location '/hive/table/appopen';

sql生成struct

insert into appopentable
select username,named_struct('appname',appname,'opencount',opencount) as appopen from appopendetail;

sql解析struct

select appopen.appname as appname,appopen.opencount as opencount from appopentable ;

spark生成struct

import java.util
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sparkSession= SparkSession.builder().master("local").appName("AppName").enableHiveSupport().getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
"{'username':'sunliu','opencount':'19','appname':'qq'}","{'username':'zhangsan','opencount':'20','appname':'brower'}"));
val namedf = sparkSession.read.json(nameRDD)
namedf.select(col("username"),concat_ws(":",col("appname"),col("opencount")))
.write.option("sep","|").csv("/software/java/data/mm")

簡單描述一下就是拼接列將拼接的字段用“:”鏈接起來 。

sparksql有直接的struct函數,但是hive文件最終的數據是文本格式的,sparksql不支持將struct保存為文本格式。

spark解析struct

1.spark.sql(“select appopen.appname as appname,appopen.opencount as opencount from appopentable”)

map結構

其實本質上和struct結構是差不多的

hive建表語句

drop table appopendetail;
create table if not exists appopendetail
(
username String,
appname String,
opencount INT
)
row format delimited fields terminated by '|'
location '/hive/table/appopendetail';

drop table appopentable;
create table if not exists appopentable
(
username String,
appopen map<String,INT>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '\t'
location '/hive/table/appopen';

sql生成map

insert into appopentable select username,map('appname',appname,'opencount',opencount) as appopen from appopendetail;

sql解析map

select appopen["appname"],appopen["opencount"] from appopentable ;

spark生成map

import java.util
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
"{'username':'sunliu','opencount':'19','appname':'qq'}","{'username':'zhangsan','opencount':'20','appname':'brower'}"));
val namedf = sparkSession.read.json(nameRDD).select(col("appname"),lit("appname") as "appnamekey",col("opencount"),lit("opencount") as "opencountkey" ,col("username"))
namedf.select(col("username"),concat_ws(":",
("\t",col("appnamekey"),col("appname")),
concat_ws("\t",col("opencountkey"),col("opencount"))
))

.write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/mm")

sparksql有直接的map函數,但是hive文件最終的數據是文本格式的,sparksql不支持將map保存為文本格式。

spark解析map

1.spark.sql(“select appopen["appname"],appopen["opencount"] from appopentable”)

array結構

hive建表語句

drop table appopendetail;
create table if not exists appopendetail
(
username String,
appname String,
opencount INT
)
row format delimited fields terminated by '|'
location '/hive/table/appopendetail';

drop table appopentable;
create table if not exists appopentable
(
username String,
array array<String>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '\t'
location '/hive/table/appopen';

sql生成array

insert into appopentable select username,collect_list(appname) as appopen from appopendetail group by username;

sql解析array

select appopen[0] from appopentable ;

spark生成array

方法一:

import java.util
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
"{'username':'wangwu','opencount':'19','appname':'qq'}","{'username':'wangwu','opencount':'20','appname':'brower'}"));
val namedf = sparkSession.read.json(nameRDD).select("username","appname")
.groupBy("username")
.agg(collect_list("appname") as "appname")
.select(col("username"),concat_ws(":",col("appname")))

.repartition(1).write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/mm")

簡單描述一下就是拼接列將拼接的字段用“:”鏈接起來 

spark解析array

1.spark.sql(“select appopen[0] from appopentable“)

struct組合map array 結構

hive建表語句

drop table appopendetail;
create table if not exists appopendetail
(
username String,
appname String,
opencount INT
)
row format delimited fields terminated by '|'
location '/hive/table/appopendetail';

create table if not exists appopentablestruct_map
(
struct_map struct<appname:String,opencount:map<String,String>>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '\t'
location '/hive/table/appopentablestruct_map';

create table if not exists appopentablestruct_array
(
struct_array struct<appname:String,opencount:array<INT>>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '\t'
location '/hive/table/appopentablestruct_array';

sql生成struct嵌套

insert into appopentablestruct_map select named_struct('appname',appname,'opencount',map('appname',appname,'opencount',opencount)) as struct_map from appopendetail;
insert into appopentablestruct_array select named_struct('appname',appname,'opencount',collect_list(opencount)) as struct_map from appopendetail group by appname;

sql解析struct嵌套

select struct_map.appname,struct_map.opencount,struct_map.opencount["appname"],struct_map.opencount["opencount"] from appopentablestruct_map;
select struct_array.appname,struct_array.opencount,struct_array.opencount[0] from appopentablestruct_array;

spark生成struct嵌套

方法一:

import java.util
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
"{'username':'wangwu','opencount':'19','appname':'qq'}","{'username':'wangwu','opencount':'20','appname':'brower'}"));
val namedf = sparkSession.read.json(nameRDD)
namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
col("opencount"),lit("opencount") as "opencountkey")

.select(concat_ws(":",col("appname"),
concat_ws("\t",
concat_ws("\u0004",col("appnamekey"),col("appname")),
concat_ws("\u0004",col("opencountkey"),col("opencount"))
)))
.repartition(1).write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/appopentablestruct_map")

namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
col("opencount"),lit("opencount") as "opencountkey")
.groupBy("appname")
.agg(concat_ws(":",col("appname"),concat_ws("\t",collect_list("opencount") as "opencount")) as "opencount" )
.select("opencount")
.repartition(1).write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/appopentablestruct_array")

簡單描述一下就是拼接列將拼接的字段用“:”鏈接起來 

spark解析struct嵌套

1.spark.sql(“select struct_map.appname,struct_map.opencount,struct_map.opencount["appname"],struct_map.opencount["opencount"] from appopentablestruct_map“)

2.spark.sql(“select struct_array.appname,struct_array.opencount,struct_array.opencount[0] from appopentablestruct_array“)

map組合struct array 結構

hive建表語句

drop table appopendetail;
create table if not exists appopendetail
(
username String,
appname String,
opencount INT
)
row format delimited fields terminated by '|'
location '/hive/table/appopendetail';

drop table appopentablemap_struct;
drop table appopentablestruct_array;
create table if not exists appopentablemap_struct
(
map_struct map<String,struct<appname:String,opencount:INT>>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '\t'
location '/hive/table/appopentablemap_struct';

create table if not exists appopentablestruct_array
(
map_array map<String,array<INT>>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '\t'
location '/hive/table/appopentablestruct_array';

sql生成map嵌套

insert into appopentablemap_struct 

select map('map_struct',map_struct) from (select named_struct('appname',appname,'opencount',opencount) as map_struct from appopendetail)t;

insert into appopentablestruct_array
select map('map_array',appopen) from (select username,collect_list(opencount) as appopen from appopendetail group by username)t;

sql解析map嵌套

select map_struct["map_struct"].appname,map_struct["map_struct"].opencount from appopentablemap_struct;
select map_array["map_array"][0],map_array["map_array"][1] from appopentablestruct_array;

spark生成map嵌套

方法一:

import java.util
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
"{'username':'wangwu','opencount':'19','appname':'qq'}","{'username':'wangwu','opencount':'20','appname':'brower'}"));
val namedf = sparkSession.read.json(nameRDD)
namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
col("opencount"),lit("opencount") as "opencountkey")
.select(concat_ws("\t",col("appnamekey"),
concat_ws("\u0004",col("appname"),col("opencount")))
)
.repartition(1).write.mode(SaveMode.Overwrite).option("sep","|").csv("/software/java/data/appopentablemap_struct")

namedf
.select(col("appname"),col("username"),lit("appname") as "appnamekey",
col("opencount"),lit("opencount") as "opencountkey")
.groupBy("appname")
.agg(concat_ws("\t",col("appname"),concat_ws("\u0004",collect_list("opencount") as "opencount")) as "opencount" )
.select("opencount")

簡單描述一下就是拼接列將拼接的字段用“:”鏈接起來 

spark解析map嵌套

1.spark.sql(“select map_struct["map_struct"].appname,map_struct["map_struct"].opencount from appopentablemap_struct“)

2.spark.sql(“select map_array["map_array"][0],map_array["map_array"][1] from appopentablestruct_array“)

array組合struct map 結構

hive建表語句

drop table appopendetail;
create table if not exists appopendetail
(
username String,
appname String,
opencount INT
)
row format delimited fields terminated by '|'
location '/hive/table/appopendetail';

drop table appopentablearray_map;
create table if not exists appopentablearray_map
(
array_map array<map<String,String>>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '\t'
location '/hive/table/appopentablearray_map';
drop table appopentablearray_struct;
create table if not exists appopentablearray_struct
(
array_struct array<struct<appname:String,opencount:String>>
)
row format delimited fields terminated by '|'
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '\t'
location '/hive/table/appopentablearray_struct';

sql生成array嵌套

insert into appopentablearray_map
select collect_list(openmap) as openmap from
(select appname,map('appname',appname,'opencount',opencount) as openmap from appopendetail)t;

insert into appopentablearray_struct
select collect_list(openmap) as openmap from
(select appname,named_struct('appname',appname,'opencount',cast(opencount as string)) as openmap from appopendetail)t;

sql解析array嵌套

select array_map[0]["appname"],array_map[0]["opencount"] from appopentablearray_map;
select array_struct[0].appname,array_struct[0].opencount from appopentablearray_struct;

spark生成array嵌套

方法一:

import java.util
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(util.Arrays.asList("{'username':'wangwu','opencount':'18','appname':'weixin'}",
"{'username':'wangwu','opencount':'19','appname':'qq'}","{'username':'wangwu','opencount':'20','appname':'brower'}"));
val namedf = sparkSession.read.json(nameRDD)
namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
col("opencount"),lit("opencount") as "opencountkey")
.agg(concat_ws(":",collect_list(
concat_ws("\t",
concat_ws("\u0004",col("appnamekey"),col("appname")),
concat_ws("\u0004",col("opencountkey"),col("opencount"))
)
))as "openmap")
.select("openmap")

namedf.select(col("appname"),col("username"),lit("appname") as "appnamekey",
col("opencount"),lit("opencount") as "opencountkey")
.agg(concat_ws(":",collect_list(
concat_ws("\t",col("appname"),col("opencount"))
))as "openmap")
.select("openmap")

spark解析array嵌套

1.spark.sql(“select array_map[0]["appname"],array_map[0]["opencount"] from appopentablearray_map“)

2.spark.sql(“select array_struct[0].appname,array_struct[0].opencount from appopentablearray_struct“)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM