Hive從0.14版本開始支持事務和行級更新,但缺省是不支持的,需要一些附加的配置。要想支持行級insert、update、delete,需要配置Hive支持事務。
一、Hive具有ACID語義事務的使用場景
1. 流式接收數據。
許多用戶使用諸如Apache Flume、Apache Storm或Apache Kafka這樣的工具將流數據灌入Hadoop集群。當這些工具以每秒數百行的頻率寫入時,Hive也許只能每15分鍾到1小時添加一個分區,因為過於頻繁地添加分區很快就會使一個表中的分區數量難以維護。而且這些工具還可能向已存在的分區中寫數據,但是這樣將會產生臟讀(可能讀到查詢開始時間點以后寫入的數據),還在這些分區的所在目錄中遺留大量小文件,進而給NameNode造成壓力。在這個使用場景下,事務支持可以獲得數據的一致性視圖同時避免產生過多的文件。
2. 緩慢變化維。
在一個典型的星型模式數據倉庫中,維度表隨時間的變化很緩慢。例如,一個零售商開了一家新商店,需要將新店數據加到商店表,或者一個已有商店的營業面積或其它需要跟蹤的特性改變了。這些改變會導致插入或修改個別記錄。從0.14版本開始,Hive支持行級更新。
3. 數據重述。
有時發現數據集合有錯誤並需要更正。或者當前數據只是個近似值(如只有全部數據的90%,得到全部數據會滯后)。或者業務業務規則可能需要根據后續事務重述特定事務(打個比方,一個客戶購買了一些商品后又購買了一個會員資格,此時可以享受折扣價格,包括先前購買的商品)。或者一個客戶可能按照合同在終止了合作關系后要求刪除他們的客戶數據。從Hive 0.14開始,這些使用場景可以通過INSERT、UPDATE和DELETE支持。
二、配置Hive支持事務(Hive 2.0版)
1. 在hive-site.xml文件中添加如下配置項
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
<
property
>
<
name
>hive.support.concurrency</
name
>
<
value
>true</
value
>
</
property
>
<
property
>
<
name
>hive.exec.dynamic.partition.mode</
name
>
<
value
>nonstrict</
value
>
</
property
>
<
property
>
<
name
>hive.txn.manager</
name
>
<
value
>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</
value
>
</
property
>
<
property
>
<
name
>hive.compactor.initiator.on</
name
>
<
value
>true</
value
>
</
property
>
<
property
>
<
name
>hive.compactor.worker.threads</
name
>
<
value
>1</
value
>
</
property
>
|
2. 添加Hive元數據(使用mysql存儲)
1
2
3
4
|
INSERT
INTO
NEXT_LOCK_ID
VALUES
(1);
INSERT
INTO
NEXT_COMPACTION_QUEUE_ID
VALUES
(1);
INSERT
INTO
NEXT_TXN_ID
VALUES
(1);
COMMIT
;
|
說明:初始時這三個表沒有數據,如果不添加數據,會報以下錯誤:
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager FAILED: Error in acquiring locks: Error communicating with the metastore
三、測試
1. 啟動hadoop集群和mysql
1
2
3
|
$HADOOP_HOME
/sbin/start-dfs
.sh
$HADOOP_HOME
/sbin/start-yarn
.sh
~
/mysql/bin/mysqld
&
|
2. 建立測試表
1
2
3
4
|
use
test
;
create table t1(
id
int, name string)
clustered by (
id
) into 8 buckets
stored as orc TBLPROPERTIES (
'transactional'
=
'true'
);
|
說明:建表語句必須帶有into buckets子句和stored as orc TBLPROPERTIES ('transactional'='true')子句,並且不能帶有sorted by子句。
3. 測試insert、update、delete
1
2
3
4
|
insert
into
t1
values
(1,
'aaa'
);
insert
into
t1
values
(2,
'bbb'
);
update
t1
set
name
=
'ccc'
where
id=1;
delete
from
t1
where
id=2;
|
執行結果分別如圖1-3所示。
<ignore_js_op>
圖1
<ignore_js_op>
圖2
<ignore_js_op>
圖3
說明:不能修改bucket列的值,否則會報以下錯誤:
FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column id.
4. 已有非ORC表的轉換
-- 在本地文件/home/grid/a.txt中寫入以下4行數據
1,張三,US,CA
2,李四,US,CB
3,王五,CA,BB
4,趙六,CA,BC
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
-- 建立非分區表並加載數據
CREATE
TABLE
t1 (id
INT
,
name
STRING, cty STRING, st STRING) ROW FORMAT DELIMITED FIELDS TERMINATED
BY
','
;
LOAD
DATA
LOCAL
INPATH
'/home/grid/a.txt'
INTO
TABLE
t1;
SELECT
*
FROM
t1;
-- 建立外部分區事務表並加載數據
CREATE
EXTERNAL
TABLE
t2 (id
INT
,
name
STRING) PARTITIONED
BY
(country STRING, state STRING)
CLUSTERED
BY
(id)
INTO
8 BUCKETS
STORED
AS
ORC TBLPROPERTIES (
'transactional'
=
'true'
);
INSERT
INTO
T2 PARTITION (country, state)
SELECT
*
FROM
T1;
SELECT
*
FROM
t2;
-- 修改數據
INSERT
INTO
TABLE
t2 PARTITION (country, state)
VALUES
(5,
'劉'
,
'DD'
,
'DD'
);
UPDATE
t2
SET
name
=
'張'
WHERE
id=1;
DELETE
FROM
t2
WHERE
name
=
'李四'
;
SELECT
*
FROM
t2;
|
修改前和修改后的數據分別如圖4、圖5所示。
<ignore_js_op>
圖4
<ignore_js_op>
圖5
說明:不能update分區鍵,否則會報以下錯誤:
FAILED: SemanticException [Error 10292]: Updating values of partition columns is not supported