FlinkSQL - Table API Connector 之 Upsert Kafka


1. 簡介

The Upsert Kafka connector allows for reading data from and writing data into Kafka topics in the upsert fashion.
Upsert Kafka 連接器允許以 upsert 方式從 Kafka 主題讀取數據並將數據寫入數據。

As a source, the upsert-kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an UPDATE of the last value for the same key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.
作為源,upsert-kafka 連接器生成一個變更日志流,其中每個數據記錄代表一個更新或刪除事件。更准確地說,數據記錄中的值被解釋為同一鍵的最后一個值的更新,如果有的話(如果相應的鍵還不存在,則更新將被視為插入)。使用表類比,更改日志流中的數據記錄被解釋為 UPSERT 或 INSERT/UPDATE,因為具有相同鍵的任何現有行都將被覆蓋。此外,空值以特殊方式解釋:具有空值的記錄表示“刪除”。

As a sink, the upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns, so the update/deletion messages on the same key will fall into the same partition.
作為接收器,upsert-kafka 連接器可以使用變更日志流。它將 INSERT/UPDATE_AFTER 數據作為普通的 Kafka 消息值寫入,並將 DELETE 數據作為具有空值的 Kafka 消息寫入(指示鍵的墓碑)。 Flink 將同一主鍵的變更value分配到同一分區來保證主鍵上的消息順序,所以同一個鍵上的更新/刪除消息會落入同一個分區。

2. 實踐過程

maven依賴

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

upsert-kafka作為維表,則只會將最新版本的維表數據與事實流關聯。

        // 把kafka中數據 映射成輸入維表 - 實時變更的維表
        tableEnv.executeSql(
                "CREATE TABLE dim_source (" +
                        "               id STRING," +
                        "               name STRING," +
                        "               update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
                        "               WATERMARK FOR update_time AS update_time, " +
                        "               PRIMARY KEY (id) NOT ENFORCED" +
                        ") WITH (" +
                        " 'connector' = 'upsert-kafka'," +
                        " 'topic' = 'flinksqldim'," +
                        " 'properties.bootstrap.servers' = 'ip:port'," +
                        " 'properties.group.id' = 'flinksqlDim'," +
                        " 'key.format' = 'json'," +
                        " 'value.format' = 'json')"
        );


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM