Spring Kafka(7)KafkaListener定時啟動(禁止自啟動)


定時啟動的意義何在

如果只學習技術不討論其應用范圍那就是在耍流氓啊,為了不做那個流氓,我還是犧牲一下色相吧$_$

在這里我舉一個定時啟動的應用場景:

比如現在單機環境下,我們需要利用Kafka做數據持久化的功能,由於用戶活躍的時間為早上10點至晚上12點,

那在這個時間段做一個大數據量的持久化可能會影響數據庫性能導致用戶體驗降低,

我們可以選擇在用戶活躍度低的時間段去做持久化的操作,也就是晚上12點后到第二條的早上10點前。

使用KafkaListenerEndpointRegistry

這里需要提及一下,@KafkaListener這個注解所標注的方法並沒有在IOC容器中注冊為Bean,

而是會被注冊在KafkaListenerEndpointRegistry中,KafkaListenerEndpointRegistry在SpringIOC中已經被注冊為Bean,

具體可以看一下該類的源碼,當然不是使用注解方式注冊啦...

public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { protected final Log logger = LogFactory.getLog(this.getClass()); private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap(); private int phase = 2147483547; private ConfigurableApplicationContext applicationContext; private boolean contextRefreshed; ...... }
View Code

那我們怎么讓KafkaListener定時啟動呢?

禁止KafkaListener自啟動(AutoStartup)

編寫兩個定時任務,一個晚上12點,一個早上10點

分別在12點的任務上啟動KafkaListener,在10點的任務上關閉KafkaListener

這里需要注意一下啟動監聽容器的方法,項目啟動的時候監聽容器是未啟動狀態,

而resume是恢復的意思不是啟動的意思,所以我們需要判斷容器是否運行,如果運行則調用resume方法,否則調用start方法

@Component @EnableScheduling public class TaskListener{ private static final Logger log= LoggerFactory.getLogger(TaskListener.class); @Autowired private KafkaListenerEndpointRegistry registry; @Autowired private ConsumerFactory consumerFactory; @Bean public ConcurrentKafkaListenerContainerFactory delayContainerFactory() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止自動啟動
        container.setAutoStartup(false); return container; } @KafkaListener(id = "durable", topics = "topic.quick.durable",containerFactory = "delayContainerFactory") public void durableListener(String data) { //這里做數據持久化的操作
        log.info("topic.quick.durable receive : " + data); } //定時器,每天凌晨0點開啟監聽
    @Scheduled(cron = "0 0 0 * * ?") public void startListener() { log.info("開啟監聽"); //判斷監聽容器是否啟動,未啟動則將其啟動
        if (!registry.getListenerContainer("durable").isRunning()) { registry.getListenerContainer("durable").start(); } registry.getListenerContainer("durable").resume(); } //定時器,每天早上10點關閉監聽
    @Scheduled(cron = "0 0 10 * * ?") public void shutDownListener() { log.info("關閉監聽"); registry.getListenerContainer("durable").pause(); } }
View Code

原本不想寫測試的,奈何擔心有人寄刀片

修改修改一下定時器注解,修改為距離現在時間較近的時間點,然后寫入些數據,啟動SpringBoot項目,靜靜的等待時間的到來

 //這個代表16:24執行
    @Scheduled(cron = "0 24 16 * * ?") @Test public void testTask() { for (int i = 0; i < 10; i++) { kafkaTemplate.send("topic.quick.durable", "this is durable message"); } }
View Code

這里可以看到在16:24的時候啟動了監聽容器,監聽容器也成功從Topic中獲取到了數據,

等到16:28的時候容器被暫停了,這個時候可以運行一下測試方法,看看監聽容器是否還能獲取數據,答案肯定是不行的鴨。

2018-09-12 16:24:00.003  INFO 2872 --- [pool-1-thread-1] com.viu.kafka.listen.TaskListener : 開啟監聽 2018-09-12 16:24:00.004  INFO 2872 --- [pool-1-thread-1] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 1000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = durable heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 15000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2018-09-12 16:24:00.007  INFO 2872 --- [pool-1-thread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
2018-09-12 16:24:00.007  INFO 2872 --- [pool-1-thread-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 2a121f7b1d402825 2018-09-12 16:24:00.007  INFO 2872 --- [pool-1-thread-1] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 2018-09-12 16:24:00.012  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=durable] Discovered group coordinator admin-PC:9092 (id: 2147483647 rack: null) 2018-09-12 16:24:00.013  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=durable] Revoking previously assigned partitions [] 2018-09-12 16:24:00.014  INFO 2872 --- [  durable-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [] 2018-09-12 16:24:00.014  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=durable] (Re-)joining group 2018-09-12 16:24:00.021  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=durable] Successfully joined group with generation 6
2018-09-12 16:24:00.021  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=durable] Setting newly assigned partitions [topic.quick.durable-0] 2018-09-12 16:24:00.024  INFO 2872 --- [  durable-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic.quick.durable-0] 2018-09-12 16:24:00.042  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message 2018-09-12 16:28:00.023  INFO 2872 --- [pool-1-thread-1] com.viu.kafka.listen.TaskListener        : 關閉監聽
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM