數據湖
數據湖的產生是為了存儲各種各樣原始數據的大型倉庫。這些數據根據需求,進行存取、處理、分析等。對於存儲部分來說,開源版本常見的就是 hdfs。而各大雲廠商也提供了各自的存儲服務,如 Amazon S3,Azure Blob 等。
而由於數據湖中存儲的數據全部為原始數據,一般需要對數據做ETL(Extract-Transform-Load)。對於大型數據集,常用的框架是 Spark、pyspark。在數據做完 ETL 后,再次將清洗后的數據存儲到存儲系統中(如hdfs、s3)。基於這部分清洗后的數據,數據分析師或是機器學習工程師等,可可以基於這些數據進行數據分析或是訓練模型。在這些過程中,還有非常重要的一點是:如何對數據進行元數據管理?
在 AWS 中,Glue 服務不僅提供了 ETL 服務,還提供的元數據的管理。下面我們會使用 S3+Glue +EMR 來展示一個數據湖+ETL+數據分析的一個簡單過程。
准備數據
此次使用的是GDELT數據,地址為:
https://registry.opendata.aws/gdelt/
此數據集中,每個文件名均顯示了此文件的日期。作為原始數據,我們首先將2015年的數據放在一個year=2015 的s3目錄下:
aws s3 cp s3://xxx/data/20151231.export.csv s3://xxxx/gdelt/year=2015/20151231.export.csv
使用Glue爬取數據定義
通過glue 創建一個爬網程序,爬取此文件中的數據格式,指定的數據源路徑為s3://xxxx/gdelt/ 。
此部分功能及具體介紹可參考aws 官方文檔:
https://docs.aws.amazon.com/zh_cn/glue/latest/dg/console-crawlers.html
爬網程序結束后,在Glue 的數據目錄中,即可看到新創建的 gdelt 表:
原數據為csv格式,由於沒有header,所以列名分別為col0、col1…、col57。其中由於s3下的目錄結構為year=2015,所以爬網程序自動將year 識別為分區列。
至此,這部分原數據的元數據即保存在了Glue。在做ETL 之前,我們可以使用AWS EMR 先驗證一下它對元數據的管理。
AWS EMR
AWS EMR 是 AWS 提供的大數據集群,可以一鍵啟動帶Hive、HBase、Presto、Spark 等常用框架的集群。
啟動AWS EMR,勾選 Hive、Spark,並使用Glue作為它們表的元數據。EMR 啟動后,登錄到主節點,啟動Hive:
> show tables;
gdelt
Time taken: 0.154 seconds, Fetched: 1 row(s)
可以看到在 hive 中已經可以看到此表,執行查詢:
> select * from gdelt where year=2015 limit 3; OK 498318487 20060102 200601 2006 2006.0055 CVL COMMUNITY CVL 1 53 53 5 1 3.8 3 1 3 -2.42718446601942 1 United States US US 38.0 -97.0 US 0 NULL NULL 1 United States US US 38.0 -97.0 US 20151231 http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896 2015 498318488 20060102 200601 2006 2006.0055 CVL COMMUNITY CVL USA UNITED STATES USA 1 51 51 5 1 3.4 3 1 3 -2.42718446601942 1 United States US US 38.0 -97.0 US 1 United States US US 38.0 -97.0 US 1 United States US US 38.0 -97.0 US 20151231 http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896 2015 498318489 20060102 200601 2006 2006.0055 CVL COMMUNITY CVL USA UNITED STATES USA 1 53 53 5 1 3.8 3 1 3 -2.42718446601942 1 United States US US 38.0 -97.0 US 1 United States US US 38.0 -97.0 US 1 United States US US 38.0 -97.0 US 20151231 http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896 2015
可以看到原始數據的列非常多,假設我們所需要的僅有4列:事件ID、國家代碼、日期、以及網址,並基於這些數據做分析。那我們下一步就是做ETL。
GLUE ETL
Glue 服務也提供了 ETL 的工具,可以編寫基於spark 或是 python 的腳本,提交給 glue etl 執行。在這個例子中,我們會抽取col0、col52、col56、col57、以及year這些列,並給它們重命名。然后從中抽取僅包含“UK”的記錄,最終以date=current_day 的格式寫入到最終s3 目錄,存儲格式為parquet。可以通過 python 或是 scala 語言調用 GLUE 編程接口,在本文中使用的是 scala:
import com.amazonaws.services.glue.ChoiceOption import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.ResolveSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import java.text.SimpleDateFormat import java.util.Date object Gdelt_etl { def main(sysArgs: Array[String]) { val sc: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(sc) val spark = glueContext.getSparkSession // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // db and table val dbName = "default" val tblName = "gdelt" // s3 location for output val format = new SimpleDateFormat("yyyy-MM-dd") val curdate = format.format(new Date()) val outputDir = "s3://xxx-xxx-xxx/cleaned-gdelt/cur_date=" + curdate + "/" // Read data into DynamicFrame val raw_data = glueContext.getCatalogSource(database=dbName, tableName=tblName).getDynamicFrame() // Re-Mapping Data val cleanedDyF = raw_data.applyMapping(Seq(("col0", "long", "EventID", "string"), ("col52", "string", "CountryCode", "string"), ("col56", "long", "Date", "String"), ("col57", "string", "url", "string"), ("year", "string", "year", "string"))) // Spark SQL on a Spark DataFrame val cleanedDF = cleanedDyF.toDF() cleanedDF.createOrReplaceTempView("gdlttable") // Get Only UK data val only_uk_sqlDF = spark.sql("select * from gdlttable where CountryCode = 'UK'") val cleanedSQLDyF = DynamicFrame(only_uk_sqlDF, glueContext).withName("only_uk_sqlDF") // Write it out in Parquet glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> outputDir)), format = "parquet").writeDynamicFrame(cleanedSQLDyF) Job.commit() } }
將此腳本保存為gdelt.scala 文件,並提交給 GLUE ETL作業執行。等待執行完畢后,我們可以在s3看到生成了輸出文件:
> aws s3 ls s3://xxxx-xxx-xxx/cleaned-gdelt/ date=2020-04-12/
part-00000-d25201b8-2d9c-49a0-95c8-f5e8cbb52b5b-c000.snappy.parquet
然后我們再對此/cleaned-gdelt/目錄執行一個新的 GLUE 網爬程序:
執行完成后,可以在GLUE 看到生成了新表,此表結構為:
可以看到輸入輸出格式均為parquet,分區鍵為cur_date,且僅包含了我們所需的列。
再次進入到 EMR Hive 中,可以看到新表已出現:
hive> describe cleaned_gdelt; OK eventid string countrycode string date string url string year string date string # Partition Information # col_name data_type comment cur_date string
查詢此表:
hive> select * from cleaned_gdelt limit 10; OK SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 498318821 UK 20151231 http://wmpoweruser.com/microsoft-denies-lumia-950-xl-withdrawn-due-issues-says-stock-due-strong-demand/ 2015 498319466 UK 20151231 http://www.princegeorgecitizen.com/news/police-say-woman-man-mauled-by-2-dogs-in-home-in-british-columbia-1.2142296 2015 498319777 UK 20151231 http://www.catchnews.com/life-society-news/happy-women-do-not-live-any-longer-than-sad-women-1451420391.html 2015 498319915 UK 20151231 http://www.nationalinterest.org/feature/the-perils-eu-army-14770 2015 … Time taken: 0.394 seconds, Fetched: 10 row(s)
可以看到出現的結果均的 CountryCode 均為 UK,達到我們的目標。
自動化
下面是將 GLUE 網爬 + ETL 進行自動化。在GLUE ETL 的工作流程中,創建一個工作流,創建完后如下所示:
如圖所示,此工作流的過程為:
- 每晚11點40開始觸發工作流
- 觸發 gdelt 的網爬作業,爬取原始數據的元數據
- 觸發gdelt的ETL作業
- 觸發gdelt-cleaned 網爬程序,爬取清洗后的數據的元數據
下面我們添加一個新文件到原始文件目錄,此新數據為 year=2016 的數據:
aws s3 cp s3://xxx-xxxx/data/20160101.export.csv s3://xxx-xxx-xxx/gdelt/year=2016/20160101.export.csv
然后執行此工作流。
期間我們可以看到ETL job 在raw_crawler_done 之后,被正常觸發:
作業完成后,在Hive 中即可查詢到 2016 年的數據:
select * from cleaned_gdelt where year=2016 limit 10; OK 498554334 UK 20160101 http://medicinehatnews.com/news/national-news/2015/12/31/support-overwhelming-for-bc-couple-mauled-by-dogs-on-christmas-day/ 2016 498554336 UK 20160101 http://medicinehatnews.com/news/national-news/2015/12/31/support-overwhelming-for-bc-couple-mauled-by-dogs-on-christmas-day/ 2016 …