Delta Lake
特性
- 支持ACID事務
- 可擴展的元數據處理
- 統一的流、批處理API接口
- 更新、刪除數據,實時讀寫(讀是讀當前的最新快照)
- 數據版本控制,根據需要查看歷史數據快照,可回滾數據
- 自動處理schema變化,可修改表結構
maven依賴
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.5.0</version>
</dependency>
因為要方便跑 demo,我這里選擇用 spark-shell 來交互式探索一下 Delta Lake 的功能。
按照文檔介紹,Delta Lake 是需要 Spark 2.4.2 或以上版本的,所以大家最好去官網下載一個預先編譯的 Spark 包。
按照上圖,輸入命令 bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0 就可以啟動加載了 Delta Lake 的 spark shell 了。
關於 --packages 的用法,是因為 Spark 有個專門解析選項參數的工具叫做 SparkSubmitOptionParser,他可以解析到依賴,並且先在本地倉庫找,沒有的話就會根據你的 Maven 配置到遠程拉取,這里 Spark
內部做了一些事情。
如何沒有配置maven倉庫的話,可以先下載編譯好的jar包放到服務器目錄然后通過--jars命令加載此jar包即可
bin/spark-shell --jars ./jars/delta-core_2.11-0.5.0-SNAPSHOT.jar
基礎表操作
//創建 delta 表和分區表
val data = spark.range(0, 5) data.write.format("delta").save("/tmp/delta-table") data.write.format("delta").partitionBy("date").save("/tmp/delta-table") //讀delta表 //第一種方式
val df = spark.read.format("delta").load("/tmp/delta-table") //第二種方式
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table") //覆蓋delta表數據(mode換成append就是插入數據) //注意:delta lake會記憶表的schema,默認情況下,overwrite只會更改表數據,不會更改表結構 //注意:可通過.option("mergeSchema", "true"),將df中有而schema沒有的字段添加到schema中,也就是add column //注意:在overwrite時,可通過df.write.option("overwriteSchema", "true")來替換原有的schema
val data = spark.range(5, 10) data.write.format("delta").mode("overwrite").save("/tmp/delta-table") //更新delta表數據
deltaTable.update( condition = expr("id % 2 == 0"), set = Map("id" -> expr("id + 100")) ) // Upsert (merge) delta表數據
val newData = spark.range(0, 20).toDF deltaTable.as("oldData") .merge( newData.as("newData"), "oldData.id = newData.id") .whenMatched(col("date") > "2019-01-01") .update(Map("id" -> col("newData.id"))) .whenMatched .delete() .whenNotMatched .insert(Map("id" -> col("newData.id"))) .execute() //讀寫流式數據到delta表(可以邊寫邊讀) //有append和complete兩種輸出模式,complete和overwrite意思一樣
val streamingDf = spark.readStream.format("delta").load("/delta/events") val stream = streamingDf.select("value").as("id").writeStream.format("delta").outputMode("append").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table") val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.outputMode("complete").format("console").start() //刪除delta表數據(只是添加墓碑標記,不是物理刪除)
deltaTable.delete("id % 2 == 0") deltaTable.delete(condition = expr("id % 2 == 0")) import org.apache.spark.sql.functions._ import spark.implicits._ deltaTable.delete(col("date") < "2017-01-01") //根據時間戳或者版本號查看歷史數據(time travel)
val timestamp_string = "2019-01-01" val version = "0" val df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events") val df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events") //查看數據的歷史版本
deltaTable.history().show() spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show() //只保留最新的數據(vacuum是用來清理磁盤上的歷史數據)
deltaTable.vacuum(0) spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 24 HOURS”)
merge操作
merge是delta lake的重要操作,它實現了upsert和delete功能。(示例詳見基礎表操作)
- merge最多有兩個whenMatched和一個whenNotMatched,且至少有一個when;
- 如果有兩個whenMatched,則第一個whenMatched必須有條件,否則會報錯。whenNotMatched可有條件,也可沒有;
- whenMatched最多有一個更新操作和一個刪除操作,whenNotMatched最多有一個刪除操作,相同操作在一個when里只能有一個。merge的實現就是inner join、left anti join以及full join(詳見源碼分析)。
- match了才能刪除,不支持刪除沒有match的數據;
- match了肯定只能update,沒match也沒法update,只能insert;
- merge的實現就是inner join、left anti join以及full join(詳見源碼分析)。
delta lake更改現有數據的具體過程
delta lake以增量寫文件的方式支持數據的更新和刪除。
- 匹配數據,定位需要刪除的行和涉及的文件;
- 將這些文件中需要保留的數據重寫到新的文件,然后給舊文件打上墓碑標記。
- 刪除、更新、合並(merge)都是這個流程。
delta表schema
delta lake對schema的驗證很嚴格,但同時也支持schema更改。
- delta表的schema以json格式保存在事務日志中;(schema屬於元數據里的Metadata,存放在commit log的json里,詳見源碼分析)
- delta表在寫數據的時候,如果dataframe包含了表schema中沒有的字段,那么會報錯;
- delta表在寫數據的時候,dataframe中的數據類型要和schema中的一致,否則會報錯;
- delta表在寫數據的時候,如果dataframe中沒有表schema中對應的字段,那么在寫數據時,對應字段默認為空值;
- delta表的schema中,字段名的小寫不能相同,如'My'和'my'不能作為兩個不同的字段名;(因為雖然delta lake區分大小寫,但保存時不敏感,而parquet保存時是大小寫敏感的,所以加了這個限制)
- 可通過.option("mergeSchema", "true"),將df中有而schema沒有的字段添加到schema中,也就是add column;
- 在overwrite時,可通過df.write.option("overwriteSchema", "true")來替換原有的schema。(上面的“基礎表操作”里有示例)
事務日志
事務日志是delta lake的核心,它記錄了delta表相關的所有commit操作。
- delta表是一個目錄,表的根目錄除了表數據外,有一個_delta_log目錄,用來存放事務日志;
- 事務日志記錄了從最初的delta表開始的所有commit事件,每個commit形成一個json文件,文件名是嚴格遞增的,文件名就是版本號。
- (默認)每10個json合並成一個parquet格式的checkpoint文件,記錄之前所有的commit。
- 事務日志有一個最新checkpoint的文件(_delta_log/_last_checkpoint),spark讀的時候會自動跳到最新的checkpoint,然后再讀之后的json。
- delta lake 使用樂觀的並發控制,當多個用戶同時寫數據時,(讀數據是讀當前最新版本的快照,互不影響),都是生成一個新版本的數據文件(文件名不重復),在提交commit時生成下一個版本的日志文件,因為日志版本號是連續遞增的,如果檢測到了同名的文件已存在,則說明有其他用戶執行了新的commit,此時進行沖突檢測,如果檢測通過,則更新當前的snapshot,然后繼續提交commit,如果未通過沖突檢測,則報錯。
- 因為事務日志的存在,可以找到歷史版本的數據,這也是時間穿梭的實現原理,delta lake可以根據commit記錄生成歷史版本的數據。
- 新版本的數據生成后,舊版本的數據不會立刻從磁盤刪除,可以使用 VACUUM 命令來刪除磁盤上的歷史版本數據。
delta表文件目錄
delta_table_path/
|-- _delta_log/
|--|-- 00000000000000000000.json
|--|-- 00000000000000000001.json
|--|-- 00000000000000000002.json
|--|-- 00000000000000000003.json
|--|-- 00000000000000000004.json
|--|-- 00000000000000000005.json
|--|-- 00000000000000000006.json
|--|-- 00000000000000000007.json
|--|-- 00000000000000000008.json
|--|-- 00000000000000000009.json
|--|-- 00000000000000000010.json
|--|-- 00000000000000000010.checkpoint.parquet
|--|-- 00000000000000000011.json
|--|-- 00000000000000000012.json
|--|-- _last_checkpoint
|-- part-00000-1339ec93-7d47-4ef7-b167-1e5aaa8cd75d-c000.snappy.parquet
|-- part-00000-10a95e81-d64c-40ff-9143-25e998aadcc5-c000.snappy.parquet
|-- part-00000-22f8124e-d2dd-4804-9037-b7a780f70a08-c000.snappy.parquet
|-- part-00000-7866ec4b-b955-4d86-b08c-58cfc71bc1ea-c000.snappy.parquet
|-- part-00000-d6431884-390d-4837-865c-f6e52f0e2cf5-c000.snappy.parquet
事務日志的一些疑問
Q: 誰來合並json日志?
A: OptimisticTransaction里commit時會調用postCommit()函數,這里會檢查日志的版本是否能整除checkpointInterval,如果能則調用deltaLog.checkpoint()函數生成新的checkpoint文件。(詳見源碼分析)
Q: 生成歷史版本的快照是從事務日志里一點點計算得來的?
A: 是的,但無需計算所有日志,只需要計算checkpoint和json文件。可以通過_last_checkpoint直接定位到最新的checkpoint,checkpoint只是單純的合並日志信息,減少讀取文件的數量,並不改變內容。
Q: 最新版本的數據不會刪除,刪除歷史版本后再想用時間穿梭,就得根據commit日志重新計算?
A: 舊版本的數據刪除后,就不能用時間穿梭了,時間穿梭只能用於已存在的版本數據。
關於delta lake的事務日志,可以看這篇博客,講解的很詳細
https://mp.weixin.qq.com/s?__biz=MzA5MTc0NTMwNQ==&mid=2650717784&idx=2&sn=53174b4dd05642d0d8746b10555ddcf2&chksm=887da32ebf0a2a38ec4b0e159994c915ee460d554eb84feea993ab0a2f72efc4eb715a07d145&scene=21#wechat_redirect
需要避免的操作
- 手動更改delta表文件。delta lake是使用事務日志來管理表的信息,即使手動添加了文件,因為事務日志里沒有此文件的信息,也讀取不了;
而如果手動刪除了文件,事務日志中該文件的指針依然存在,但無法讀取。 - 使用其他文件讀取方式。delta lake的數據是按照parquet格式存儲的,可以使用各種工具讀取,但是使用其他讀取方式,數據會存在安全隱患(官檔說的,其實只要別改文件內容,讀應該沒啥影響)。
delta lake目前的不足
- 更新操作很重,更新一條數據和更新一批數據的成本可能是一樣的,所以不適合一條條的更新數據
- 更新數據的方式是新增文件,會造成文件數量過多,需要清理歷史版本的數據,version最好不要保存太多
- 樂觀鎖在多用戶同時更新時並發能力較差,更適合寫少讀多的場景(或者only append寫多更新少場景)