一、KafkaListener消費
/** * 手動提交監聽. * * @param record 消息記錄 * @param ack 確認實例 */ @Override @KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING, topics = {"${kafka.app.topic.warning}"}, containerFactory = "ackContainerFactory", groupId = "warning") public void ackListener(ConsumerRecord record, Acknowledgment ack) { if (LOG.isInfoEnabled()) { LOG.info("###################預警ackListener接收到消息###################"); } boolean ackFlag = true; long beginTime = System.currentTimeMillis(); try { WarningInfo warningInfo = parseConsumerRecord(record); if (null == warningInfo) { dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_1, new Object[]{record.topic(), record.value()})); } else { warningBusinessHandle.doHandle(record, warningInfo); } // } catch (BusinessException ex) { // LOG.error(record.topic() + "消費失敗:" + ex.getMessage(), ex); // // 業務處理失敗(目前暫無此場景),把消息發送至重試主題 // this.sendRetryTopic(record, this.interceptErrMessage(ex.getMessage())); } catch (Exception e) { LOG.error("[" + record.topic() + "]消費發生運行時異常:" + e.getMessage(), e); ackFlag = false; consumerListenerServiceImpl.stopListener(ConsumerConst.LISTENER_ID_WARNING); dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_2, new Object[]{record.topic()})); } finally { if (ackFlag) { // 手動提交offset ack.acknowledge(); } LOG.info("###################預警ackListener處理完消息,耗時" + (System.currentTimeMillis()-beginTime) + "ms ###################"); } }
二、使用KafkaListenerEndpointRegistry實現啟動和停止功能
下面參數里面的listenerId值,必須是消費時@KafkaListener注解中指定的id值:@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING
package com.macaupass.kafka.consumer.service.impl; import com.macaupass.kafka.consumer.service.KafkaConsumerListenerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.stereotype.Service; /** * Kafka消費監聽服務實現類. * * @author weixiong.cao * @date 2019/7/2 */ @Service public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService { /** * LOG. */ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerServiceImpl.class); /** * registry. */ @Autowired private KafkaListenerEndpointRegistry registry; /** * 開啟監聽. * * @param listenerId 監聽ID */ @Override public void startListener(String listenerId) { //判斷監聽容器是否啟動,未啟動則將其啟動 if (!registry.getListenerContainer(listenerId).isRunning()) { registry.getListenerContainer(listenerId).start(); } //項目啟動的時候監聽容器是未啟動狀態,而resume是恢復的意思不是啟動的意思 registry.getListenerContainer(listenerId).resume(); LOG.info(listenerId + "開啟監聽成功。"); } /** * 停止監聽. * * @param listenerId 監聽ID */ @Override public void stopListener(String listenerId) { registry.getListenerContainer(listenerId).stop(); LOG.info(listenerId + "停止監聽成功。"); } }
三、Controller
package com.macaupass.kafka.consumer.controller; import com.macaupass.kafka.consumer.service.impl.KafkaConsumerListenerServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import java.util.HashMap; import java.util.Map; /** * Kafka消費監聽Controller. * * @author weixiong.cao * @date 2019/7/2 */ @Controller @RequestMapping(value = "/listener") public class KafkaConsumerListenerController { /** * LOG. */ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerController.class); /** * 注入監聽服務. */ @Autowired private KafkaConsumerListenerServiceImpl kafkaConsumerListenerService; /** * 開啟監聽. * * @param listenerId 監聽ID */ @RequestMapping("/start") @ResponseBody public Map<String, String> startListener(@RequestParam(required=false) String listenerId) { if (LOG.isInfoEnabled()) { LOG.info("開啟監聽...listenerId=" + listenerId); } Map<String, String> retMap = new HashMap<>(); try { kafkaConsumerListenerService.startListener(listenerId); retMap.put("respCode", "0000"); retMap.put("respMsg", "啟動成功。"); } catch (Exception e) { LOG.error(e.getMessage(), e); retMap.put("respCode", "0001"); retMap.put("respMsg", "啟動失敗:" + e.getMessage()); } return retMap; } /** * 停止監聽. * * @param listenerId 監聽ID */ @RequestMapping("/stop") @ResponseBody public Map<String, String> stopListener(@RequestParam(required=false) String listenerId) { if (LOG.isInfoEnabled()) { LOG.info("停止監聽...listenerId=" + listenerId); } Map<String, String> retMap = new HashMap<>(); try { kafkaConsumerListenerService.stopListener(listenerId); retMap.put("respCode", "0000"); retMap.put("respMsg", "停止成功。"); } catch (Exception e) { LOG.error(e.getMessage(), e); retMap.put("respCode", "0001"); retMap.put("respMsg", "停止失敗:" + e.getMessage()); } return retMap; } /** * 訪問入口. */ @RequestMapping("/index") public String index() { return "kafka/listener"; } }
四、JSP界面
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html> <head> <title>消費監聽管理</title> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <link rel="stylesheet" href="../css/common.css"> <script src="../jquery/jquery-3.3.1.min.js"></script> <script type="text/javascript"> var contextPath = "<%=request.getContextPath() %>"; /** * 開啟監聽. * * @param listenerId 監聽ID */ function startListener(listenerId) { ajaxPostByJson(contextPath + "/listener/start?listenerId=" + listenerId); } /** * 停止監聽. * * @param listenerId 監聽ID */ function stopListener(listenerId) { ajaxPostByJson(contextPath + "/listener/stop?listenerId=" + listenerId); } /** * ajax請求. * * @param url 請求url */ function ajaxPostByJson(url) { $.ajax({ type: "POST", url: url, dataType:"json", contentType : 'application/json;charset=utf-8', success: function(respData){ alert(respData.respMsg); }, error: function(res){ alert("系統異常:" + res.responseText); } }); } </script> </head> <body text=#000000 bgColor="#ffffff" leftMargin=0 topMargin=4> <div id="main"> <div id="head"> <dl class="alipay_link"> <a target="_blank" href=""><span> </span></a> </dl> <span class="title">Kafka消費手動管理</span> </div> <div class="cashier-nav"> </div> <form name=query method=post> <div id="body" style="clear:left"> <dl class="content"> <dd> <span class="new-btn-login-sp"> <button class="new-btn-login" type="button" style="text-align:center;" onclick="startListener('listenerIdWarning')">開啟【預警】消費</button> </span> <span class="new-btn-login-sp"> <button class="new-btn-login" type="button" style="text-align:center;" onclick="stopListener('listenerIdWarning')">停止【預警】消費</button> </span> </dd> </dl> </div> </form> </div> </body> </html>
五、功能界面