在自定義Flink1.10 Sql Sink遇到的問題


1.org.apache.flink.table.api.TableException: Table sink does not implement a table schema.

問題:在RedisTableSink中沒有重寫getTableSchema方法

解決:增加重寫getTableSchema

    @Override
    public TableSchema getTableSchema() {return tableSchema;}

2.org.apache.flink.table.api.TableException: Table sink does not implement a consumed data type.

問題:在RedisTableSink中沒有重寫getConsumedDataType方法

解決:增加重寫getConsumedDataType

    @Override
    public DataType getConsumedDataType() {return tableSchema.toRowDataType(); }  

3.org.apache.flink.api.common.InvalidProgramException: root
|-- pay_hour: STRING
|-- item_id: STRING
is not serializable. The object probably contains or references non serializable fields.

問題:在RedisTableSink的emitDataStream方法中將tableSchema傳到RedisSinkFunction方法中去,而TableSchema未實現Serializable,出現序列化的問題

解決:因為傳進去的tableSchema暫時沒有用到,所以去掉不傳,如有必要傳入需要自己擴展一個Schema 然后implements Serializable

4.org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, com.yunji.bigdata.connect.redis.RedisTableSink doesn't implement this method.

問題:使用了廢棄的emitDataStream方法,而且沒有重寫consumeDataStream,再本地單元測試跑是沒問題,on yarn跑出現以上錯誤

解決:將核心代碼寫在consumeDataStream,emitDataStream再調用consumeDataStream

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM