hbase mutation操作,比如delete put等,都需要先獲取行鎖,然后再進行操作,在獲取行鎖時,是通過HRegion.getRowLockInternal(byte[] row, boolean waitForLock)進行的,因此,我們先大體瀏覽一下這個方法的流程,如下。可以看到,該方法中主要涉及到行鎖相關的內容為RowLock和RowLockContext兩個類。這兩個都是HRegion的內部類,下面詳細看一下這兩個類是咋實現的。
protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
HashedBytes rowKey = new HashedBytes(row);
RowLockContext rowLockContext = new RowLockContext(rowKey);
// loop until we acquire the row lock (unless !waitForLock)
while (true) {
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
if (existingContext == null) {
// Row is not already locked by any thread, use newly created context.
break;
} else if (existingContext.ownedByCurrentThread()) {
// Row is already locked by current thread, reuse existing context instead.
rowLockContext = existingContext;
break;
} else {
if (!waitForLock) {
return null;
}
try {
// Row is already locked by some other thread, give up or wait for it
if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for lock for row: " + rowKey);
}
} catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
}
}
// allocate new lock for this thread
return rowLockContext.newLock();
}
首先看RowLock類,該類主要邏輯是release方法,是用來釋放行鎖的。同時有一個布爾類型參數release,默認為false,代表該行鎖是否被釋放掉了。
public static class RowLock {
@VisibleForTesting final RowLockContext context;
private boolean released = false;
@VisibleForTesting RowLock(RowLockContext context) {
this.context = context;
}
/**
* Release the given lock. If there are no remaining locks held by the current thread
* then unlock the row and allow other threads to acquire the lock.
* @throws IllegalArgumentException if called by a different thread than the lock owning thread
*/
public void release() {
if (!released) {
context.releaseLock();
released = true;
}
}
}
但是在RowLock中,並沒有看到實際涉及到鎖的信息,這是咋回事呢,別急,細細看下release方法,里面有一個context,是RowLockContext類型。同時其構造方法中也傳了一個context對象,因此懷疑是在RowLockContext中new出了一個rowlock,進RowLockContext中看下:
@VisibleForTesting class RowLockContext {
private final HashedBytes row;
//通過計數以及CountDownLatch實現對行鎖的condition。這里之所以將countdownlatch設置為一,是因為hbase自己也不知道到底有多少condition來競爭鎖,所以加一個計數lockCount,
//當lockCount為零時,再把latch.coutDown。否則會在getRowLockInternal中await。
private final CountDownLatch latch = new CountDownLatch(1);
private final Thread thread;
private int lockCount = 0;
RowLockContext(HashedBytes row) {
this.row = row;
this.thread = Thread.currentThread();
}
boolean ownedByCurrentThread() {
return thread == Thread.currentThread();
}
RowLock newLock() {
lockCount++;
return new RowLock(this);
}
void releaseLock() {
if (!ownedByCurrentThread()) {
throw new IllegalArgumentException("Lock held by thread: " + thread
+ " cannot be released by different thread: " + Thread.currentThread());
}
lockCount--;
if (lockCount == 0) {
// no remaining locks by the thread, unlock and allow other threads to access
RowLockContext existingContext = lockedRows.remove(row);
if (existingContext != this) {
throw new RuntimeException(
"Internal row lock state inconsistent, should not happen, row: " + row);
}
latch.countDown();
}
}
}
通過計數以及CountDownLatch實現對行鎖的condition。這里之所以將countdownlatch設置為一,是因為hbase自己也不知道到底有多少condition來競爭鎖,所以加一個計數lockCount,
當lockCount為零時,再把latch.coutDown。否則會在getRowLockInternal中await。
在HRegion中還有一個關鍵的成員變量: lockedrows,用來存儲當前已經獲取了行鎖的所有行信息,key為rowkey,value為RowLockContext。
// map from a locked row to the context for that lock including:
// - CountDownLatch for threads waiting on that row
// - the thread that owns the lock (allow reentrancy)
// - reference count of (reentrant) locks held by the thread
// - the row itself
private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
new ConcurrentHashMap<HashedBytes, RowLockContext>();
好啦,行鎖涉及到的內容,我們都大體瀏覽了,再從getRowLockInternal中開始通一遍邏輯:
- 根據rowkey構建RowLockContext對象
- while循環,直到獲取到行鎖,或者wait超時
- 首先判斷lockedrows中是否有該rowkey的行鎖信息,此處利用的是concurrentMap的putIfAbsent
- 如果不存在,以為着這行鎖還沒有其他線程拿到,將行鎖信息加入到lockedrows中,直接break跳出循環,然后now一個行鎖。
- 如果存在,則以為着該行鎖已經被占有了,邏輯如下
- 判斷持有該行鎖的線程是否是自己本身,如果是,則直接覆蓋rowLockContext,跳出循環
- 判斷是否需要wait 行鎖,通過參數waitForLock,如果不wait直接return;如果wait,則調用latch.await等待,如果超時則拋出異常。
- 如果跳出了循環,則意味着獲取成功,則newLock並返回。
- 首先判斷lockedrows中是否有該rowkey的行鎖信息,此處利用的是concurrentMap的putIfAbsent
上面是獲取行鎖的流程,釋放行鎖呢,是通過HRegion的releaseRowLocks方式實現,我們看下代碼:
/**
* If the given list of row locks is not null, releases all locks.
*/
public void releaseRowLocks(List<RowLock> rowLocks) {
if (rowLocks != null) {
for (RowLock rowLock : rowLocks) {
rowLock.release();
}
rowLocks.clear();
}
}
可見是調用RowLock.release實現,該方法代碼在上面有,具體的邏輯如下:
在lockedrows中將該行鎖刪除。
判斷release是否為false,如果為false,則調用context.releaseLock,context.releaseLock邏輯如下
首先判斷釋放該行鎖的線程是否是該行鎖的持有者,若不是則拋出異常
將count--;
如果count==0了,則直接調用latch.countDown,這個方法會觸發其他線程去獲取行鎖。當count==0了也就是說該線程已經不需要改行鎖,已經釋放
將release設置為true。
注意:
這里在getRowLockInternal中,只要lockedRows.putIfAbsent(rowKey, rowLockContext)成功,其他線程將不會獲取成功,由concurrentMap保證。
