1、初始化kafka相關參數,在APP類初始化的時候即獲取kafka對應的topic參數
public App(){ try{ kafkaParams.put("metadata.broker.list", ConfigUtil.getInstance().getKafkaConf().get("brokerlist")); kafkaParams.put("group.id", Constant.groupId); scala.collection.mutable.Map<String, String> mutableKafkaParam = JavaConversions .mapAsScalaMap(kafkaParams); scala.collection.immutable.Map<String, String> immutableKafkaParam = mutableKafkaParam .toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() { public Tuple2<String, String> apply( Tuple2<String, String> v1) { return v1; } }); this.kafkaCluster = new KafkaCluster(immutableKafkaParam); this.topics = new HashSet<String>(); List<String> topicList = DatasourceUtil.getInstance().getAnalyzerTopicNamesByDb(Constant.DBNAME); for(int i=0;i<topicList.size();i++){ this.topics.add(topicList.get(i)); } }catch(Exception e){ e.printStackTrace(); } }
2、設置spark conf 參數並定義sqlcontext
final SparkConf sparkConf = new SparkConf(); //使用Kryo序列化 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.kryo.registrator", "com.nsfocus.statistics.util.KryoRegister"); //及時釋放無用的RDD sparkConf.set("spark.streaming.unpersist", "true"); //reduce task的數目,應該為cores的2~3倍;數量太大,會有許多小任務 final Integer cores = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("executornum")); sparkConf.set("spark.default.parallelism", String.valueOf(2 * cores)); //合並shuffle中間文件 sparkConf.set("spark.shuffle.consolidateFiles", "true"); //設置kafka接收數據的最大速率 Integer maxRate = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("maxrate")); Integer partitions = 4; try { partitions = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("partitions")); } catch (Exception ex) { ex.printStackTrace(); logger.error(ex); } sparkConf.set("spark.streaming.kafka.maxRatePerPartition", String.valueOf(maxRate / partitions)); sparkConf.setMaster(Constant.master); sparkConf.setAppName(Constant.APPNAME); final JavaSparkContext ctx = new JavaSparkContext(sparkConf); final JavaStreamingContext jsctx = new JavaStreamingContext(ctx, duration); final SQLContext sqlContext = new SQLContext(ctx); sqlContext.setConf("spark.sql.shuffle.partitions", String.valueOf(cores));
3、通過 createDirectStream 函數讀取kafka中數據
//每次取最新的數據 JavaPairInputDStream<byte[],byte[]> messages = KafkaUtils.createDirectStream( jsctx, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaParams, topics);
4、然后可以對messages 執行一系列的轉換操作,如Map,fliter等,
將JsonObject轉換成Row,方便創建dateframe:
obj = JSONObject.parseObject(new String(bytes)); return RowFactory.create(RowCreatUtil.getInstance().generateRowUtil(obj, fieldList));
創建dateframe:
DataFrame df = sqlContext.createDataFrame(filterRDD, DataTypes.createStructType(inputFields)); df.registerTempTable(tempTable); sqlContext.cacheTable(tempTable);
調用sqlContext 對創建的臨時表執行操作:
DataFrame staticDf = sqlContext.sql(resultInfo.getSql());
5、在foreachPartition時,創建臨時文件,從Row中取對應的字段,寫入文件中:
DataFrame staticDf = sqlContext.sql(resultInfo.getSql()); final int colSize = resultInfo.getCols().split(",").length; staticDf.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() { public void call(java.util.Iterator<Row> rowIterator) throws Exception { String filePath = "/tmp/statistic_" + java.util.UUID.randomUUID().toString().replace("-", "_"); FileWriter fileWriter = new FileWriter(filePath); try { while (rowIterator.hasNext()) { Row row = rowIterator.next(); StringBuilder sb = new StringBuilder(); sb.append(time); for (int rowIndex = 0; colSize > 1 && rowIndex < colSize - 1; rowIndex++) { sb.append(",").append(row.get(rowIndex)); } sb.append("\n"); fileWriter.write(sb.toString()); } } catch (Exception ex) { ex.printStackTrace(); } finally { if (fileWriter != null) { fileWriter.close(); } } CopyUtil.getInstance().copyIntoTableByFile(resultInfo.getTableName() + "(" + resultInfo.getCols() + ")", filePath, ip, port, database, username, password); } });
copyIntoTableByFile中獲取數據庫連接,插入數據庫:
Connection conn = null; Connection baseConnection = null; CopyManager copyManager = null; FileReader fileReader = null; File file = new File(filepath); try { fileReader = new FileReader(file); conn = RemoteDBUtil.getInstance(ip, port, database, username, password).getConnection(); baseConnection = conn.getMetaData().getConnection(); copyManager = new CopyManager((BaseConnection) baseConnection); copyManager.copyIn("copy " + tableName + " from stdin delimiter as ',' NULL as 'null'", fileReader); } catch (Exception ex) { ex.printStackTrace(); } finally { if(fileReader != null){ try { fileReader.close(); } catch (IOException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } file.delete(); }