在做支付訂單寬表的場景,需要關聯的表比較多而且支付有可能要延遲很久,這種情況下不太適合使用Flink的表Join,想到的另外一種解決方案是消費多個Topic的數據,再根據訂單號進行keyBy,再在邏輯中根據不同Topic處理,所以在接收到的消息中最好能夠有topic字段,JSONKeyValueDeserializationSchema就完美的解決了這個問題。
def getKafkaConsumer(kafkaAddr: String, topicNames: util.ArrayList[String], groupId: String): FlinkKafkaConsumer[ObjectNode] = { val properties = getKafkaProperties(groupId, kafkaAddr) val consumer = new FlinkKafkaConsumer[ObjectNode](topicNames, new JSONKeyValueDeserializationSchema(true), properties) consumer.setStartFromGroupOffsets() // the default behaviour
consumer }
在這里new JSONKeyValueDeserializationSchema(true)是需要帶上元數據信息,false則不帶上,源碼如下
public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); }
if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); }return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
本來以為到這里就大功告成了,誰不想居然報錯了。。每條消息反序列化的都報錯。
2019-11-29 19:55:15.401 flink [Source: kafkasource (1/1)] ERROR c.y.b.D.JSONKeyValueDeserializationSchema - Unrecognized token 'xxxxx': was expecting ('true', 'false' or 'null') at [Source: [B@2e119f0e; line: 1, column: 45]org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'xxxxxxx': was expecting ('true', 'false' or 'null') at [Source: [B@2e119f0e; line: 1, column: 45] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3464) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2628) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:854) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890) at com.xx.xx.DeserializationSchema.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:33) at com.xx.xx.DeserializationSchema.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:15) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745)
因為源碼是沒有try catch的,無法獲取到報錯的具體數據,只能直接重寫這個方法了
新建一個DeserializationSchema包,再創建JSONKeyValueDeserializationSchema類,然后在getKafkaConsumer重新引用我們自己的JSONKeyValueDeserializationSchema類,再在日志中我們就可以知道是哪些數據無法反序列化
@PublicEvolving public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); try { if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); } if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); } } catch (Exception e) { log.error(e.getMessage(), e); log.error("JSONKeyValueDeserializationSchema 出錯:" + record.toString() + "=====key為" + new String(record.key()) + "=====數據為" + new String(record.value())); } return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
發現key為一串訂單號,因為topic數據不是原生canal json數據,是被加工過的,那應該是上游生產的時候指定的key
那繼續修改我們的JSONKeyValueDeserializationSchema代碼,因為key用不到,所以直接注釋掉,當然也可以將class指定為String
if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); }
try catch在這里我們還是保留並將出錯的數據打到日志,修改后的代碼如下
@PublicEvolving public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); try { // if (record.key() != null) { // node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); // }
if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); } } catch (Exception e) { log.error(e.getMessage(), e); log.error("JSONKeyValueDeserializationSchema 出錯:" + record.toString() + "=====key為" + new String(record.key()) + "=====數據為" + new String(record.value())); } return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
至此,問題解決。