Spring-Kafka —— KafkaListener手動啟動和停止


 

一、KafkaListener消費

    /**
     * 手動提交監聽.
     *
     * @param record 消息記錄
     * @param ack    確認實例
     */
    @Override
    @KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING, topics = {"${kafka.app.topic.warning}"}, containerFactory = "ackContainerFactory", groupId = "warning")
    public void ackListener(ConsumerRecord record, Acknowledgment ack) {
        if (LOG.isInfoEnabled()) {
            LOG.info("###################預警ackListener接收到消息###################");
        }

        boolean ackFlag = true;
        long beginTime = System.currentTimeMillis();
        try {
            WarningInfo warningInfo = parseConsumerRecord(record);
            if (null == warningInfo) {
                dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_1, new Object[]{record.topic(), record.value()}));
            } else {
                warningBusinessHandle.doHandle(record, warningInfo);
            }
//      } catch (BusinessException ex) {
//            LOG.error(record.topic() + "消費失敗:" + ex.getMessage(), ex);
//            // 業務處理失敗(目前暫無此場景),把消息發送至重試主題
//            this.sendRetryTopic(record, this.interceptErrMessage(ex.getMessage()));
        } catch (Exception e) {
            LOG.error("[" + record.topic() + "]消費發生運行時異常:" + e.getMessage(), e);
            ackFlag = false;
            consumerListenerServiceImpl.stopListener(ConsumerConst.LISTENER_ID_WARNING);
            dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_2, new Object[]{record.topic()}));
        } finally {
            if (ackFlag) {
                // 手動提交offset
                ack.acknowledge();
            }
            LOG.info("###################預警ackListener處理完消息,耗時" + (System.currentTimeMillis()-beginTime) + "ms ###################");
        }
    }

 

 

二、使用KafkaListenerEndpointRegistry實現啟動和停止功能

下面參數里面的listenerId值,必須是消費時@KafkaListener注解中指定的id值:@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING
package com.macaupass.kafka.consumer.service.impl;

import com.macaupass.kafka.consumer.service.KafkaConsumerListenerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;

/**
 * Kafka消費監聽服務實現類.
 *
 * @author weixiong.cao
 * @date 2019/7/2
 */
@Service
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {

    /**
     * LOG.
     */
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerServiceImpl.class);

    /**
     * registry.
     */
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 開啟監聽.
     *
     * @param listenerId 監聽ID
     */
    @Override
    public void startListener(String listenerId) {
        //判斷監聽容器是否啟動,未啟動則將其啟動
        if (!registry.getListenerContainer(listenerId).isRunning()) {
            registry.getListenerContainer(listenerId).start();
        }
        //項目啟動的時候監聽容器是未啟動狀態,而resume是恢復的意思不是啟動的意思
        registry.getListenerContainer(listenerId).resume();
        LOG.info(listenerId + "開啟監聽成功。");
    }

    /**
     * 停止監聽.
     *
     * @param listenerId 監聽ID
     */
    @Override
    public void stopListener(String listenerId) {
        registry.getListenerContainer(listenerId).stop();
        LOG.info(listenerId + "停止監聽成功。");
    }

}

 

三、Controller

package com.macaupass.kafka.consumer.controller;

import com.macaupass.kafka.consumer.service.impl.KafkaConsumerListenerServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka消費監聽Controller.
 *
 * @author weixiong.cao
 * @date 2019/7/2
 */
@Controller
@RequestMapping(value = "/listener")
public class KafkaConsumerListenerController {

    /**
     * LOG.
     */
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerController.class);

    /**
     * 注入監聽服務.
     */
    @Autowired
    private KafkaConsumerListenerServiceImpl kafkaConsumerListenerService;

    /**
     * 開啟監聽.
     *
     * @param listenerId 監聽ID
     */
    @RequestMapping("/start")
    @ResponseBody
    public Map<String, String> startListener(@RequestParam(required=false) String listenerId) {
        if (LOG.isInfoEnabled()) {
            LOG.info("開啟監聽...listenerId=" + listenerId);
        }

        Map<String, String> retMap = new HashMap<>();
        try {
            kafkaConsumerListenerService.startListener(listenerId);
            retMap.put("respCode", "0000");
            retMap.put("respMsg", "啟動成功。");
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            retMap.put("respCode", "0001");
            retMap.put("respMsg", "啟動失敗:" + e.getMessage());
        }
        return retMap;
    }

    /**
     * 停止監聽.
     *
     * @param listenerId 監聽ID
     */
    @RequestMapping("/stop")
    @ResponseBody
    public Map<String, String> stopListener(@RequestParam(required=false) String listenerId) {
        if (LOG.isInfoEnabled()) {
            LOG.info("停止監聽...listenerId=" + listenerId);
        }

        Map<String, String> retMap = new HashMap<>();
        try {
            kafkaConsumerListenerService.stopListener(listenerId);
            retMap.put("respCode", "0000");
            retMap.put("respMsg", "停止成功。");
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            retMap.put("respCode", "0001");
            retMap.put("respMsg", "停止失敗:" + e.getMessage());
        }
        return retMap;
    }

    /**
     * 訪問入口.
     */
    @RequestMapping("/index")
    public String index() {
        return "kafka/listener";
    }

}

 

四、JSP界面

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html>
<head>
    <title>消費監聽管理</title>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <link rel="stylesheet" href="../css/common.css">
    <script src="../jquery/jquery-3.3.1.min.js"></script>
    <script type="text/javascript">
        var contextPath = "<%=request.getContextPath() %>";

        /**
         * 開啟監聽.
         *
         * @param listenerId 監聽ID
         */
        function startListener(listenerId) {
            ajaxPostByJson(contextPath + "/listener/start?listenerId=" + listenerId);
        }

        /**
         * 停止監聽.
         *
         * @param listenerId 監聽ID
         */
        function stopListener(listenerId) {
            ajaxPostByJson(contextPath + "/listener/stop?listenerId=" + listenerId);
        }

        /**
         * ajax請求.
         *
         * @param url 請求url
         */
        function ajaxPostByJson(url) {
            $.ajax({
                type: "POST",
                url: url,
                dataType:"json",
                contentType : 'application/json;charset=utf-8',
                success: function(respData){
                    alert(respData.respMsg);
                },
                error: function(res){
                    alert("系統異常:" + res.responseText);
                }
            });
        }
    </script>
</head>
<body text=#000000 bgColor="#ffffff" leftMargin=0 topMargin=4>
<div id="main">
    <div id="head">
        <dl class="alipay_link">
            <a target="_blank" href=""><span>&nbsp;</span></a>
        </dl>
        <span class="title">Kafka消費手動管理</span>
    </div>
    <div class="cashier-nav">
    </div>
    <form name=query method=post>
        <div id="body" style="clear:left">
            <dl class="content">
                <dd>
                    <span class="new-btn-login-sp">
                            <button class="new-btn-login" type="button" style="text-align:center;" onclick="startListener('listenerIdWarning')">開啟【預警】消費</button>
                    </span>
                    <span class="new-btn-login-sp">
                            <button class="new-btn-login" type="button" style="text-align:center;" onclick="stopListener('listenerIdWarning')">停止【預警】消費</button>
                    </span>
                </dd>
            </dl>
        </div>
    </form>
</div>
</body>
</html>

 

五、功能界面

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM