1.創建分區表:
分區表有自己的分區列,而分區表則沒有。
public static void createTableWithPartition(Odps odps, String createTableName) throws Exception { Tables tables = odps.tables();// /獲取表示ODPS全部Table的集合對象 boolean a = tables.exists(createTableName);// 推斷指定表test_table_jyl是否存在 if (a) { System.out.println("指定表存在"); Table table = tables.get(createTableName); tables.delete(createTableName);//存在就刪除 } else { System.out.println("指定表不存在"); } System.out.println("-------------------------------------------------"); /* 創建表 */ if (tables.exists(createTableName)) { System.out.println("指定表存在,無法創建"); } else { System.out.println("指定表不存在,能夠創建"); /* TableSchema表示ODPS中表的定義 */ TableSchema tableSchema = new TableSchema(); /* 加入列 */ Column col; // Column表示ODPS中表的列定義 col = new Column("id", OdpsType.STRING, "ID"); tableSchema.addColumn(col); col = new Column("name", OdpsType.STRING, "姓名"); tableSchema.addColumn(col); col = new Column("sex", OdpsType.BIGINT, "性別"); tableSchema.addColumn(col); col = new Column("birthday", OdpsType.DATETIME, "生日"); tableSchema.addColumn(col); /* 加入分區列 */ col = new Column("province ", OdpsType.STRING, "省(分區列)"); tableSchema.addPartitionColumn(col); tables.create(createTableName, tableSchema);//創建表 System.out.println("表【" + createTableName + "】創建成功"); } System.out.println("-------------------------------------------------"); }
2.分區表數據上傳:
分區表上傳數據必須指定分區。所以上傳數據前必須保證存在分區,不存在就創建一個,創建分區有兩種方法
/*PartitionSpec類表示一個特定分區的定義*/ String partitionColumn="province";//表中的分區列 /*第一種,直接調用帶參構造函數, * 參數格式:分區定義字符串。比方: pt='1',ds='2' */ PartitionSpec partitionSpec1 = new PartitionSpec(partitionColumn+"='hubei'"); /*另外一種,調用布帶參數構造函數,再調用隊形set方法。*/ PartitionSpec partitionSpec2 = new PartitionSpec(); partitionSpec2.set(partitionColumn, "hubei");
TableTunnel類中有兩個創建創建上傳會話方法:
createUploadSession
public TableTunnel.UploadSession createUploadSession(String projectName, String tableName) throws TunnelException
- 在非分區表上創建上傳會話
-
- Parameters:
-
projectName
- Project名稱 -
tableName
- 表名,非視圖 - Returns:
-
TableTunnel.UploadSession
- Throws:
-
TunnelException
createUploadSession
public TableTunnel.UploadSession createUploadSession(String projectName, String tableName, PartitionSpec partitionSpec) throws TunnelException
-
在分區表上創建上傳會話
注: 分區必須為最末級分區,如表有兩級分區pt,ds, 則必須所有指定值, 不支持僅僅指定當中一個值
- Parameters:
-
projectName
- Project名 -
tableName
- 表名,非視圖 -
partitionSpec
- 指定分區PartitionSpec
- Returns:
-
TableTunnel.UploadSession
- Throws:
-
TunnelException
分區表必須使用帶分區的構造方法。還必須保證該分區存在,否則會報異常。
public static void uploadDataToYun(Odps odps, String project, String tableName) throws Exception { TableTunnel tunnel = new TableTunnel(odps); tunnel.setEndpoint(TUNNEL_URL);// 設置TunnelServer地址,沒有設置TunnelServer地址的情況下自己主動選擇 /*PartitionSpec類表示一個特定分區的定義*/ String partitionColumn="province";//表中的分區列 PartitionSpec partitionSpec = new PartitionSpec(); partitionSpec.set(partitionColumn, "hubei"); Table table = odps.tables().get(tableName);//獲取當前表 boolean a= table.hasPartition(partitionSpec);//推斷上述定義分區在表中是否存在 if(a){ System.out.println("分區已經存在,能夠直接上傳數據"); }else{ System.out.println("分區不存在,先創建分區再上傳數據"); table.createPartition(partitionSpec); } /*在分區表上創建上傳會話*/ TableTunnel.UploadSession uploadSession = tunnel.createUploadSession( project, tableName,partitionSpec); RecordWriter rw = uploadSession.openRecordWriter(1); Column[] columns = new Column[4]; columns[0] = new Column("id", OdpsType.STRING); columns[1] = new Column("name", OdpsType.STRING); columns[2] = new Column("sex", OdpsType.BIGINT); columns[3] = new Column("birthday", OdpsType.DATETIME); Record r = new ArrayRecord(columns); r.setString("id", "3"); r.setString("name", "name3"); r.setBigint("sex", (long) 2); Date date = new Date(); r.setDatetime("birthday", date); rw.write(r); rw.close();//一定要close,不然無法commit Long[] blocks = uploadSession.getBlockList(); uploadSession.commit(blocks); System.out.println("數據上傳成功"); }
3.測試類:
private static final String ACCESS_ID = "***********"; private static final String ACCESS_KEY = "***************"; private static final String PROJECT_NAME = "*************"; private static final String TUNNEL_URL = "http://dt.odps.aliyun.com"; private static final String ODPS_URL = "http://service.odps.aliyun.com/api"; public static void main(String args[]) throws Exception { /* 先構建阿里雲帳號 */ Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY); /* Odps類是ODPS SDK的入口 */ Odps odps = new Odps(account); odps.setDefaultProject(PROJECT_NAME);// 指定默認使用的Project名稱 odps.setEndpoint(ODPS_URL);// 設置ODPS服務的地址 String tableName="test_table_jyl"; /*創建帶分區的表*/ createTableWithPartition(odps,tableName); /*上傳數據*/ uploadDataToYun(odps, PROJECT_NAME, tableName); }