kafka 暫停消費


1、代碼實現

kafkaListener

需要指定id,例如這里是:full-part-id。

@KafkaListener(topics = "part-full-topic", id = "full-part-id", containerGroup = "full-part-group")
public void listenFullPart(ConsumerRecord<String, String> record) {
    Optional<String> recordOptional = Optional.fromNullable(record.value());
    if (recordOptional.isPresent()) {
        List<PartStockInfoVo> partStockInfoVos = JSONObject.parseArray(recordOptional.get(), PartStockInfoVo.class);
        esPartInfoClient.updateFullIndex(partStockInfoVos);
    }
}

消費開關

@RestController
public class KafkaManageController {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @RequestMapping("/stop")
    public void stop() {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
        listenerContainer.stop();
    }

    @RequestMapping("/start")
    public void start() {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
        listenerContainer.start();
    }
}

 

 

參考:

1、How can i stop consumers from consuming?

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM