使用場景:
某些時候,當幾個topic生產者突發產生大量消息時,會造成磁盤空間緊張,這時,除了增加磁盤,另一個方法就是修改配置文件,將日志的保存時間修改小一點,但這兩種方案,都必須停機和重啟kafka,顯然,這在生產集群上,是不能這么處理的。這里,可以通過在線修改單個topic的配置,以覆蓋默認配置,臨時解決磁盤空間緊張的問題。
優點:在線修改,不需要重啟和停機
修改后,新的配置會在 log.retention.check.interval.ms 時間內被檢查並應用到整個集群,該值在 kafka/config/server.properties 中配置,默認為 300 秒
注意,修改前日志保存時長,必然會清除掉超過這個時長的舊數據,在生產環境中,這需要和業務方共同評估和確認
下面以修改名為 my_test_topic 的 topic 為例
#1,查看當前topic配置
./kafka-topics.sh --describe --topic my_test_topic --zookeeper test.myzk.com:2181/kafkacluster
#2,調整topic配置
./kafka-topics.sh --topic my_test_topic --zookeeper test.myzk.com:2181/kafkacluster --alter --config retention.ms=43200000
# 時長毫秒 43200000ms=12h
#3,檢查修改的配置是否生效
同第一步,查看輸出的第一行,類似如下:
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,delete.retention.ms=86400000,retention.ms=43200000,cleanup.policy=delete,compression.type=producer
其他可選的調整參數:
delete.retention.ms=86400000 #對於壓縮日志保留的最長時間,也是客戶端消費消息的最長時間,與retention.ms的區別在於一個控制 未壓縮數據,一個控制 壓縮后的數據
retention.ms=86400000 #如果使用“delete”的retention策略,這項配置就是指刪除日志前日志保存的時間
cleanup.policy=delete #默認方式 delete 將會丟棄舊的部分 compact 將會進行日志壓縮
compression.type=producer #壓縮類型,此配置接受標准壓縮編碼 gzip, snappy, lz4 ,另外接受 uncompressed 相當於不壓縮, producer 意味着壓縮類型由producer指定
./zookeeper-shell.sh test.myzk.com:2181/kafkacluster #查看zk中kafka集群信息
另外,需要注意的是:
kafka 0.10+ 之后的版本,有個 __consumer_offsets 的topic也是需要清理的,需要定期注意該topic占用空間情況
生產環境kafka內核優化參數
vm.min_free_kbytes=4194304 即4G 系統16C32G內存
cat /proc/sys/vm/min_free_kbytes
該值表示強制Linux VM最低保留多少空閑內存 單位Kbytes
當可用內存低於該參數時,系統開始回收cache內存,以釋放內存,直到可用內存大於該值
目的:讓系統更加積極的回收cache內存
vm.zone_reclaim_mode=1
cat /proc/sys/vm/zone_reclaim_mode
管理當一個內存區域zone內部的內存耗盡時,是從其內部進行內存回收還是可以從其他zone進行回收
0 關閉zone_reclaim模式,允許從其他zone或NUMA節點回收內存[默認]
1 打開zone_reclaim模式,這樣內存回收只會發生在本地節點內
2 在本地回收內存時,可以將cache中的臟數據寫回硬盤,以回收內存
4 可以用swap方式回收內存
目的:限制內存回收不跨zone
清空cache (可選)
echo 1 > /proc/sys/vm/drop_caches
生產集群參數參考
zookeeper
zoo.cfg 配置文件
1
2
3
4
5
6
7
8
9
10
11
|
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/server/zkdata
clientPort=2181
maxClientCnxns=600
autopurge.snapRetainCount=60
autopurge.purgeInterval=24
server.1=192.168.1.100:2888:3888
server.2=yyyy:2888:3888
server.3=zzzz:2888:3888
|
echo "1">/data/server/zkdata/myid
./zkServer.sh status
./zkCleanup.sh /data/server/zookeeper/data -n 100
./zkCleanup.sh 參數1 -n 參數2
參數1,zk data目錄,即zoo.cfg文件中dataDir值
參數2,保存最近的多少個快照
kafka
server.properties 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
broker.id=0
listeners=PLAINTEXT://192.168.1.100:9092
port=9092
host.name=192.168.1.100
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
log.dirs=/data/server/kafkadata
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.100:2181,yyyy:2181,zzzz:2181/kafkacluster
zookeeper.connection.timeout.ms=30000
default.replication.factor=3
delete.topic.enable=true
auto.create.topics.enable=true
|
在kafka啟動腳本中,需要添加JMX的支持,方便在kafkamanager中查看到更加豐富的數據
kafka-server-start.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#!/bin/bash
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export JMX_PORT="8999"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=192.168.1.100
-Djava.net.preferIPv4Stack=true"
fi
EXTRA_ARGS="-name kafkaServer -loggc"
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
|
使用supervisor管理zookeeper和kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[program:zookeeper]
directory=/data/server/zookeeper
command= sh bin/zkServer.sh start-foreground
numprocs=1
user=kafka
autostart=true
autorestart=true
stdout_logfile=/data/server/zookeeper/logs/zookeeper.log
redirect_stderr=true
priority=5
[program:kafka]
directory=/data/server/kafka
command=/bin/bash bin/kafka-server-start.sh config/server.properties
numprocs=1
user=kafka
autostart=true
autorestart=true
stdout_logfile=/data/server/kafka/logs/kafka.log
redirect_stderr=true
stdout_logfile_maxbytes=1GB
priority=6
|
kafka-manager
項目地址:https://github.com/yahoo/kafka-manager
讓一般用戶免密登錄,且只有查看權限:
修改conf/application.conf
1
2
3
4
5
6
|
application.features=[""]
#application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
...
basicAuthentication.enabled=false
#basicAuthentication.enabled=true
#basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED}
|
topic 操作
Delete Topic 刪除 topic
Reassign Partitions 平衡集群負載
Add Partitions 增加分區
Update Config Topic 配置信息更新
Manual Partition Assignments 手動為每個分區下的副本分配 broker
Generate Partition Assignments 系統自動為每個分區下的副本分配 broker
一般而言,手動調整、系統自動分配分區和添加分區之后,都需要調用 Reassign Partition
1
2
3
4
5
6
7
8
9
10
11
|
[program:kafka-manager]
directory=/data/server/kafka-manager
command=sh bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000
numprocs=1
user=kafka
autostart=true
autorestart=true
stdout_logfile=/data/server/kafka-manager/logs/kafka-manager.log
redirect_stderr=true
stdout_logfile_maxbytes=1GB
priority=6
|
轉載請注明:輕風博客 » Kafka在線修改topic日志保存時長(不停機,不重啟)