一、KafkaListener消費
/**
* 手動提交監聽.
*
* @param record 消息記錄
* @param ack 確認實例
*/
@Override
@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING, topics = {"${kafka.app.topic.warning}"}, containerFactory = "ackContainerFactory", groupId = "warning")
public void ackListener(ConsumerRecord record, Acknowledgment ack) {
if (LOG.isInfoEnabled()) {
LOG.info("###################預警ackListener接收到消息###################");
}
boolean ackFlag = true;
long beginTime = System.currentTimeMillis();
try {
WarningInfo warningInfo = parseConsumerRecord(record);
if (null == warningInfo) {
dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_1, new Object[]{record.topic(), record.value()}));
} else {
warningBusinessHandle.doHandle(record, warningInfo);
}
// } catch (BusinessException ex) {
// LOG.error(record.topic() + "消費失敗:" + ex.getMessage(), ex);
// // 業務處理失敗(目前暫無此場景),把消息發送至重試主題
// this.sendRetryTopic(record, this.interceptErrMessage(ex.getMessage()));
} catch (Exception e) {
LOG.error("[" + record.topic() + "]消費發生運行時異常:" + e.getMessage(), e);
ackFlag = false;
consumerListenerServiceImpl.stopListener(ConsumerConst.LISTENER_ID_WARNING);
dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_2, new Object[]{record.topic()}));
} finally {
if (ackFlag) {
// 手動提交offset
ack.acknowledge();
}
LOG.info("###################預警ackListener處理完消息,耗時" + (System.currentTimeMillis()-beginTime) + "ms ###################");
}
}
二、使用KafkaListenerEndpointRegistry實現啟動和停止功能
下面參數里面的listenerId值,必須是消費時@KafkaListener注解中指定的id值:@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING
package com.macaupass.kafka.consumer.service.impl;
import com.macaupass.kafka.consumer.service.KafkaConsumerListenerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;
/**
* Kafka消費監聽服務實現類.
*
* @author weixiong.cao
* @date 2019/7/2
*/
@Service
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {
/**
* LOG.
*/
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerServiceImpl.class);
/**
* registry.
*/
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 開啟監聽.
*
* @param listenerId 監聽ID
*/
@Override
public void startListener(String listenerId) {
//判斷監聽容器是否啟動,未啟動則將其啟動
if (!registry.getListenerContainer(listenerId).isRunning()) {
registry.getListenerContainer(listenerId).start();
}
//項目啟動的時候監聽容器是未啟動狀態,而resume是恢復的意思不是啟動的意思
registry.getListenerContainer(listenerId).resume();
LOG.info(listenerId + "開啟監聽成功。");
}
/**
* 停止監聽.
*
* @param listenerId 監聽ID
*/
@Override
public void stopListener(String listenerId) {
registry.getListenerContainer(listenerId).stop();
LOG.info(listenerId + "停止監聽成功。");
}
}
三、Controller
package com.macaupass.kafka.consumer.controller;
import com.macaupass.kafka.consumer.service.impl.KafkaConsumerListenerServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka消費監聽Controller.
*
* @author weixiong.cao
* @date 2019/7/2
*/
@Controller
@RequestMapping(value = "/listener")
public class KafkaConsumerListenerController {
/**
* LOG.
*/
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerController.class);
/**
* 注入監聽服務.
*/
@Autowired
private KafkaConsumerListenerServiceImpl kafkaConsumerListenerService;
/**
* 開啟監聽.
*
* @param listenerId 監聽ID
*/
@RequestMapping("/start")
@ResponseBody
public Map<String, String> startListener(@RequestParam(required=false) String listenerId) {
if (LOG.isInfoEnabled()) {
LOG.info("開啟監聽...listenerId=" + listenerId);
}
Map<String, String> retMap = new HashMap<>();
try {
kafkaConsumerListenerService.startListener(listenerId);
retMap.put("respCode", "0000");
retMap.put("respMsg", "啟動成功。");
} catch (Exception e) {
LOG.error(e.getMessage(), e);
retMap.put("respCode", "0001");
retMap.put("respMsg", "啟動失敗:" + e.getMessage());
}
return retMap;
}
/**
* 停止監聽.
*
* @param listenerId 監聽ID
*/
@RequestMapping("/stop")
@ResponseBody
public Map<String, String> stopListener(@RequestParam(required=false) String listenerId) {
if (LOG.isInfoEnabled()) {
LOG.info("停止監聽...listenerId=" + listenerId);
}
Map<String, String> retMap = new HashMap<>();
try {
kafkaConsumerListenerService.stopListener(listenerId);
retMap.put("respCode", "0000");
retMap.put("respMsg", "停止成功。");
} catch (Exception e) {
LOG.error(e.getMessage(), e);
retMap.put("respCode", "0001");
retMap.put("respMsg", "停止失敗:" + e.getMessage());
}
return retMap;
}
/**
* 訪問入口.
*/
@RequestMapping("/index")
public String index() {
return "kafka/listener";
}
}
四、JSP界面
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html>
<head>
<title>消費監聽管理</title>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<link rel="stylesheet" href="../css/common.css">
<script src="../jquery/jquery-3.3.1.min.js"></script>
<script type="text/javascript">
var contextPath = "<%=request.getContextPath() %>";
/**
* 開啟監聽.
*
* @param listenerId 監聽ID
*/
function startListener(listenerId) {
ajaxPostByJson(contextPath + "/listener/start?listenerId=" + listenerId);
}
/**
* 停止監聽.
*
* @param listenerId 監聽ID
*/
function stopListener(listenerId) {
ajaxPostByJson(contextPath + "/listener/stop?listenerId=" + listenerId);
}
/**
* ajax請求.
*
* @param url 請求url
*/
function ajaxPostByJson(url) {
$.ajax({
type: "POST",
url: url,
dataType:"json",
contentType : 'application/json;charset=utf-8',
success: function(respData){
alert(respData.respMsg);
},
error: function(res){
alert("系統異常:" + res.responseText);
}
});
}
</script>
</head>
<body text=#000000 bgColor="#ffffff" leftMargin=0 topMargin=4>
<div id="main">
<div id="head">
<dl class="alipay_link">
<a target="_blank" href=""><span> </span></a>
</dl>
<span class="title">Kafka消費手動管理</span>
</div>
<div class="cashier-nav">
</div>
<form name=query method=post>
<div id="body" style="clear:left">
<dl class="content">
<dd>
<span class="new-btn-login-sp">
<button class="new-btn-login" type="button" style="text-align:center;" onclick="startListener('listenerIdWarning')">開啟【預警】消費</button>
</span>
<span class="new-btn-login-sp">
<button class="new-btn-login" type="button" style="text-align:center;" onclick="stopListener('listenerIdWarning')">停止【預警】消費</button>
</span>
</dd>
</dl>
</div>
</form>
</div>
</body>
</html>
五、功能界面
