Flink sql 流式去重


 

在大數據的處理過程中會出現很多匯總類指標的計算,比如計算當日的每個類目下的用戶的訂單信息,就需要按類目分組,對用戶做去重。Flink sql 提供了 “去重” 功能,可以在流模式的任務中做去重操作。

官網文檔 去重 

官網鏈接: [去重](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D)

** 注意 僅 Blink planner 支持去重。

去重是指對在列的集合內重復的行進行刪除,只保留第一行或最后一行數據。 在某些情況下,上游的 ETL 作業不能實現精確一次的端到端,這將可能導致在故障恢復 時,sink 中有重復的記錄。 由於重復的記錄將影響下游分析作業的正確性(例如,SUM、COUNT), 所以在進一步分析之前需要進行數據去重。

與 Top-N 查詢相似,Flink 使用 ROW_NUMBER() 去除重復的記錄。理論上來說,去重是一個特殊的 Top-N 查詢,其中 N 是 1 ,記錄則是以處理時間或事件事件進行排序的。

以下代碼展示了去重語句的語法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

參數說明:

ROW_NUMBER(): 從第一行開始,依次為每一行分配一個唯一且連續的號碼。
PARTITION BY col1[, col2...]: 指定分區的列,例如去重的鍵。
ORDER BY time_attr [asc|desc]: 指定排序的列。所指定的列必須為 時間屬性, 目前 Flink 支持 處理時間屬性 和 事件時間屬性 。升序( ASC )排列指只保留第一行,而降序排列( DESC )則指保留最后一行。
WHERE rownum = 1: Flink 需要 rownum = 1 以確定該查詢是否為去重查詢。
以下的例子描述了如何指定 SQL 查詢以在一個流計算表中進行去重操作。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 從外部數據源讀取 DataStream
val ds: DataStream[(String, String, String, Int)] = env.addSource(...)
// 注冊名為 “Orders” 的 DataStream
tableEnv.createTemporaryView("Orders", ds, $"order_id", $"user", $"product", $"number", $"proctime".proctime)

// 由於不應該出現兩個訂單有同一個order_id,所以根據 order_id 去除重復的行,並保留第一行
val result1 = tableEnv.sqlQuery(
    """
      |SELECT order_id, user, product, number
      |FROM (
      |   SELECT *,
      |       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
      |   FROM Orders)
      |WHERE row_num = 1
    """.stripMargin)

** 注以上內容來自官網

demo

默認模式和 mini-batch 模式下都支持去重,但是默認模型是基於全局的,mini-batch 是基於 mini-batch 配置的
默認模式 下,去重是基於全局的,要么只輸出第一條數據( asc 升序,第一條數據 rownum=1),要么全部輸出(desc 降序,依次來的每條數據 rownum 都是 1)
mini-batch 模式下,數據是基於批次生效

去重 sql 如下:

--- 去重查詢
-- kafka source
CREATE TABLE user_log (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior INT
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_behavior'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'group-offsets'
  ,'format' = 'json'
);

---sink table
CREATE TABLE user_log_sink (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior INT
  ,ts TIMESTAMP(3)
  ,num BIGINT
  ,primary key (user_id) not enforced
) WITH (
'connector' = 'upsert-kafka'
  ,'topic' = 'user_behavior_sink'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'key.format' = 'json'
  ,'key.json.ignore-parse-errors' = 'true'
  ,'value.format' = 'json'
  ,'value.json.fail-on-missing-field' = 'false'
  ,'value.fields-include' = 'ALL'
);

-- insert
insert into user_log_sink(user_id, item_id, category_id,behavior,ts,num)
SELECT user_id, item_id, category_id,behavior,ts,rownum
FROM (
   SELECT user_id, item_id, category_id,behavior,ts,
     ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY process_time desc) AS rownum -- desc use the latest one,
   FROM user_log)
WHERE rownum=1-- 只能使用 rownum=1,如果寫 rownum=2(或<10),每個分區只會輸出一條數據(小於是多條)rownum=2的,看起來基於全局去重了

mini-batch 配置參數

configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");

 

去重是基於參數 "table.exec.mini-batch.allow-latency" 和 "table.exec.mini-batch.size" 生效,滿足一個就算一個“批次”,每個批次每個 category_id 只輸出一次 (可以控制數據源的數據,只輸出一個用戶一個 category_id,測試這兩個參數)

 rownum=1 的問題

看了下源碼,去重部分結構是這樣的

大致分成兩部分,分別以類: DeduplicateFunctionBase 和 MiniBatchDeduplicateFunctionBase 為基類,分別實現了 ProcTimeDeduplicateKeepFirstRowFunction 、ProcTimeDeduplicateKeepLastRowFunction 、RowTimeDeduplicateFunction 和 ProcTimeMiniBatchDeduplicateKeepFirstRowFunction、ProcTimeMiniBatchDeduplicateKeepLastRowFunction、RowTimeMiniBatchDeduplicateFunction 兩個分支的數據
簡單的區別就是: DeduplicateFunctionBase 接收的數據是單條的,mini-batch 的接收的數據都是集合

RowTimeDeduplicateFunction

@Override
public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {
    deduplicateOnRowTime(
            state,
            input,
            out,
            generateUpdateBefore,
            generateInsert,
            rowtimeIndex,
            keepLastRow);
}

RowTimeMiniBatchDeduplicateFunction

@Override
public void finishBundle(Map<RowData, List<RowData>> buffer, Collector<RowData> out) throws Exception {
    for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) {
        RowData currentKey = entry.getKey();
        List<RowData> bufferedRows = entry.getValue();
        ctx.setCurrentKey(currentKey);
        miniBatchDeduplicateOnRowTime(
                state,
                bufferedRows,
                out,
                generateUpdateBefore,
                generateInsert,
                rowtimeIndex,
                keepLastRow);
    }
}

所以就不需要問為什么 rownum 只能等於 1 了。

新的問題又來了: 為什么沒看 mini-batch 的時候沒有輸出沒有去重呢?

答: PARTITION BY category_id ORDER BY process_time desc 在 desc 的情況下,每次進來的數據 process_time 都是大於之前的數據,新數據的 rownum=1,所以 每條數據都輸出了

完整代碼參見:https://github.com/springMoon/sqlSubmit 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM