估計Phoenix中支持Joins,對很多使用Hbase的朋友來說,還是比較好的。下面我們就來演示一下。
首先看一下幾張表的數據:
Orders表:
OrderID |
CustomerID |
ItemID |
Quantity |
Date |
1630781 |
C004 |
I001 |
650 |
09-01-2013 |
1630782 |
C003 |
I006 |
2500 |
09-02-2013 |
1630783 |
C002 |
I002 |
340 |
09-03-2013 |
1630784 |
C004 |
I006 |
1260 |
09-04-2013 |
1630785 |
C005 |
I003 |
1500 |
09-05-2013 |
數據保存到Orders.csv,內容格式為:
1630781,C004,I001,650,09-01-2013
1630782,C003,I006,2500,09-02-2013
1630783,C002,I002,340,09-03-2013
1630784,C004,I006,1260,09-04-2013
1630785,C005,I003,1500,09-05-2013
Customers表:
CustomerID |
CustomerName |
Country |
C001 |
Telefunken |
Germany |
C002 |
Logica |
Belgium |
C003 |
Salora Oy |
Finland |
C004 |
Alps Nordic AB |
Sweden |
C005 |
Deister Electronics |
Germany |
C006 |
Thales Nederland |
Netherlands |
數據保存到Customers.csv,內容格式為:
C001,Telefunken,Germany
C002,Logica,Belgium
C003,Salora Oy,Finland
C004,Alps Nordic AB,Sweden
C005,Deister Electronics,Germany
C006,Thales Nederland,Netherlands
Items表:
ItemID |
ItemName |
Price |
I001 |
BX016 |
15.96 |
I002 |
MU947 |
20.35 |
I003 |
MU3508 |
9.6 |
I004 |
XC7732 |
55.24 |
I005 |
XT0019 |
12.65 |
I006 |
XT2217 |
12.35 |
數據保存到Items.csv,內容格式為:
I001,BX016,15.96
I002,MU947,20.35
I003,MU3508,9.6
I004,XC7732,55.24
I005,XT0019,12.65
I006,XT2217,12.35
創建表的語句為:
Orders.sql文件內容為:
create table IF NOT EXISTS Orders (
OrderID Integer,
CustomerID Char(4),
ItemID Char(4),
Quantity Integer,
Date Char(10)
constraint pk Primary key(OrderID)
);
Customers.sql文件內容為:
create table IF NOT EXISTS Customers (
CustomerID Char(4),
CustomerName Varchar(50),
Country Varchar(50)
constraint pk Primary key(CustomerID)
);
Items.sql文件內容為:
create table IF NOT EXISTS Items (
ItemID Char(4),
ItemName Char(10),
Price Decimal(25,2)
constraint pk Primary key(ItemID)
);
我們將上面的數據導入到創建的表中:
bin/psql.py gpmaster:2181:/hbaseforkylin Orders.sql Orders.csv
bin/psql.py gpmaster:2181:/hbaseforkylin Customers.sql Customers.csv
bin/psql.py gpmaster:2181:/hbaseforkylin Items.sql Items.sql
查看數據是否已經導入:
可以看到數據都全部導入了。
下面來執行一些關於Join的相關操作:
SELECT O.OrderID,C.CustomerName,C.Country, O.Date
FROM Orders AS O
INNER JOIN Customers AS C
ON O.CustomerID = C.CustomerID;
執行過程以及結果如下:
基於索引的Join操作
當執行join查詢操作時,二級索引能夠自動地被利用。如果我們在Orders和Items上分別創建索引,如下:
CREATE INDEX iOrders ON Orders (ItemID) INCLUDE (CustomerID, Quantity);
CREATE INDEX i2Orders ON Orders(CustomerID) INCLUDE (ItemID, Quantity);
CREATE INDEX iItems ON Items (ItemName) INCLUDE (Price);
如果你創建可變的二級索引,出現如下的錯誤:
Error: ERROR 1029 (42Y88): Mutable secondary indexes must have the hbase.regionserver.wal.codec property set to org.apache.Hadoop.hbase.regionserver.wal.IndexedWALEditCodec in thehbase-sites.xml of every region server. tableName=IORDERS(state=42Y88,code=1029)
那么需要在RegionServer每個節點的hbase-site.xml中配置參數:
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
然后重啟HBase集群。
執行查詢:
SELECT ItemName,sum(Price * Quantity) AS OrderValue
FROM Items
JOIN Orders
ON Items.ItemID = Orders.ItemID
WHERE Orders.CustomerID > 'C002'
GROUP BY ItemName;
查詢結果為:
通過explain查詢執行計划:
在這個案例中,可以看到索引iItems和索引i2Orders都有使用。
GroupedJoins and Derived Tables
Phoenix也支持復雜的Join語法,比如 grouped joins(或子查詢),以及derived-tables(派生表)的join操作。
對於 grouped joins來說,如下:
SELECT O.OrderID,C.CustomerID, I.ItemID
FROM Customers AS C
INNER JOIN
(Items AS I
INNER JOIN OrdersAS O
ON I.ItemID = O.ItemID)
ON C.CustomerID = O.CustomerID;
通過使用一個子查詢(derived table)替換 sub join,得到相等的查詢:
SELECT S.OrderID,C.CustomerID, S.ItemID
FROM Customers AS C
INNER JOIN
( select O.CustomerID,
O.OrderID,
I.ItemID
from Items AS I
INNER JOIN Orders AS O
ON I.ItemID = O.ItemID) S
ON C.CustomerID = S.CustomerID;
上面兩個查詢結果都為:
Hash Join vs. Sort-Merge Join
基本的Hash Join通常比其他類型的join算法更好,但是它也有一些限制,其中最典型的一個特性是關系中的一張表要小到能夠加載到內存中。Phoenix同時支持Hash Join和Sort-Merge Join去實現快速的join操作以及兩張大表之間的join操作。
Phoenix目前盡可能地使用Hash Join算法,因為通常更快。但是我們可以使用“USE_SORT_MERGE_JOIN”的hint在查詢中使用Sort-Merge Join。對於這兩種join的算法的選擇將來會根據表的統計信息自動選擇。
Foreign Key to Primary Key Join Optimization
通常情況下,一個join發生在一個child 表到一個parent 表,通過child表的外鍵映射到一個parent表的主鍵。因此代替對parent表的全表掃描,Phoenix將基於child表的外鍵值對parent表進行skip-scan或range-scan掃描。
下面我們舉個例子,parent表為“Employee”,child表為“Patent”。
CREATE TABLE Employee (
Region VARCHAR NOT NULL,
LocalID VARCHAR NOT NULL,
Name VARCHAR,
StartDate DATE,
CONSTRAINT pk PRIMARY KEY (Region, LocalID));
CREATE TABLE Patent (
PatentID VARCHAR NOT NULL,
Region VARCHAR,
LocalID VARCHAR,
Title VARCHAR,
Category VARCHAR,
FileDate DATE,
CONSTRAINT pk PRIMARY KEY (PatentID));
SELECT E.Name, E.Region, P.PCount
FROM Employee AS E
JOIN
(SELECT Region, LocalID, count(*) AS PCount
FROM Patent
WHERE FileDate >= to_date('2000-01-01')
GROUP BY Region, LocalID) AS P
ON E.Region = P.Region AND E.LocalID =P.LocalID;
上面的查詢語句通過Region和LocalID兩個join的key對表“Employee”使用skip-scan掃描。下面是查詢使用和不使用這個優化的執行時間(“Employee”大概5000000 行,“Patent”大約1000 行記錄):
W/O(沒有) Optimization |
W/(有) Optimization |
8.1s |
0.4s |
然而,當考慮到child外鍵的值完全在parent表的主鍵空間中,因此使用skip-scan只會慢不會快的。你可以總是通過指定“NO_CHILD_PARENT_OPTIMIZATION”的hint關閉這個優化。將來,通過表的統計信息智能地選擇這兩種模式。
Configuration
前面我們提到,使用Hash Join時,前提條件是將關聯中的一張表加載到內存中,以便廣播到所有的服務器,因此需要考慮RegionServer服務器的堆內存足夠大以便容納較小的表,我們也需要關注一下使用Hash Join的幾個關鍵性的配置參數。
服務器端緩存用於存放哈希表,緩存的大小和生存時間由下面的幾個參數控制。
1. phoenix.query.maxServerCacheBytes
一個relation在被壓縮和發送到RegionServer前的最大原始數據大小(bytes)。
如果嘗試去序列化一個relation原始數據大小超過這個值的話,會導致MaxServerCacheSizeExceededException錯誤。
默認值為:104857600
2. phoenix.query.maxGlobalMemoryPercentage
所有threads使用的堆內存百分比(Runtime.getRuntime().maxMemory())。
默認值為:15
3. phoenix.coprocessor.maxServerCacheTimeToLiveMs
服務器端緩存的最大生存時間。
當在服務器端出現IO異常(“Could not find hash cache for joinId”),就考慮調整這個參數了。
如果獲取到“Earlier hash cache(s) might have expired on servers”告警日志時,可以提升這個參數的值。
默認值為:30000(30s)
盡管有時可以通過修改參數來解決上面提到的一些異常問題,但是強烈建議首先要考慮優化join的查詢,大家可以學習下面的優化部分內容。
OptimizingYour Query
下面是默認join的順序(沒有表統計信息的存在),查詢的一邊作為“smaller”relation並且將被加載到服務器的內存中:
1. lhs INNER JOIN rhs
rhs將在服務器的內存中建立hash表
2. lhs LEFT OUTER JOIN rhs
rhs將在服務器的內存中建立hash表
3. lhs RIGHT OUTER JOIN rhs
lhs將在服務器的內存中建立hash表
對於多個join查詢來說,join的順序是比較復雜的,你可以使用explain來查詢真正的執行計划。對於multiple-inner-join查詢來說,Phoenix默認應用star-join優化,意味着join所有右手邊的表的同時,leading的表(即左手邊的表)將僅僅被掃描一次。當所有右手邊的表的大小超過內存限制時,你可以通過使用“NO_STAR_JOIN”的hint來關閉這個優化。
下面我們來看一下之前的查詢示例:
SELECT O.OrderID, C.CustomerName, I.ItemName, I.Price,O.Quantity
FROM Orders AS O
INNER JOIN Customers AS C
ON O.CustomerID = C.CustomerID
INNER JOIN Items AS I
ON O.ItemID = I.ItemID;
默認的Join順序為(使用star-join優化):
1. SCAN Customers --> BUILD HASH[0]
SCAN Items --> BUILD HASH[1]
2. SCAN Orders JOIN HASH[0], HASH[1] --> Final Resultset
另外,如果我們使用“NO_STAR_JOIN”的hint,如下:
SELECT /*+ NO_STAR_JOIN*/ O.OrderID, C.CustomerName, I.ItemName, I.Price, O.Quantity
FROM Orders AS O
INNER JOIN Customers AS C
ON O.CustomerID = C.CustomerID
INNER JOIN Items AS I
ON O.ItemID = I.ItemID;
這次的Join順序為:
1. SCAN Customers --> BUILD HASH[0]
2. SCAN Orders JOIN HASH[0]; CLOSE HASH[0] --> BUILD HASH[1]
3. SCAN Items JOIN HASH[1] --> Final Resultset
這里需要說明的是,並不是表的整個數據集都計算到內存占用的,而是只有查詢使用的列數據並且過濾后的記錄才會在服務器端建立Hash表。