在做支付订单宽表的场景,需要关联的表比较多而且支付有可能要延迟很久,这种情况下不太适合使用Flink的表Join,想到的另外一种解决方案是消费多个Topic的数据,再根据订单号进行keyBy,再在逻辑中根据不同Topic处理,所以在接收到的消息中最好能够有topic字段,JSONKeyValueDeserializationSchema就完美的解决了这个问题。
def getKafkaConsumer(kafkaAddr: String, topicNames: util.ArrayList[String], groupId: String): FlinkKafkaConsumer[ObjectNode] = { val properties = getKafkaProperties(groupId, kafkaAddr) val consumer = new FlinkKafkaConsumer[ObjectNode](topicNames, new JSONKeyValueDeserializationSchema(true), properties) consumer.setStartFromGroupOffsets() // the default behaviour
consumer }
在这里new JSONKeyValueDeserializationSchema(true)是需要带上元数据信息,false则不带上,源码如下
public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); }
if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); }return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
本来以为到这里就大功告成了,谁不想居然报错了。。每条消息反序列化的都报错。
2019-11-29 19:55:15.401 flink [Source: kafkasource (1/1)] ERROR c.y.b.D.JSONKeyValueDeserializationSchema - Unrecognized token 'xxxxx': was expecting ('true', 'false' or 'null') at [Source: [B@2e119f0e; line: 1, column: 45]org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'xxxxxxx': was expecting ('true', 'false' or 'null') at [Source: [B@2e119f0e; line: 1, column: 45] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3464) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2628) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:854) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890) at com.xx.xx.DeserializationSchema.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:33) at com.xx.xx.DeserializationSchema.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:15) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745)
因为源码是没有try catch的,无法获取到报错的具体数据,只能直接重写这个方法了
新建一个DeserializationSchema包,再创建JSONKeyValueDeserializationSchema类,然后在getKafkaConsumer重新引用我们自己的JSONKeyValueDeserializationSchema类,再在日志中我们就可以知道是哪些数据无法反序列化
@PublicEvolving public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); try { if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); } if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); } } catch (Exception e) { log.error(e.getMessage(), e); log.error("JSONKeyValueDeserializationSchema 出错:" + record.toString() + "=====key为" + new String(record.key()) + "=====数据为" + new String(record.value())); } return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
发现key为一串订单号,因为topic数据不是原生canal json数据,是被加工过的,那应该是上游生产的时候指定的key
那继续修改我们的JSONKeyValueDeserializationSchema代码,因为key用不到,所以直接注释掉,当然也可以将class指定为String
if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); }
try catch在这里我们还是保留并将出错的数据打到日志,修改后的代码如下
@PublicEvolving public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); try { // if (record.key() != null) { // node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); // }
if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); } } catch (Exception e) { log.error(e.getMessage(), e); log.error("JSONKeyValueDeserializationSchema 出错:" + record.toString() + "=====key为" + new String(record.key()) + "=====数据为" + new String(record.value())); } return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
至此,问题解决。