1. 通過Hive view
CREATE EXTERNAL TABLE if not exists finance.json_serde_optd_table ( retCode string, retMsg string, data array<struct< secid:string,="" tradedate:date,="" optid:string,="" ticker:string,="" secshortname:string,="" exchangecd:string,="" presettleprice:double,="" precloseprice:double,="" openprice:double,="" highestprice:double,="" lowestprice:double,="" closeprice:double,="" settlprice:double,="" turnovervol:double,="" turnovervalue:double,="" openint:int="">>) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION 'hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/'; create table if not exists finance.tb_optd as SELECT b.data.secID, b.data.tradeDate, b.data.optID, b.data.ticker, b.data.secShortName, b.data.exchangeCD, b.data.preSettlePrice, b.data.preClosePrice, b.data.openPrice, b.data.highestPrice, b.data.lowestPrice, b.data.closePrice, b.data.settlPrice, b.data.turnoverVol, b.data.turnoverValue, b.data.openInt FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
2. 通過Zeppelin
%dep
z.load("/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar");
// 定義導入的hive對象集合 case class HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String); var hiveConfigList = List[HiveConfig]();
// 創建equd數據結構
// 定義json結構
val schema_json_equd_serde =""" retCode string,
retMsg string,
data array<struct< secid="" :="" string,="" tradedate="" date,="" ticker="" secshortname="" exchangecd="" precloseprice="" double,="" actprecloseprice:="" openprice="" highestprice="" lowestprice="" closeprice="" turnovervol="" turnovervalue="" dealamount="" int,="" turnoverrate="" accumadjfactor="" negmarketvalue="" marketvalue="" pe="" pe1="" pb="" isopen="" int="">>""";
var schema_equd ="""b.data.secID,
b.data.ticker,
b.data.secShortName,
b.data.exchangeCD,
b.data.tradeDate,
b.data.preClosePrice,
b.data.actPreClosePrice,
b.data.openPrice,
b.data.highestPrice,
b.data.lowestPrice,
b.data.closePrice,
b.data.turnoverVol,
b.data.turnoverValue,
b.data.dealAmount,
b.data.turnoverRate,
b.data.accumAdjFactor,
b.data.negMarketValue,
b.data.marketValue,
b.data.PE,
b.data.PE1,
b.data.PB,
b.data.isOpen""";
hiveConfigList = hiveConfigList :+ HiveConfig("finance", "equd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_equd_serde, schema_equd);
// 創建idxd數據結構
// 定義json結構
val schema_json_idxd_serde =""" retCode string,
retMsg string,
data array<struct< indexid:string,="" tradedate:date,="" ticker:string,="" porgfullname:string,="" secshortname:string,="" exchangecd:string,="" precloseindex:double,="" openindex:double,="" lowestindex:double,="" highestindex:double,="" closeindex:double,="" turnovervol:double,="" turnovervalue:double,="" chg:double,="" chgpct:double="">>""";
var schema_idxd ="""b.data.indexID,
b.data.tradeDate,
b.data.ticker,
b.data.porgFullName,
b.data.secShortName,
b.data.exchangeCD,
b.data.preCloseIndex,
b.data.openIndex,
b.data.lowestIndex,
b.data.highestIndex,
b.data.closeIndex,
b.data.turnoverVol,
b.data.turnoverValue,
b.data.CHG,
b.data.CHGPct""";
hiveConfigList = hiveConfigList :+ HiveConfig("finance", "idxd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_idxd_serde, schema_idxd);
// 循環加載數據中
def loadDataToHive(args:HiveConfig){
val loadPath = args.hdfsPath + args.modelName;
val tb_json_serde = "json_serde_" + args.modelName +"_table";
val tb= "tb_" + args.modelName;
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
if(args.database != "" && args.schema != "") {
print("正在創建項目..." + args.modelName)
hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + args.database);
print("正在構造擴展模型...");
hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb_json_serde + "(" + args.schema + ") row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION " + "'" + loadPath + "/'");
println("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
println(args.modelName + " 擴展模型加載已完成!");
}
}
hiveConfigList.size;
hiveConfigList.foreach { x => loadDataToHive(x) };
3. 第二種取法
由於data是json數據里的一個數組,所以上面的轉換復雜了一點。下面這種方法是先把json里data數組取出來放到hdfs,然后直接用下面的語句放到hive:
用splitjson 來提取、分隔 data 數組

CREATE EXTERNAL TABLE if not exists finance.awen_optd ( secid string, tradedate date, optid string, ticker string, secshortname string, exchangecd string, presettleprice double, precloseprice double, openprice double, highestprice double, lowestprice double, closeprice double, settlprice double, turnovervol double, turnovervalue double, openint int) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION 'hdfs://wdp.xxxx.cn:8020/nifi/finance2/optd/';
NIFI 中國社區 QQ群:595034369
