近日,Hudi社區合並了 Flink 引擎的基礎實現(HUDI-1327),這意味着 Hudi 開始支持 Flink 引擎。
當前 Flink 版本的 Hudi 只支持讀取 Kafka 數據,sink到 COW 類型的 Hudi 表中,其他功能還在完善。
這里我們簡要介紹下如何從 Kafka 讀取數據 寫出到 Hudi表。
一、環境介紹
hadoop-2.7.2 kafka_2.11-2.1.0 flink-1.11.2 hudi-0.7.0
二、官方源碼編譯
官方地址:https://hudi.apache.org/ gihub地址:https://github.com/apache/hudi
2.1、 源碼下載
git clone https://github.com/apache/hudi.git && cd hudi
2.2、 源碼編譯
修改hudi/pom.xml
1> 切換release-0.7.0分支,修改pom.xml中hadoop對應集群版本
2> window環境下注釋掉<module>hudi-integ-test</module>和<module>packaging/hudi-integ-test-bundle</module>
3> flink-1.11.2對應parquet版本為1.11.1,可修改pom.xml到對應版本(若flink/lib下存在其他本版以該版本保持一致),本次測試不修改保留1.10.1版本
mvn clean package -DskipTests
三、創建測試用例
3.1、創建hudi-conf.properties配置文件
hoodie.datasource.write.recordkey.field=uuid hoodie.datasource.write.partitionpath.field=ts bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092 hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator hoodie.embed.timeline.server=false hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs:///tmpdir/hudi/test/config/flink/schema.avsc hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs:///tmpdir/hudi/test/config/flink/schema.avsc
3.2、創建schema.avsc文件
{ "type":"record", "name":"stock_ticks", "fields":[{ "name": "uuid", "type": "string" }, { "name": "ts", "type": "long" }, { "name": "symbol", "type": "string" },{ "name": "year", "type": "int" },{ "name": "month", "type": "int" },{ "name": "high", "type": "double" },{ "name": "low", "type": "double" },{ "name": "key", "type": "string" },{ "name": "close", "type": "double" }, { "name": "open", "type": "double" }, { "name": "day", "type":"string" } ]}
3.3、上傳到hdfs文件系統
sudo -u hdfs hadoop fs -mkdir -p /tmpdir/hudi/test/config/flink hadoop fs -put schema.avsc /tmpdir/hudi/test/config/flink/ hadoop fs -put hudi-conf.properties /tmpdir/hudi/test/config/flink/
3.4、創建kafka測試topic
#創建主題 /opt/apps/kafka/bin/kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka --replication-factor 2 --partitions 3 --topic mytest #查看主題 /opt/apps/kafka/bin/kafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka #生產者(測試) /opt/apps/kafka/bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic mytest
測試數據
{"close":0.27172637588467297,"day":"2","high":0.4493211149337879,"key":"840ef1","low":0.030714155934507215,"month":11,"open":0.7762668153935262,"symbol":"77c-40d6-8412-6859d4757727","ts":1608361070161,"uuid":"840ef1ff-b77c-40d6-8412-6859d4757727","year":120}
6、提交命令及參數介紹
/opt/flink-1.11.2/bin/flink run -c org.apache.hudi.HoodieFlinkStreamer -m yarn-cluster -d -yjm 2048 -ytm 3096 -ys 4 -ynm hudi_on_flink_test \ -p 1 -yD env.java.opts=" -XX:+TraceClassPaths -XX:+TraceClassLoading" hudi-flink-bundle_2.11-0.7.0.jar --kafka-topic mytest --kafka-group-id hudi_on_flink \ --kafka-bootstrap-servers hadoop01:9092,hadoop02:9092,hadoop03:9092 --table-type COPY_ON_WRITE --target-base-path hdfs:///tmpdir/hudi/test/data/hudi_on_flink \ --target-table hudi_on_flink --props hdfs:///tmpdir/hudi/test/config/flink/hudi-conf.properties --checkpoint-interval 3000 \ --flink-checkpoint-path hdfs:///flink/hudi/hudi_on_flink_cp
HoodieFlinkStreamer參數介紹
參數 | 描述 |
---|---|
--kafka-topic | 必選,kafka source主題 |
--kafka-group-id | 必選,kafka消費者組 |
--kafka-bootstrap-servers | 必選,kafka bootstrap.servers 如 node1:9092,node2:9092,node3:9092 |
--flink-checkpoint-path | 可選,flink checkpoint 路徑 |
--flink-block-retry-times | 可選,默認10,當最近的hoodie instant未完成時重試的次數 |
--flink-block-retry-interval | 可選,默認1,當最近的hoodie instant未完成時,兩次嘗試之間的秒數 |
--target-base-path | 必選,目標hoodie表的基本路徑(如果路徑不存在將被創建) |
--target-table | 必選,Hive中目標表的名稱 |
--table-type | 必選,表的類型。COPY_ON_WRITE 或 MERGE_ON_READ |
--props | 可選,屬性配置文件的路徑(local或hdfs)。有hoodie客戶端、schema提供者、鍵生成器和數據源的配置。 |
--hoodie-conf | 可選,可以在屬性文件中設置的任何配置(使用參數"--props")也可以使用此參數傳遞命令行。 |
--source-ordering-field | 可選,以決定如何打破輸入數據中具有相同鍵的記錄之間的聯系字段。默認值:'ts'保存記錄的unix時間戳 |
--payload-class | 可選,HoodieRecordPayload的子類,它在GenericRecord上工作。實現你自己的,如果你想做一些事情而不是覆蓋現有的值 |
--op | 可選,接受以下值之一:UPSERT(默認),INSERT(當輸入純粹是新數據/插入時使用,以提高速度) |
--filter-dupes | 可選,默認false。是否應該在插入/大容量插入之前刪除/過濾源中的重復記錄 |
--commit-on-errors | 可選,默認false。即使某些記錄寫入失敗也要提交 |
--checkpoint-interval | 可選,默認5000毫秒。Flink checkpoint間隔 |
四、發現問題
新搭建的測試環境未做過jar包的兼容性整合。job能正常啟動后,當消費消息write_process時會發生如下異常:
java.io.IOException: Could not perform checkpoint 6 for operator write_process (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:892) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:137) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6 for operator write_process (1/1). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879) ... 13 more Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20210125212048 at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62) at org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor.execute(FlinkUpsertCommitActionExecutor.java:47) at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.upsert(HoodieFlinkCopyOnWriteTable.java:66) at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.upsert(HoodieFlinkCopyOnWriteTable.java:58) at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:110) at org.apache.hudi.operator.KeyedWriteProcessFunction.snapshotState(KeyedWriteProcessFunction.java:121) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.hudi.operator.KeyedWriteProcessOperator.snapshotState(KeyedWriteProcessOperator.java:58) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) ... 23 more Caused by: java.lang.RuntimeException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121) at java.util.Iterator.forEachRemaining(Iterator.java:116) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.lambda$execute$0(BaseFlinkCommitActionExecutor.java:120) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:118) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:68) at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:55) ... 33 more Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:73) at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:38) at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119) ... 39 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143) at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:69) ... 41 more Caused by: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) ... 42 more Caused by: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport at org.apache.hudi.io.storage.HoodieFileWriterFactory.newParquetFileWriter(HoodieFileWriterFactory.java:59) at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:47) at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:85) at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:66) at org.apache.hudi.io.CreateHandleFactory.create(CreateHandleFactory.java:34) at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:83) at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:40) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
五、定位問題
5.1、查看編譯的jar包
發現存在該類,於是猜測與集群中有jar沖突
5.2、是否和集群中有hudi相關jar沖突(該類屬於hudi-common包)
通過 find /opt -name "*hudi*.jar" 查找。並未發現有相關的jar(曾切換到不同版本的hadoop集群測試結果一致異常)
結論:不是jar沖突引起。
5.3、打開classloadtrace查看類加載情況
添加 -yD env.java.opts="-XX:+TraceClassLoading -XX:+TraceClassPaths"參數,查看HoodieAvroWriteSupport 沒有被加載。於是開始定位異常代碼位置,猜測為靜態代碼中初始化該類時就已經異常導致,將 new AvroSchemaConverter().convert(schema)寫到外面來定位具體的缺失類

Caused by: java.lang.ClassNotFoundException: org.apache.parquet.schema.Type
mac
5.4、通過查看hudi-flink-bundle編譯包,除了parquet-avro,其他parquet相關的依賴並沒有被編譯進來。
到編譯的本地maven依賴倉庫找出以下parquet相關jar拷貝到flink/lib下嘗試解決

Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetInputFormat
at org.apache.parquet.HadoopReadOptions$Builder.<init>(HadoopReadOptions.java:87)
所以由parquet相關包的異常導致NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport
5.5、根據HadoopReadOptions類加載信息判斷
查看classloadtrace信息已加載HadoopReadOptions來確認parquet-hadoop包已經被加載,所以確定ParquetInputFormat類(來自parquet-hadoop)的異常由其他缺失造成,於是查看HadoopReadOptions類中引入的類及ParquetInputFormat繼承的父類等。最終發現ParquetInputFormat繼承的父類org.apache.hadoop.mapreduce.lib.input.FileInputFormat
(所屬hadoop-mapreduce-client-coreye-x.x.x.jar)並未被加載。
5.6、將hadoop-mapreduce-client-core-x.x.x.jar拷貝到flink/lib下解決
重新提交任務。若發現有如下異常,刪除hdfs表目錄下的文件重新提交(由於之前步驟的異常造成的損壞)。
java.io.IOException: Could not perform checkpoint 3 for operator instant_generator (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:892) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:137) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException: Last instant costs more than 10 second, stop task now at org.apache.hudi.operator.InstantGenerateOperator.doCheck(InstantGenerateOperator.java:199) at org.apache.hudi.operator.InstantGenerateOperator.prepareSnapshotPreBarrier(InstantGenerateOperator.java:119) at org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:266) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:249) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879) ... 13 more
5.7 解決問題
問題總結
上述問題基本是由於集群沒有相關jar包導致,根據上面的問題定位方法能解決大部分在遇到NoClassDefFoundError、ClassNotFoundException等異常問題。
https://www.jianshu.com/p/f2323689b206