Kudu基本操作及概念


Kudu:
    針對 Apache Hadoop 平台而開發的列式存儲管理器。

使用場景:
    適用於那些既有隨機訪問,也有批量數據掃描的復合場景。
    高計算量的場景。
    使用了高性能的存儲設備,包括使用更多的內存。
    支持數據更新,避免數據反復遷移。
    支持跨地域的實時數據備份和查詢。
    
kudu的關鍵機制:
1.模仿數據庫,以二維表的形式組織數據,創建表的時候需要指定schema。所以只支持結構化數據。

2.每個表指定一個或多個主鍵。

3.支持insert/update/delete,這些修改操作全部要指定主鍵。

4.read操作,只支持scan原語。

5.一致性模型,默認支持snapshot ,這個可以保證scan和單個客戶端 read-you-writes一致性保證。更強的一致性保證,提供manually propagate timestamps between clients或者commit-wait。

6.cluster類似hbase簡單的M-S結構,master支持備份。

7.單個表支持水平分割,partitions叫tablets,單行一定在一個tablets里面,支持范圍,以及list等更靈活的分區鍵。

8.使用Raft 協議,可以根據SLA指定備份塊數量。

9.列式存儲

10.delta flushes,數據先更新到內存中,最后在合並到最終存儲中,有專門到后台進程負責。

11.Lazy Materialization ,對一些選擇性謂詞,可以幫助跳過很多不必要的數據。

12.支持和MR/SPARK/IMPALA等集成,支持Locality ,Columnar Projection ,Predicate pushdown 等。


注意:
1、建表的時候要求所有的tserver節點都活着
2、根據raft機制,允許(replication的副本數-)/ 2宕掉,集群還會正常運行,否則會報錯找不到ip:7050(7050是rpc的通信端口號),需要注意一個問題,第一次運行的時候要保證集群處於正常狀態下,也就是所有的服務都啟動,如果運行過程中,允許(replication的副本數-)/ 2宕掉
3、讀操作,只要有一台活着的情況下,就可以運行



maven 依賴:

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>1.7.0</version>
        </dependency>

 





Java 代碼:

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @Author:Xavier
 * @Data:2019-02-22 09:25
 **/


public class KuduOption {
    // master地址
    private static final String KUDU_MASTER = "nn02:7051";

    private static String tableName = "KuduTest";

    //創建表
    @Test
    public void CreateTab() {
        // 創建kudu的數據庫鏈接
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

        try {
            // 設置表的schema(模式)
            List<ColumnSchema> columns = new ArrayList(2);
            columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
            Schema schema = new Schema(columns);

            //創建表時提供的所有選項
            CreateTableOptions options = new CreateTableOptions();

            // 設置表的replica備份和分區規則
            List<String> rangeKeys = new ArrayList<>();
            rangeKeys.add("key");

            // 一個replica
            options.setNumReplicas(1);
            // 用列rangeKeys做為分區的參照
            options.setRangePartitionColumns(rangeKeys);
            client.createTable(tableName, schema, options);

            // 添加key的hash分區
            //options.addHashPartitions(parcols, 3);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    //向表內插入新數據
    @Test
    public void InsertData() {
        // 創建kudu的數據庫鏈接
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            // 打開表
            KuduTable table = client.openTable(tableName);
            // 創建寫session,kudu必須通過session寫入
            KuduSession session = client.newSession();

            // 采取Flush方式 手動刷新
            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            session.setMutationBufferSpace(3000);

            System.out.println("-------start--------" + System.currentTimeMillis());

            for (int i = 1; i < 6100; i++) {
                Insert insert = table.newInsert();
                // 設置字段內容
                PartialRow row = insert.getRow();
                row.addString("key", i+"");
                row.addString(1, "value"+i);
                session.flush();
                session.apply(insert);
            }
            System.out.println("-------end--------" + System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    //更新數據
    @Test
    public void kuduUpdateTest() {
        // 創建kudu的數據庫鏈接
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            KuduTable table = client.openTable(tableName);
            KuduSession session = client.newSession();

            Update update = table.newUpdate();
            PartialRow row = update.getRow();

            //
            row.addString("key", 998 + "");
            row.addString("value", "updata Data " + 10);
            session.flush();
            session.apply(update);

//            System.out.print(operationResponse.getRowError());

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    //根據主鍵刪除數據
    @Test
    public void deleteData(){
        KuduClient client=new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            KuduTable table=client.openTable(tableName);
            KuduSession session=client.newSession();

            Delete delete=table.newDelete();
            PartialRow row=delete.getRow();
            row.addString("key","992");

            session.apply(delete);
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

    //掃描數據
    @Test
    public void SearchData() {
        // 創建kudu的數據庫鏈接
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

        try {
            KuduTable table = client.openTable(tableName);

            List<String> projectColumns = new ArrayList<>(1);
            projectColumns.add("value");
            KuduScanner scanner = client.newScannerBuilder(table)
                    .setProjectedColumnNames(projectColumns)
                    .build();
            while (scanner.hasMoreRows()) {
                RowResultIterator results = scanner.nextRows();
                while (results.hasNext()) {
                    RowResult result = results.next();
                    System.out.println(result.getString(0));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    //條件掃描數據
    @Test
    public void searchDataByCondition(){
        KuduClient client =new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

        try {
            KuduTable table=client.openTable(tableName);

            KuduScanner.KuduScannerBuilder scannerBuilder=client.newScannerBuilder(table);

            //設置搜索的條件
            KuduPredicate predicate=KuduPredicate.
                    newComparisonPredicate(
                            table.getSchema().getColumn("key"),//設置要值的謂詞(字段)
                            KuduPredicate.ComparisonOp.EQUAL,//設置搜索邏輯
                            "991");//設置搜索條件值
            scannerBuilder.addPredicate(predicate);

            // 開始掃描
            KuduScanner scanner=scannerBuilder.build();
            while(scanner.hasMoreRows()){
                RowResultIterator iterator=scanner.nextRows();
                while(iterator.hasNext()){
                    RowResult result=iterator.next();
                    System.out.println("輸出: "+result.getString(0)+"--"+result.getString("value"));
                }
            }
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

    //刪除表
    @Test
    public void DelTab() {
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            client.deleteTable(tableName);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    //
    @Test
    public void searchBysparkSql() {
        SparkSession sparkSession = getSparkSession();
        List<StructField> fields = Arrays.asList(
                DataTypes.createStructField("key", DataTypes.StringType, true),
                DataTypes.createStructField("value", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(fields);
        Dataset ds = sparkSession.read().format("org.apache.kudu.spark.kudu").
                schema(schema).option("kudu.master", "nn02:7051").option("kudu.table", "KuduTest").load();
        ds.registerTempTable("abc");
        sparkSession.sql("select * from abc").show();
    }

    @Test
    public void checkTableExistByKuduContext() {
        SparkSession sparkSession = getSparkSession();
        KuduContext context = new KuduContext("172.19.224.213:7051", sparkSession.sparkContext());
        System.out.println(tableName + " is exist = " + context.tableExists(tableName));
    }

    public SparkSession getSparkSession() {
        SparkConf conf = new SparkConf().setAppName("test")
                .setMaster("local[*]")
                .set("spark.driver.userClassPathFirst", "true");

        conf.set("spark.sql.crossJoin.enabled", "true");
        SparkContext sparkContext = new SparkContext(conf);
        SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate();
        return sparkSession;
    }
}

 






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM