java通過SparkSession連接spark-sql


SparkSession配置獲取客戶端

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;

public class SparkTool implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SparkTool.class);

    public static String appName ="root";
    private static JavaSparkContext jsc = null;
    private static SparkSession spark = null;

    private static void initSpark() {
        if (jsc == null || spark == null) {

            SparkConf  sparkConf = new SparkConf();
            sparkConf.set("spark.driver.allowMultipleContexts", "true");
            sparkConf.set("spark.eventLog.enabled", "true");
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
            sparkConf.set("spark.hadoop.validateOutputSpecs", "false");
            sparkConf.set("hive.mapred.supports.subdirectories", "true");
            sparkConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");

            spark = SparkSession.builder().appName(appName).config(sparkConf).enableHiveSupport().getOrCreate();
            jsc = new JavaSparkContext(spark.sparkContext());
        }

    }

    public static JavaSparkContext getJsc() {
        if (jsc == null) {
            initSpark();
        }
        return jsc;
    }

    public static SparkSession getSession() {
        if (spark == null ) {
            initSpark();
        }
        return spark;

    }

}

通過sparkSession執行sql

 public List<TableInfo> selectTableInfoFromSpark(String abstractSql){
        List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
        TableInfo tableInfo = new TableInfo();
        SparkSession spark = SparkTool.getSession();
        Dataset<Row> dataset = spark.sql(abstractSql);
        List<Row> rowList = dataset.collectAsList();
        for(Row row : rowList){
            tableInfo.setColumnName(row.getString(1));
            tableInfo.setColumnType(row.getString(2));
            tableInfo.setColumnComment(row.getString(3));
            tableInfoList.add(tableInfo);
        }
        return tableInfoList;
    }

 

      java 或者scala操作spark-sql時查詢出來的數據有RDD、DataFrame、DataSet三種。

     這三種數據結構關系以及轉換或者解析見博客:https://www.jianshu.com/p/71003b152a84


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM