一、KafkaListener消费
/** * 手动提交监听. * * @param record 消息记录 * @param ack 确认实例 */ @Override @KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING, topics = {"${kafka.app.topic.warning}"}, containerFactory = "ackContainerFactory", groupId = "warning") public void ackListener(ConsumerRecord record, Acknowledgment ack) { if (LOG.isInfoEnabled()) { LOG.info("###################预警ackListener接收到消息###################"); } boolean ackFlag = true; long beginTime = System.currentTimeMillis(); try { WarningInfo warningInfo = parseConsumerRecord(record); if (null == warningInfo) { dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_1, new Object[]{record.topic(), record.value()})); } else { warningBusinessHandle.doHandle(record, warningInfo); } // } catch (BusinessException ex) { // LOG.error(record.topic() + "消费失败:" + ex.getMessage(), ex); // // 业务处理失败(目前暂无此场景),把消息发送至重试主题 // this.sendRetryTopic(record, this.interceptErrMessage(ex.getMessage())); } catch (Exception e) { LOG.error("[" + record.topic() + "]消费发生运行时异常:" + e.getMessage(), e); ackFlag = false; consumerListenerServiceImpl.stopListener(ConsumerConst.LISTENER_ID_WARNING); dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_2, new Object[]{record.topic()})); } finally { if (ackFlag) { // 手动提交offset ack.acknowledge(); } LOG.info("###################预警ackListener处理完消息,耗时" + (System.currentTimeMillis()-beginTime) + "ms ###################"); } }
二、使用KafkaListenerEndpointRegistry实现启动和停止功能
下面参数里面的listenerId值,必须是消费时@KafkaListener注解中指定的id值:@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING
package com.macaupass.kafka.consumer.service.impl; import com.macaupass.kafka.consumer.service.KafkaConsumerListenerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.stereotype.Service; /** * Kafka消费监听服务实现类. * * @author weixiong.cao * @date 2019/7/2 */ @Service public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService { /** * LOG. */ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerServiceImpl.class); /** * registry. */ @Autowired private KafkaListenerEndpointRegistry registry; /** * 开启监听. * * @param listenerId 监听ID */ @Override public void startListener(String listenerId) { //判断监听容器是否启动,未启动则将其启动 if (!registry.getListenerContainer(listenerId).isRunning()) { registry.getListenerContainer(listenerId).start(); } //项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思 registry.getListenerContainer(listenerId).resume(); LOG.info(listenerId + "开启监听成功。"); } /** * 停止监听. * * @param listenerId 监听ID */ @Override public void stopListener(String listenerId) { registry.getListenerContainer(listenerId).stop(); LOG.info(listenerId + "停止监听成功。"); } }
三、Controller
package com.macaupass.kafka.consumer.controller; import com.macaupass.kafka.consumer.service.impl.KafkaConsumerListenerServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import java.util.HashMap; import java.util.Map; /** * Kafka消费监听Controller. * * @author weixiong.cao * @date 2019/7/2 */ @Controller @RequestMapping(value = "/listener") public class KafkaConsumerListenerController { /** * LOG. */ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerController.class); /** * 注入监听服务. */ @Autowired private KafkaConsumerListenerServiceImpl kafkaConsumerListenerService; /** * 开启监听. * * @param listenerId 监听ID */ @RequestMapping("/start") @ResponseBody public Map<String, String> startListener(@RequestParam(required=false) String listenerId) { if (LOG.isInfoEnabled()) { LOG.info("开启监听...listenerId=" + listenerId); } Map<String, String> retMap = new HashMap<>(); try { kafkaConsumerListenerService.startListener(listenerId); retMap.put("respCode", "0000"); retMap.put("respMsg", "启动成功。"); } catch (Exception e) { LOG.error(e.getMessage(), e); retMap.put("respCode", "0001"); retMap.put("respMsg", "启动失败:" + e.getMessage()); } return retMap; } /** * 停止监听. * * @param listenerId 监听ID */ @RequestMapping("/stop") @ResponseBody public Map<String, String> stopListener(@RequestParam(required=false) String listenerId) { if (LOG.isInfoEnabled()) { LOG.info("停止监听...listenerId=" + listenerId); } Map<String, String> retMap = new HashMap<>(); try { kafkaConsumerListenerService.stopListener(listenerId); retMap.put("respCode", "0000"); retMap.put("respMsg", "停止成功。"); } catch (Exception e) { LOG.error(e.getMessage(), e); retMap.put("respCode", "0001"); retMap.put("respMsg", "停止失败:" + e.getMessage()); } return retMap; } /** * 访问入口. */ @RequestMapping("/index") public String index() { return "kafka/listener"; } }
四、JSP界面
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html> <head> <title>消费监听管理</title> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <link rel="stylesheet" href="../css/common.css"> <script src="../jquery/jquery-3.3.1.min.js"></script> <script type="text/javascript"> var contextPath = "<%=request.getContextPath() %>"; /** * 开启监听. * * @param listenerId 监听ID */ function startListener(listenerId) { ajaxPostByJson(contextPath + "/listener/start?listenerId=" + listenerId); } /** * 停止监听. * * @param listenerId 监听ID */ function stopListener(listenerId) { ajaxPostByJson(contextPath + "/listener/stop?listenerId=" + listenerId); } /** * ajax请求. * * @param url 请求url */ function ajaxPostByJson(url) { $.ajax({ type: "POST", url: url, dataType:"json", contentType : 'application/json;charset=utf-8', success: function(respData){ alert(respData.respMsg); }, error: function(res){ alert("系統異常:" + res.responseText); } }); } </script> </head> <body text=#000000 bgColor="#ffffff" leftMargin=0 topMargin=4> <div id="main"> <div id="head"> <dl class="alipay_link"> <a target="_blank" href=""><span> </span></a> </dl> <span class="title">Kafka消费手动管理</span> </div> <div class="cashier-nav"> </div> <form name=query method=post> <div id="body" style="clear:left"> <dl class="content"> <dd> <span class="new-btn-login-sp"> <button class="new-btn-login" type="button" style="text-align:center;" onclick="startListener('listenerIdWarning')">开启【预警】消费</button> </span> <span class="new-btn-login-sp"> <button class="new-btn-login" type="button" style="text-align:center;" onclick="stopListener('listenerIdWarning')">停止【预警】消费</button> </span> </dd> </dl> </div> </form> </div> </body> </html>
五、功能界面