深入淺出Sqoop之遷移過程源碼分析


【摘要】Sqoop是一種用於在Apache Hadoop和結構化數據存儲(如關系數據庫)之間高效傳輸批量數據的工具 。本文將簡單介紹Sqoop作業執行時相關的類及方法,並將該過程與MapReduce的執行結合,分析數據如何從源端遷移到目的端。

Sqoop作業執行過程

拋開MR的執行過程,Sqoop執行時用到的關鍵類總共有5個,Initializer、Partitioner、Extractor、Loader、Destroyer。執行流程如下圖所示
  • Initializer:初始化階段,源數據校驗,參數初始化等工作;
  • Partitioner:源數據分片,根據作業並發數來決定源數據要切分多少片;
  • Extractor:開啟extractor線程,根據用戶配置從內存中構造數據寫入隊列;
  • Loader:開啟loader線程,從隊列中讀取數據並拋出;
  • Destroyer:資源回收,斷開sqoop與數據源的連接,並釋放資源;
因此,每次新建一個連接器都要實現上述5個類。

Initializer

Initializer是在sqoop任務提交到MR之前被調用,主要是做遷移前的准備,例如連接數據源,創建臨時表,添加依賴的jar包等。它是sqoop作業生命周期的第一步,主要API如下
public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration); public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration){ return new LinkedList<String>(); } public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration) { return new NullSchema(); }

 

其中getSchema()方法被From或者To端的connector在提取或者載入數據時用來匹配數據。例如,一個GenericJdbcConnector會調用它獲取源端Mysql的數據庫名,表名,表中的字段信息等。

Destroyer

Destroyer 是在作業執行結束后被實例化,這是Sqoop作業的最后一步。清理任務,刪除臨時表,關閉連接器等。
public abstract void destroy(DestroyerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);

Partitioner

Partitioner創建分區Partition,Sqoop默認創建10個分片,主要API如下
public abstract List<Partition> getPartitions(PartitionerContext context, LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration);

 

Partition類中實現了readFields()方法和write()方法,方便讀寫
public abstract class Partition { public abstract void readFields(DataInput in) throws IOException; public abstract void write(DataOutput out) throws IOException; public abstract String toString(); }

Extractor

Extractor類根據分片partition和配置信息從源端提取數據,寫入SqoopMapDataWriter中,SqoopMapDataWriter是SqoopMapper的內部類它繼承了DataWriter類。此外它打包了SqoopWritable類,以中間數據格式保存從源端讀取到的數據。
public abstract void extract(ExtractorContext context, LinkConfiguration linkConfiguration, JobConfiguration jobConfiguration, SqoopPartition partition);
該方法內部核心代碼如下
while (resultSet.next()) { ... context.getDataWriter().writeArrayRecord(array); ... }

Loader

loader從源端接受數據,並將其載入目的端,它必須實現如下接口
public abstract void load(LoaderContext context, ConnectionConfiguration connectionConfiguration, JobConfiguration jobConfiguration) throws Exception;
load方法從SqoopOutputFormatDataReader中讀取,它讀取“中間數據格式表示形式” _中的數據並將其加載到數據源。此外Loader必須迭代的調用DataReader()直到它讀完。
while ((array = context.getDataReader().readArrayRecord()) != null) { ... }

MapReduce執行過程

上一節避開MR執行過程,僅僅從Extractor和Loader過程描述遷移過程。下面將結合MR的執行過程詳細的介紹一個Sqoop遷移作業流程。
初始化
1)作業初始化階段,SqoopInputFormat讀取給源端數據分片的過程
  • SqoopInputFormat的getSplits方法會調用Partitioner類的getPartitions方法
  • 將返回的Partition列表包裝到SqoopSplit中;
  • 默認分片個數為10
這里每個Partition分片會交給一個Mapper執行。每個Mapper分別啟動一個extractor線程和Loader線程遷移數據。
Mapper
2)作業執行階段的Mapper過程
  • SqoopMapper包含了一個SqoopMapDataWriter類,
  • Mapper的run()調用Extractor.extract方法,該方法迭代的獲取源端數據再調用DataWriter寫入Context
private Class SqoopMapDataWriter extends DataWriter { ... private void writeContent() { ... context.wirte(writable, NullWritable.get()); // 這里的writable 是SqoopWritable的一個對象 ... } ... }
注意:這里的Context中存的是KV對,K是SqoopWritable,而V僅是一個空的Writable對象。SqoopWritable中實現了write和readField,用於序列化和反序列化。
Reducer
3)作業執行階段的Reduce過程,
  • SqoopOutputFormatLoadExecutor包裝了SqoopOuputFormatDataReader,SqoopRecordWriter, ConsumerThread三個內部類;
  • SqoopNullOutputFormat調用getRecordWriter時創建一個線程:ConsumerThread,代碼如下
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { executorService = Executors.newSingleThreadExecutor(...); consumerFuture = executorService.submit(new ConsumerThread(context)); return writer; }
  • ConsumerThread集成了Runnable接口,線程內部調用Loader.load(...)方法,該方法用DataReader迭代的從Context中讀取出SqoopWritable,並將其寫入一個中間數據格式再寫入目的端數據庫中。
private class ConsumerThread implements Runnable { ... public void run() { ... Loader.load(loaderContext, connectorLinkConfig, ConnectorToJobConfig); ... } ... }
注意:
  • 再本地模式下,Sqoop提交任務時沒有設置SqoopReducer.class,MR會調用一個默認的reducer.class。
  • setContent就是SqoopRecordWriter.write(...),它將SqoopWritable反序列化后存入中間存儲格式中,即IntermediateDataFormat。與之對應,getContent就是從該中間存儲格式中讀取數據。
  • Sqoop定義了一個可插拔的中間數據格式抽象類,IntermediateDataFormat類,SqoopWritable打包了這個抽象類用來保存中間數據。
以上即為Sqoop作業執行時相關的類及方法內容,希望對大家在進行數據遷移過程中有所幫助。
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM