1、初始化kafka相關參數,在APP類初始化的時候即獲取kafka對應的topic參數
public App(){
try{
kafkaParams.put("metadata.broker.list", ConfigUtil.getInstance().getKafkaConf().get("brokerlist"));
kafkaParams.put("group.id", Constant.groupId);
scala.collection.mutable.Map<String, String> mutableKafkaParam = JavaConversions
.mapAsScalaMap(kafkaParams);
scala.collection.immutable.Map<String, String> immutableKafkaParam = mutableKafkaParam
.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
public Tuple2<String, String> apply(
Tuple2<String, String> v1) {
return v1;
}
});
this.kafkaCluster = new KafkaCluster(immutableKafkaParam);
this.topics = new HashSet<String>();
List<String> topicList = DatasourceUtil.getInstance().getAnalyzerTopicNamesByDb(Constant.DBNAME);
for(int i=0;i<topicList.size();i++){
this.topics.add(topicList.get(i));
}
}catch(Exception e){
e.printStackTrace();
}
}
2、設置spark conf 參數並定義sqlcontext
final SparkConf sparkConf = new SparkConf();
//使用Kryo序列化
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "com.nsfocus.statistics.util.KryoRegister");
//及時釋放無用的RDD
sparkConf.set("spark.streaming.unpersist", "true");
//reduce task的數目,應該為cores的2~3倍;數量太大,會有許多小任務
final Integer cores = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("executornum"));
sparkConf.set("spark.default.parallelism", String.valueOf(2 * cores));
//合並shuffle中間文件
sparkConf.set("spark.shuffle.consolidateFiles", "true");
//設置kafka接收數據的最大速率
Integer maxRate = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("maxrate"));
Integer partitions = 4;
try {
partitions = Integer.valueOf(JobconfUtil.getInstance().getJobConf(Constant.JOBCONF).get("partitions"));
} catch (Exception ex) {
ex.printStackTrace();
logger.error(ex);
}
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", String.valueOf(maxRate / partitions));
sparkConf.setMaster(Constant.master);
sparkConf.setAppName(Constant.APPNAME);
final JavaSparkContext ctx = new JavaSparkContext(sparkConf);
final JavaStreamingContext jsctx = new JavaStreamingContext(ctx, duration);
final SQLContext sqlContext = new SQLContext(ctx);
sqlContext.setConf("spark.sql.shuffle.partitions", String.valueOf(cores));
3、通過 createDirectStream 函數讀取kafka中數據
//每次取最新的數據
JavaPairInputDStream<byte[],byte[]> messages = KafkaUtils.createDirectStream(
jsctx,
byte[].class,
byte[].class,
DefaultDecoder.class,
DefaultDecoder.class,
kafkaParams,
topics);
4、然后可以對messages 執行一系列的轉換操作,如Map,fliter等,
將JsonObject轉換成Row,方便創建dateframe:
obj = JSONObject.parseObject(new String(bytes)); return RowFactory.create(RowCreatUtil.getInstance().generateRowUtil(obj, fieldList));
創建dateframe:
DataFrame df = sqlContext.createDataFrame(filterRDD, DataTypes.createStructType(inputFields)); df.registerTempTable(tempTable); sqlContext.cacheTable(tempTable);
調用sqlContext 對創建的臨時表執行操作:
DataFrame staticDf = sqlContext.sql(resultInfo.getSql());
5、在foreachPartition時,創建臨時文件,從Row中取對應的字段,寫入文件中:
DataFrame staticDf = sqlContext.sql(resultInfo.getSql());
final int colSize = resultInfo.getCols().split(",").length;
staticDf.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
public void call(java.util.Iterator<Row> rowIterator) throws Exception {
String filePath = "/tmp/statistic_" + java.util.UUID.randomUUID().toString().replace("-", "_");
FileWriter fileWriter = new FileWriter(filePath);
try {
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
StringBuilder sb = new StringBuilder();
sb.append(time);
for (int rowIndex = 0; colSize > 1 && rowIndex < colSize - 1; rowIndex++) {
sb.append(",").append(row.get(rowIndex));
}
sb.append("\n");
fileWriter.write(sb.toString());
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (fileWriter != null) {
fileWriter.close();
}
}
CopyUtil.getInstance().copyIntoTableByFile(resultInfo.getTableName() + "(" + resultInfo.getCols() + ")",
filePath, ip, port, database, username, password);
}
});
copyIntoTableByFile中獲取數據庫連接,插入數據庫:
Connection conn = null;
Connection baseConnection = null;
CopyManager copyManager = null;
FileReader fileReader = null;
File file = new File(filepath);
try {
fileReader = new FileReader(file);
conn = RemoteDBUtil.getInstance(ip, port, database, username, password).getConnection();
baseConnection = conn.getMetaData().getConnection();
copyManager = new CopyManager((BaseConnection) baseConnection);
copyManager.copyIn("copy " + tableName + " from stdin delimiter as ',' NULL as 'null'",
fileReader);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if(fileReader != null){
try {
fileReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
file.delete();
}
