出錯在 KStreamFlatMapValues 方法執行時,由於json異常數據無法解析,結果生成的值為null,報錯信息如下:
2018-04-18 19:21:04,776 ERROR [app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1] com.gw.stream.KStream103.lambda$main$1(100) | 捕獲到異常:hello world hello world king
Exception in thread "app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1" java.lang.NullPointerException
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
問題解決方案:
-
對json解析的bean添加未知字段忽略
import java.util.List; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @JsonIgnoreProperties(ignoreUnknown = true) public class Bean103 { private List<String> key1; private List<List<String>> key2; public void setKey1(List<String> key1) { this.key1 = key1; } public List<String> getKey1() { return key1; } public void setKey2(List<List<String>> key2) { this.key2 = key2; } public List<List<String>> getKey2() { return key2; } }
-
由於報空指針錯誤,所以解決空指針問題,即判斷為null時創建一個空對象.
return list == null ? new ArrayList<String>():list;
-
完整的示例代碼如下:
package com.gw.stream; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.apache.log4j.Logger; import com.alibaba.fastjson.JSONObject; public class KStream103 { private static Logger log = Logger.getLogger(KStream103.class); public static void main(String[] args) { if(args.length < 6){ log.error("錯誤:參數個數不正確[application_id bootstarp_server groupid source_topic target_topic auto_offset_reset]"); return ; } String application_id=args[0]; String bootstarp_server = args[1]; String groupid = args[2]; String source_topic = args[3]; String target_topic = args[4]; String auto_offset_reset = args[5]; Properties props = new Properties(); // consumer group // 指定一個應用ID,會在指定的目錄下創建文件夾,里面存放.lock文件 props.put(StreamsConfig.APPLICATION_ID_CONFIG, application_id); props.put(StreamsConfig.STATE_DIR_CONFIG, "./tmp/"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,bootstarp_server); // props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自動提交 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid); //針對時間異常解決方法 props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class); final String splitChar = "\001"; StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream(source_topic); // 接收第一個topic textLines.flatMapValues(value -> { Bean103 bean103 = null; List<String> list = null; try { //這里是value的業務處理邏輯...最終返回的是一個list } catch (Exception e) { log.error("捕獲到異常:" + value); log.error("error message:" + e.getMessage()); } return list == null ? new ArrayList<String>():list; }).filter((k,v)-> v !=null).map((k, v) -> new KeyValue<>(k, v)) .to(target_topic, Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }