1、初始化kafka相关参数,在APP类初始化的时候即获取kafka对应的topic参数
public App(){ try{ kafkaParams.put("metadata.broker.list", ConfigUtil.getInstance().getKafkaConf().get("brokerlist")); kafkaParams.put("group.id", Constant.groupId); scala.collection.mutable.Map<String, String> mutableKafkaParam = JavaConversions .mapAsScalaMap(kafkaParams); scala.collection.immutable.Map<String, String> immutableKafkaParam = mutableKafkaParam .toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() { public Tuple2<String, String> apply( Tuple2<String, String> v1) { return v1; } }); this.kafkaCluster = new KafkaCluster(immutableKafkaParam); this.topics = new HashSet<String>(); List<String> topicList = DatasourceUtil.getInstance().getAnalyzerTopicNamesByDb(Constant.DBNAME); for(int i=0;i<topicList.size();i++){ this.topics.add(topicList.get(i)); } }catch(Exception e){ e.printStackTrace(); } }
2、设置spark conf 参数并定义sqlcontext
final SparkConf sparkConf = new SparkConf(); //使用Kryo序列化 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.kryo.registrator", "com.nsfocus.statistics.util.KryoRegister"); //及时释放无用的RDD sparkConf.set("spark.streaming.unpersist", "true"); //reduce task的数目,应该为cores的2~3倍;数量太大,会有许多小任务 final Integer cores = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("executornum")); sparkConf.set("spark.default.parallelism", String.valueOf(2 * cores)); //合并shuffle中间文件 sparkConf.set("spark.shuffle.consolidateFiles", "true"); //设置kafka接收数据的最大速率 Integer maxRate = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("maxrate")); Integer partitions = 4; try { partitions = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("partitions")); } catch (Exception ex) { ex.printStackTrace(); logger.error(ex); } sparkConf.set("spark.streaming.kafka.maxRatePerPartition", String.valueOf(maxRate / partitions)); sparkConf.setMaster(Constant.master); sparkConf.setAppName(Constant.APPNAME); final JavaSparkContext ctx = new JavaSparkContext(sparkConf); final JavaStreamingContext jsctx = new JavaStreamingContext(ctx, duration); final SQLContext sqlContext = new SQLContext(ctx); sqlContext.setConf("spark.sql.shuffle.partitions", String.valueOf(cores));
3、通过 createDirectStream 函数读取kafka中数据
//每次取最新的数据 JavaPairInputDStream<byte[],byte[]> messages = KafkaUtils.createDirectStream( jsctx, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaParams, topics);
4、然后可以对messages 执行一系列的转换操作,如Map,fliter等,
将JsonObject转换成Row,方便创建dateframe:
obj = JSONObject.parseObject(new String(bytes)); return RowFactory.create(RowCreatUtil.getInstance().generateRowUtil(obj, fieldList));
创建dateframe:
DataFrame df = sqlContext.createDataFrame(filterRDD, DataTypes.createStructType(inputFields)); df.registerTempTable(tempTable); sqlContext.cacheTable(tempTable);
调用sqlContext 对创建的临时表执行操作:
DataFrame staticDf = sqlContext.sql(resultInfo.getSql());
5、在foreachPartition时,创建临时文件,从Row中取对应的字段,写入文件中:
DataFrame staticDf = sqlContext.sql(resultInfo.getSql()); final int colSize = resultInfo.getCols().split(",").length; staticDf.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() { public void call(java.util.Iterator<Row> rowIterator) throws Exception { String filePath = "/tmp/statistic_" + java.util.UUID.randomUUID().toString().replace("-", "_"); FileWriter fileWriter = new FileWriter(filePath); try { while (rowIterator.hasNext()) { Row row = rowIterator.next(); StringBuilder sb = new StringBuilder(); sb.append(time); for (int rowIndex = 0; colSize > 1 && rowIndex < colSize - 1; rowIndex++) { sb.append(",").append(row.get(rowIndex)); } sb.append("\n"); fileWriter.write(sb.toString()); } } catch (Exception ex) { ex.printStackTrace(); } finally { if (fileWriter != null) { fileWriter.close(); } } CopyUtil.getInstance().copyIntoTableByFile(resultInfo.getTableName() + "(" + resultInfo.getCols() + ")", filePath, ip, port, database, username, password); } });
copyIntoTableByFile中获取数据库连接,插入数据库:
Connection conn = null; Connection baseConnection = null; CopyManager copyManager = null; FileReader fileReader = null; File file = new File(filepath); try { fileReader = new FileReader(file); conn = RemoteDBUtil.getInstance(ip, port, database, username, password).getConnection(); baseConnection = conn.getMetaData().getConnection(); copyManager = new CopyManager((BaseConnection) baseConnection); copyManager.copyIn("copy " + tableName + " from stdin delimiter as ',' NULL as 'null'", fileReader); } catch (Exception ex) { ex.printStackTrace(); } finally { if(fileReader != null){ try { fileReader.close(); } catch (IOException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } file.delete(); }