概要
Elasticsearch在文檔更新時默認使用的是樂觀鎖方案,而Elasticsearch利用文檔的一些create限制條件,也能達到悲觀鎖的效果,我們一起來看一看。
樂觀鎖與悲觀鎖
樂觀鎖
ES默認實現樂觀鎖,所有的數據更新默認使用樂觀鎖機制。document更新時,必須要帶上currenct version,更新時與document的version進行比較,如果相同進行更新操作,不相同表示已經被別的線程更新過了,此時更新失敗,並且重新獲取新的version再嘗試更新。
悲觀鎖
我們舉一個這樣的例子:Elasticsearch存儲文件系統的目錄、文件名信息,有多個線程需要對/home/workspace/ReadMe.txt進行追加修改,而且是並發執行的,有先后順序之分,跟之前的庫存更新案例有點不一樣,此時單純使用樂觀鎖,可能會出現亂序的問題。
這種場景就需要使用悲觀鎖控制,保證線程的執行順序,有一個線程在修改,其他的線程只能掛起等待。悲觀鎖通過/index/lock/實現,只有一個線程能做修改操作,其他線程block掉。
悲觀鎖有三種,分別對應三種粒度,由粗到細可為分:
- 全局鎖:最粗的鎖,直接鎖整個索引
- document鎖:指定id加鎖,只鎖一條數據,類似於數據庫的行鎖
- 共享鎖和排他鎖:也叫讀寫鎖,針對一條數據分讀和寫兩種操作,一般共享鎖允許多個線程對同一條數據進行加鎖,排他鎖只允許一個線程對數據加鎖,並且排他鎖和共享鎖互斥。
鎖的基本操作步驟
我們使用鎖的基本步驟都是一樣的,無論是關系型數據庫、Redis/Memcache/Zookeeper分布式鎖,還是今天介紹的Elasticsearch實現的鎖機制,都有如下三步:
- 上鎖
- 執行事務方法
- 解鎖
全局鎖
假定有兩個線程,線程1和線程2
- 線程1上鎖命令:
PUT /files/file/global/_create
{}
- files表示索引名稱。
- file為type,6.3.1一個索引只允許有一個type,選用file作用type名稱。
- global:即document的id,固定寫為global表示全局鎖,或者使用專門的索引進行加鎖操作。
- _create: 強制必須是創建,如果已經存在,那么創建失敗,報錯。
- 線程1執行事務方法:更新文件名
POST /files/file/global/_update
{
"doc": {
"name":"ReadMe.txt"
}
}
- 線程2嘗試加鎖,失敗,此時程序進行重試階段,直到線程1釋放鎖
# 請求:
PUT /files/file/global/_create
{}
# 響應:
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[file][global]: version conflict, document already exists (current version [1])",
"index_uuid": "_6E1d7BLQmy9-7gJptVp7A",
"shard": "2",
"index": "files"
}
],
"type": "version_conflict_engine_exception",
"reason": "[file][global]: version conflict, document already exists (current version [1])",
"index_uuid": "_6E1d7BLQmy9-7gJptVp7A",
"shard": "2",
"index": "files"
},
"status": 409
}
- 線程1釋放鎖
DELETE files/file/global
- 線程2加鎖
PUT /files/file/global/_create
{}
響應
{
"_index": "files",
"_type": "file",
"_id": "global",
"_version": 3,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 1
}
- 加鎖成功,然后執行事務方法。
優缺點
全局鎖本質上是所有線程都用_create語法來創建id為global的文檔,利用Elasticsearch對_create語法的校驗來實現鎖的目的。
-
優點:操作簡單,容易使用,成本低。
-
缺點:直接鎖住整個索引,除了加鎖的那個線程,其他所有對此索引的線程都block住了,並發量較低。
-
適用場景:讀多寫少的數據,並且加解鎖的時間非常短,類似於數據庫的表鎖。
注意事項:加鎖解鎖的控制必須嚴格在程序里定義,因為單純基於doc的鎖控制,如果id固定使用global,在有鎖的情況,任何線程執行delete操作都是可以成功的,因為大家都知道id。
document level級別的鎖
document level級別的鎖是更細粒度的鎖,以文檔為單位進行鎖控制。
我們新建一個索引專門用於加鎖操作:
PUT /files-lock/_mapping/lock
{
"properties": {
}
}
我們先創建一個script腳本,ES6.0以后默認使用painless腳本:
POST _scripts/document-lock
{
"script": {
"lang": "painless",
"source": "if ( ctx._source.process_id != params.process_id ) { Debug.explain('already locked by other thread'); } ctx.op = 'noop';"
}
}
Debug.explain表示拋出一個異常,內容為already locked by other thread。
ctx.op = 'noop'表示不執行更新。
- 線程1增加行鎖,此時傳入的process_id為181ab3ee-28cc-4339-ba35-69802e06fe42
POST /files-lock/lock/1/_update
{
"upsert": { "process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42" },
"script": {
"id": "document-lock",
"params": {
"process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42"
}
}
}
響應結果:
{
"_index": "files-lock",
"_type": "lock",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
- 線程1、線程2查詢鎖信息
{
"_index": "files-lock",
"_type": "lock",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42"
}
}
- 線程2傳入的process_id為181ab3ee-28cc-4339-ba35-69802e06fe42,嘗試加鎖,失敗,此時應該啟動重試機制
POST /files-lock/lock/1/_update
{
"upsert": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" },
"script": {
"id": "document-lock",
"params": {
"process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5"
}
}
}
提示該文檔已經被別的線程(線程1)鎖住了,你不能更新了,響應報文如下:
{
"error": {
"root_cause": [
{
"type": "remote_transport_exception",
"reason": "[node-1][192.168.17.137:9300][indices:data/write/update[s]]"
}
],
"type": "illegal_argument_exception",
"reason": "failed to execute script",
"caused_by": {
"type": "script_exception",
"reason": "runtime error",
"painless_class": "java.lang.String",
"to_string": "already locked by other thread",
"java_class": "java.lang.String",
"script_stack": [
"Debug.explain('already locked by other thread'); } ",
" ^---- HERE"
],
"script": "judge-lock",
"lang": "painless",
"caused_by": {
"type": "painless_explain_error",
"reason": null
}
}
},
"status": 400
}
- 線程1執行事務方法
POST /files/file/1/_update
{
"doc": {
"name":"README1.txt"
}
}
- 線程1的事務方法執行完成,並通過刪除id為1的文檔,相當於釋放鎖
DELETE /files-lock/lock/1
- 線程2在線程1執行事務的期間,一直在模擬掛起,重試的操作,直到線程1完成釋放鎖,然后線程2加鎖成功
POST /files-lock/lock/1/_update
{
"upsert": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" },
"script": {
"id": "document-lock",
"params": {
"process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5"
}
}
}
結果:
{
"_index": "files-lock",
"_type": "lock",
"_id": "1",
"_version": 3,
"found": true,
"_source": {
"process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5"
}
}
此時鎖的process_id變成線程2傳入的"a6d13529-86c0-4422-b95a-aa0a453625d5"
{
"_index": "files-lock",
"_type": "lock",
"_id": "1",
"_version": 3,
"found": true,
"_source": {
"process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5"
}
}
這樣基於ES的行鎖操作控制過程就完成了。
腳本解釋
update+upsert操作,如果該記錄沒加鎖(此時document為空),執行upsert操作,設置process_id,如果已加鎖,執行script
script內的邏輯是:判斷傳入參數與當前doc的process_id,如果不相等,說明有別的線程嘗試對有鎖的doc進行加鎖操作,Debug.explain表示拋出一個異常。
process_id可以由Java應用系統里生成,如UUID。
如果兩個process_id相同,說明當前執行的線程與加鎖的線程是同一個,ctx.op = 'noop'表示什么都不做,返回成功的響應,Java客戶端拿到成功響應的報文,就可以繼續下一步的操作,一般這里的下一步就是執行事務方法。
點評
文檔級別的鎖顆粒度小,並發性高,吞吐量大,類似於數據庫的行鎖。
共享鎖與排他鎖
概念
共享鎖:允許多個線程獲取同一條數據的共享鎖進行讀操作
排他鎖:同一條數據只能有一個線程獲取排他鎖,然后進行增刪改操作
互斥性:共享鎖與排他鎖是互斥的,如果這條數據有共享鎖存在,那么排他鎖無法加上,必須得共享鎖釋放完了,排他鎖才能加上。
反之也成立,如果這條數據當前被排他鎖鎖信,那么其他的排他鎖不能加,共享鎖也加不上。必須等這個排他鎖釋放完了,其他鎖才加得上。
有人在改數據,就不允許別人來改,也不讓別人來讀。
讀寫鎖的分離
如果只是讀數據,每個線程都可以加一把共享鎖,此時該數據的共享鎖數量一直遞增,如果這時有寫數據的請求(寫請求是排他鎖),由於互斥性,必須等共享鎖全部釋放完,寫鎖才加得上。
有人在讀數據,就不允許別人來改。
案例實驗
我們先創建一個共享鎖的腳本:
# 讀操作加鎖腳本
POST _scripts/rw-lock
{
"script": {
"lang": "painless",
"source": "if (ctx._source.lock_type == 'exclusive') { Debug.explain('one thread is writing data, the lock is exclusive now'); } ctx._source.lock_count++"
}
}
# 讀操作完畢釋放鎖腳本
POST _scripts/rw-unlock
{
"script": {
"lang": "painless",
"source": "if ( --ctx._source.lock_count == 0) { ctx.op = 'delete' }"
}
}
- 每次有一個線程讀數據時,執行一次加鎖操作
POST /files-lock/lock/1/_update
{
"upsert": {
"lock_type": "shared",
"lock_count": 1
},
"script": {
"id": "rw-lock"
}
}
在多個頁面上嘗試,可以看到lock_count在逐一遞增,模擬多個線程同時讀一個文檔的操作。
- 在有線程讀文檔,還未釋放的情況下,嘗試對該文檔加一個排他鎖
PUT /files-lock/lock/1/_create
{ "lock_type": "exclusive" }
結果肯定會報錯:
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[lock][1]: version conflict, document already exists (current version [8])",
"index_uuid": "XD7LFToWSKe_6f1EvLNoFw",
"shard": "3",
"index": "files-lock"
}
],
"type": "version_conflict_engine_exception",
"reason": "[lock][1]: version conflict, document already exists (current version [8])",
"index_uuid": "XD7LFToWSKe_6f1EvLNoFw",
"shard": "3",
"index": "files-lock"
},
"status": 409
}
- 線程讀數據完成后,對共享鎖進行釋放,執行釋放鎖的腳本
POST /files-lock/lock/1/_update
{
"script": {
"id": "rw-unlock"
}
}
釋放1次lock_count減1,減到0時,說明所有的共享鎖已經釋放完畢,就把這個doc刪除掉
- 所有共享鎖釋放完畢,嘗試加排他鎖
PUT /files-lock/lock/1/_create
{ "lock_type": "exclusive" }
此時能夠加鎖成功,響應報文:
{
"_index": "files-lock",
"_type": "lock",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"lock_type": "exclusive"
}
}
- 有排他鎖的情況,嘗試加一個共享鎖,失敗信息如下:
{
"error": {
"root_cause": [
{
"type": "remote_transport_exception",
"reason": "[node-1][192.168.17.137:9300][indices:data/write/update[s]]"
}
],
"type": "illegal_argument_exception",
"reason": "failed to execute script",
"caused_by": {
"type": "script_exception",
"reason": "runtime error",
"painless_class": "java.lang.String",
"to_string": "one thread is writing data, the lock is exclusive now",
"java_class": "java.lang.String",
"script_stack": [
"Debug.explain('one thread is writing data, the lock is exclusive now'); } ",
" ^---- HERE"
],
"script": "rw-lock",
"lang": "painless",
"caused_by": {
"type": "painless_explain_error",
"reason": null
}
}
},
"status": 400
}
- 排他鎖事務執行完成時,刪除文檔即可對鎖進行釋放
DELETE /files-lock/lock/1
腳本解釋
讀鎖的加鎖腳本和釋放鎖腳本,成對出現,用來統計線程的數量。
寫鎖利用_create
語法來實現,如果有線程對某一文檔有讀取操作,那么對這個文檔執行_create操作肯定報錯。
小結
利用Elasticsearch一些語法的特性,加上painless腳本的配合,也能完整的復現全局鎖、行鎖、讀寫鎖的特性,實現的思路還是挺有意思的,跟使用redis、zookeeper實現分布式鎖有異曲同工之處,只是生產案例上用redis實現分布式鎖是比較成功的實踐,Elasticsearch的對這種分布式鎖的實現方式可能不是最佳實踐,但也可以了解一下。
專注Java高並發、分布式架構,更多技術干貨分享與心得,請關注公眾號:Java架構社區
可以掃左邊二維碼添加好友,邀請你加入Java架構社區微信群共同探討技術