服務設計-ETL-核心框架


ETL工具開發目標是一款通用數據遷移工具,可擴充遷移的源數據類型,同時可以擴充目標端存儲類型,是一款可以不斷擴展功能的、通用的數據遷移工具。工具具有數據映射

過濾、默認值等插件可配置使用;提供業務處理插件接口,可供定制化業務處理;對大量數據進行分批遷移的功能;批量任務遷移時支持斷點續傳功能等。

3.2 ETL軟件架構

 

Figure 1 ETL軟件架構圖

 

         上圖為ETL軟件架構,主要包含以下幾個模塊:

1)      配置解析模塊

負責接收外部配置文件,按需解析配置文件內容。

2)      數據輸入適配器

結合配置解析模塊,解析輸入配置,得到輸入實例,再根據輸入配置信息,抽取源數據。

3)      數據統一格式模塊

對任意種類的數據,抽取到內存中后,需要對格式進行統一化,方面處理。在ETL中,ETL是基於Spark開發,數據抽取到內容中后,會轉化成RDD(分布式彈性數據集),RDD結合數據結構會轉化為DataSet(DataFrame)。

4)      數據映射、過濾、默認值處理模塊

ETL內部集成針對數據的映射處理、過濾處理以及默認值處理,用戶可以通過配置使用。

5)      業務數據處理插件接口

業務處理插件接口,用於復雜、需求常變動場景下,由業務人員定制處理處理業務使用。

6)      數據輸出適配器

結合配置解析模塊,解析輸出配置,得到輸出實例,再根據輸出配置信息,加載數據到目標數據庫端。

7)      自動分批模塊

大數據項目中數據量通常從幾百萬條數據到幾十上百億條數據,對於數據量大的任務,工具需要把大任務分解成一批小任務分別執行。自動分批模塊可根據輸入的配置,結合單批次設定的數據量大小,對任務進行划分。

8)      斷點續傳功能

此功能和自動分批模塊結合使用,在遷移過程中,批次任務異常中止后,重啟工具,任務可自動識別當前已經執行成功的任務,從斷點的批次繼續執行數據遷移。

9)      Rowkey生成器

當前ETL的輸出數據端主要是Hbase數據庫,rowkey的生成通常來自數據字段的值,結合一定函數組合而成。Rowkey生成器內嵌一些集成的基礎函數,可供配置生成Rowkey。

10)   通用數據處理模塊

此模塊是一些通用的數據處理實現類,是實現業務數據處理插件接口得到。可以完成一些數據轉化成輸出數據格式要求。

11)   任務調度模塊

此模塊主要用於調度執行數據遷移任務。

3.3 已有功能說明

         ETL工具目前提供功能如下圖所示:

 

Figure 2 ETL功能示意圖

         功能說明:

ETL工具主要提供數據抽取、數據轉換接口,數據輸出功能。目前ETL工具提供從DB、Hbase、Solr抽取數據,然后經過用戶實現的Process接口或者通用的業務處理類對數據進行轉換處理,最后加載到Hbase數據庫中。

 

3) 業務數據處理插件Process接口優化

         業務數據處理插件的使用方式如下:

A、 實現process接口

B、 程序通過輸入配置獲取實現類信息

C、 通過反射方式實例化實現類

D、 調用實現類處理業務數據

E、  通過實現類返回值獲取輸出數據和對應的輸出數據結構信息

老的process接口類

老版本ETL業務數據處理插件接口類定義如下:

 

addParameters用於接收外部傳入參數供process實現類使用

process方法用於實現對業務數據的處理

process輸入參數DataIterator內部結構如下:

 

 

 

 

 

 

 

 

 

 

主要包含三個部分:

         輸入數據的字段信息fieldInfos

         判斷迭代器中是否有未訪問的數據

         從迭代器中返回一條數據

 

輸出類ResultSet如下:

        

 

 

 

         仍是一個迭代器,為后續訪問數據方便,內存存儲數據使用的為list存儲。

(2)新的process接口類

         從業務發展發現,上述接口基本可以滿足業務處理要求,但一些特殊場景,用戶需要獲取輸入配置的所有信息,不單單是輸入數據的字段信息。

        

         修改后新的業務數據處理插件接口類如下:

 

 

 

InputInfo類主要內容如下:

 

 

 

新的接口類改變點如下:

1、變更原有兩個方式為三個方法

把輸入數據和對應的字段信息分別提供兩個方法實現,引導用戶更好的使用輸入字段信息並要求定義輸出字段信息

2、在輸入信息類中新增屬性configInfo,其包含所有的輸入配置信息。

3、調整類以及方法、參數的名稱,使其更容易理解

4) 代碼優化

在開發和測試etl過程中發現一些代碼冗余、已經廢棄、包含業務、效率低等問題,針對發現的問題,進行列舉說明,部分內容進行詳細說明。

(1)ArrayList的使用優化

舊版ETL中arraylist通常直接new使用,並未設置capacity,默認為10;

但是當arraylist中放入的數據量達到一定數據量后,arraylist會進行擴容,擴容的方式是拷貝現有數據新創建一個arraylist,擴容的后的數組大小為(oldCapacity + (oldCapacity >> 1))2倍多。

         缺點:如果數據量特別大,很發生發次擴容,影響數據處理效率,且對內存有影響。

         修改方式:ETL中為spark任務,spark的任務單元為partition,etl中每個partion中有多少條數據是確定的,使用時創建Arraylist時直接指定相應數據量的容量大小。

(linkList也可以解決上述問題,但linklist在隨機訪問時性能不佳,為影響存儲在里面的數據的訪問,故未使用)

 

Arraylist擴容相關代碼如下:(如進一步了解請查看arraylist源碼)

add數據時,

public boolean add(E e) {

        ensureCapacityInternal(size + 1);  // Increments modCount!!

        elementData[size++] = e;

        return true;

}

private void ensureCapacityInternal(int minCapacity) {

        ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));

    }

private void ensureExplicitCapacity(int minCapacity) {

        modCount++;

 

        // overflow-conscious code

        if (minCapacity - elementData.length > 0)

            grow(minCapacity);

  }

private void grow(int minCapacity) {

        // overflow-conscious code

        int oldCapacity = elementData.length;

        int newCapacity = oldCapacity + (oldCapacity >> 1);

        if (newCapacity - minCapacity < 0)

            newCapacity = minCapacity;

        if (newCapacity - MAX_ARRAY_SIZE > 0)

            newCapacity = hugeCapacity(minCapacity);

        // minCapacity is usually close to size, so this is a win:

        elementData = Arrays.copyOf(elementData, newCapacity);

    }

/**

     * Copies the specified array, truncating or padding with nulls (if necessary)

   **/

public static <T> T[] copyOf(T[] original, int newLength) {

        return (T[]) copyOf(original, newLength, original.getClass());

}

 

Spark兩種共享變量:廣播變量(broadcast variable)與累加器(accumulator)累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。

共享變量出現的原因:通常在向 Spark 傳遞函數時,比如使用 map() 函數或者用 filter() 傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。Spark 的兩個共享變量,累加器與廣播變量,分別為結果聚合與廣播這兩種常見的通信模式突破了這一限制。

 

問題描述:

在使用少量數據測試ETL功能時發現,業務數據處理插件DataProcess實現類返回的輸出數據字段信息在spark一些節點的execute中為null,定位結論為未進行初始化。

問題原因:

由於一些exectute執行了dataprocess實現類中的方法,有返回值,則輸出字段信息有賦值,所以不為空。另外一些未執行,則為null。

解決辦法:

         Spark的廣播變量可以在集群任務中共享此屬性,只要在driver節點執行dataprocess獲取輸出字段信息,然后使用廣播變量,即可在所有execute中使用此變量。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM