spark java dataset api沒有提供迭代器


spark  java dataset api沒有提供迭代器

所以處理一些列表內部數據關聯轉換,而不是只處理單條數據的轉換得換成javaRdd api。

 

下面是一個簡單例子。通過進入宿舍的時間 找到后面的第一條離開宿舍的記錄。並合並成一條完整的宿舍的 進出記錄。

這種業務,直接獲得列表的當前對象,不能獲得整個列表是很難實現的。

 
static final Encoder<AccessInout> inoutEncoder = Encoders.bean(AccessInout.class);

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL data sources example")
.config("spark.some.config.option", "some-value")
.getOrCreate();

runJdbcDatasetExample(spark);

//spark.stop();
}



private static void runJdbcDatasetExample(SparkSession spark) {


Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "0000000");

String dbUrl="jdbc:mysql://192.168.100.4:3306/datacenter?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8";


//1、查詢原始數據
final Dataset<Row> allRecord = spark.read()
.jdbc(dbUrl,
"( select a.kid,a.outid,a.ioflag,a.OpDT as 'indate',a.OpDT as 'outdate',a.school_code ,a.faculty_code,a.major_code,a.class_code,a.sex from access_record_inout_temp2 a " +
// " where a.outid like'1622%'" +//測試代碼,只查詢16屆學生數據
" ) t", connectionProperties);

final Dataset<AccessInout> allRecordInout=allRecord.as(inoutEncoder);
//2、將原始數據注冊成視圖
allRecord.createOrReplaceTempView("view_access_record_inout_temp2");

allRecord.printSchema();


//3、按照學號分區。 並且分組內 按照進入宿舍的時間排序。
Dataset<AccessInout> allRecordSort= allRecordInout.repartition(allRecordInout.col("outid"))
.sortWithinPartitions(allRecordInout.col("indate"));

4、由於Dataset接口沒有提供Iterator,無法實現相關邏輯,這里換成rdd來實現。(scala接口是提供了迭代器的哦)
     JavaRDD<AccessInout> rdd= allRecordSort.toJavaRDD();


JavaRDD<AccessInout> resultRdd= rdd.mapPartitions((FlatMapFunction<Iterator<AccessInout>,AccessInout>)(Iterator<AccessInout> in)->{
List<AccessInout> result=new ArrayList<AccessInout>();
while(in.hasNext()){
AccessInout first= in.next();
if("0".endsWith(first.getIoflag())){//第一條記錄是:進入宿舍的記錄
if(in.hasNext()){
AccessInout second= in.next();
//取比入記錄大的最小的一條出記錄的時間,作為入記錄的出時間。(排序后,后面一條就是最小記錄)
if(first.getOutid().endsWith(second.getOutid())&&("1".endsWith(second.getIoflag()))&&first.getIndate().before(second.getIndate())){
first.setOutdate(second.getIndate());
result.add(first);
}
}
}
}
return result.iterator();
});
List<AccessInout> list= resultRdd.top(100);
for(AccessInout temp :list){
System.out.println(temp.getOutid()+"||"+temp.getIoflag()+"||"+temp.getIndate()+"||"+temp.getOutdate());
}

resultRdd.saveAsTextFile("D:\\2018\\AccessInout.txt");

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM