測試用例代碼:
@RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class RiskCaseBaseDictionaryServiceTest { @Autowired private XXXXDictionaryService xxxxDictionaryService; @Test public void test() { log.info("-------test---------"); HttpMessageResult<?> ret =xxxxDictionaryService.getDictionaryInfo();
Assert.assertEquals(ret.getCode(), 200); } }
啟動后發現,遲遲不能進入測試方法。
原因:啟動測試單元,會先加載spring上下文,包括實現CommandLineRunner接口的類,加載完上下文后才會執行測試用例,而我的kafka消費是實現了CommandLineRunner接口,里面
的poll方法是會阻塞的,所以可以使用新線程去poll。如下:
原來:
@Slf4j @Component public class XxxxKafkaConsumer implements CommandLineRunner { @Value("${xxx.kafka.server}") private String servers; @Value("${xxx.kafka.pay.topic}") private String topic; @Value("${xxx.kafka.group}") private String GROUPID; @Autowired private XxxxService xxxService; @Override public void run(String... args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); try { for (;;) { System.out.println("==========kafkaConsumer============="); ConsumerRecords<String, String> msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { log.info("消費topic,消息={}", record.value()); JSONObject json = JSONObject.parseObject(record.value()); XxxxCreateDTO xxxCreateDTO = JSONObject.toJavaObject(json,xxxCreateDTO.class); if (Objects.isNull(xxxCreateDTO)) { log.error("【支付案件消費 從MQ獲取數據為空異常!"); } xxxService.create(xxxCreateDTO); } } else { Thread.sleep(1000); } } } catch (InterruptedException e) { log.error("消費異常={}", e.getMessage()); } finally { consumer.close(); } } }
使用新線程后:
@Slf4j @Component public class XxxConsumer implements CommandLineRunner { @Value("${xxx.kafka.server}") private String servers; @Value("${xxx.kafka.pay.topic}") private String topic; @Value("${xxx.kafka.group}") private String GROUPID; @Autowired private xxxService xxxService; @Override public void run(String... args) throws Exception { new Thread(){ public void run() { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); try { for (;;) { System.out.println("==========kafkaConsumer============="); ConsumerRecords<String, String> msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { log.info("消費topic,消息={}", record.value()); JSONObject json = JSONObject.parseObject(record.value()); XxxxCreateDTO xxxxCreateDTO = JSONObject.toJavaObject(json,xxxxCreateDTO.class); if (Objects.isNull(riskCaseBaseCreateDTO)) { log.error("【支付案件消費 從MQ獲取數據為空異常!"); } xxxxService.create(xxxxCreateDTO); } } else { Thread.sleep(1000); } } } catch (InterruptedException e) { log.error("消費異常={}", e.getMessage()); } finally { consumer.close(); } } }.start(); } }
開啟新線程,新線程阻塞不影響主線程,主線程初始化完后就可以執行測試單元