項目實戰從0到1之Spark(10)Spark讀取HDFS寫入Hive


package com.xxxx.report.service;

import com.google.common.collect.Lists;
import com.xx.report.config.Constants;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;

/**
 * @author huanghanyu
 */
public class BicycleLog2hive implements Serializable{
    // Log日志
    private static final Logger LOG = LoggerFactory.getLogger(BicycleLog2hive.class);
    // 日期格式化
    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
    private static final String TMP_TABLE_NAME = "tableNameTemp";
    private static final String TABLE_NAME = "tableName1";
    private static final String APP_NAME = "xxxxx@yangxin";

    private EngineLockLog handleLine(String line) {
        EngineLockLog engineLockLog = new EngineLockLog();
        try {
            System.out.println("handleLine Function -> : " + line);
            xxxxxxxxxxxxxxxxx
            xxxxxxxxxxxxxxx
            xxxxxxxxxxxx
        }catch (Exception error) {
            System.out.println(error.getMessage() + " | " + line);
            error.printStackTrace();
        }
        return engineLockLog;
    }

    public void run(String master, String startTime, String endTime) {
        long startTimsStamp = System.currentTimeMillis();
        startTime = startTime.replace("-", "");
        startTime = startTime.replace("_", "");
        endTime = endTime.replace("-", "");
        endTime = endTime.replace("_", "");
        SparkSession spark = SparkSession.builder().appName(APP_NAME).enableHiveSupport().getOrCreate();
        List<String> list = Lists.newArrayList();
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.YEAR, Integer.valueOf(startTime.substring(0, 4)));
        calendar.set(Calendar.MONTH, Integer.valueOf(startTime.substring(4, 6)) - 1);
        calendar.set(Calendar.DATE, Integer.valueOf(startTime.substring(6, 8)));
        String date = startTime;
        while (!date.equals(endTime)) {
            list.add(date);
            calendar.add(Calendar.DATE, 1);
            date = simpleDateFormat.format(calendar.getTime());
        }
        list.add(endTime);
        for (String day : list) {
            LOG.info("日期:-> " + day);
            StringBuilder path = new StringBuilder();
            path.append(Constants.PREFIX_BICYCLE_LOG_PATH_YangXin).append(day).append("/*/*");
            LOG.info("路徑:-> " + path);
            JavaRDD<EngineLockLog> mapRDD = spark.read().textFile(path.toString()).
                    javaRDD().
                    map(line -> {
                        return handleLine(line);
                    }).filter(new Function<EngineLockLog, Boolean>() {
                @Override
                public Boolean call(EngineLockLog engineLockLog) throws Exception {
                    return engineLockLog.getUser_id() != null;
                }
            });
            if (!mapRDD.isEmpty()) {
                Dataset<Row> mapDF = spark.createDataFrame(mapRDD, EngineLockLog.class);
                mapDF.createOrReplaceTempView(TMP_TABLE_NAME);
                String dayTemp = day.substring(0, 4) + "-" + (day.substring(4,6)) + "-" + day.substring(6, 8);
                String insertSQL = "insert into table " + TABLE_NAME + " partition(dt=\'" + dayTemp + "\') " +
                        "select xxxx,xxxxx,xxxxx from " + TMP_TABLE_NAME;
                spark.sql(insertSQL);
            }
        }
        long endTimeStamp = System.currentTimeMillis();
        System.out.println("總耗時: -> " + (endTimeStamp - startTimsStamp) + "ms");
    }

    public static void main(String[] args) {
        String master = args[0];
        String startTime = args[1];
        String endTime = args[2];
        BicycleLog2hive bicycleLog2hive = new BicycleLog2hive();
        bicycleLog2hive.run(master, startTime, endTime);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM