零、步驟
一、Flink SQL集成Kafka

1.創建topic(一分區一備份)
flink-topic
2.准備flink-sql-connector-kafka_2.12-1.13.1.jar,放入flink/lib下
3.啟動client,指定jar
./sql-client.sh embedded -j ../lib/flink-sql-connector-kafka_2.12-1.13.1.jar shell
設置分析結果展示模式為:set execution.result-mode=tableau;
4.創建表,映射到kafka topic
kafka topic中數據是CSV文件格式,有三個字段,user_id、item_id、behavior,從kafka消費數據時,設置從最新偏移量開始
CREATE TABLE test_kafka( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH( 'connector' = 'kafka', 'topic'='flink-topic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'test-group-10001', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); Flink SQL> select * from test_kafka; +----+----------------------+----------------------+--------------------------------+ | op | user_id | item_id | behavior | +----+----------------------+----------------------+————————————————+
5.kafka寫入數據
kafka-console-producer.sh --broker-list localhost:9092 —-topic flink-topic 1001,90001,click 1001,90001,browser 1001,90001,click 1002,90002,click 1002,90003,click 1003,90001,order 1004,90001,order
MacBook-Pro:bin FengZhen$ kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic >1001,90001,click 1001,90001,browser 1001,90001,click 1002,90002,click 1002,90003,click 1003,90001,order 1004,90001,order >>>>>>> 數據可實時查詢處理 Flink SQL> select * from test_kafka; +----+----------------------+----------------------+--------------------------------+ | op | user_id | item_id | behavior | +----+----------------------+----------------------+--------------------------------+ | +I | 1001 | 90001 | click | | +I | 1001 | 90001 | browser | | +I | 1001 | 90001 | click | | +I | 1002 | 90002 | click | | +I | 1002 | 90003 | click | | +I | 1003 | 90001 | order | | +I | 1004 | 90001 | order |
二、代碼實現
package com.zhen.hudi; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; /** * @Author FengZhen * @Date 3/9/22 10:17 PM * @Description 基於Flink SQL Connector實現:實時消費topic中數據,轉換處理后,實時存儲到hudi表中 */ public class FlinkSQLHudiDemo { public static void main(String[] args) { //1.獲取表的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); //並行度設置為1 env.setParallelism(1); //TODO: 由於增量將數據寫入到Hudi表,所以需要啟動Flink Checkpoint 檢查點 env.enableCheckpointing(5 * 1000); EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode()//設置流式模式 .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); //2.創建輸入表,TODO:從kafka消費數據 tableEnv.executeSql( "CREATE TABLE order_kafka_source(\n" + " `orderId` STRING,\n" + " `userId` STRING,\n" + " `orderTime` STRING,\n" + " `ip` STRING,\n" + " `orderMoney` DOUBLE,\n" + " `orderStatus` INT\n" + ")\n" + "WITH(\n" + " 'connector' = 'kafka',\n" + " 'topic'='order-topic',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'gid-1001',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n" ); //3.轉換數據,可以使用SQL,也可以是TableAPI Table etlTable = tableEnv .from("order_kafka_source") //添加字段:hudi數據合並的字段,時間戳 .addColumns( $("orderId").substring(0,17).as("ts") ) //添加字段:Hudi表分區字段,"orderTime": 2022-03-09 22:21:13.124 .addColumns( $("orderTime").substring(0, 10).as("partition_day") ); tableEnv.createTemporaryView("view_order", etlTable); //4.創建輸出表,TODO:關聯到hudi表,指定hudi表名稱,存儲路徑,字段名稱等信息 tableEnv.executeSql( "CREATE TABLE order_hudi_sink(\n" + " `orderId` STRING PRIMARY KEY NOT ENFORCED,\n" + " `userId` STRING,\n" + " `orderTime` STRING,\n" + " `ip` STRING,\n" + " `orderMoney` DOUBLE,\n" + " `orderStatus` INT,\n" + " `ts` STRING,\n" + " `partition_day` STRING\n" + ")\n" + "PARTITIONED BY (partition_day)\n" + "WITH(\n" + " 'connector' = 'hudi',\n" + " 'path'='hdfs://localhost:9000/hudi-warehouse/flink_hudi_order',\n" + " 'table.type' = 'MERGE_ON_READ',\n" + " 'write.operation' = 'upsert',\n" + " 'hoodie.datasource.write.recordkey.field' = 'orderId',\n" + " 'write.precombine.field' = 'ts',\n" + " 'write.tasks' = '1'\n" + ")\n" ); //5.通過子查詢的方式,將數據寫入輸出表 tableEnv.executeSql( "INSERT INTO order_hudi_sink " + "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM view_order" ); } }
kafka數據生成工具類
package com.zhen.hudi.streaming import java.util.Properties import org.apache.commons.lang3.time.FastDateFormat import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.json4s.jackson.Json import scala.util.Random /** * 訂單實體類(Case Class) * * @param orderId 訂單ID * @param userId 用戶ID * @param orderTime 訂單日期時間 * @param ip 下單IP地址 * @param orderMoney 訂單金額 * @param orderStatus 訂單狀態 */ case class OrderRecord( orderId: String, userId: String, orderTime: String, ip: String, orderMoney: Double, orderStatus: Int ) /** * @Author FengZhen * @Date 3/3/22 9:54 PM * @Description TODO * 模擬生產訂單數據,發送到Kafka Topic中 * Topic中每條數據Message類型為String,以JSON格式數據發送 * 數據轉換: * 將Order類實例對象轉換為JSON格式字符串數據(可以使用json4s類庫) */ object MockOrderProducer { def main(args: Array[String]): Unit = { var producer: KafkaProducer[String, String] = null try { // 1. Kafka Client Producer 配置信息 val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("acks", "1") props.put("retries", "3") // props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") // props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("key.serializer", classOf[StringSerializer].getName) props.put("value.serializer", classOf[StringSerializer].getName) // 2. 創建KafkaProducer對象,傳入配置信息 producer = new KafkaProducer[String, String](props) // 隨機數實例對象 val random: Random = new Random() // 訂單狀態:訂單打開 0,訂單取消 1,訂單關閉 2,訂單完成 3 val allStatus = Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) while (true) { // 每次循環 模擬產生的訂單數目 val batchNumber: Int = random.nextInt(1) + 1 (1 to batchNumber).foreach { number => val currentTime: Long = System.currentTimeMillis() val orderId: String = s"${getDate(currentTime)}%06d".format(number) val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000)) val orderTime: String = getDate(currentTime, format = "yyyy-MM-dd HH:mm:ss.SSS") val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100)) val orderStatus: Int = allStatus(random.nextInt(allStatus.length)) // 3. 訂單記錄數據 val orderRecord: OrderRecord = OrderRecord( orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus ) // 轉換為JSON格式數據 val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord) println(orderJson) // 4. 構建ProducerRecord對象 val record = new ProducerRecord[String, String]("order-topic", orderId, orderJson) // 5. 發送數據:def send(messages: KeyedMessage[K,V]*), 將數據發送到Topic producer.send(record) } Thread.sleep(random.nextInt(500) + 5000) } } catch { case e: Exception => e.printStackTrace() } finally { if (null != producer) producer.close() } } /** =================獲取當前時間================= */ def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = { val fastFormat: FastDateFormat = FastDateFormat.getInstance(format) val formatDate: String = fastFormat.format(time) // 格式化日期 formatDate } /** ================= 獲取隨機IP地址 ================= */ def getRandomIp: String = { // ip范圍 val range: Array[(Int, Int)] = Array( (607649792, 608174079), //36.56.0.0-36.63.255.255 (1038614528, 1039007743), //61.232.0.0-61.237.255.255 (1783627776, 1784676351), //106.80.0.0-106.95.255.255 (2035023872, 2035154943), //121.76.0.0-121.77.255.255 (2078801920, 2079064063), //123.232.0.0-123.235.255.255 (-1950089216, -1948778497), //139.196.0.0-139.215.255.255 (-1425539072, -1425014785), //171.8.0.0-171.15.255.255 (-1236271104, -1235419137), //182.80.0.0-182.92.255.255 (-770113536, -768606209), //210.25.0.0-210.47.255.255 (-569376768, -564133889) //222.16.0.0-222.95.255.255 ) // 隨機數:IP地址范圍下標 val random = new Random() val index = random.nextInt(10) val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1) // 轉換Int類型IP地址為IPv4格式 number2IpString(ipNumber) } /** =================將Int類型IPv4地址轉換為字符串類型================= */ def number2IpString(ip: Int): String = { val buffer: Array[Int] = new Array[Int](4) buffer(0) = (ip >> 24) & 0xff buffer(1) = (ip >> 16) & 0xff buffer(2) = (ip >> 8) & 0xff buffer(3) = ip & 0xff // 返回IPv4地址 buffer.mkString(".") } }