使用spark將hive中的數據導入到mongodb


import com.huinong.truffle.push.process.domain.common.constant.Constants;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.WriteConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.*;

@Slf4j
public class SynchronizeData implements Serializable{


    public static SparkSession getSparkSession(String mongoUrl ,String dbName ,String outputDabase ,String outputCollection){
        SparkSession spark = null;
        try{
            spark = SparkSession.builder().master("local[*]")
                    .appName("SparkHive")
                    .config("spark.sql.warehouse.dir", Constants.WAREHOUSE_DIR).enableHiveSupport()
                    .config("spark.mongodb.output.uri", mongoUrl + dbName)
                    .config("spark.mongodb.output.database",outputDabase)
                    .config("spark.mongodb.output.collection",outputCollection)
                    .getOrCreate();
            spark.sql("show databases").show();
            spark.sql("show tables").show();
        }catch (Exception e){
            log.error("創建spark session失敗",e);
        }
        return spark;
    }


    public static void sync(String sql ,String mongoUrl ,String dbName ,String outputDabase ,String outputCollection) throws Exception{
        SparkSession spark = getSparkSession(mongoUrl ,dbName ,outputDabase ,outputCollection);
        JavaSparkContext jc = new JavaSparkContext(spark.sparkContext());
        System.out.println("===========================開始.........."+System.currentTimeMillis());
        Dataset<Row> dataset = spark.sql(sql);
        if (dataset != null && dataset.count() > 0){
            MongoSpark.save(dataset);
        }
        System.out.println("===========================結束.........."+System.currentTimeMillis());
        jc.close();
    }
}

調用:

private String mongoUrl = "mongodb://10.10.3.241:27017/";

    public void synchronizeUserinfos() throws Exception{
        String sql = "select * from hn_application.push_userinfos";
        SynchronizeData.sync(sql , mongoUrl, "push_userinfos" ,"push" ,"push_userinfos");
    }
public static final String WAREHOUSE_DIR="/user/hive/warehouse";

參考資料:

https://www.cnblogs.com/kaiwen1/p/9179035.html

 

資料說要把集群三個配置文件放到resource目錄下,我這邊只放hive-site.xml文件沒有問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM