ETL實踐--Spark做數據清洗


 

ETL實踐--Spark做數據清洗

 

 

上篇博客,說的是用hive代替kettle的表關聯。是為了提高效率。

本文要說的spark就不光是為了效率的問題。

 

1、用spark的原因

(如果是一個sql能搞定的關聯操作,可以直接用kettle導原始數據到hive,用hive視圖做關聯直接給kylin提供數據)

(1)、場景一之前用kettle需要多個轉換、關聯才能實現數據清洗的操作。

        用hive不知道如何進行,就算能進行也感覺繁瑣,同時多個步驟必然降低數據時效性。用mr的話也是同樣道理太多步驟繁瑣不堪。

(2)、一些不能用sql來處理的數據清洗邏輯。比如循環類的,或者是更復雜的處理邏輯。用hive和kettle都不方便解決。

一些其他的原因

(3)、支持的語言多,容易上手,並且之前也學習過一些。

(4)、我公司用的大數據平台上,提供了spark的支持,可以方便的安裝和維護,並且可以和現有平台很好的融合(yarn部署方式)。

(5)、效率高。

(6)、剛好公司有需要用到spark streaming。

 

 

2、下面是我學習用spark處理業務問題的一個例子。有注釋和一些方法的測試。

 

public class EtlSpark5sp2Demo {

//3、解碼器
//static final Encoder<EcardAccessOutTime> outTimeEncoder = Encoders.bean(EcardAccessOutTime.class);
static final Encoder<EcardAccessInout> inoutEncoder = Encoders.bean(EcardAccessInout.class);

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL data sources example")
.config("spark.some.config.option", "some-value")
.getOrCreate();

runJdbcDatasetExample(spark);
spark.stop();
}

private static void runJdbcDatasetExample(SparkSession spark) {

Properties connectionProperties = new Properties();
connectionProperties.put("user", "root123");
connectionProperties.put("password", "123");
String dbUrl="jdbc:mysql://192.168.100.4:3306/datasql?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8";

//對比java中連接mysql的字符串(標紅的是區別,分割字符串是spark識別不了的,要去掉)
//  mysql.url=jdbc:mysql://192.168.100.4:3306/datasql?autoReconnect=true&amp;useUnicode=true&amp;characterEncoding=UTF-8

//1、查詢原始數據
final Dataset<Row> allRecord = spark.read()
.jdbc(dbUrl,
"( select a.kid,a.outid,a.ioflag,a.OpDT as 'indate',a.OpDT as 'outdate',a.school_code ,a.faculty_code,a.major_code,a.class_code,a.sex from access_record_inout_temp2 a limit 3000 ) t", connectionProperties);

final Dataset<EcardAccessInout> allRecordInout=allRecord.as(inoutEncoder);
 spark提供了把數據集注冊成視圖,然后用sql的方式對數據集進行處理的功能:如下2、3所示

//2、將原始數據注冊成視圖
allRecord.createOrReplaceTempView("view_access_record_inout_temp2");
allRecord.printSchema();//打印數據集結構

        //3、在上面注冊的視圖上執行sql測試:查詢出進入宿舍的記錄
        final   Dataset<EcardAccessInout> inRecord  = spark.sql(
" select a.kid,a.outid,a.ioflag,a.school_code,a.faculty_code,a.major_code,a.class_code,a.sex,a.indate,a.outdate " +
" from view_access_record_inout_temp2 a " +
" where a.ioflag = 0 ").as(inoutEncoder);
inRecord.printSchema();
//打印數據集結構
        inRecord.show();
 
        
 spark不光提供了針對注冊的視圖的sql查詢。也提供了通過方法來查詢數據集的方式:下面是2種方式


//4、filter用法測試:
 String outid="45723107";
long kid=7516452;
//filter方法能夠正常使用
Dataset<Row> list1 = allRecord.filter(allRecord.col("outid").equalTo(outid).and(allRecord.col("kid").gt(kid))).orderBy(allRecord.col("indate"));//.take(1);//
list1.show();
//打印前20條記錄


        5、where用法測試

//where方法能夠正常使用
 Dataset<Row> list2 = allRecord.where("outid = '"+ outid +"'").where("kid > "+ kid +"").orderBy("indate");
list2.show();
//打印前20條記錄




//6、分組取topN:測試::mysql中可以group by 2個字段查詢全部的字段。實際返回值是取的分組后的第一條記錄。
(對應實際業務就按照學號和時間去重,數據當中有重復數據)
            6.1、mysql中的原始sql
                            select a.id,a.outid,g.school_code,g.faculty_code,g.major_code,g.class_code,g.sex,a.OpDT,a.ioflag

                            from access_record a inner join own_org_student g on a.OutId=g.outid
                               where a.id > ?   group by a.OutId,a.OpDT

 
 6.2、hive和spark都不支持這種操作。他們的做法是一樣的。就是通過下面這個sql,用row_number()函數分組,取第一
 final Dataset<Row> topRecord = spark.sql(
" select t.kid,t.outid,t.ioflag,t.school_code,t.faculty_code,t.major_code,t.class_code,t.sex,t.indate,t.outdate from (" +
" select a.kid,a.outid,a.ioflag,a.school_code,a.faculty_code,a.major_code,a.class_code,a.sex,a.indate,a.outdate, " +
" row_number() over(partition by outid order by indate) as rowNumber " + //根據行號top
" from view_access_record_inout_temp2 a " +
" where a.ioflag = 0 " +
" ) t where rowNumber =1 ");

topRecord.show();//打印前20條記錄




//7、循環:合並進入記錄和出去的記錄
7.1、這里先進行先按照學號分區,再按照時間排序。
Dataset<EcardAccessInout> allRecordSort= allRecordInout.repartition(allRecordInout.col("outid")).sortWithinPartitions(allRecordInout.col("indate"));
 7、循環:合並進入記錄和出去的記錄(因為已經排序了,本條記錄的下一條,如果是正常記錄就是出去的記錄)
 Iterator<EcardAccessInout> iterator = allRecordSort.toLocalIterator();
List<EcardAccessInout> result=new ArrayList<EcardAccessInout>();
while(iterator.hasNext()){
EcardAccessInout first= iterator.next();
if("0".endsWith(first.getIoflag())){//第一條記錄是:進入宿舍的記錄
if(iterator.hasNext()){
EcardAccessInout second= iterator.next();
//取比入記錄大的最小的一條出記錄的時間,作為入記錄的出時間。(排序后,后面一條就是最小記錄)
if(first.getOutid().endsWith(second.getOutid())&&("1".endsWith(second.getIoflag()))&&first.getIndate().before(second.getIndate())){
first.setOutdate(second.getIndate());
result.add(first);
}
}
}
}

//8、處理后的數據寫入mysql。這里只是個例子,實際數據應該是寫到hdfs,變成hive表
inRecord.write().mode(SaveMode.Append)
.jdbc(dbUrl, "datacenter.access_record_inout_temp10", connectionProperties);

}

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM