Sqoop任務搬運數據出現重復


問題描述

sqoop任務:sqlserver -> hdfs
運行時間:2019-05-23 00:05:30~00:37:03
sqoop任務運行成功,但是sqlserver搬運到hdfs的300W數據出現829條重復記錄

問題影響

影響酒店下游任務報表數據不准確,需要重跑任務

當時臨時解決方案

重跑該sqoop任務后,數據沒有出現重復
防止類似情況出現,將該任務下游Base數據ETL時distinct

問題原因定位

該sqoop任務配置信息大致如下:

sqoop import -D mapreduce.job.name={JOB_NAME} --connect '{db_info=232}'  --delete-target-dir -query   " SELECT id,star_out,hotel_type,hotel_economic,hotel_apartment,IsMultiSupply,InventoryUseType,IsSendVouchFax,auditingType,replace(replace(replace(replace(SubcityID,char(10),''),char(13),''),char(1),''),char(0),'') as  SubcityID,isshadow,AgreementEntityID FROM Hotel_Product.dbo.hotel(nolock) where   \$CONDITIONS " --where '1=1' --split-by id --null-string '\\N' --null-non-string '\\N' --fields-terminated-by '\001' -m 8 --target-dir /data/BaseData/elong/dshprdt_hotel

其中--split-by id,-m 8 通過id字段來分割出8個map執行。

運行在hadoop-070-126.bigdata.ly節點上的map為第map
搬運的數據范圍為( id >= 1 ) AND ( id < 464562 )
2019-05-23 00:36:27,823 數據庫連接發生異常,異常信息見如下堆棧

2019-05-23 00:36:27,823 ERROR [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Top level exception: 
com.microsoft.sqlserver.jdbc.SQLServerException: Connection reset
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1352)
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1339)
        at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1694)
        at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:3734)
        at com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:3687)
        at com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:3663)
        at com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:3979)
        at com.microsoft.sqlserver.jdbc.TDSReader.readWrappedBytes(IOBuffer.java:4001)
        at com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:3942)
        at com.microsoft.sqlserver.jdbc.TDSReader.readUnsignedInt(IOBuffer.java:3959)
        at com.microsoft.sqlserver.jdbc.PLPInputStream.readBytesInternal(PLPInputStream.java:313)
        at com.microsoft.sqlserver.jdbc.PLPInputStream.getBytes(PLPInputStream.java:129)
        at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:438)
        at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:2441)
        at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:176)
        at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:113)
        at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:1981)
        at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:1966)
        at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getString(SQLServerResultSet.java:2291)
        at org.apache.sqoop.lib.JdbcWritableBridge.readString(JdbcWritableBridge.java:71)
        at com.cloudera.sqoop.lib.JdbcWritableBridge.readString(JdbcWritableBridge.java:61)
        at QueryResult.readFields(QueryResult.java:1670)
        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:244)
        at org.apache.sqoop.mapreduce.db.SQLServerDBRecordReader.nextKeyValue(SQLServerDBRecordReader.java:148)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:553)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1701)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

該數據庫連接異常導致map任務重新和數據庫建立連接進行恢復,繼續進行數據搬運,最終該map成功完成。recover過程見下面日志

2019-05-23 00:36:27,862 WARN [main] org.apache.sqoop.mapreduce.db.SQLServerDBRecordReader: Trying to recover from DB read failure: 
java.io.IOException: SQLException in nextKeyValue
        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:277)
        at org.apache.sqoop.mapreduce.db.SQLServerDBRecordReader.nextKeyValue(SQLServerDBRecordReader.java:148)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:553)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1701)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Connection reset
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1352)
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1339)
        at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1694)
        at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:3734)
        at com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:3687)
        at com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:3663)
        at com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:3979)
        at com.microsoft.sqlserver.jdbc.TDSReader.readWrappedBytes(IOBuffer.java:4001)
        at com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:3942)
        at com.microsoft.sqlserver.jdbc.TDSReader.readUnsignedInt(IOBuffer.java:3959)
        at com.microsoft.sqlserver.jdbc.PLPInputStream.readBytesInternal(PLPInputStream.java:313)
        at com.microsoft.sqlserver.jdbc.PLPInputStream.getBytes(PLPInputStream.java:129)
        at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:438)
        at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:2441)
        at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:176)
        at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:113)
        at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:1981)
        at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:1966)
        at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getString(SQLServerResultSet.java:2291)
        at org.apache.sqoop.lib.JdbcWritableBridge.readString(JdbcWritableBridge.java:71)
        at com.cloudera.sqoop.lib.JdbcWritableBridge.readString(JdbcWritableBridge.java:61)
        at QueryResult.readFields(QueryResult.java:1670)
        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:244)
        ... 13 more
2019-05-23 00:36:28,130 INFO [main] org.apache.sqoop.mapreduce.db.SQLServerConnectionFailureHandler: Session context is: NULL
2019-05-23 00:36:28,131 INFO [main] org.apache.sqoop.mapreduce.db.BasicRetrySQLFailureHandler: A new connection has been established
2019-05-23 00:36:28,186 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Working on split: id >= 1 AND id < 464562
2019-05-23 00:36:28,198 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Executing query:  SELECT id,CorpGroupID,HotelBrandID,AgreementEntityID FROM Hotel_Product.dbo.hotel(nolock) where ( id > 458735 ) AND ( id < 464562 )
2019-05-23 00:36:28,804 WARN [ResponseProcessor for block BP-894016253-10.12.180.10-1463057953660:blk_15531850664_15126642971] org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fiel
ds took 51708ms (threshold=30000ms); ack: seqno: 21070 status: SUCCESS status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 20470492, targets: [172.1.0.126:50010, 172.1.0.72:50010, 1
72.1.0.78:50010]
2019-05-23 00:36:42,628 INFO [Thread-13] org.apache.sqoop.mapreduce.AutoProgressMapper: Auto-progress thread is finished. keepGoing=false
2019-05-23 00:36:42,693 INFO [main] org.apache.hadoop.mapred.Task: Task:attempt_1555615894016_1958403_m_000000_0 is done. And is in the process of committing
2019-05-23 00:36:42,742 INFO [main] org.apache.hadoop.mapred.Task: Task attempt_1555615894016_1958403_m_000000_0 is allowed to commit now
2019-05-23 00:36:42,751 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_1555615894016_1958403_m_000000_0' to viewfs://dcfs/data/BaseDat
a/elong/dshprdt_hotel/_temporary/1/task_1555615894016_1958403_m_000000
2019-05-23 00:36:42,816 INFO [main] org.apache.hadoop.mapred.Task: Task 'attempt_1555615894016_1958403_m_000000_0' done.

注意上述日志中重新建立數據庫連接后查詢的id范圍為
where ( id > 458735 ) AND ( id < 464562 )
為什么恢復后的任務是id查詢的是大於458735的數據?
先看下sqoop恢復任務的代碼

SQLServerDBRecordReader.java
  public boolean nextKeyValue() throws IOException {
    boolean valueReceived = false;
    int retryCount = RETRY_MAX;
    boolean doRetry = true;

    do {
      try {
        // Try to get the next key/value pairs
        valueReceived = super.nextKeyValue();
        doRetry = false;
      } catch (IOException ioEx) {
        LOG.warn("Trying to recover from DB read failure: ", ioEx);
        Throwable cause = ioEx.getCause();

        // Use configured connection handler to recover from the connection
        // failure and use the newly constructed connection.
        // If the failure cannot be recovered, an exception is thrown
        if (failureHandler.canHandleFailure(cause)) {
          // Recover from connection failure
          Connection conn = failureHandler.recover();

          // Configure the new connection before using it
          configureConnection(conn);
          setConnection(conn);

          --retryCount;
          doRetry = (retryCount >= 0);
        } else {
          // Cannot recovered using configured handler, re-throw
          throw new IOException("Cannection handler cannot recover failure: ",
              ioEx);
        }
      }
    } while (doRetry);

    // Rethrow the exception if all retry attempts are consumed
    if (retryCount < 0) {
      throw new IOException("Failed to read from database after "
        + RETRY_MAX + " retries.");
    }

    return valueReceived;
  }

從上述代碼可知,nextKeyValue()獲取數據出現數據連接相關異常后會進行3次重試。
然后繼續執行super.nextKeyValue()。此時因為是新的數據連接,要重新執行一次數據查詢(見如下代碼),新的查詢SQL是通過getSelectQuery()方法構造出來的

DBRecordReader.java  
  @Override
  public boolean nextKeyValue() throws IOException {
    try {
      if (key == null) {
        key = new LongWritable();
      }
      if (value == null) {
        value = createValue();
      }
      if (null == this.results) {
        // First time into this method, run the query.
        LOG.info("Working on split: " + split);
        this.results = executeQuery(getSelectQuery());
      }
      if (!results.next()) {
        return false;
      }
      ...
    }

仔細看看getSelectQuery()方法是如何構建出來的(見下面代碼),構建新的查詢語句的時候會添加一個lowerClause,這個是動態拼接出來的查詢下界:( id > 458735 )
至於查詢上界 ( id < 464562 ) 是寫死的,這個map執行根據split出來最大上限。
我們最關心的是這個動態生成的查詢下界是從哪里獲取的

SQLServerDBRecordReader.java  
  protected String getSelectQuery() {
    // Last seen record key is only expected to be unavailable if no reads
    // ever happened
    String selectQuery;
    if (lastRecordKey == null) {
      selectQuery = super.getSelectQuery();
    } else {
      // If last record key is available, construct the select query to start
      // from
      DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
          (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
      StringBuilder lowerClause = new StringBuilder();
      lowerClause.append(getDBConf().getInputOrderBy());
      lowerClause.append(" > ");
      lowerClause.append(lastRecordKey.toString());

      // Get the select query with the lowerClause, and split upper clause
      selectQuery = getSelectQuery(lowerClause.toString(),
          dataSplit.getUpperClause());
    }

    return selectQuery;
  }

查詢下界的值是lastRecordKey,map是當前故障發生前處理的最后一條成功數據的splitColumn所對應的值(--split-by id)。此處代碼如下

SQLServerDBRecordReader.java  
  public T getCurrentValue() {
    T val = super.getCurrentValue();
    // Lookup the key of the last read record to use for recovering
    // As documented, the map may not be null, though it may be empty.
    Object lastRecordSplitCol = val.getFieldMap().get(splitColumn);
    lastRecordKey = (lastRecordSplitCol == null) ? null
        : lastRecordSplitCol.toString();
    return val;
  }
    
QueryResult.java
  public Map<String, Object> getFieldMap() {
    Map<String, Object> __sqoop$field_map = new TreeMap<String, Object>();
    __sqoop$field_map.put("id", this.id);
    ...
 }

因為我們使用的id 不是主鍵(主鍵為hotel_id),且不保證有序,所以sqoop利用游標查詢數據庫時返回數據可能會出現如下情況

id hotel_name
1 A酒店
3 B酒店
4 C酒店
5 D酒店
6 E酒店
7 F酒店
8 G酒店
2 H酒店
9 I酒店
10 J酒店
11 K酒店

假設當我們MapReduce到id為2的酒店時候並且將該酒店數據刷到hdfs上時,數據庫連接出現異常導致查詢中斷。此時我們實際已經將如下酒店數據搬運完畢

id hotel_name
1 A酒店
3 B酒店
4 C酒店
5 D酒店
6 E酒店
7 F酒店
8 G酒店
2 H酒店

然后sqoop開始恢復任務,lastRecordKey獲取最后一個成功搬運的數據為H酒店(id為2),因為新的查詢語句為 select * from table where (id > 2) AND (id <= 11),本來只剩3個酒店數據未搬運,但此時重新恢復連接搬運的數據為:

id hotel_name
3 B酒店
4 C酒店
5 D酒店
6 E酒店
7 F酒店
8 G酒店
9 I酒店
10 J酒店
11 K酒店

有6個酒店數據記錄被重復搬運了!!!
第一次搬運的數據中,只要id還有大於lastRecordKey的都會被重復搬運。
還有更嚴重的情況,如果故障前處理的最后一條數據為id:8
則重新恢復連接搬運的數據范圍為 id > 8 & id <= 11
此時會跳過id:2的數據,造成數據丟失!!!

線上數據驗證

通過如下語句,下載map1應當搬運的全量數據456629條到csv中

select id, hotel_id from Hotel_Product.dbo.hotel(nolock) where id < 464562 

異常發生時候處理的最后一條數據為id: 458735
在異常發生前的csv數據內搜索ID大於458735數據,有829條重復數據。

結論

起因:本次問題起因由數據庫連接異常引起。
sqoop從sqlserver導數據配置時候--split-by 的字段必須為長整型順序唯一字段。而本次搬運涉及的表結構沒有很好符合該規范。從而使得連接異常時恢復鏈接后繼續搬運數據出現重復。

改進點

  1. 業務方后續新增的庫表時設計要按照數據庫規范來:表必須定義主鍵,默認為ID,長整型自增,且自增ID不能作為業務場景使用。
  2. sqoop搬運的sqlserver表時,--split-by 的字段指定長整型順序唯一的主鍵字段;
  3. 或者--split-by 的字段 指定一個有唯一性約束的字段,對於無序的問題通過語句中加ORDER BY 字段 解決無序問題;
  4. 修改sqoop代碼將sqlserver數據庫連接異常的報錯拋出來,不進行重試,讓sqoop的MR任務失敗,直接在調度層面讓sqoop任務重跑;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM