依次啟動HDFS、mysql、hive、kudu、impala
登錄impala的shell控制端:
Impala-shell
1:使用該impala-shell命令啟動Impala Shell 。默認情況下,impala-shell 嘗試連接到localhost端口21000 上的Impala守護程序。要連接到其他主機,請使用該-i <host:port>選項。要自動連接到特定的Impala數據庫,請使用該-d <database>選項。例如,如果您的所有Kudu表都位於數據庫中的Impala中impala_kudu,則-d impala_kudu可以使用此數據庫。
2:要退出Impala Shell,請使用以下命令: quit;
內部表
內部表由Impala管理,當您從Impala中刪除時,數據和表確實被刪除。當您使用Impala創建新表時,它通常是內部表。
使用impala創建內部表:
CREATE TABLE my_first_table ( id BIGINT, name STRING, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ( 'kudu.master_addresses' = 'hadoop01:7051,hadoop02:7051,hadoop03:7051', 'kudu.table_name' = 'my_first_table' );
在 CREATE TABLE 語句中,必須首先列出構成主鍵的列。
此時創建的表是內部表,從impala刪除表的時候,在底層存儲的kudu也會刪除表
drop table if exists my_first_table;
准備kudu表和數據,使用java創建kudu表以及插入數據:
創建kudu表:
import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import java.util.LinkedList; import java.util.List; public class CreateTable { private static ColumnSchema newColumn(String name, Type type, boolean iskey) { ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type); column.key(iskey); return column.build(); } public static void main(String[] args) throws KuduException { // master地址 final String masteraddr = "hadoop01,hadoop02,hadoop03"; // 創建kudu的數據庫鏈接 KuduClient client = new KuduClient.KuduClientBuilder(masteraddr).defaultSocketReadTimeoutMs(6000).build(); // 設置表的schema List<ColumnSchema> columns = new LinkedList<ColumnSchema>(); columns.add(newColumn("CompanyId", Type.INT32, true)); columns.add(newColumn("WorkId", Type.INT32, false)); columns.add(newColumn("Name", Type.STRING, false)); columns.add(newColumn("Gender", Type.STRING, false)); columns.add(newColumn("Photo", Type.STRING, false)); Schema schema = new Schema(columns); //創建表時提供的所有選項 CreateTableOptions options = new CreateTableOptions(); // 設置表的replica備份和分區規則 List<String> parcols = new LinkedList<String>(); parcols.add("CompanyId"); //設置表的備份數 options.setNumReplicas(1); //設置range分區 options.setRangePartitionColumns(parcols); //設置hash分區和數量 options.addHashPartitions(parcols, 3); try { client.createTable("PERSON", schema, options); } catch (KuduException e) { e.printStackTrace(); } client.close(); } }
插入數據到kudu:
import org.apache.kudu.client.*; import org.apache.kudu.client.SessionConfiguration.FlushMode; public class InsertRow { public static void main(String[] args) throws KuduException { // master地址 final String masteraddr = "hadoop01,hadoop02,hadoop03"; // 創建kudu的數據庫鏈接 KuduClient client = new KuduClient.KuduClientBuilder(masteraddr).build(); // 打開表 KuduTable table = client.openTable("PERSON"); // 創建寫session,kudu必須通過session寫入 KuduSession session = client.newSession(); // 采取Flush方式 手動刷新 session.setFlushMode(FlushMode.MANUAL_FLUSH); session.setMutationBufferSpace(3000); for (int i = 1; i < 10; i++) { Insert insert = table.newInsert(); // 設置字段內容 insert.getRow().addInt("CompanyId", i); insert.getRow().addInt("WorkId", i); insert.getRow().addString("Name", "lisi" + i); insert.getRow().addString("Gender", "male"); insert.getRow().addString("Photo", "person" + i); session.flush(); session.apply(insert); } session.close(); client.close(); } }
將kudu中的表映射到impala中:
CREATE EXTERNAL TABLE kudu_PERSON STORED AS KUDU TBLPROPERTIES ( 'kudu.master_addresses' = 'hadoop01:7051,hadoop02:7051,hadoop03:7051', 'kudu.table_name' = 'PERSON' );
使用CREATE TABLE ... AS SELECT 創建新表
CREATE TABLE new_table PRIMARY KEY (companyid) PARTITION BY HASH(companyid) PARTITIONS 8 STORED AS KUDU AS SELECT companyid, workid, name ,gender,photo FROM kudu_PERSON;
結果:
[angel1:21000] > CREATE TABLE new_table > PRIMARY KEY (companyid) > PARTITION BY HASH(companyid) PARTITIONS 8 > STORED AS KUDU > AS SELECT companyid, workid, name ,gender,photo FROM kudu_PERSON; Query: create TABLE new_table PRIMARY KEY (companyid) PARTITION BY HASH(companyid) PARTITIONS 8 STORED AS KUDU AS SELECT companyid, workid, name ,gender,photo FROM kudu_PERSON +-------------------+ | summary | +-------------------+ | Inserted 9 row(s) | +-------------------+ Fetched 1 row(s) in 1.05s
外部表(創建者CREATE EXTERNAL TABLE)不受Impala管理,並且刪除此表不會將表從其源位置(此處為Kudu)丟棄。相反,它只會去除Impala和Kudu之間的映射。這是Kudu提供的用於將現有表映射到Impala的語法。
使用java創建一個kudu表:
public class CreateTable { private static ColumnSchema newColumn(String name, Type type, boolean iskey) { ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type); column.key(iskey); return column.build(); } public static void main(String[] args) throws KuduException { // master地址 final String masteraddr = "hadoop01,hadoop02,hadoop03"; // 創建kudu的數據庫鏈接 KuduClient client = new KuduClient.KuduClientBuilder(masteraddr).defaultSocketReadTimeoutMs(6000).build(); // 設置表的schema List<ColumnSchema> columns = new LinkedList<ColumnSchema>(); columns.add(newColumn("CompanyId", Type.INT32, true)); columns.add(newColumn("WorkId", Type.INT32, false)); columns.add(newColumn("Name", Type.STRING, false)); columns.add(newColumn("Gender", Type.STRING, false)); columns.add(newColumn("Photo", Type.STRING, false)); Schema schema = new Schema(columns); //創建表時提供的所有選項 CreateTableOptions options = new CreateTableOptions(); // 設置表的replica備份和分區規則 List<String> parcols = new LinkedList<String>(); parcols.add("CompanyId"); //設置表的備份數 options.setNumReplicas(1); //設置range分區 options.setRangePartitionColumns(parcols); //設置hash分區和數量 options.addHashPartitions(parcols, 3); try { client.createTable("PERSON", schema, options); } catch (KuduException e) { e.printStackTrace(); } client.close(); } }
使用impala創建外部表 , 將kudu的表映射到impala上:
CREATE EXTERNAL TABLE my_mapping_table STORED AS KUDU TBLPROPERTIES ( 'kudu.master_addresses' = 'hadoop01:7051,hadoop02:7051,hadoop03:7051', 'kudu.table_name' = 'PERSON' );