1.創建分區表:
分區表有自己的分區列,而分區表則沒有。
public static void createTableWithPartition(Odps odps, String createTableName)
throws Exception {
Tables tables = odps.tables();// /獲取表示ODPS全部Table的集合對象
boolean a = tables.exists(createTableName);// 推斷指定表test_table_jyl是否存在
if (a) {
System.out.println("指定表存在");
Table table = tables.get(createTableName);
tables.delete(createTableName);//存在就刪除
} else {
System.out.println("指定表不存在");
}
System.out.println("-------------------------------------------------");
/* 創建表 */
if (tables.exists(createTableName)) {
System.out.println("指定表存在,無法創建");
} else {
System.out.println("指定表不存在,能夠創建");
/* TableSchema表示ODPS中表的定義 */
TableSchema tableSchema = new TableSchema();
/* 加入列 */
Column col; // Column表示ODPS中表的列定義
col = new Column("id", OdpsType.STRING, "ID");
tableSchema.addColumn(col);
col = new Column("name", OdpsType.STRING, "姓名");
tableSchema.addColumn(col);
col = new Column("sex", OdpsType.BIGINT, "性別");
tableSchema.addColumn(col);
col = new Column("birthday", OdpsType.DATETIME, "生日");
tableSchema.addColumn(col);
/* 加入分區列 */
col = new Column("province ", OdpsType.STRING, "省(分區列)");
tableSchema.addPartitionColumn(col);
tables.create(createTableName, tableSchema);//創建表
System.out.println("表【" + createTableName + "】創建成功");
}
System.out.println("-------------------------------------------------");
}
2.分區表數據上傳:
分區表上傳數據必須指定分區。所以上傳數據前必須保證存在分區,不存在就創建一個,創建分區有兩種方法
/*PartitionSpec類表示一個特定分區的定義*/ String partitionColumn="province";//表中的分區列 /*第一種,直接調用帶參構造函數, * 參數格式:分區定義字符串。比方: pt='1',ds='2' */ PartitionSpec partitionSpec1 = new PartitionSpec(partitionColumn+"='hubei'"); /*另外一種,調用布帶參數構造函數,再調用隊形set方法。*/ PartitionSpec partitionSpec2 = new PartitionSpec(); partitionSpec2.set(partitionColumn, "hubei");
TableTunnel類中有兩個創建創建上傳會話方法:
createUploadSession
public TableTunnel.UploadSession createUploadSession(String projectName, String tableName) throws TunnelException
- 在非分區表上創建上傳會話
-
- Parameters:
-
projectName- Project名稱 -
tableName- 表名,非視圖 - Returns:
-
TableTunnel.UploadSession - Throws:
-
TunnelException
createUploadSession
public TableTunnel.UploadSession createUploadSession(String projectName, String tableName, PartitionSpec partitionSpec) throws TunnelException
-
在分區表上創建上傳會話
注: 分區必須為最末級分區,如表有兩級分區pt,ds, 則必須所有指定值, 不支持僅僅指定當中一個值
- Parameters:
-
projectName- Project名 -
tableName- 表名,非視圖 -
partitionSpec- 指定分區PartitionSpec - Returns:
-
TableTunnel.UploadSession - Throws:
-
TunnelException
分區表必須使用帶分區的構造方法。還必須保證該分區存在,否則會報異常。
public static void uploadDataToYun(Odps odps, String project, String tableName)
throws Exception {
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(TUNNEL_URL);// 設置TunnelServer地址,沒有設置TunnelServer地址的情況下自己主動選擇
/*PartitionSpec類表示一個特定分區的定義*/
String partitionColumn="province";//表中的分區列
PartitionSpec partitionSpec = new PartitionSpec();
partitionSpec.set(partitionColumn, "hubei");
Table table = odps.tables().get(tableName);//獲取當前表
boolean a= table.hasPartition(partitionSpec);//推斷上述定義分區在表中是否存在
if(a){
System.out.println("分區已經存在,能夠直接上傳數據");
}else{
System.out.println("分區不存在,先創建分區再上傳數據");
table.createPartition(partitionSpec);
}
/*在分區表上創建上傳會話*/
TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(
project, tableName,partitionSpec);
RecordWriter rw = uploadSession.openRecordWriter(1);
Column[] columns = new Column[4];
columns[0] = new Column("id", OdpsType.STRING);
columns[1] = new Column("name", OdpsType.STRING);
columns[2] = new Column("sex", OdpsType.BIGINT);
columns[3] = new Column("birthday", OdpsType.DATETIME);
Record r = new ArrayRecord(columns);
r.setString("id", "3");
r.setString("name", "name3");
r.setBigint("sex", (long) 2);
Date date = new Date();
r.setDatetime("birthday", date);
rw.write(r);
rw.close();//一定要close,不然無法commit
Long[] blocks = uploadSession.getBlockList();
uploadSession.commit(blocks);
System.out.println("數據上傳成功");
}
3.測試類:
private static final String ACCESS_ID = "***********";
private static final String ACCESS_KEY = "***************";
private static final String PROJECT_NAME = "*************";
private static final String TUNNEL_URL = "http://dt.odps.aliyun.com";
private static final String ODPS_URL = "http://service.odps.aliyun.com/api";
public static void main(String args[]) throws Exception {
/* 先構建阿里雲帳號 */
Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
/* Odps類是ODPS SDK的入口 */
Odps odps = new Odps(account);
odps.setDefaultProject(PROJECT_NAME);// 指定默認使用的Project名稱
odps.setEndpoint(ODPS_URL);// 設置ODPS服務的地址
String tableName="test_table_jyl";
/*創建帶分區的表*/
createTableWithPartition(odps,tableName);
/*上傳數據*/
uploadDataToYun(odps, PROJECT_NAME, tableName);
}
