ETL工具開發目標是一款通用數據遷移工具,可擴充遷移的源數據類型,同時可以擴充目標端存儲類型,是一款可以不斷擴展功能的、通用的數據遷移工具。工具具有數據映射
過濾、默認值等插件可配置使用;提供業務處理插件接口,可供定制化業務處理;對大量數據進行分批遷移的功能;批量任務遷移時支持斷點續傳功能等。
3.2 ETL軟件架構
Figure 1 ETL軟件架構圖
上圖為ETL軟件架構,主要包含以下幾個模塊:
1) 配置解析模塊
負責接收外部配置文件,按需解析配置文件內容。
2) 數據輸入適配器
結合配置解析模塊,解析輸入配置,得到輸入實例,再根據輸入配置信息,抽取源數據。
3) 數據統一格式模塊
對任意種類的數據,抽取到內存中后,需要對格式進行統一化,方面處理。在ETL中,ETL是基於Spark開發,數據抽取到內容中后,會轉化成RDD(分布式彈性數據集),RDD結合數據結構會轉化為DataSet(DataFrame)。
4) 數據映射、過濾、默認值處理模塊
ETL內部集成針對數據的映射處理、過濾處理以及默認值處理,用戶可以通過配置使用。
5) 業務數據處理插件接口
業務處理插件接口,用於復雜、需求常變動場景下,由業務人員定制處理處理業務使用。
6) 數據輸出適配器
結合配置解析模塊,解析輸出配置,得到輸出實例,再根據輸出配置信息,加載數據到目標數據庫端。
7) 自動分批模塊
大數據項目中數據量通常從幾百萬條數據到幾十上百億條數據,對於數據量大的任務,工具需要把大任務分解成一批小任務分別執行。自動分批模塊可根據輸入的配置,結合單批次設定的數據量大小,對任務進行划分。
8) 斷點續傳功能
此功能和自動分批模塊結合使用,在遷移過程中,批次任務異常中止后,重啟工具,任務可自動識別當前已經執行成功的任務,從斷點的批次繼續執行數據遷移。
9) Rowkey生成器
當前ETL的輸出數據端主要是Hbase數據庫,rowkey的生成通常來自數據字段的值,結合一定函數組合而成。Rowkey生成器內嵌一些集成的基礎函數,可供配置生成Rowkey。
10) 通用數據處理模塊
此模塊是一些通用的數據處理實現類,是實現業務數據處理插件接口得到。可以完成一些數據轉化成輸出數據格式要求。
11) 任務調度模塊
此模塊主要用於調度執行數據遷移任務。
3.3 已有功能說明
ETL工具目前提供功能如下圖所示:

Figure 2 ETL功能示意圖
功能說明:
ETL工具主要提供數據抽取、數據轉換接口,數據輸出功能。目前ETL工具提供從DB、Hbase、Solr抽取數據,然后經過用戶實現的Process接口或者通用的業務處理類對數據進行轉換處理,最后加載到Hbase數據庫中。
3) 業務數據處理插件Process接口優化
業務數據處理插件的使用方式如下:
A、 實現process接口
B、 程序通過輸入配置獲取實現類信息
C、 通過反射方式實例化實現類
D、 調用實現類處理業務數據
E、 通過實現類返回值獲取輸出數據和對應的輸出數據結構信息
老的process接口類
老版本ETL業務數據處理插件接口類定義如下:

addParameters用於接收外部傳入參數供process實現類使用
process方法用於實現對業務數據的處理
process輸入參數DataIterator內部結構如下:

主要包含三個部分:
輸入數據的字段信息fieldInfos
判斷迭代器中是否有未訪問的數據
從迭代器中返回一條數據
輸出類ResultSet如下:

仍是一個迭代器,為后續訪問數據方便,內存存儲數據使用的為list存儲。
(2)新的process接口類
從業務發展發現,上述接口基本可以滿足業務處理要求,但一些特殊場景,用戶需要獲取輸入配置的所有信息,不單單是輸入數據的字段信息。
修改后新的業務數據處理插件接口類如下:

InputInfo類主要內容如下:

新的接口類改變點如下:
1、變更原有兩個方式為三個方法
把輸入數據和對應的字段信息分別提供兩個方法實現,引導用戶更好的使用輸入字段信息並要求定義輸出字段信息
2、在輸入信息類中新增屬性configInfo,其包含所有的輸入配置信息。
3、調整類以及方法、參數的名稱,使其更容易理解
4) 代碼優化
在開發和測試etl過程中發現一些代碼冗余、已經廢棄、包含業務、效率低等問題,針對發現的問題,進行列舉說明,部分內容進行詳細說明。
(1)ArrayList的使用優化
舊版ETL中arraylist通常直接new使用,並未設置capacity,默認為10;
但是當arraylist中放入的數據量達到一定數據量后,arraylist會進行擴容,擴容的方式是拷貝現有數據新創建一個arraylist,擴容的后的數組大小為(oldCapacity + (oldCapacity >> 1))2倍多。
缺點:如果數據量特別大,很發生發次擴容,影響數據處理效率,且對內存有影響。
修改方式:ETL中為spark任務,spark的任務單元為partition,etl中每個partion中有多少條數據是確定的,使用時創建Arraylist時直接指定相應數據量的容量大小。
(linkList也可以解決上述問題,但linklist在隨機訪問時性能不佳,為影響存儲在里面的數據的訪問,故未使用)
Arraylist擴容相關代碼如下:(如進一步了解請查看arraylist源碼)
add數據時,
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
private void ensureCapacityInternal(int minCapacity) {
ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}
private void ensureExplicitCapacity(int minCapacity) {
modCount++;
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}
/**
* Copies the specified array, truncating or padding with nulls (if necessary)
**/
public static <T> T[] copyOf(T[] original, int newLength) {
return (T[]) copyOf(original, newLength, original.getClass());
}
Spark兩種共享變量:廣播變量(broadcast variable)與累加器(accumulator)累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。
共享變量出現的原因:通常在向 Spark 傳遞函數時,比如使用 map() 函數或者用 filter() 傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。Spark 的兩個共享變量,累加器與廣播變量,分別為結果聚合與廣播這兩種常見的通信模式突破了這一限制。
問題描述:
在使用少量數據測試ETL功能時發現,業務數據處理插件DataProcess實現類返回的輸出數據字段信息在spark一些節點的execute中為null,定位結論為未進行初始化。
問題原因:
由於一些exectute執行了dataprocess實現類中的方法,有返回值,則輸出字段信息有賦值,所以不為空。另外一些未執行,則為null。
解決辦法:
Spark的廣播變量可以在集群任務中共享此屬性,只要在driver節點執行dataprocess獲取輸出字段信息,然后使用廣播變量,即可在所有execute中使用此變量。
