Hudi-Flink消費kafka將增量數據實時寫入Hudi(java)


零、步驟

 

 

一、Flink SQL集成Kafka

 

1.創建topic(一分區一備份)

flink-topic
 

2.准備flink-sql-connector-kafka_2.12-1.13.1.jar,放入flink/lib下

 

3.啟動client,指定jar

./sql-client.sh embedded -j ../lib/flink-sql-connector-kafka_2.12-1.13.1.jar shell
設置分析結果展示模式為:set execution.result-mode=tableau;
 

4.創建表,映射到kafka topic

kafka topic中數據是CSV文件格式,有三個字段,user_id、item_id、behavior,從kafka消費數據時,設置從最新偏移量開始
CREATE TABLE test_kafka(
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
)
WITH(
    'connector' = 'kafka',
    'topic'='flink-topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group-10001',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'csv'
);
 
Flink SQL> select * from test_kafka;
+----+----------------------+----------------------+--------------------------------+
| op |              user_id |              item_id |                       behavior |
+----+----------------------+----------------------+————————————————+

 

5.kafka寫入數據

kafka-console-producer.sh --broker-list localhost:9092 —-topic flink-topic
1001,90001,click
1001,90001,browser
1001,90001,click
1002,90002,click
1002,90003,click
1003,90001,order
1004,90001,order
 
 
MacBook-Pro:bin FengZhen$ kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>1001,90001,click
1001,90001,browser
1001,90001,click
1002,90002,click
1002,90003,click
1003,90001,order
1004,90001,order
>>>>>>>
 
數據可實時查詢處理
Flink SQL> select * from test_kafka;
+----+----------------------+----------------------+--------------------------------+
| op |              user_id |              item_id |                       behavior |
+----+----------------------+----------------------+--------------------------------+
| +I |                 1001 |                90001 |                          click |
| +I |                 1001 |                90001 |                        browser |
| +I |                 1001 |                90001 |                          click |
| +I |                 1002 |                90002 |                          click |
| +I |                 1002 |                90003 |                          click |
| +I |                 1003 |                90001 |                          order |
| +I |                 1004 |                90001 |                          order |

 

二、代碼實現

package com.zhen.hudi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author FengZhen
 * @Date 3/9/22 10:17 PM
 * @Description 基於Flink SQL Connector實現:實時消費topic中數據,轉換處理后,實時存儲到hudi表中
 */
public class FlinkSQLHudiDemo {
    public static void main(String[] args) {

        //1.獲取表的執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        //並行度設置為1
        env.setParallelism(1);
        //TODO: 由於增量將數據寫入到Hudi表,所以需要啟動Flink Checkpoint 檢查點
        env.enableCheckpointing(5 * 1000);


        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()//設置流式模式
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);


        //2.創建輸入表,TODO:從kafka消費數據
        tableEnv.executeSql(
                "CREATE TABLE order_kafka_source(\n" +
                        "    `orderId` STRING,\n" +
                        "    `userId` STRING,\n" +
                        "    `orderTime` STRING,\n" +
                        "    `ip` STRING,\n" +
                        "    `orderMoney` DOUBLE,\n" +
                        "    `orderStatus` INT\n" +
                        ")\n" +
                        "WITH(\n" +
                        "    'connector' = 'kafka',\n" +
                        "    'topic'='order-topic',\n" +
                        "    'properties.bootstrap.servers' = 'localhost:9092',\n" +
                        "    'properties.group.id' = 'gid-1001',\n" +
                        "    'scan.startup.mode' = 'latest-offset',\n" +
                        "    'format' = 'json',\n" +
                        "    'json.fail-on-missing-field' = 'false',\n" +
                        "    'json.ignore-parse-errors' = 'true'\n" +
                        ")\n"
        );

        //3.轉換數據,可以使用SQL,也可以是TableAPI
        Table etlTable = tableEnv
                .from("order_kafka_source")
                //添加字段:hudi數據合並的字段,時間戳
                .addColumns(
                        $("orderId").substring(0,17).as("ts")
                )
                //添加字段:Hudi表分區字段,"orderTime": 2022-03-09 22:21:13.124
                .addColumns(
                        $("orderTime").substring(0, 10).as("partition_day")
                );


        tableEnv.createTemporaryView("view_order", etlTable);

        //4.創建輸出表,TODO:關聯到hudi表,指定hudi表名稱,存儲路徑,字段名稱等信息
        tableEnv.executeSql(
                "CREATE TABLE order_hudi_sink(\n" +
                        "    `orderId` STRING PRIMARY KEY NOT ENFORCED,\n" +
                        "    `userId` STRING,\n" +
                        "    `orderTime` STRING,\n" +
                        "    `ip` STRING,\n" +
                        "    `orderMoney` DOUBLE,\n" +
                        "    `orderStatus` INT,\n" +
                        "    `ts` STRING,\n" +
                        "    `partition_day` STRING\n" +
                        ")\n" +
                        "PARTITIONED BY (partition_day)\n" +
                        "WITH(\n" +
                        "    'connector' = 'hudi',\n" +
                        "    'path'='hdfs://localhost:9000/hudi-warehouse/flink_hudi_order',\n" +
                        "    'table.type' = 'MERGE_ON_READ',\n" +
                        "    'write.operation' = 'upsert',\n" +
                        "    'hoodie.datasource.write.recordkey.field' = 'orderId',\n" +
                        "    'write.precombine.field' = 'ts',\n" +
                        "    'write.tasks' = '1'\n" +
                        ")\n"
        );

        //5.通過子查詢的方式,將數據寫入輸出表
        tableEnv.executeSql(
                "INSERT INTO order_hudi_sink " +
                        "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM view_order"
        );

    }
}

kafka數據生成工具類

package com.zhen.hudi.streaming

import java.util.Properties

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json

import scala.util.Random

/**
  * 訂單實體類(Case Class)
  *
  * @param orderId     訂單ID
  * @param userId      用戶ID
  * @param orderTime   訂單日期時間
  * @param ip          下單IP地址
  * @param orderMoney  訂單金額
  * @param orderStatus 訂單狀態
  */
case class OrderRecord(
  orderId: String,
  userId: String,
  orderTime: String,
  ip: String,
  orderMoney: Double,
  orderStatus: Int
)

/**
  * @Author FengZhen
  * @Date 3/3/22 9:54 PM 
  * @Description TODO
  * 模擬生產訂單數據,發送到Kafka Topic中
  * Topic中每條數據Message類型為String,以JSON格式數據發送
  * 數據轉換:
  * 將Order類實例對象轉換為JSON格式字符串數據(可以使用json4s類庫)
  */
object MockOrderProducer {

  def main(args: Array[String]): Unit = {

    var producer: KafkaProducer[String, String] = null
    try {
      // 1. Kafka Client Producer 配置信息
      val props = new Properties()
      props.put("bootstrap.servers", "localhost:9092")
      props.put("acks", "1")
      props.put("retries", "3")

//      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("key.serializer", classOf[StringSerializer].getName)
      props.put("value.serializer", classOf[StringSerializer].getName)

      // 2. 創建KafkaProducer對象,傳入配置信息
      producer = new KafkaProducer[String, String](props)

      // 隨機數實例對象
      val random: Random = new Random()
      // 訂單狀態:訂單打開 0,訂單取消 1,訂單關閉 2,訂單完成 3
      val allStatus = Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)

      while (true) {
        // 每次循環 模擬產生的訂單數目
        val batchNumber: Int = random.nextInt(1) + 1
        (1 to batchNumber).foreach { number =>
          val currentTime: Long = System.currentTimeMillis()
          val orderId: String = s"${getDate(currentTime)}%06d".format(number)
          val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000))
          val orderTime: String = getDate(currentTime, format = "yyyy-MM-dd HH:mm:ss.SSS")
          val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100))
          val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
          // 3. 訂單記錄數據
          val orderRecord: OrderRecord = OrderRecord(
            orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
          )
          // 轉換為JSON格式數據
          val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
          println(orderJson)
          // 4. 構建ProducerRecord對象
          val record = new ProducerRecord[String, String]("order-topic", orderId, orderJson)
          // 5. 發送數據:def send(messages: KeyedMessage[K,V]*), 將數據發送到Topic
          producer.send(record)
        }
        Thread.sleep(random.nextInt(500) + 5000)
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (null != producer) producer.close()
    }
  }

  /** =================獲取當前時間================= */
  def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {
    val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
    val formatDate: String = fastFormat.format(time) // 格式化日期
    formatDate
  }

  /** ================= 獲取隨機IP地址 ================= */
  def getRandomIp: String = {
    // ip范圍
    val range: Array[(Int, Int)] = Array(
      (607649792, 608174079), //36.56.0.0-36.63.255.255
      (1038614528, 1039007743), //61.232.0.0-61.237.255.255
      (1783627776, 1784676351), //106.80.0.0-106.95.255.255
      (2035023872, 2035154943), //121.76.0.0-121.77.255.255
      (2078801920, 2079064063), //123.232.0.0-123.235.255.255
      (-1950089216, -1948778497), //139.196.0.0-139.215.255.255
      (-1425539072, -1425014785), //171.8.0.0-171.15.255.255
      (-1236271104, -1235419137), //182.80.0.0-182.92.255.255
      (-770113536, -768606209), //210.25.0.0-210.47.255.255
      (-569376768, -564133889) //222.16.0.0-222.95.255.255
    )
    // 隨機數:IP地址范圍下標
    val random = new Random()
    val index = random.nextInt(10)
    val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)

    // 轉換Int類型IP地址為IPv4格式
    number2IpString(ipNumber)
  }

  /** =================將Int類型IPv4地址轉換為字符串類型================= */
  def number2IpString(ip: Int): String = {
    val buffer: Array[Int] = new Array[Int](4)
    buffer(0) = (ip >> 24) & 0xff
    buffer(1) = (ip >> 16) & 0xff
    buffer(2) = (ip >> 8) & 0xff
    buffer(3) = ip & 0xff
    // 返回IPv4地址
    buffer.mkString(".")
  }

}
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM