項目中最近使用kafka需要定時清理消息,我們知道kafka有周期性清理消息機制,但是項目中往往因為數據量較大,需要手動控制分區已消費消息的清理。
此處使用的是反編譯程序,具體幾個方法已標出,
個人清理想法:大致流程就是根據topic獲取所有的分區,然后根據遍歷每個分區的偏移量,然后存入map,存入時先判斷偏移量大小,保留最小該分區消費者最小偏移量的值,然后刪除該偏移量以前的數據,
下面的反碼程序實際上沒有做最小偏移量識別,直接用的分區最后一個最新的的偏移量,是通過創建一個消費者直接將其Offset toEnd,然后刪掉該分區該offset之前的消息,具體的大家可根據自己要求自行修改
package com.atguigu.kafkaclean; import org.springframework.stereotype.*; import org.springframework.context.annotation.*; import com.ankki.kafkaclient.mapper.*; import javax.annotation.*; import java.time.format.*; import com.ankki.kafkaclient.utils.*; import java.util.concurrent.*; import org.apache.kafka.clients.admin.*; import org.springframework.scheduling.annotation.*; import com.ankki.kafkaclient.consumer.*; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.*; import com.ankki.kafkaclient.model.*; import java.time.*; import org.slf4j.*; import java.util.*; @Slf4j @Component @Configuration @EnableScheduling public class SaticScheduleTask { private static final Logger log; private static final String SASL_VALUE = "yes"; private static final String TOPIC; private static final String KAFKA_SERVER_ADDRR; @Resource private SysLogMapper sysLogMapper; @Scheduled(cron = "${cleanTime}") public void configureTasks() { log.info("\u6267\u884c\u5b9a\u65f6\u6e05\u7406\u4efb\u52a1\u65f6\u95f4: {}", (Object)LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); final Properties props = new Properties(); if ("yes".equals(PropertyUtils.getProperty("openSasl"))) { KafkaUtils.jaasConfig(props); } ((Hashtable<String, String>)props).put("bootstrap.servers", SaticScheduleTask.KAFKA_SERVER_ADDRR); try (final AdminClient adminClient = KafkaAdminClient.create(props)) { final String[] topics = SaticScheduleTask.TOPIC.split(","); final Map<TopicPartition, RecordsToDelete> recordsToDeleteMap = new HashMap<TopicPartition, RecordsToDelete>(16); DeleteRecordsResult deleteRecordsResult = null; for (final String topic : topics) { final Map<Integer, Long> partitionInfoMap = this.getPartitionsForTopic(topic); for (final Map.Entry<Integer, Long> entry2 : partitionInfoMap.entrySet()) { final TopicPartition topicPartition = new TopicPartition(topic, (int)entry2.getKey()); final RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset((long)entry2.getValue()); recordsToDeleteMap.put(topicPartition, recordsToDelete); } } SaticScheduleTask.log.debug("\u6e05\u7406\u96c6\u5408\uff1a{}", (Object)recordsToDeleteMap.toString()); deleteRecordsResult = adminClient.deleteRecords((Map)recordsToDeleteMap); final Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = (Map<TopicPartition, KafkaFuture<DeletedRecords>>)deleteRecordsResult.lowWatermarks(); final Exception ex; Exception e; lowWatermarks.entrySet().forEach(entry -> { try { SaticScheduleTask.log.info("\u6e05\u7406\u6570\u636e\u4fe1\u606f \u4e3b\u9898\uff1a{} \u5206\u533a\uff1a{} \u6700\u65b0\u504f\u79fb\u91cf\uff1a{} \u7684\u7ed3\u679c\uff1a{}", new Object[] { entry.getKey().topic(), entry.getKey().partition(), ((DeletedRecords)((KafkaFuture)entry.getValue()).get()).lowWatermark(), ((KafkaFuture)entry.getValue()).isDone() }); } catch (InterruptedException | ExecutionException ex2) { e = ex; SaticScheduleTask.log.error("\u83b7\u53d6kafka\u6e05\u7406\u6807\u8bb0\u7ed3\u679c\u5f02\u5e38\uff1a", (Throwable)e); } return; }); this.recordSystemLog("true"); SaticScheduleTask.log.debug("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u6210\u529f"); } catch (Exception e2) { this.recordSystemLog(null); SaticScheduleTask.log.error("kafka\u5b9a\u65f6\u6267\u884c\u6e05\u7406\u6807\u8bb0\u5f02\u5e38\uff1a", (Throwable)e2); } } private Consumer<Long, String> createConsumer() { final Consumer<Long, String> consumer = (Consumer<Long, String>)new KafkaConsumer(KafkaConsumerConfig.consumerConfigs()); return consumer; } private Map<Integer, Long> getPartitionsForTopic(final String topic) { final Map<Integer, Long> partitionInfoMap = new HashMap<Integer, Long>(16); final Consumer<Long, String> consumer = this.createConsumer(); final Collection<PartitionInfo> partitionInfos = (Collection<PartitionInfo>)consumer.partitionsFor(topic); final List<TopicPartition> tp = new ArrayList<TopicPartition>(); final List<TopicPartition> list; final Consumer consumer2; final Map<Integer, Long> map; partitionInfos.forEach(str -> { list.add(new TopicPartition(topic, str.partition())); consumer2.assign((Collection)list); consumer2.seekToEnd((Collection)list); map.put(str.partition(), consumer2.position(new TopicPartition(topic, str.partition()))); return; }); return partitionInfoMap; } private void recordSystemLog(final String value) { final SysLog systemLog = new SysLog(); systemLog.setLogTime(Instant.now().toEpochMilli()); systemLog.setLogType((Byte)2); if ("true".equals(value)) { systemLog.setLogStatus((Byte)1); systemLog.setLogLevel((Byte)2); systemLog.setLogDesc("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u6210\u529f\uff01"); } else { systemLog.setLogStatus((Byte)0); systemLog.setLogLevel((Byte)1); systemLog.setLogDesc("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u5931\u8d25\uff01"); } this.sysLogMapper.insertSelective(systemLog); } static { log = LoggerFactory.getLogger((Class)SaticScheduleTask.class); TOPIC = PropertyUtils.getProperty("cleanTopics"); KAFKA_SERVER_ADDRR = PropertyUtils.getProperty("bootstrapServers"); } }