package com.example.demo;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
public class DemoApplication {
public static void main(String[] args) {
// /*-----------------------線上調用方式--------------------------*/
// 讀入店鋪id數據
SparkSession spark = SparkSession.builder().appName("demo_spark").enableHiveSupport().getOrCreate();
Dataset<Row> vender_set = spark.sql("select pop_vender_id from app.app_sjzt_payout_apply_with_order where dt = '2019-08-05' and pop_vender_id is not null");
System.out.println( "數據讀取 OK" );
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// JavaSparkContext sc = new JavaSparkContext();
SQLContext sqlContext = new SQLContext(sc);
// 將數據去重,轉換成 List<Row> 格式
vender_set = vender_set.distinct();
vender_set = vender_set.na().fill(0L);
JavaRDD<Row> vender= vender_set.toJavaRDD();
List<Row> vender_list = vender.collect();
// 遍歷商家id,調用jsf接口,創建list 保存返回數據
List<String> list_temp = new ArrayList<String>();
for(Row row:vender_list) {
String id = row.getString(0);
String result = service.venderDownAmountList(id);
System.out.println( "接口調用返回值 OK" );
// 解析json串 ,按照JSONObject 和 JSONArray 一層一層解析 並過返回濾數據
JSONObject jsonOBJ = JSON.parseObject(result);
JSONArray data = jsonOBJ.getJSONArray("data");
if (data != null) {
JSONObject data_all = data.getJSONObject(0);
double amount = data_all.getDouble("jfDownAmount");
// 將商家id 和 倒掛金額存下來
list_temp.add("{\"vender_id\":"+id+",\"amount\":"+amount+"}");
}
else {
continue;
}
System.out.println( "解析 OK" );
}
// list 轉為 RDD
JavaRDD<String> venderRDD = sc.parallelize(list_temp);
// 注冊成表
Dataset<Row> vender_table = sqlContext.read().json(venderRDD);
vender_table.registerTempTable("vender");
System.out.println( "注冊表 OK" );
// 寫入數據庫
spark.sql("insert overwrite table dev.dev_jypt_vender_dropaway_amount select vender.vender_id,vender.amount from vender");
System.out.println( "寫入數據表 OK" );
sc.stop();
System.out.println( "Hello World!" );
}
}