kafka消息-----定時清理


項目中最近使用kafka需要定時清理消息,我們知道kafka有周期性清理消息機制,但是項目中往往因為數據量較大,需要手動控制分區已消費消息的清理。

此處使用的是反編譯程序,具體幾個方法已標出,

 個人清理想法:大致流程就是根據topic獲取所有的分區,然后根據遍歷每個分區的偏移量,然后存入map,存入時先判斷偏移量大小,保留最小該分區消費者最小偏移量的值,然后刪除該偏移量以前的數據,

下面的反碼程序實際上沒有做最小偏移量識別,直接用的分區最后一個最新的的偏移量,是通過創建一個消費者直接將其Offset  toEnd,然后刪掉該分區該offset之前的消息,具體的大家可根據自己要求自行修改

 

package com.atguigu.kafkaclean;

import org.springframework.stereotype.*;
import org.springframework.context.annotation.*;
import com.ankki.kafkaclient.mapper.*;
import javax.annotation.*;
import java.time.format.*;
import com.ankki.kafkaclient.utils.*;
import java.util.concurrent.*;
import org.apache.kafka.clients.admin.*;
import org.springframework.scheduling.annotation.*;
import com.ankki.kafkaclient.consumer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import com.ankki.kafkaclient.model.*;
import java.time.*;
import org.slf4j.*;
import java.util.*;
@Slf4j
@Component
@Configuration
@EnableScheduling
public class SaticScheduleTask
{
    private static final Logger log;
    private static final String SASL_VALUE = "yes";
    private static final String TOPIC;
    private static final String KAFKA_SERVER_ADDRR;
    @Resource
    private SysLogMapper sysLogMapper;
    
    @Scheduled(cron = "${cleanTime}")
    public void configureTasks() {
        log.info("\u6267\u884c\u5b9a\u65f6\u6e05\u7406\u4efb\u52a1\u65f6\u95f4: {}", (Object)LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        final Properties props = new Properties();
        if ("yes".equals(PropertyUtils.getProperty("openSasl"))) {
            KafkaUtils.jaasConfig(props);
        }
        ((Hashtable<String, String>)props).put("bootstrap.servers", SaticScheduleTask.KAFKA_SERVER_ADDRR);
        try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
            final String[] topics = SaticScheduleTask.TOPIC.split(",");
            final Map<TopicPartition, RecordsToDelete> recordsToDeleteMap = new HashMap<TopicPartition, RecordsToDelete>(16);
            DeleteRecordsResult deleteRecordsResult = null;
            for (final String topic : topics) {
                final Map<Integer, Long> partitionInfoMap = this.getPartitionsForTopic(topic);
                for (final Map.Entry<Integer, Long> entry2 : partitionInfoMap.entrySet()) {
                    final TopicPartition topicPartition = new TopicPartition(topic, (int)entry2.getKey());
                    final RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset((long)entry2.getValue());
                    recordsToDeleteMap.put(topicPartition, recordsToDelete);
                }
            }
            SaticScheduleTask.log.debug("\u6e05\u7406\u96c6\u5408\uff1a{}", (Object)recordsToDeleteMap.toString());
            deleteRecordsResult = adminClient.deleteRecords((Map)recordsToDeleteMap);
            final Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = (Map<TopicPartition, KafkaFuture<DeletedRecords>>)deleteRecordsResult.lowWatermarks();
            final Exception ex;
            Exception e;
            lowWatermarks.entrySet().forEach(entry -> {
                try {
                    SaticScheduleTask.log.info("\u6e05\u7406\u6570\u636e\u4fe1\u606f \u4e3b\u9898\uff1a{} \u5206\u533a\uff1a{} \u6700\u65b0\u504f\u79fb\u91cf\uff1a{} \u7684\u7ed3\u679c\uff1a{}", new Object[] { entry.getKey().topic(), entry.getKey().partition(), ((DeletedRecords)((KafkaFuture)entry.getValue()).get()).lowWatermark(), ((KafkaFuture)entry.getValue()).isDone() });
                }
                catch (InterruptedException | ExecutionException ex2) {
                    e = ex;
                    SaticScheduleTask.log.error("\u83b7\u53d6kafka\u6e05\u7406\u6807\u8bb0\u7ed3\u679c\u5f02\u5e38\uff1a", (Throwable)e);
                }
                return;
            });
            this.recordSystemLog("true");
            SaticScheduleTask.log.debug("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u6210\u529f");
        }
        catch (Exception e2) {
            this.recordSystemLog(null);
            SaticScheduleTask.log.error("kafka\u5b9a\u65f6\u6267\u884c\u6e05\u7406\u6807\u8bb0\u5f02\u5e38\uff1a", (Throwable)e2);
        }
    }
    
    private Consumer<Long, String> createConsumer() {
        final Consumer<Long, String> consumer = (Consumer<Long, String>)new KafkaConsumer(KafkaConsumerConfig.consumerConfigs());
        return consumer;
    }
    
    private Map<Integer, Long> getPartitionsForTopic(final String topic) {
        final Map<Integer, Long> partitionInfoMap = new HashMap<Integer, Long>(16);
        final Consumer<Long, String> consumer = this.createConsumer();
        final Collection<PartitionInfo> partitionInfos = (Collection<PartitionInfo>)consumer.partitionsFor(topic);
        final List<TopicPartition> tp = new ArrayList<TopicPartition>();
        final List<TopicPartition> list;
        final Consumer consumer2;
        final Map<Integer, Long> map;
        partitionInfos.forEach(str -> {
            list.add(new TopicPartition(topic, str.partition()));
            consumer2.assign((Collection)list);
            consumer2.seekToEnd((Collection)list);
            map.put(str.partition(), consumer2.position(new TopicPartition(topic, str.partition())));
            return;
        });
        return partitionInfoMap;
    }
    
    private void recordSystemLog(final String value) {
        final SysLog systemLog = new SysLog();
        systemLog.setLogTime(Instant.now().toEpochMilli());
        systemLog.setLogType((Byte)2);
        if ("true".equals(value)) {
            systemLog.setLogStatus((Byte)1);
            systemLog.setLogLevel((Byte)2);
            systemLog.setLogDesc("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u6210\u529f\uff01");
        }
        else {
            systemLog.setLogStatus((Byte)0);
            systemLog.setLogLevel((Byte)1);
            systemLog.setLogDesc("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u5931\u8d25\uff01");
        }
        this.sysLogMapper.insertSelective(systemLog);
    }
    
    static {
        log = LoggerFactory.getLogger((Class)SaticScheduleTask.class);
        TOPIC = PropertyUtils.getProperty("cleanTopics");
        KAFKA_SERVER_ADDRR = PropertyUtils.getProperty("bootstrapServers");
    }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM