spark 對kafka數據進行統計並批量插入數據庫


1、初始化kafka相關參數,在APP類初始化的時候即獲取kafka對應的topic參數

public App(){
    try{
        kafkaParams.put("metadata.broker.list", ConfigUtil.getInstance().getKafkaConf().get("brokerlist"));
        kafkaParams.put("group.id", Constant.groupId);
        scala.collection.mutable.Map<String, String> mutableKafkaParam = JavaConversions
                .mapAsScalaMap(kafkaParams);
        scala.collection.immutable.Map<String, String> immutableKafkaParam = mutableKafkaParam
                .toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
                    public Tuple2<String, String> apply(
                            Tuple2<String, String> v1) {
                        return v1;
                    }
                });
        this.kafkaCluster = new KafkaCluster(immutableKafkaParam);
        this.topics = new HashSet<String>();
        List<String> topicList = DatasourceUtil.getInstance().getAnalyzerTopicNamesByDb(Constant.DBNAME);
        for(int i=0;i<topicList.size();i++){
            this.topics.add(topicList.get(i));
        }
    }catch(Exception e){
        e.printStackTrace();
    }
}

2、設置spark conf 參數並定義sqlcontext  

final SparkConf sparkConf = new SparkConf();
//使用Kryo序列化
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "com.nsfocus.statistics.util.KryoRegister");
//及時釋放無用的RDD
sparkConf.set("spark.streaming.unpersist", "true");
//reduce task的數目,應該為cores的2~3倍;數量太大,會有許多小任務
final Integer cores = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("executornum"));
sparkConf.set("spark.default.parallelism", String.valueOf(2 * cores));
//合並shuffle中間文件
sparkConf.set("spark.shuffle.consolidateFiles", "true");
//設置kafka接收數據的最大速率
Integer maxRate = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("maxrate"));
Integer partitions = 4;
try {
    partitions = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("partitions"));
} catch (Exception ex) {
    ex.printStackTrace();
    logger.error(ex);
}
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", String.valueOf(maxRate / partitions));

sparkConf.setMaster(Constant.master);
sparkConf.setAppName(Constant.APPNAME);

final JavaSparkContext ctx = new JavaSparkContext(sparkConf);
final JavaStreamingContext jsctx = new JavaStreamingContext(ctx, duration);
final SQLContext sqlContext = new SQLContext(ctx);
sqlContext.setConf("spark.sql.shuffle.partitions", String.valueOf(cores));

3、通過 createDirectStream 函數讀取kafka中數據

//每次取最新的數據
JavaPairInputDStream<byte[],byte[]> messages = KafkaUtils.createDirectStream(
        jsctx,
        byte[].class,
        byte[].class,
        DefaultDecoder.class,
        DefaultDecoder.class,
        kafkaParams,
        topics);

4、然后可以對messages  執行一系列的轉換操作,如Map,fliter等,

將JsonObject轉換成Row,方便創建dateframe:

 obj = JSONObject.parseObject(new String(bytes));
 return RowFactory.create(RowCreatUtil.getInstance().generateRowUtil(obj, fieldList));  

創建dateframe:

DataFrame df = sqlContext.createDataFrame(filterRDD, DataTypes.createStructType(inputFields));
df.registerTempTable(tempTable);
sqlContext.cacheTable(tempTable);  

調用sqlContext 對創建的臨時表執行操作:

DataFrame staticDf = sqlContext.sql(resultInfo.getSql());

5、在foreachPartition時,創建臨時文件,從Row中取對應的字段,寫入文件中:

DataFrame staticDf = sqlContext.sql(resultInfo.getSql());
final int colSize = resultInfo.getCols().split(",").length;
staticDf.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
    public void call(java.util.Iterator<Row> rowIterator) throws Exception {
        String filePath = "/tmp/statistic_" + java.util.UUID.randomUUID().toString().replace("-", "_");
        FileWriter fileWriter = new FileWriter(filePath);
        try {
            while (rowIterator.hasNext()) {
                Row row = rowIterator.next();
                StringBuilder sb = new StringBuilder();

                sb.append(time);
                for (int rowIndex = 0; colSize > 1 && rowIndex < colSize - 1; rowIndex++) {
                    sb.append(",").append(row.get(rowIndex));
                }
                sb.append("\n");

                fileWriter.write(sb.toString());
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (fileWriter != null) {
                fileWriter.close();
            }
        }
        CopyUtil.getInstance().copyIntoTableByFile(resultInfo.getTableName() + "(" + resultInfo.getCols() + ")",
                filePath, ip, port, database, username, password);

        }
    });
  

  

copyIntoTableByFile中獲取數據庫連接,插入數據庫:
Connection conn = null;
Connection baseConnection = null;
CopyManager copyManager = null;
FileReader fileReader = null;
File file = new File(filepath);

try {
    fileReader = new FileReader(file);
    conn = RemoteDBUtil.getInstance(ip, port, database, username, password).getConnection();
    baseConnection = conn.getMetaData().getConnection();
    copyManager = new CopyManager((BaseConnection) baseConnection);
    copyManager.copyIn("copy " + tableName + " from stdin delimiter as ',' NULL as 'null'",
            fileReader);
} catch (Exception ex) {
    ex.printStackTrace();
} finally {
    if(fileReader != null){
        try {
            fileReader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    if (conn != null) {
        try {
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    file.delete();
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM