【ODPS】阿里雲ODPS中帶分區的表操作


1.創建分區表:

分區表有自己的分區列,而分區表則沒有。

public static void createTableWithPartition(Odps odps, String createTableName)
			throws Exception {
		Tables tables = odps.tables();// /獲取表示ODPS全部Table的集合對象
		boolean a = tables.exists(createTableName);// 推斷指定表test_table_jyl是否存在
		if (a) {
			System.out.println("指定表存在");
			Table table = tables.get(createTableName);
			tables.delete(createTableName);//存在就刪除
		} else {
			System.out.println("指定表不存在");
		}
		System.out.println("-------------------------------------------------");
		
		/* 創建表 */
		if (tables.exists(createTableName)) {
			System.out.println("指定表存在,無法創建");
		} else {
			System.out.println("指定表不存在,能夠創建");
			/* TableSchema表示ODPS中表的定義 */
			TableSchema tableSchema = new TableSchema();
			/* 加入列 */
			Column col; // Column表示ODPS中表的列定義
			col = new Column("id", OdpsType.STRING, "ID");
			tableSchema.addColumn(col);
			col = new Column("name", OdpsType.STRING, "姓名");
			tableSchema.addColumn(col);
			col = new Column("sex", OdpsType.BIGINT, "性別");
			tableSchema.addColumn(col);
			col = new Column("birthday", OdpsType.DATETIME, "生日");
			tableSchema.addColumn(col);

			/* 加入分區列 */
			col = new Column("province ", OdpsType.STRING, "省(分區列)");
			tableSchema.addPartitionColumn(col);
			
			tables.create(createTableName, tableSchema);//創建表
			System.out.println("表【" + createTableName + "】創建成功");
		}
		System.out.println("-------------------------------------------------");

	}


2.分區表數據上傳:

分區表上傳數據必須指定分區。所以上傳數據前必須保證存在分區,不存在就創建一個,創建分區有兩種方法

/*PartitionSpec類表示一個特定分區的定義*/
		String partitionColumn="province";//表中的分區列
		/*第一種,直接調用帶參構造函數,
		 * 參數格式:分區定義字符串。比方: pt='1',ds='2'
		 */
		PartitionSpec partitionSpec1 = new PartitionSpec(partitionColumn+"='hubei'");
		
		/*另外一種,調用布帶參數構造函數,再調用隊形set方法。

*/ PartitionSpec partitionSpec2 = new PartitionSpec(); partitionSpec2.set(partitionColumn, "hubei");


TableTunnel類中有兩個創建創建上傳會話方法:

createUploadSession

public TableTunnel.UploadSession createUploadSession(String projectName,
                                                     String tableName)
                                              throws TunnelException
在非分區表上創建上傳會話

Parameters:
projectName  - Project名稱
tableName  - 表名,非視圖
Returns:
TableTunnel.UploadSession
Throws:
TunnelException

createUploadSession

public TableTunnel.UploadSession createUploadSession(String projectName,
                                                     String tableName,
                                                     PartitionSpec partitionSpec)
                                              throws TunnelException
在分區表上創建上傳會話

注: 分區必須為最末級分區,如表有兩級分區pt,ds, 則必須所有指定值, 不支持僅僅指定當中一個值

Parameters:
projectName  - Project名
tableName  - 表名,非視圖
partitionSpec  - 指定分區   PartitionSpec
Returns:
TableTunnel.UploadSession
Throws:
TunnelException


分區表必須使用帶分區的構造方法。還必須保證該分區存在,否則會報異常。

public static void uploadDataToYun(Odps odps, String project, String tableName)
			throws Exception {
		TableTunnel tunnel = new TableTunnel(odps);
		tunnel.setEndpoint(TUNNEL_URL);// 設置TunnelServer地址,沒有設置TunnelServer地址的情況下自己主動選擇
		
		/*PartitionSpec類表示一個特定分區的定義*/
		String partitionColumn="province";//表中的分區列
		PartitionSpec partitionSpec = new PartitionSpec();
		partitionSpec.set(partitionColumn, "hubei");
		
		Table table = odps.tables().get(tableName);//獲取當前表
		boolean a= table.hasPartition(partitionSpec);//推斷上述定義分區在表中是否存在
		if(a){
			System.out.println("分區已經存在,能夠直接上傳數據");
		}else{
			System.out.println("分區不存在,先創建分區再上傳數據");
			table.createPartition(partitionSpec);
		}
		
		/*在分區表上創建上傳會話*/
		TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(
				project, tableName,partitionSpec);

		RecordWriter rw = uploadSession.openRecordWriter(1);
		Column[] columns = new Column[4];
		columns[0] = new Column("id", OdpsType.STRING);
		columns[1] = new Column("name", OdpsType.STRING);
		columns[2] = new Column("sex", OdpsType.BIGINT);
		columns[3] = new Column("birthday", OdpsType.DATETIME);
		Record r = new ArrayRecord(columns);
		
		r.setString("id", "3");
		r.setString("name", "name3");
    	r.setBigint("sex", (long) 2);
    	Date date = new Date();
    	r.setDatetime("birthday", date);
    	rw.write(r);
    	rw.close();//一定要close,不然無法commit

		Long[] blocks = uploadSession.getBlockList();
		uploadSession.commit(blocks);
		System.out.println("數據上傳成功");
	}


3.測試類:

	private static final String ACCESS_ID = "***********";
	private static final String ACCESS_KEY = "***************";
	private static final String PROJECT_NAME = "*************";
	private static final String TUNNEL_URL = "http://dt.odps.aliyun.com";
	private static final String ODPS_URL = "http://service.odps.aliyun.com/api";
	
	public static void main(String args[]) throws Exception {

		/* 先構建阿里雲帳號 */
		Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
		
		/* Odps類是ODPS SDK的入口 */
		Odps odps = new Odps(account);
		odps.setDefaultProject(PROJECT_NAME);// 指定默認使用的Project名稱
		odps.setEndpoint(ODPS_URL);// 設置ODPS服務的地址
		
		String tableName="test_table_jyl";
		/*創建帶分區的表*/
		createTableWithPartition(odps,tableName);
		
		/*上傳數據*/
		uploadDataToYun(odps, PROJECT_NAME, tableName);
	}




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM