es之文檔更新過程中並發沖突問題


1:樂觀鎖控制

ES是分布式的,也是異步並發的,我們的復制請求是並行發送的;這就意味着請求到達目的地的順序是不可控制的,是亂序的;

如果是亂序的方式,很有可能出現這樣的一個問題,新version的文檔被舊version的文檔覆蓋掉—-數據丟失,或者直接拋異常;

TransportClient client = null;

@Before
public void testConn(){

   try {
       Settings settings = Settings.builder()
              .put("cluster.name", "cluster_es").build();
       client = new PreBuiltTransportClient(settings)
              .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("hadoop01"), 9300));
       System.out.println("========連接成功=============");
  } catch (UnknownHostException e) {
       e.printStackTrace();
  }
}


/**
* upsert
* */
@Test
public void upsertDocument2() throws InterruptedException {


   ExecutorService executorService = Executors.newFixedThreadPool(3);
   for (int i = 0; i < 10; i++){
       executorService.execute(new Thread1());

  }
   Thread.sleep(10000);
   executorService.shutdown();

}

class Thread1 implements Runnable {

   public void run() {
       System.out.println("*************" + Thread.currentThread().getName() + " *************");
       // 設置查詢條件, 查找不到則添加
       IndexRequest indexRequest = null;
       try {
           indexRequest = new IndexRequest("website", "blog", "1")
                  .source(XContentFactory.jsonBuilder()
                          .startObject()
                          .field("id", "1")
                          .endObject());
           // 設置更新, 查找到更新下面的設置
           UpdateRequest upsert = new UpdateRequest("website", "blog", "1")
                  .doc(XContentFactory.jsonBuilder()
                          .startObject()
                          .field("process_id", Thread.currentThread().getId())
                          .endObject())
                  .upsert(indexRequest);

           client.update(upsert).get();
      } catch (Exception e) {
           e.printStackTrace();
      }

  }
}


@After
public void close(){
   client.close();
}

 

所以在分布式異步並發場景中,需要一種方式:新版本的文檔不會被舊版本的文檔覆蓋——【樂觀鎖】

Elasticsearch使用這個 _version 號來確保變更以正確順序得到執行。如果舊版本的文檔在新版本之后到達,它可以被簡單的忽略。

我們可以利用 _version 號來確保 應用中相互沖突的變更不會導致數據丟失。我們通過指定想要修改文檔的 version 號來達到這個目的。 如果該版本不是當前版本號,我們的請求將會失敗。

新建一個文檔,這個時候我們可以看到新文檔的版本號_version=1:

PUT /website/blog/1/_create
{
 "title" : "this is title" ,
 "txt" : "just do it"
}

現在嘗試通過重建文檔索引來保存修改數據:

請求成功,並且響應體告訴我們 _version 已經遞增到 2

PUT /website/blog/1?version=1
{
 "title" : "this is test" ,
 "txt" : "just do it"
}

 

然而,如果我們重新運行相同的索引請求,仍然指定 version=1 , Elasticsearch 返回 409 ConflictHTTP 響應碼,和一個如下所示的響應體:

 

以上通過version的控制,可以讓es在並行情況下操作而不出現丟失數據的現象,這種樂觀鎖的操作是比較常用的;

 

2:通過外部系統進行版本控制

上面我們講到的是基於version進行版本的控制。在分布式環境下,只要version不同,那么修改就會報錯;

而通過外部系統進行控制:version_type=external,只有當你提供的version比es中的_version大的時候,才能完成修改

_version version_type=external
只有_versioin相同,才會執行修改 只有當你提供的version比es中的_version大的時候,才能完成修改

例如,要創建一個新的具有外部版本號 5 的博客文章,我們可以按以下方法進行:

PUT /website/blog/2?version=5&version_type=external
{
 "title": "My first external blog entry",
 "text":  "Starting to get the hang of this..."
}

 

現在我們更新這個文檔,指定一個新的 version 號是 10 :

PUT /website/blog/2?version=10&version_type=external
{
 "title": "My first external blog entry",
 "text":  "This is a piece of cake..."
}

 

version_type=external能夠修改的條件就是:提供的版本號必須比_version大

如果此時插入版本號比現在的_version小的,就會報錯:

 

3:重復提交retry_on_conflict

elasticsearch設計的目的就是多用戶的海量數據操作;

那么可能存在這樣場景:A進程接收到請求嘗試去檢索(retrieve)和重建索引(reindex)某個文檔C,B進程也接收到請求檢索(retrieve)和重建索引(reindex)文檔C;

那么這個時候就會出現:其中一個進程提前修改了文檔C,然后另一個進程在做檢索的時候,因為_version改變了,所以匹配不到文檔C,操作就會失敗,然后數據丟失

這就是在並發操作的時候經常出現的現象;

解決:

對於多用戶的更新操作,文檔被修改了並不要緊,如果出現了匹配不到的現象,我們只要重新在操作一遍就可以了;所以需要使用關鍵字retry_on_conflict(默認0)

POST /website/pageviews/1/_update?retry_on_conflict=5
{
  "script" : "ctx._source.views+=1",
  "upsert": {
      "views": 0
  }
}

retry_on_conflict=5 代表如果出現失敗,最大可以重復五次的update操作

5.7.6:悲觀鎖控制【無用】

類似傳統數據庫————mysql,在處理並發的時候,為了防止出現沖突的問題,就會使用悲觀鎖;

這種方法被關系型數據庫廣泛使用,它假定有變更沖突可能發生,因此阻塞訪問資源以防止沖突。

一個典型的例子是讀取一行數據之前先將其鎖住,確保只有放置鎖的線程能夠對這行數據進行修改(想想java中的synchronize)。

5.7.6.1:全局鎖(無用)

只允許一個線程進行執行更新操作,這樣能夠避免並發性問題,在es中,全局鎖是將一份文檔是否存在作為依據

獲取一個全局鎖:

PUT website/blog/1/_create
{}

這樣就上鎖了,然后使用java的多線程做測試,在里面修改數據

TransportClient client = null;

@Before
public void testConn(){

   try {
       Settings settings = Settings.builder()
              .put("cluster.name", "cluster").build();
       client = new PreBuiltTransportClient(settings)
              .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("hadoop01"), 9300));
       System.out.println("========連接成功=============");
  } catch (UnknownHostException e) {
       e.printStackTrace();
  }
}


/**
* upsert
* */
@Test
public void upsertDocument2() throws InterruptedException {


   ExecutorService executorService = Executors.newFixedThreadPool(1);//線程數為1是全局鎖
   for (int i = 0; i < 10; i++){
       executorService.execute(new Thread1());

  }
   Thread.sleep(10000);
   executorService.shutdown();

}

class Thread1 implements Runnable {

   public void run() {
       System.out.println("*************" + Thread.currentThread().getName() + " *************");
       // 設置查詢條件, 查找不到則添加
       IndexRequest indexRequest = null;
       try {
           indexRequest = new IndexRequest("website", "blog", "1")
                  .source(XContentFactory.jsonBuilder()
                          .startObject()
                          .field("id", "1")
                          .endObject());
           // 設置更新, 查找到更新下面的設置
           UpdateRequest upsert = new UpdateRequest("website", "blog", "1")
                  .doc(XContentFactory.jsonBuilder()
                          .startObject()
                          .field("process_id", Thread.currentThread().getId())
                          .endObject())
                  .upsert(indexRequest);

           client.update(upsert).get();
      } catch (Exception e) {
           e.printStackTrace();
      }

  }
}


@After
public void close(){
   client.close();
}

如果另一個進行想同時在創建一個website/blog/1 就會拋異常

釋放全局鎖:

全局鎖必須通過刪除來釋放:

DELETE website/blog/1

優點:操作非常簡單,非常容易使用,成本低 缺點:你直接就把整個index給上鎖了,這個時候對index中所有的doc的操作,都會被block住,導致整個系統的並發能力很低

5.7.6.2:document文檔鎖(無用)

這種鎖比全局鎖的粒度小,因為全局鎖是鎖定整個index,那么文檔所就是針對單個文檔完成鎖定

上鎖的方式依賴groovy腳本:/config/scripts

vim documentLock.groovy 【腳本需要上傳到所有節點】

if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

腳本的意思:

如果當前傳入的process_id和設定的process_id不一致,就拋異常assert false

如果一致的,返回'noop'

插入一個文檔:

PUT website/blog/1
{
 "id" : 1,
 "process_id" : 234
}

對當前文檔上文檔鎖:

POST /website/blog/1/_update
{
 "upsert": { "process_id": 234 },
 "script": {
   "lang": "groovy",
   "file": "documentLock",
   "params": {
   "process_id": 234
}
}
}

注意,當前設定的"process_id": 234,如果此時換一個"process_id" : 123,那么就會拋異常:assert false

比如:

POST /website/blog/1/_update
{
 "upsert": { "process_id": 123 },
 "script": {
   "lang": "groovy",
   "file": "documentLock",
   "params": {
   "process_id": 123
}
}
}

注意:如果傳入的是"process_id": 234,傳入正確參數,直接返回ctx.op = 'noop'

POST /website/blog/1/_update
{
 "upsert": { "process_id": 234 },
 "script": {
   "lang": "groovy",
   "file": "documentLock",
   "params": {
   "process_id": 234
}
}
}

如何釋放悲觀鎖 , 刪除對應的process_id數據即可:

DELETE website/blog/1
{
 "query": {
   "term": {
     "process_id": 234
  }
}
}

文檔級鎖可以實現細粒度的訪問控制,但是當文檔數量達到百分甚至上千萬的時候,這種方式開銷是比較昂貴的

5.7.6.3:共享鎖和排它鎖(無用)

共享鎖:數據是共享的,多個線程可以獲取同一個數據的共享鎖,然后對這個數據執行讀操作 排它鎖:只能有一個線程獲取排它鎖,然后執行更新操作

在config/scripts下 vim gongxiang_paita.groovy

if (ctx._source.lock_type == 'exclusive') {
      assert false
} else {
      ctx._source.lock_count++
}

腳本意思:

如果其他線程共享:ctx._source.lock_count++

POST /website/blog/1/_update 
{
 "upsert": {
   "lock_type":  "shared",
   "lock_count": 1
},
 "script": {
   "lang": "groovy",
   "file": "gongxiang_paita"
}
}

如果其他線程添加排他鎖'exclusive',那么拋異常:

(1):將共享share標記修改成排他exclusive標記
POST /website/blog/1/_update
{
  "doc" : {
    "lock_type": "exclusive"
  }
}

(2):修改成排他標記后,在嘗試共享修改操作,報錯
POST /website/blog/1/_update
{
 "upsert": {
   "lock_type":  "shared",
   "lock_count": 1
},
 "script": {
   "lang": "groovy",
   "file": "gongxiang_paita"
}
}

如何釋放鎖:

Vim unlock.groovy

if (ctx._source.lock_type == "shared") {ctx._source.lock_count --};
if (ctx._source.lock_count == 0) { ctx.op = 'delete' };

腳本意思:

ctx._source.lock_type == "shared" 則lock_count—

當lock_count == 0,那么刪除/website/blog/1

(1):GET website/blog/1 查看一下,當前是共享鎖還是排它鎖;
(2): 如果是排他鎖,需要修改會共享鎖
POST /website/blog/1/_update
{
  "doc" : {
    "lock_type": "shared"
  }
}
(3):釋放共享鎖
POST /website/blog/1/_update
{
 "upsert": {
   "lock_type":  "shared",
   "lock_count": 1
},
 "script": {
   "lang": "groovy",
   "file": "unlock"
}
}

這樣就釋放了共享鎖;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM