Elasticsearch 之(7)並發沖突解決: 剖析悲觀鎖與樂觀鎖的並發控制方案


並發沖突問題


剖析悲觀鎖與樂觀鎖兩種並發控制方案

基於_version進行樂觀鎖並發控制

(1)_version元數據
PUT /test_index/test_type/6
{
  "test_field": "test test"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "6",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
第一次創建一個document的時候,它的_version內部版本號就是1;以后,每次對這個document執行修改或者刪除操作,都會對這個_version版本號自動加1;哪怕是刪除,也會對這條數據的版本號加1
{
  "found": true,
  "_index": "test_index",
  "_type": "test_type",
  "_id": "6",
  "_version": 4,
  "result": "deleted",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
我們會發現,在刪除一個document之后,可以從一個側面證明,它不是立即物理刪除掉的,因為它的一些版本號等信息還是保留着的。先刪除一條document,再重新創建這條document,其實會在delete version基礎之上,再把version號加1

(1)先構造一條數據出來

PUT /test_index/test_type/7
{
  "test_field": "test test"
}
(2)模擬兩個客戶端,都獲取到了同一條數據
GET test_index/test_type/7

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "test test"
  }
}
(3)其中一個客戶端,先更新了一下這個數據

同時帶上數據的版本號,確保說,es中的數據的版本號,跟客戶端中的數據的版本號是相同的,才能修改
PUT /test_index/test_type/7?version=1 
{
  "test_field": "test client 1"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}
(4)另外一個客戶端,嘗試基於version=1的數據去進行修改,同樣帶上version版本號,進行樂觀鎖的並發控制
PUT /test_index/test_type/7?version=1 
{
  "test_field": "test client 2"
}

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
        "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
        "shard": "3",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
    "shard": "3",
    "index": "test_index"
  },
  "status": 409
}
(5)在樂觀鎖成功阻止並發問題之后,嘗試正確的完成更新
GET /test_index/test_type/7

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 2,
  "found": true,
  "_source": {
    "test_field": "test client 1"
  }
}
基於最新的數據和版本號,去進行修改,修改后,帶上最新的版本號,可能這個步驟會需要反復執行好幾次,才能成功,特別是在多線程並發更新同一條數據很頻繁的情況下
PUT /test_index/test_type/7?version=2 
{
  "test_field": "test client 2"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 3,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}


基於external version進行樂觀鎖並發控制

es提供了一個feature,就是說,你可以不用它提供的內部_version版本號來進行並發控制,可以基於你自己維護的一個版本號來進行並發控制。舉個列子,加入你的數據在mysql里也有一份,然后你的應用系統本身就維護了一個版本號,無論是什么自己生成的,程序控制的。這個時候,你進行樂觀鎖並發控制的時候,可能並不是想要用es內部的_version來進行控制,而是用你自己維護的那個version來進行控制。
?version=1
?version=1&version_type=external
version_type=external,唯一的區別在於,_version,只有當你提供的version與es中的_version一模一樣的時候,才可以進行修改,只要不一樣,就報錯;當version_type=external的時候,只有當你提供的version比es中的_version大的時候,才能完成修改

es,_version=1,?version=1,才能更新成功
es,_version=1,?version>1&version_type=external,才能成功,比如說?version=2&version_type=external

(1)先構造一條數據
PUT /test_index/test_type/8
{
  "test_field": "test"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
(2)模擬兩個客戶端同時查詢到這條數據
GET /test_index/test_type/8

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "test"
  }
}
(3)第一個客戶端先進行修改,此時客戶端程序是在自己的數據庫中獲取到了這條數據的最新版本號,比如說是2
PUT /test_index/test_type/8?version=2&version_type=external
{
  "test_field": "test client 1"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}
(4)模擬第二個客戶端,同時拿到了自己數據庫中維護的那個版本號,也是2,同時基於version=2發起了修改
PUT /test_index/test_type/8?version=2&version_type=external
{
  "test_field": "test client 2"
}

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
        "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
        "shard": "1",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
    "shard": "1",
    "index": "test_index"
  },
  "status": 409
}
(5)在並發控制成功后,重新基於最新的版本號發起更新
GET /test_index/test_type/8

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 2,
  "found": true,
  "_source": {
    "test_field": "test client 1"
  }
}

PUT /test_index/test_type/8?version=3&version_type=external
{
  "test_field": "test client 2"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 3,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}

悲觀鎖(全局鎖
全局鎖,一次性就鎖整個index,對這個index的所有增刪改操作都會被block住,如果 上鎖解鎖的操作不是頻繁,然后每次上鎖之后,執行的操作的耗時不會太長,用這種方式,方便。
優點:操作非常簡單,非常容易使用,成本低
缺點:你直接就把整個index給上鎖了,這個時候對index中所有的doc的操作,都會被block住,導致整個系統的並發能力很低

PUT /fs/lock/global/_create
{}
fs: 你要上鎖的那個index
lock: 就是你指定的一個對這個index上全局鎖的一個type
global: 就是你上的全局鎖對應的這個doc的id
_create:強制必須是創建,如果/fs/lock/global這個doc已經存在,那么創建失敗,報錯

利用了doc來進行上鎖
{
  "_index": "fs",
  "_type": "lock",
  "_id": "global",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
另外一個線程同時嘗試上鎖
PUT /fs/lock/global/_create
{}

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][global]: version conflict, document already exists (current version [1])",
        "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
        "shard": "2",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][global]: version conflict, document already exists (current version [1])",
    "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
    "shard": "2",
    "index": "fs"
  },
  "status": 409
}
釋放鎖
DELETE /fs/lock/global

{
  "found": true,
  "_index": "fs",
  "_type": "lock",
  "_id": "global",
  "_version": 2,
  "result": "deleted",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}


悲觀鎖( document鎖
細粒度的一個鎖,document鎖,顧名思義,每次就鎖你要操作的,你要執行增刪改的那些doc,doc鎖了,其他線程就不能對這些doc執行增刪改操作了
但是你只是鎖了部分doc,其他線程對其他的doc還是可以上鎖和執行增刪改操作的
document鎖,是用腳本進行上鎖
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}
/fs/lock,是固定的,就是說fs下的lock type,專門用於進行上鎖
/fs/lock/id,比如1,id其實就是你要上鎖的那個doc的id,代表了某個doc數據對應的lock(也是一個doc)
_update + upsert:執行upsert操作

params,里面有個process_id,process_id,是你的要執行增刪改操作的進程的唯一id,比如說可以在java系統,啟動的時候,給你的每個線程都用UUID自動生成一個thread id,你的系統進程啟動的時候給整個進程也分配一個UUID。process_id + thread_id就代表了某一個進程下的某個線程的唯一標識。可以自己用UUID生成一個唯一ID

process_id很重要,會在lock中,設置對對應的doc加鎖的進程的id,這樣其他進程過來的時候,才知道,這條數據已經被別人給鎖了

assert false,不是當前進程加鎖的話,則拋出異常
ctx.op='noop',不做任何修改

如果該document之前沒有被鎖,/fs/lock/1之前不存在,也就是doc id=1沒有被別人上過鎖; upsert的語法,那么執行index操作,創建一個/fs/lock/id這條數據,而且用params中的數據作為這個lock的數據。process_id被設置為123,script不執行。這個時候象征着process_id=123的進程已經鎖了一個doc了。

如果document被鎖了,就是說/fs/lock/1已經存在了,代表doc id=1已經被某個進程給鎖了。那么執行update操作,script,此時會比對process_id,如果相同,就是說,某個進程,之前鎖了這個doc,然后這次又過來,就可以直接對這個doc執行操作,說明是該進程之前鎖的doc,則不報錯,不執行任何操作,返回success; 如果process_id比對不上,說明doc被其他doc給鎖了,此時報錯。
/fs/lock/1
{
  "process_id": 123
}
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}
script:ctx._source.process_id,123
process_id:加鎖的upsert請求中帶過來額proess_id
scripts/judge-lock.groovy: if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 123
    }
  }
}
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
GET /fs/lock/1

{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "found": true,
  "_source": {
    "process_id": 123
  }
}
如果兩個process_id相同,說明是一個進程先加鎖,然后又過來嘗試加鎖,可能是要執行另外一個操作,此時就不會block,對同一個process_id是不會block,ctx.op= 'noop',什么都不做,返回一個success
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 123
    }
  }
}
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "result": "noop",
  "_shards": {
    "total": 0,
    "successful": 0,
    "failed": 0
  }
}    

如果說已經有一個進程加了鎖了,process_id: 234。process_id不相等,說明這個doc之前已經被別人上鎖了,process_id=123上鎖了; process_id=234過來再次嘗試上鎖,失敗,assert false,就會報錯
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 234 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 234
    }
  }
}
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[4onsTYV][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}
釋放鎖
POST /fs/_refresh  //刷新到內存中
PUT /fs/lock/_bulk
{ "delete": { "_id": 1}}
{
  "took": 20,
  "errors": false,
  "items": [
    {
      "delete": {
        "found": true,
        "_index": "fs",
        "_type": "lock",
        "_id": "1",
        "_version": 2,
        "result": "deleted",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    }
  ]
}

共享鎖和排他鎖
共享鎖:這份數據是共享的,然后多個線程過來,都可以獲取同一個數據的共享鎖,然后對這個數據執行讀操作
排他鎖:是排他的操作,只能一個線程獲取排他鎖,然后執行增刪改操作

讀寫鎖的分離
如果只是要讀取數據的話,那么任意個線程都可以同時進來然后讀取數據,每個線程都可以上一個共享鎖
但是這個時候,如果有線程要過來修改數據,那么會嘗試上排他鎖,排他鎖會跟共享鎖互斥,也就是說,如果有人已經上了共享鎖了,那么排他鎖就不能上,就得等


如果有人在讀數據,就不允許別人來修改數據
反之,也是一樣的

如果有人在修改數據,就是加了排他鎖
那么其他線程過來要修改數據,也會嘗試加排他鎖,此時會失敗,鎖沖突,必須等待,同時只能有一個線程修改數據
如果有人過來同時要讀取數據,那么會嘗試加共享鎖,此時會失敗,因為共享鎖和排他鎖是沖突的

如果有在修改數據,就不允許別人來修改數據,也不允許別人來讀取數據

1、有人在讀數據,其他人也能過來讀數據
judge-lock-2.groovy: if (ctx._source.lock_type == 'exclusive') { assert false }; ctx._source.lock_count++

POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
  	"lang": "groovy",
  	"file": "judge-lock-2"
  }
}

POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
  	"lang": "groovy",
  	"file": "judge-lock-2"
  }
}
GET /fs/lock/1

{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 3,
  "found": true,
  "_source": {
    "lock_type": "shared",
    "lock_count": 3
  }
}
就給大家模擬了,有人上了共享鎖,你還是要上共享鎖,直接上就行了,沒問題,只是lock_count加1
2、已經有人上了共享鎖,然后有人要上排他鎖
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
排他鎖用的不是upsert語法,create語法,要求lock必須不能存在,直接自己是第一個上鎖的人,上的是排他鎖
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][1]: version conflict, document already exists (current version [3])",
        "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
        "shard": "3",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][1]: version conflict, document already exists (current version [3])",
    "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
    "shard": "3",
    "index": "fs"
  },
  "status": 409
}
如果已經有人上了共享鎖,明顯/fs/lock/1是存在的,create語法去上排他鎖,肯定會報錯

3、對共享鎖進行解鎖
unlock-shared.groovy: if (--ctx._source.lock_count == 0) { ctx.op = 'delete' }; 

POST /fs/lock/1/_update
{
  "script": {
  	"lang": "groovy",
  	"file": "unlock-shared"
  }
}
之前上了3次共享鎖,連續解鎖3次,此時共享鎖就徹底沒了
每次解鎖一個共享鎖,就對lock_count先減1,如果減了1之后,是0,那么說明所有的共享鎖都解鎖完了,此時就就將/fs/lock/1刪除,就徹底解鎖所有的共享鎖

4、上排他鎖,再上排他鎖
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
其他線程
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][1]: version conflict, document already exists (current version [7])",
        "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
        "shard": "3",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][1]: version conflict, document already exists (current version [7])",
    "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
    "shard": "3",
    "index": "fs"
  },
  "status": 409
}
5、上排他鎖,上共享鎖
POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
  	"lang": "groovy",
  	"file": "judge-lock-2"
  }
}
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[4onsTYV][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock-2",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}
6、解鎖排他鎖
DELETE /fs/lock/1






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM