ETL實踐--Spark做數據清洗
上篇博客,說的是用hive代替kettle的表關聯。是為了提高效率。
本文要說的spark就不光是為了效率的問題。
1、用spark的原因
(如果是一個sql能搞定的關聯操作,可以直接用kettle導原始數據到hive,用hive視圖做關聯直接給kylin提供數據)
(1)、場景一之前用kettle需要多個轉換、關聯才能實現數據清洗的操作。
用hive不知道如何進行,就算能進行也感覺繁瑣,同時多個步驟必然降低數據時效性。用mr的話也是同樣道理太多步驟繁瑣不堪。
(2)、一些不能用sql來處理的數據清洗邏輯。比如循環類的,或者是更復雜的處理邏輯。用hive和kettle都不方便解決。
一些其他的原因
(3)、支持的語言多,容易上手,並且之前也學習過一些。
(4)、我公司用的大數據平台上,提供了spark的支持,可以方便的安裝和維護,並且可以和現有平台很好的融合(yarn部署方式)。
(5)、效率高。
(6)、剛好公司有需要用到spark streaming。
2、下面是我學習用spark處理業務問題的一個例子。有注釋和一些方法的測試。
public class EtlSpark5sp2Demo {
//3、解碼器
//static final Encoder<EcardAccessOutTime> outTimeEncoder = Encoders.bean(EcardAccessOutTime.class);
static final Encoder<EcardAccessInout> inoutEncoder = Encoders.bean(EcardAccessInout.class);
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL data sources example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
runJdbcDatasetExample(spark);
spark.stop();
}
private static void runJdbcDatasetExample(SparkSession spark) {
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root123");
connectionProperties.put("password", "123");
String dbUrl="jdbc:mysql://192.168.100.4:3306/datasql?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8";
//對比java中連接mysql的字符串(標紅的是區別,分割字符串是spark識別不了的,要去掉)
// mysql.url=jdbc:mysql://192.168.100.4:3306/datasql?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8
//1、查詢原始數據
final Dataset<Row> allRecord = spark.read()
.jdbc(dbUrl,
"( select a.kid,a.outid,a.ioflag,a.OpDT as 'indate',a.OpDT as 'outdate',a.school_code ,a.faculty_code,a.major_code,a.class_code,a.sex from access_record_inout_temp2 a limit 3000 ) t", connectionProperties);
final Dataset<EcardAccessInout> allRecordInout=allRecord.as(inoutEncoder);
spark提供了把數據集注冊成視圖,然后用sql的方式對數據集進行處理的功能:如下2、3所示
//2、將原始數據注冊成視圖
allRecord.createOrReplaceTempView("view_access_record_inout_temp2");
allRecord.printSchema();//打印數據集結構
//3、在上面注冊的視圖上執行sql測試:查詢出進入宿舍的記錄
final Dataset<EcardAccessInout> inRecord = spark.sql(
" select a.kid,a.outid,a.ioflag,a.school_code,a.faculty_code,a.major_code,a.class_code,a.sex,a.indate,a.outdate " +
" from view_access_record_inout_temp2 a " +
" where a.ioflag = 0 ").as(inoutEncoder);
inRecord.printSchema();//打印數據集結構
inRecord.show();
spark不光提供了針對注冊的視圖的sql查詢。也提供了通過方法來查詢數據集的方式:下面是2種方式
//4、filter用法測試:
String outid="45723107";
long kid=7516452;
//filter方法能夠正常使用
Dataset<Row> list1 = allRecord.filter(allRecord.col("outid").equalTo(outid).and(allRecord.col("kid").gt(kid))).orderBy(allRecord.col("indate"));//.take(1);//
list1.show();//打印前20條記錄
5、where用法測試
//where方法能夠正常使用
Dataset<Row> list2 = allRecord.where("outid = '"+ outid +"'").where("kid > "+ kid +"").orderBy("indate");
list2.show();//打印前20條記錄
//6、分組取topN:測試::mysql中可以group by 2個字段查詢全部的字段。實際返回值是取的分組后的第一條記錄。(對應實際業務就按照學號和時間去重,數據當中有重復數據)
6.1、mysql中的原始sql
select a.id,a.outid,g.school_code,g.faculty_code,g.major_code,g.class_code,g.sex,a.OpDT,a.ioflag
from access_record a inner join own_org_student g on a.OutId=g.outid
where a.id > ? group by a.OutId,a.OpDT
6.2、hive和spark都不支持這種操作。他們的做法是一樣的。就是通過下面這個sql,用row_number()函數分組,取第一
final Dataset<Row> topRecord = spark.sql(
" select t.kid,t.outid,t.ioflag,t.school_code,t.faculty_code,t.major_code,t.class_code,t.sex,t.indate,t.outdate from (" +
" select a.kid,a.outid,a.ioflag,a.school_code,a.faculty_code,a.major_code,a.class_code,a.sex,a.indate,a.outdate, " +
" row_number() over(partition by outid order by indate) as rowNumber " + //根據行號top
" from view_access_record_inout_temp2 a " +
" where a.ioflag = 0 " +
" ) t where rowNumber =1 ");
topRecord.show();//打印前20條記錄
//7、循環:合並進入記錄和出去的記錄
7.1、這里先進行先按照學號分區,再按照時間排序。
Dataset<EcardAccessInout> allRecordSort= allRecordInout.repartition(allRecordInout.col("outid")).sortWithinPartitions(allRecordInout.col("indate"));
7、循環:合並進入記錄和出去的記錄(因為已經排序了,本條記錄的下一條,如果是正常記錄就是出去的記錄)
Iterator<EcardAccessInout> iterator = allRecordSort.toLocalIterator();
List<EcardAccessInout> result=new ArrayList<EcardAccessInout>();
while(iterator.hasNext()){
EcardAccessInout first= iterator.next();
if("0".endsWith(first.getIoflag())){//第一條記錄是:進入宿舍的記錄
if(iterator.hasNext()){
EcardAccessInout second= iterator.next();
//取比入記錄大的最小的一條出記錄的時間,作為入記錄的出時間。(排序后,后面一條就是最小記錄)
if(first.getOutid().endsWith(second.getOutid())&&("1".endsWith(second.getIoflag()))&&first.getIndate().before(second.getIndate())){
first.setOutdate(second.getIndate());
result.add(first);
}
}
}
}
//8、處理后的數據寫入mysql。這里只是個例子,實際數據應該是寫到hdfs,變成hive表
inRecord.write().mode(SaveMode.Append)
.jdbc(dbUrl, "datacenter.access_record_inout_temp10", connectionProperties);
}
}