一、事務的概述
1、定義
事務就是一組單元化操作,這些操作要么都執行,要么都不執行,是一個不可分割的工作單位。
2、特點
- 原子性:一個事務是一個不可再分割的工作單位,事務中的所有操作要么都發生,要么都不發生。
- 一致性:事務開始之前和事務結束以后,數據庫的完整性約束沒有被破壞。這是說數據庫事務不能破壞關系數據的完整性以及業務邏輯上的一致性。
- 隔離性:多個事務並發訪問,事務之間是隔離的,一個事務不影響其它事務運行效果。這指的是在並發環境中,當不同的事務同時操作相同的數據時,每個事務都有各自完整的數據空間。事務查看數據更新時,數據所處的狀態要么是另一事務修改它之前的狀態,要么是另一事務修改后的狀態,事務不會查看到中間狀態的數據。事務之間的相應影響,分別為:臟讀、不可重復讀、幻讀、丟失更新。
- 持久性(Durability):意味着在事務完成以后,該事務鎖對數據庫所作的更改便持久的保存在數據庫之中,並不會被回滾。
3、事務實現的原理
- 預寫日志(Write-ahead logging):保證原子性和持久性。
- 鎖(locking):保證隔離性;鎖是指在並發環境中通過讀寫鎖來保證操作的互斥性。根據隔離程度不同,鎖的運用也不同。
- 一致性,是因為一致性是應用相關的話題,它的定義一個由業務系統來定義,什么樣的狀態才是一致?而實現一致性的代碼通常在業務邏輯的代碼中得以體現。
二、使用事務的准備條件和限制
1、准備條件
a、 默認事務是關閉的,需要設置開啟。
b、需要添加如下配置
<property>
<name>hive.support.concurrency</name>
<value>
true
</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>
true
</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>
true
</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>
1
</value>
</property>
2、限制條件
- BEGIN, COMMIT, ROLLBACK 暫時不支持,所有操作自動提交
- 目前只支持ORC 的文件格式。
- 表必須支持分桶(即條件1的設置)
- 必須設置事務管理器 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager ,否則事務表無法工作(條件1的設置)
- 目前支持快照級別的隔離。就是當一次數據查詢時,會提供一個數據一致性的快照
- 已有的zookeeper和內存的鎖管理和Hive的事務不沖突
- LOAD DATA. 語句目前在事務表中暫時不支持
三、Hive中實現事務的原理
1、Hive事務實現原理
HDFS是不支持文件的修改,並且當有數據追加到文件,HDFS不對讀數據的用戶提供一致性的。為了在HDFS上支持以上的特性,Hive借鑒了其他數據倉庫工具的方法。如果Hive中的表開啟transaction,數據是insert進去的,則insert進去的數據會放到delta文件夾里面,hive后台會有個進程定時去跟base里的數據合並,然后刪除delta文件。即:表和分區的數據都被存在base files。 新的記錄和更新,刪除都存在delta files。一次事務操作創建一系列的delta files,將其合並為base。在讀取的時候,將基礎文件和修改,刪除合並,最后返回給查詢。
2、幾個名詞解釋
創建一個事務表:CREATE table demo (
num string,
create_date int,
number(19,4)
) clustered by (num ) into 40 buckets stored as orc tblproperties('transactional'='true');
a、表的Base and Delta Directories 目錄:
hive> dfs -ls -R /user/hive/warehouse/demo;
drwxr-xr-x - ekoifman staff 0 2016-06-09 17:07 /user/hive/warehouse/demo/delta_0000044_0000044_0000
-rw-r--r-- 1 ekoifman staff 610 2016-06-09 17:07 /user/hive/warehouse/demo/delta_0000044_0000044_0000/bucket_00000
- Minor compaction :將已有的delta files重寫到一個單獨的delta file,每個分桶一個。
- Major compaction: 將delta文件和base 重寫到一個新的base file,每個分桶一個。 這個合並操作的代價更大。
- 所有的合並操作都是后台進行,不會影響並行的數據讀取和寫入。合並完成之后,系統會等到所以的讀操作完成再刪除舊的文件。
d、Worker
該進程檢查在 hive.txn.timeout 內沒有心跳的事務並丟棄。對於一個初始化過事務的client,如果心跳停止了,它所鎖住的資源會被釋放。
四、datax通過數據中遇到的問題
1、前提條件
當開啟了事務,但是沒有配置參數,即事務限制條件:
- 客戶端
hive.support.concurrency – true
hive.enforce.bucketing – true (Hive 2.0 默認)
hive.exec.dynamic.partition.mode – nonstrict
hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager - 服務端 (Metastore)
hive.compactor.initiator.on – true
hive.compactor.worker.threads – a positive number
當直接向表格中插入數據時,HDFS中存儲的都是delta數據,datax無法讀取,會報錯:
java.lang.IllegalArgumentException: delta_0051045_0051045 does not start with base_
at org.apache.hadoop.hive.ql.io.AcidUtils.parseBase(AcidUtils.java:144)
at org.apache.hadoop.hive.ql.io.AcidUtils.parseBaseBucketFilename(AcidUtils.java:172)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.run(OrcInputFormat.java:544)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-02-12 12:58:21.367 [0-0-36-reader] ERROR HdfsReader$Job - 從orcfile文件路徑[hdfs://10.20.30.50:8020/inceptor1/user/hive/warehouse/iri.db/hive/var_scr/delta_0051045_0051045/bucket_00017]中讀取數據發生異常,請聯系系統管理員。