hbase 原子操作cas


在高並發的情況下,對數據row1  column=cf1:qual1, timestamp=1, value=val1的插入或者更新可能會導致非預期的情況,

例如:原本客戶端A需要在value=val1的條件下將value更新成val_A,高並發下客戶端B可能搶先將數據value=val1更新成value=val_B,這個時候客戶端A如果還繼續更新將不符合預期。

HBase中的CAS(compare-and-set)API用來解決這個數據一致性問題,簡單的說CAS操作可以讓你在put數據之前先經過某些條件的驗證,只有滿足條件的put才會入庫。

hbase1.0版本的API:
checkAndPut(byte.md row, byte.md family, byte.md qualifier, byte.md value, Put put)
其中,put方法的最后一個參數是你需要錄入的數據的put對象;
value是與服務端check的預期值,只有服務器端對應rowkey的數據與你預期的值相同時,你的put操作才能被提交的服務端。
但是這個API在2.0版本已經被棄用

新的API是checkAndMutate,一個流式API。checkAndMutate在后續的版本中還在持續改進中。

1、創建一個測試表demoTable,模擬一些測試數據

    //清除並插入測試數據
    private static void createDemoTable() throws IOException {
        String tableNameString = "demoTable";
        if (helper.existsTable(tableNameString))
            helper.dropTable(tableNameString);
        helper.createTable(tableNameString, 100, "cf1", "cf2");
        helper.put(tableNameString,
                new String[]{"row1"},
                new String[]{"cf1", "cf2"},
                new String[]{"qual1", "qual2", "qual3"},
                new long[]{1, 2, 3},
                new String[]{"val1", "val2", "val3"});
        System.out.println("Before check and mutate calls...");
        helper.dump("testtable", new String[]{"row1"}, null, null);
    }

結果類似這樣:

2、測試cas,checkAndMutate支持流式函數格式

2.1 如果列數據還不存在就插入預期數據

        String tableNameString = "demoTable";
        Table table = helper.getConnection().getTable(TableName.valueOf(tableNameString));

        boolean res = false;
        Put put = null;

        put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("qual4"), 4, Bytes.toBytes("val1"));
//        如果row1 cf1 qual4 不存在值就插入put數據
        res = table.checkAndMutate(Bytes.toBytes("row1"), Bytes.toBytes("cf1"))
                .qualifier(Bytes.toBytes("qual4"))
                .ifNotExists()
                .thenPut(put);
        System.out.println("1 result is (expected true) :" + res);

執行結果:1 result is (expected true) :true

執行第二遍將返回false

2.2 如果列值相等就更新數據

        put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("qual1"), 4, Bytes.toBytes("val1"));
        //如果row1 cf1 qual1 val1存在就插入put,因為這個value已經存在所以可以插入,結果返回true,時間戳變為4
        res = table.checkAndMutate(Bytes.toBytes("row1"), Bytes.toBytes("cf1"))
                .qualifier(Bytes.toBytes("qual1")).ifEquals(Bytes.toBytes("val1"))
                .thenPut(put);
        System.out.println("2 result is (expected true) :" + res);

執行結果:2 result is (expected true) :true

2.3 如果列值不等於則更新

        put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("qual1"),5,Bytes.toBytes("val2"));
        ////如果row1 cf1 qual1 不等於val2在就插入put
        res = table.checkAndMutate(Bytes.toBytes("row1"), Bytes.toBytes("cf1"))
                .qualifier(Bytes.toBytes("qual1"))
                .ifMatches(CompareOperator.NOT_EQUAL,Bytes.toBytes("val2"))
                .thenPut(put);
        System.out.println("3 result is (expected true) :" + res);

2.4 多個條件組合判斷,多個操作執行

        put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("qual5"),1,Bytes.toBytes("val1"));
        put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("qual6"),1,Bytes.toBytes("val1"));

        Delete delete = new Delete(Bytes.toBytes("row1"));
        delete.addColumns(Bytes.toBytes("cf1"), Bytes.toBytes("qual4"));

        //RowMutations這個版本還沒定型
        RowMutations mutations = new RowMutations(Bytes.toBytes("row1"));
        mutations.add(put);
        mutations.add(delete);

        //row1 cf1 qual4 val1存在,row1 cf1 qual5和row1 cf1 qual6無值則插入qual5和qual6的值,並刪除qual4的值
        res = table.checkAndMutate(Bytes.toBytes("row1"),Bytes.toBytes("cf1")).qualifier(Bytes.toBytes("qual4"))
                .ifEquals(Bytes.toBytes("val1"))
                .qualifier(Bytes.toBytes("qual5")).ifNotExists()
                .qualifier(Bytes.toBytes("qual6")).ifNotExists()
                .thenMutate(mutations);
        System.out.println("1 result is (expected true) :" + res);

3、hbase的cas操作針對是同一個行鍵下的數據。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM