java使用spark/spark-sql處理schema數據


1、spark是什么?

Spark是基於內存計算的大數據並行計算框架。

1.1 Spark基於內存計算

相比於MapReduce基於IO計算,提高了在大數據環境下數據處理的實時性。

1.2 高容錯性和高可伸縮性

與mapreduce框架相同,允許用戶將Spark部署在大量廉價硬件之上,形成集群。

 

2、spark編程

每一個spark應用程序都包含一個驅動程序(driver program ),他會運行用戶的main函數,並在集群上執行各種並行操作(parallel operations)

spark提供的最主要的抽象概念有兩種: 
彈性分布式數據集(resilient distributed dataset)簡稱RDD ,他是一個元素集合,被分區地分布到集群的不同節點上,可以被並行操作,RDDS可以從hdfs(或者任意其他的支持Hadoop的文件系統)上的一個文件開始創建,或者通過轉換驅動程序中已經存在的Scala集合得到,用戶也可以讓spark將一個RDD持久化到內存中,使其能再並行操作中被有效地重復使用,最后RDD能自動從節點故障中恢復

spark的第二個抽象概念是共享變量(shared variables),它可以在並行操作中使用,在默認情況下,當spark將一個函數以任務集的形式在不同的節點上並行運行時,會將該函數所使用的每個變量拷貝傳遞給每一個任務中,有時候,一個變量需要在任務之間,或者驅動程序之間進行共享,spark支持兩種共享變量: 
廣播變量(broadcast variables),它可以在所有節點的內存中緩存一個值。 
累加器(accumulators):只能用於做加法的變量,例如計算器或求和器

 

3、spark-sql

spark-sql是將hive sql跑在spark引擎上的一種方式,提供了基於schema處理數據的方式。

 

4、代碼詳解

java spark和spark-sql依賴。

pom.xml

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>

 

 

基於spark1.6創建HiveContext客戶端。在spark2.1已經開始使用sparksession了。請注意。

package com.xiaoju.dqa.fireman.driver;
import com.xiaoju.dqa.fireman.exception.SparkInitException;
import com.xiaoju.dqa.fireman.utils.PropertiesUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.HiveContext;

import java.io.IOException;
import java.util.Properties;

public class SparkClient {
    private SparkConf sparkConf;
    private JavaSparkContext javaSparkContext;

    public SparkClient() {
        initSparkConf();
        javaSparkContext = new JavaSparkContext(sparkConf);
    }

    public SQLContext getSQLContext() throws SparkInitException {
        return new SQLContext(javaSparkContext);
    }

    public HiveContext getHiveContext() throws SparkInitException {
        return new HiveContext(javaSparkContext);
    }

    private void initSparkConf() {
        try {
            PropertiesUtil propUtil = new PropertiesUtil("fireman.properties");
            Properties prop = propUtil.getProperties();
            String warehouseLocation = System.getProperty("user.dir");
            sparkConf = new SparkConf()
                    .setAppName(prop.getProperty("spark.appname"))
                    .set("spark.sql.warehouse.dir", warehouseLocation)
                    .setMaster(prop.getProperty("spark.master"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

}

 

驅動程序driver

1、這里要實現可序列化接口,否則spark並不會識別這個類。

2、這里在通過spark-sql讀取到row數據之后,將schema解析出來,並且映射為hashmap。

public class FiremanDriver implements Serializable {
    private String db;
    private String table;
private HiveContext hiveContext;public FiremanDriver(String db, String table) {
        try {
            this.db = db;
            this.table = table;
            SparkClient sparkClient = new SparkClient();
            hiveContext = sparkClient.getHiveContext();
        } catch (SparkInitException ex) {
            ex.printStackTrace();
        }
    }
  
public void check() { HashMap<String, Object> result = null; try { String query = String.format("select * from %s.%s", db ,table); System.out.println(query); DataFrame rows = hiveContext.sql(query); JavaRDD<Row> rdd = rows.toJavaRDD(); result = rdd.map(new Function<Row, HashMap<String, Object>>() { @Override public HashMap<String, Object> call(Row row) throws Exception { HashMap<String, Object> fuseResult = new HashMap<String, Object>(); HashMap<String, Object> rowMap = formatRowMap(row); // 實際map過程 return mapResult; } }).reduce(new Function2<HashMap<String, Object>, HashMap<String, Object>, HashMap<String, Object>>() { @Override public HashMap<String, Object> call(HashMap<String, Object> map1, HashMap<String, Object> map2) throws Exception { // reduce merge過程
            return mergeResult; } }); } catch (Exception ex) { ex.printStackTrace(); } }   // 讀取shema,這里在通過spark-sql讀取到row數據之后,將schema解析出來,並且映射為hashmap private HashMap<String, Object> formatRowMap(Row row){ HashMap<String, Object> rowMap = new HashMap<String, Object>(); try {         for (int i=0; i<row.schema().fields().length; i++) { String colName = row.schema().fields()[i].name(); Object colValue = row.get(i); rowMap.put(colName, colValue); }catch (Exception ex) { ex.printStackTrace(); } return rowMap; } public static void main(String[] args) { String db = args[0]; String table = args[1]; FiremanDriver firemanDriver = new FiremanDriver(db, table); firemanDriver.check(); } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM