場景描述
有這樣一種場景,用戶在自建服務器上存有一定數量級的CSV格式業務數據,某一天用戶了解到阿里雲的OSS服務存儲性價比高(嘿嘿,顏值高),於是想將CSV數據遷移到雲上OSS中,並且未來還想對這些數據做一些離線分析,挖掘其中存在價值,因此需要將OSS中文件再通過一種方式同步到ODPS數加平台上,面對這樣需求,小編我經過參考文檔,實踐,調試並修復Bug,實現出以下一種解決方案。
實現目標
通過OSS的Java SDK以及批量數據通道tunnel SDK實現以下兩個功能:
(1)將本地CSV文件上傳到OSS;
(2)將OSS中文件同步到ODPS;
准備工作
在具體實操之前,有必要對OSS有個了解,OSS是個什么東東,為什么要選用OSS呢,OSS控制台限制條件,需要注意事項?
OSS是個什么東東?
阿里雲對象存儲(Object Storage Service,簡稱OSS),是阿里雲對外提供的海量,安全,低成本,高可靠的雲存儲服務。通過網絡隨時存儲和調用包括文本、圖片、音頻、和視頻在內的各種結構化或非結構化數據文件。
為什么選用雲產品OSS服務呢?
是什么原因致使用戶放棄使用自建服務器存儲數據,而轉向雲產品OSS呢?
這方面我深有感觸,我以前在上海一家公司工作,原公司所有數據都是存放在自建的五六台服務器上,從規划,采購到部署,這其間過程復雜,人力部署也不簡單,而且服務器價格昂貴,開發維護成本高,數據可靠性還低,總之耗時、耗力最重要是影響業務進展。接觸了解到OSS后才發現,之前的自建服務器存儲真是太out啦,呵呵,OSS顏值高額,這里顏值具體有以下幾個方面:
可靠性高:數據自動多重冗余備份,規模自動擴展,不影響對外服務;
安全:提供企業級、用戶級多層次安全保護,授權機制及白名單、防盜鏈、主子賬號功能;
成本:省去人工擴容硬盤以及運維成本;
數據處理能力:提供豐富的數據處理服務,比如圖片處理、視頻轉碼、CDN內容加速分發。
OSS控制台限制條件?
通過 OSS 控制台可以上傳
小於 500 MB
文件。如要上傳的文件大於 500 MB,控制台會給出超過大小限制警告,並且在任務管理列表,
失敗並嘗試上傳請求三次。異常警告如下圖所示:
解決方法:可以通過 OSS的SDK 進行上傳。
需要注意幾點
(1) 在OSS中,用戶操作基本數據單元是object,單個對象大小限制為48.8TB,一個存儲空間中可以有無
限量對象。
(2) 新建Bucket,輸入存儲空間名稱,創建后不支持更改存儲空間名稱,上傳到OSS后不能移動文件存儲位
置;
(3) 在所屬地域框中,下拉選擇該存儲空間的數據中心。訂購后不支持更換地域。
(4) 刪除存儲空間之前請確保尚未完成的分片上傳文件產生的碎片文件全部清空,否則無法刪除存儲空間。
(5) 通過web控制台上傳文件,一刷新頁面,任務管理中顯示的上傳任務就會消失不見,所以在上傳過程中
不要刷新頁面。
本地大文件分片上傳到OSS
因為使用單次HTTP請求,Object過大會導致上傳時間長。在這段時間出現網絡原因造成超時或者鏈接斷開錯誤的時候,上傳容易失敗,可以考慮斷點續傳上傳(分片上傳)。當Object大於5GB,這種情況下只能使用斷點續傳上傳(分片上傳),具體參考斷點續傳上傳
,下面代碼實現上傳本地路徑下ratings.csv文件到OSS object管理中:
見附件中 源代碼.rar 壓縮文件中的 MultipartUploadDemo 類實現
單線程實現將OSS文件上傳至ODPS(OSS java-SDK與tunnel SDK結合)
下面代碼實現目標:將OSS中bucket名為qf-test,object對象為ratings.csv文件數據導入到ODPS平台中項目名為dtstack_dev,表名為ratings,分區字段為ds=20160612中。
見附件中 源代碼.rar 壓縮文件中的 OSSToODPS_Upload 類實現
多線程實現將OSS文件上傳至ODPS(OSS java-SDK與tunnel SDK結合)
下面代碼實現目標:將OSS中bucket名為qf-test,object對象為data_test/movies.csv
文件數據導入到ODPS平台中項目名為dtstack_dev,表名為movies_odps2中。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.OSSObject;
class UploadThread implements Callable<Boolean> {
private long id;
private TableSchema schema = null;
private RecordWriter recordWriter = null;
private Record record = null;
private BufferedReader reader = null;
public UploadThread(long id, RecordWriter recordWriter, Record record,
TableSchema schema,BufferedReader reader) {
this.id = id;
this.recordWriter = recordWriter;
this.record = record;
this.schema = schema;
this.reader = reader;
}
public Boolean call() throws Exception {
while (true) {
String line = reader.readLine();
if (line == null) break;
if(id == 0){ //第一行是字段名,忽略掉
id++;
continue;
}
System.out.println(line);
String[] s = line.split(",");
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, Long.valueOf(s[i]));
break;
// case BOOLEAN:
// record.setBoolean(i, str);
// break;
// case DATETIME:
// record.setDatetime(i, str);
// break;
case DOUBLE:
record.setDouble(i, Double.valueOf(s[i]));
break;
case STRING:
record.setString(i,s[i]);
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
recordWriter.write(record);
}
recordWriter.close();
return true;
}
}
public class OSSToODPS_UploadThread {
private static String accessKeyId = "UQV2yoSSWNgquhhe";
private static String accessKeySecret = "bG8xSLwhmKYRmtBoE3HbhOBYXvknG6";
private static String endpoint = "http://oss-cn-hangzhou.aliyuncs.com";
private static String bucketName = "qf-test";
private static String key = "data_test/movies.csv";
private static String tunnelUrl = "http://dt.odps.aliyun.com";
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String project = "dtstack_dev";
private static String table = "movies_odps2";
//private static String partition = "ds=20160612";
private static int threadNum = 10;
public static void main(String args[]) {
/*
* Constructs a client instance with your account for accessing OSS
*/
OSSClient client = new OSSClient(endpoint, accessKeyId, accessKeySecret);
System.out.println("Downloading an object");
OSSObject object = client.getObject(new GetObjectRequest(bucketName, key));
BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent()));
Account account = new AliyunAccount(accessKeyId, accessKeySecret);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);
//PartitionSpec partitionSpec = new PartitionSpec(partition);
UploadSession uploadSession = tunnel.createUploadSession(project,table);
// UploadSession uploadSession = tunnel.createUploadSession(project,
// table, partitionSpec); //分區
System.out.println("Session Status is : "
+ uploadSession.getStatus().toString());
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < threadNum; i++) {
RecordWriter recordWriter = uploadSession.openRecordWriter(i);
Record record = uploadSession.newRecord();
callers.add(new UploadThread(i, recordWriter, record,
uploadSession.getSchema(),reader));
}
pool.invokeAll(callers);
pool.shutdown();
Long[] blockList = new Long[threadNum];
for (int i = 0; i < threadNum; i++)
blockList[i] = Long.valueOf(i);
uploadSession.commit(blockList);
reader.close();
System.out.println("upload success!");
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
編程實現中遇到Bug
Apache httpclient包沖突
Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE

at org.apache.http.conn.ssl.SSLConnectionSocketFactory.<clinit>(SSLConnectionSocketFactory.java:144)
at com.aliyun.oss.common.comm.DefaultServiceClient.createHttpClientConnectionManager(DefaultServiceClient.java:232)
at com.aliyun.oss.common.comm.DefaultServiceClient.<init>(DefaultServiceClient.java:78)
at com.aliyun.oss.OSSClient.<init>(OSSClient.java:273)
at com.aliyun.oss.OSSClient.<init>(OSSClient.java:194)
at UploadToODPS.main(UploadToODPS.java:53)
工程里可能有包沖突。原因是OSS Java SDK使用了Apache httpclient 4.4.1,而個人工程使用了與Apache httpclient 4.4.1沖突的Apache httpclient。如上述發生錯誤的工程里,使用了Apache httpclient 4.1.2:
使用統一版本。如果個人工程里使用與Apache httpclient 4.4.1沖突版本,請也使用4.4.1版本。去掉其它版本的Apache httpclient依賴。

recordWriter.write(record) 寫入位置不正確
在單線程編碼實現從OSS傳數據到ODPS
代碼中 recordWriter.write(record) 寫入位置不正確,如下代碼顯示:

多線程上
傳任務無故中斷,如下是異常截圖

for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, Long.valueOf(s[i]));
break;
case DOUBLE:
record.setDouble(i, Double.valueOf(s[i]));
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
recordWriter.write(record); //寫入位置不正確
}
}
// recordWriter.write(record); //放到for循環外,寫入位置正確
recordWriter.write(record)寫入位置不對,將recordWriter.write(record)放置到for循環內,會出現以下奇怪異常:

正確位置是:將recordWriter.write(record)放置到for循環外,結果如下表顯示:
上傳代碼中 partition="20160612" 字符串寫法不對
需要注意,指定分區字符串在程序中正確寫法:
private static String partition = "ds=20160612"; (必須加上分區字段名)
PartitionSpec partitionSpec = new PartitionSpec(partition);
不正確寫法如下:
private static String partition = "20160612";(缺少分區字段名)

通過多線程將OSS中文件同步到ODPS表中時,實現多任務的並發執行,在編碼實現時要注意
reader.close()位置要放正確:
UploadSession uploadSession = tunnel.createUploadSession(project,table, partitionSpec);
OSSObject object = client.getObject(new GetObjectRequest(bucketName, key));
BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent()));
Long[] blockList = new Long[threadNum];
uploadSession.commit(blockList);
將reader.close()放到Callable接口中call()方法里是不對滴,call方法是線程異步執行地方,開啟的所有線程不斷地異步從OSS的緩沖字符輸入流reader中讀取OSS中數據,如果在call()方法中就將reader關閉,也就是說將輸入數據源關閉,直接導致線程讀取失敗。因此reader.close()應該放在線程外部,即uploadSession.commit()位置后邊,如下。
uploadSession.commit(blockList);
reader.close(); //正確位置
System.out.println("upload success!");
