kafka刪除topic數據


一、概述

生產環境中,有一個topic的數據量非常大。這些數據不是非常重要,需要定期清理。

要求:默認保持24小時,某些topic 需要保留2小時或者6小時

 

二、清除方式

主要有3個:

1. 基於時間

2. 基於日志大小

3. 基於日志起始偏移量

 

詳情,請參考鏈接:

https://blog.csdn.net/u013256816/article/details/80418297

 

接下來,主要介紹基於時間的清除!

kafka版本為:  2.11-1.1.0

zk版本為:  3.4.13

 

三、kafka配置

# 啟用刪除主題
delete.topic.enable=true
# 檢查日志段文件的間隔時間,以確定是否文件屬性是否到達刪除要求。
log.retention.check.interval.ms=1000

 

注意:這2行配置必須存在,否則清除策略失效!

log.retention.check.interval.ms 參數的單位是毫秒,這里表示間隔1秒鍾

 

四、清除策略

全局topic

在 server.properties 文件中配置的是全局策略,針對每一個topic

比如:

log.retention.hours=3

表示保留3個小時

 

單個topic

針對單個topic策略,需要使用腳本kafka-configs.sh

此腳本不需要重啟kafka就會生效!

 

首先來查看一下,當前的topic策略,比如test

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

 

參數解釋:

--describe  詳細信息

--entity-type 實體類型

--entity-name 指定topic名

 

輸出:

Configs for topic 'test' are

 

這個表示為策略為空

 

刪除topic數據

如果需要刪除topic所有數據,使用命令

bin/kafka-topics.sh --delete --topic test --zookeeper zookeeper-1.default.svc.cluster.local:2181

 

這個命令,請謹慎執行!!!

 

如果想保留主題,只刪除主題現有數據(log)。可以通過修改數據保留時間實現

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000

 

執行輸出:

Completed Updating config for entity: topic 'test'.

 

注意:修改保留時間為10秒鍾,並不是10秒鍾就馬上刪掉。kafka是采用輪詢的方式,輪詢到這個topic時,刪除10秒鍾前的數據。

時間由server.properties里面的log.retention.check.interval.ms選項為主

 

假設說 log.retention.check.interval.ms 值為1分鍾,那么等待70秒,這個topic的數據就會自動被刪除!

 

再次查看topic策略

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

 

輸出:

Configs for topic 'test' are retention.ms=10000

 

發現目前的刪除策略為 retention.ms=10000

 

刪除策略

如果需要刪除上面的10秒策略,使用以下命令:

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms

 

輸出:

Completed Updating config for entity: topic 'test'.

 

再次查看topic策略

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

 

輸出:

Configs for topic 'test' are

 

發現策略為空,說明刪除成功了!

 

五、測試清除策略

測試思路

 

說明:

第一步,設置清除策略為保留10秒

第二步,進入生產者模式,輸入消息 a

第三步,等待5秒,再次進入生產者模式,輸入消息 b

第四部,進入消費者模式,看輸出的消息是a還是b

 

判斷標准:

在進行第三步時,a這條消息,應該已經被刪除了。所以在第15秒進入消費者模式時,應該輸出 b,這樣的話,策略才是成功的!

 

設置策略

topic 為test的數據保留10秒

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000

 

 

生產模式

進入生產模式,輸入a

bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test
> a

 

等待5秒后,再次進入生產模式,輸入b

bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test
> b

 

消費者模式

等待5秒后,進入 消費者模式

bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning

b

 

如果消費者輸出為b,表示策略成功!


備注:

如果生產環境中,正在不斷的進行生產和消費,執行kafka-configs.sh 腳本,是否會有影響呢?

答案是不會的,它是動態策略!

 

 

本文參考鏈接:

https://blog.csdn.net/forrest_ou/article/details/78999983

 

 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM