Kafka運維大全來了!優化、監控、故障處理


Kafka運維大全來了!優化、監控、故障處理……

 

 

Kafka概念

 

Kafka是分布式發布-訂閱消息系統。它最初由LinkedIn公司開發,之后成為Apache項目的一部分。Kafka是一個分布式的、可划分的、冗余備份的、持久性的日志服務。它主要用於處理活躍的流式數據。分布式系統,易於向外擴展。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機即可擴展機器。

 

Kafka設計方案

 

消息持久化及其緩存

 

磁盤性能:在傳統的磁盤寫入很慢,因為它使用隨機寫入50k/s(6個7200轉的sata硬盤組成的raid-5),但是線性寫入速度有300ms/s的速度,所以Kafka利用線性寫入的方式。      

 

線性寫入:將數據調用操作系統文件接口寫到文件系統里面去這樣就緩存到操作系統的頁面緩存中,然后傳統意思來說將其flush一下到磁盤中,但是Kafka並沒有這樣,而是保存在頁面緩存中(相當於放在內存當中)並沒有進行flush操作,這樣他就會提供比較高的讀的性能,下次讀就從內核頁面緩存中讀數據,但是內存中存儲數量不是無限大的,所以我們配置參數(每當接收到N條信息或者每過M秒),進行一個flush操作,從而可以為系統硬件崩潰時“處於危險之中”的數據在量上加個上限。      

 

Kafka的緩存不是在內存中保存盡可能多的數據並在需要時將這些數刷新到文件系統,而是做完全相反的事情,將所有的數據立即寫入文件系統中的持久化的日志中,但不進行刷新數據的調用,實際這么做意味着數據被傳輸到os內核的頁面緩存中去了,隨后在根據配置刷新到硬盤。

 

Kafka安裝

 

安裝優化主要修改config目錄下的server.properties文件,需要修改的參數值主要有 broker.id、host.name、log.dirs。

 

brokerid是對Kafka集群各個節點的一個標識,比如xx.xxx.xx.1 當做節點一,則brokerid=1;xx.xxx.xx.2 當做節點二,則brokerid=2 ;host.name需要配置的是本機ip或者主機名映射。如下圖:

 

 

log.dirs是配置Kafka數據日志的本地磁盤。

 

 

生產集群中,我們還需要配置Kafka進程的啟動內存,通過配置kafka-server-start.sh,分配10g內存,5g初始化內存。如下圖: 

 

 

啟動Kafka集群並檢查zk路徑上Kafka節點是否全部上線。

 

 

Kafka優化

 

以下為實際生產集群Kafka優化配置項,標紅部分為權限控制配置,后續會有專門一章來描述。

 

下面兩個參數,如果在生產集群中寫死了無法批量修改配置。

 

broker.id=2

listeners=SASL_PLAINTEXT://hosip:9092

 

可以按如下配置,將自動生成brokeid,自動識別host.name。

 

#broker.id=2

listeners=SASL_PLAINTEXT://:9092      

zookeeper.connect=zkip1:2181,zkip2:2181,zkip3:2181/kafka

 

# Timeout in ms for connecting to zookeeper

delete.topic.enable=true

zookeeper.connection.timeout.ms=60000

zookeeper.session.timeout.ms=60000

controlled.shutdown.enable=true

#很重要

unclean.leader.election.enable=true

auto.create.topics.enable=false

#副本拉取線程數

num.replica.fetchers=4

auto.leader.rebalance.enable=true

leader.imbalance.per.broker.percentage=10

leader.imbalance.check.interval.seconds=3600

#副本拉取的最小大小1mb

replica.fetch.min.bytes=1

#副本拉取的最大大小20mb

replica.fetch.max.bytes=20971520

#多長時間拉取一次副本

replica.fetch.wait.max.ms=500

#超過多長時間副本退出isr

replica.socket.timeout.ms=60000

#replica.fetch.wait.max.ms=1000

#緩存大小

replica.socket.receive.buffer.bytes=131072

num.network.threads=7

num.io.threads=13

#每當producer寫入10000條消息時,刷數據到磁盤

log.flush.interval.messages=10000

#每間隔1秒鍾時間,刷數據到磁盤

log.flush.interval.ms=1000

socket.receive.buffer.bytes=1048576

socket.send.buffer.bytes=1048576

queued.max.requests=10000

sasl.enabled.mechanisms=PLAIN  

sasl.mechanism.inter.broker.protocol=PLAIN 

allow.everyone.if.no.acl.found=false

super.users=User:admin

authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer

security.inter.broker.protocol=SASL_PLAINTEXT

 

Kafka常用操作

 

啟動Kafka進程:

 

nohup kafka-server-start.sh /usr/local/kafka/config/server.properties >/dev/null 2>&1 &  注意

 

創建主題:

 

$KAFKA_HOME/bin/kafka-topics.sh --create --topic logstash-yarnnodelog --replication-factor 3 --partitions 9 --zookeeper zkip:2181/kafka

 

主題列表:

 

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper zkip:2181

 

啟動消費者進程:

 

$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper zkip:2181 --topic topic-test --from-beginning

kafka-console-consumer.sh --bootstrap-server brokerip:9092 --from-beginning --topic logstash --new-consumer --consumer.config=/opt/beh/core/kafka/config/consumer.properties

 

啟動生產者進程:

 

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list brokerip:9092 --topic topic-test

 

刪除主題:

 

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper zkip:2181 --delete --topic topic-test

 

描述主題:

 

$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper  zkip:2181/ --topic test20160807

 

Kafka權限控制

 

配置服務端權限控制屬性server.properties:

 

vi /opt/beh/core/kafka/config/server.properties

修改brokerid 

 

zookeeper.connect=zkip1:2181, zkip2:2181, zkip3:2181

 

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=6000

listeners=SASL_PLAINTEXT://:9092  

security.inter.broker.protocol=SASL_PLAINTEXT  

sasl.enabled.mechanisms=PLAIN  

sasl.mechanism.inter.broker.protocol=PLAIN  

auto.create.topics.enable=false

allow.everyone.if.no.acl.found=false

delete.topic.enable=true

super.users=User:admin

authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer

 

配置服務端權限控制用戶:

 

KafkaServer {

        org.apache.kafka.common.security.plain.PlainLoginModule required

        username="admin"

        password="admin"

        user_admin="admin"

        user_hadoop="hadoop"

        user_producer1="producer1_test"

        user_consumer1="consumer1_test"

        user_producer2="producer2_test"

        user_consumer2="consumer2_test";

        };

 

配置客戶端權限控制用戶:

 

vi kafka_client_consumer_jaas.conf

 

KafkaClient {

        org.apache.kafka.common.security.plain.PlainLoginModule required

        username="consumer1"

        password="consumer1_test";

        };

 

Vi kafka_client_producer_jaas.conf

 

KafkaClient {

        org.apache.kafka.common.security.plain.PlainLoginModule required

        username="producer1"

        password="producer1_test";

        };

 

配置生產及消費權限控制屬性producer.properties:

 

consumer.properties

echo security.protocol=SASL_PLAINTEXT >> producer.properties

echo sasl.mechanism=PLAIN >> producer.properties

echo security.protocol=SASL_PLAINTEXT >> consumer.properties

echo sasl.mechanism=PLAIN >> consumer.properties

 

vi producer.properties

 

security.protocol=SASL_PLAINTEXT  

sasl.mechanism=PLAIN

 

vi consumer.properties

 

security.protocol=SASL_PLAINTEXT  

sasl.mechanism=PLAIN

 

配置服務端啟動腳本:

 

/opt/beh/core/kafka/bin/

vi server-start 

 

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/beh/core/kafka/config/kafka_server_jaas.conf"

nohup kafka-server-start.sh /opt/beh/core/kafka/config/server.properties &

 

配置生產消費運行腳本:

 

vi kafka-console-producer.sh

 

 

if [  "x$KAFKA_OPTS" ]; then

 export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/beh/core/kafka/config/kafka_client_jaas.conf"

fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

    export KAFKA_HEAP_OPTS="-Xmx512M"

fi

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

 

 

 

vi kafka-console-consumer.sh

 

if [  "x$KAFKA_OPTS" ]; then

 export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/beh/core/kafka/config/kafka_client_jaas.conf"

fi

 

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

    export KAFKA_HEAP_OPTS="-Xmx512M"

fi

 

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

 

賦權命令

 

未賦予任何權限時:

 

 

測試命令:

 

啟動服務:

 

nohup kafka-server-start.sh /opt/beh/core/kafka/config/server.properties &

 

確認環境無授權信息:

 

kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181

 

賦予某個用戶處理集群的權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:admin  --operation ClusterAction --cluster --add  (更新metedata權限)

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:admin   --cluster --add

 

創建主題:

 

$KAFKA_HOME/bin/kafka-topics.sh --create --topic topic-test1 --replication-factor 2 --partitions 4 --zookeeper localhost:2181

 

賦予topic權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host xxx.xx.xx.0 --allow-host xxx.xx.xx.1 --operation Read --operation Write --topic Test-topic


- 指定主題指定用戶 -


為主題賦予*某個*用戶的*生產*權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:producer1  --topic=topic-test --operation Write   --add

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:producer1  --topic=test1 --operation Write   --add

 

為主題賦予*某個*用戶在*所有*消費者組*下*消費*權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:producer1  --consumer --topic=topic-test --group=*  --add

 

為主題賦予*某個*用戶在*某個*消費者組*下*消費*權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:hadoop  --consumer --topic=topic-test1 --group=test-consumer-group  --add

 

- 指定主題全部用戶 -

 

為主題賦予*全部*用戶的*生產*權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:*  --producer --topic=topic-test1  --add

 

為主題賦予*全部*用戶在*所有*消費者組*下*消費*權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:*  --consumer --topic=topic-test1 --group=*  --add

 

為主題賦予*全部*用戶在*某個*消費者組*下*消費*權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:*  --consumer --topic=topic-test1 --group=test-consumer-group  --add

 

- 所有主題指定用戶 -

 

為所有主題賦予某個用戶的生產權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:producer1  --topic=* --operation Write   --add

 

為所有主題賦予某個用戶在某個消費者組消費權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:hadoop  --consumer --topic=* --group=test-consumer-group  --add

 

為所有主題賦予某個用戶在全部消費者組消費權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:hadoop  --consumer --topic=* --group=*  --add

 

- 所有主題全部用戶 -

 

為所有主題賦予全部用戶的生產權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:*  --topic=* --operation Write   --add

 

為所有主題賦予全部用戶在某個消費者組消費權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:*  --consumer --topic=* --group=topic-test  --add

 

為所有主題賦予全部用戶在全部消費者組消費權限:

 

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:*  --consumer --topic=* --group=*  --add

 

移除權限:

 

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host xxx.xx.xx.0 --allow-host xxx.xx.xx.1 --operation Read --operation Write --topic test

 

查看權限:

 

查看所有用戶的所有權限:

 

kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181

 

查看某個用戶所擁有的權限:

 

kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181 User:hadoop

 

查看某個主題所擁有的權限:

 

kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181 --topic=topic-test1

 

生產消費測試

 

啟動生產者:

 

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:9092 --topic topic-test --producer.config=/opt/beh/core/kafka/config/producer.properties

 

啟動消費者:

 

kafka-console-consumer.sh --bootstrap-server broker1:9092 --from-beginning --topic topic-test --new-consumer --consumer.config=/opt/beh/core/kafka/config/consumer.properties

 

Kafka權限控制的java代碼示例:

 

put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"consumer1\" password=\"consumer1_test\";");

                put("security.protocol", "SASL_PLAINTEXT");

                put("sasl.mechanism", "PLAIN");

 

Kafka維護存儲均衡

 

  • 評估數據量:要求研發提前評估topic一個周期全量的數據大小。

  • 計算磁盤總存儲:如一塊盤825g,一個節點20快盤,10個節點。那么磁盤總存儲就是165000g。

  • 預估實際數據存儲占比:topic一個周期全量數據大小占磁盤總存儲的百分比,超過百分之六十,即要求研發減少存儲周期。

  • 計算磁盤總塊數:一個節點20快盤,10個節點,總磁盤塊數200個。

  • 合理預分區:分區數量為磁盤總數的整數倍。如所有的topic總數據量為50000gb,磁盤個數為200,那么就可以設置總分區數為200,400,600.具體多少分區數視業務決定。若分區數為400,那么一個分區的大小約125g。例如某一個topic:cbss001的預估數據量是210g,那么通過計算可以將其分成兩個分區。這樣根據Kafka副本落盤策略,各個主機磁盤就能保證最大限度的存儲均衡。

 

Kafka常見故障處理

 

  • 壞盤會導致節點宕掉,及時更換壞盤,重啟節點即可。

  • unclean.leader.election.enable  該參數為true配置到topic中會引起消息重復消費。但為false時,會引起節點9092端口斷開連接,導致Kafka進程假死。

  • 內存溢出,其會導致節點副本不能上線isr。

  • 進程,文件數限制也會造成節點報錯,后續調優中會給出優化參數。

  • flower副本不能及時同步leader副本,同步超時導致副本下線isr。

  • 消費offset越界,這種情況首先重啟節點,若還是報錯,則找到該offset越界的分區,刪除幾條message,再次查看。知道不報錯為止。

 

Kafka集群擴容下線節點

 

使用自動遷移工具

 

下面的示例將把foo1,foo2兩個主題的所有分區都遷移到新的broker機器5,6上。最后,foo1,foo2兩個主題的所有分區都厚在brokers 5,6上。

 

vi topics-to-move.json 

{"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 

--topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment 

{"version":1,

"partitions":[

{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]},{"topic":"foo2","partition":1,"replicas":[2,3]}

]

Proposed partition reassignment configuration 

{"version":1, 

"partitions":[

{"topic":"foo1","partition":2,"replicas":[5,6]},{"topic":"foo1","partition":0,"replicas":[5,6]},

{"topic":"foo2","partition":2,"replicas":[5,6]},{"topic":"foo2","partition":0,"replicas":[5,6]},

{"topic":"foo1","partition":1,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[5,6]}

}

 

工具生成了一個把主題foo1,foo2所有分區遷移到brokers 5,6上的計划。注意,分區遷移還沒有開始。它只是告訴你當前分配計划和新計划的提議。為了防止萬一需要回滾,新的計划應該保存起來。

 

新的調整計划應該保存成一個json文件(如:expand-cluster-reassignment.json),並以–execute選項的方式,如下:

 

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute 

Current partition replica assignment 

{"version":1, 

"partitions":[

{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}

] } 

Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions 

{"version":1, 

"partitions":[

{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}

]

 }

執行驗證:–verify

 

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify 

Status of partition reassignment: Reassignment of partition [foo1,0] 

completed successfully 

Reassignment of partition [foo2,1] 

completed successfully

 

Kafka日志保留周期設置

 

log.retention.bytes (一個topic的大小限制 =分區數*log.retention.bytes)

log.retention.minutes

log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行數據刪除

 

Kafka指定topic賦參

 

kafka-configs.sh --zookeeper zkip1:2181  --describe --entity-type topics --entity-name CdrNormal

Configs for topics:CdrNormal are retention.ms=86400000

 

Kafka集群監控

 

Python腳本監控Kafka存活節點:

 

#!/usr/bin/python

#_*_coding:utf-8_*_

import pycurl

import json

import StringIO

import time

import sys

import zookeeper

 

zk=zookeeper.init("zkip1:2181")

t = zookeeper.get_children(zk,"/brokers/ids")

d=0

for i in t:

  d=d+1

b=16-d

if d == 16:

   print "ok  cb實時kafka1節點存活正常"

   sys.exit(0)         

else:

   print "Critical  cb實時kafka1節點有:",b,"個死去節點"

   sys.exit(2)


Python腳本監控Kafka各節點磁盤存儲:

 

#!/usr/bin/python

#_*_coding:utf-8_*_

import paramiko

import sys

hostname = ['IP1',' IP2']

username = sys.argv[1]

password = sys.argv[2]

percent = sys.argv[3]

disk={}

error=""

ssh = paramiko.SSHClient()

ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

for i in range(0,len(hostname)):

    ssh.connect(hostname[i],22,username,password)

    stdin,stdout,stderr = ssh.exec_command("df -TPh|awk '+$6>%s {print $7}'" % percent)

    path = stdout.readlines()

    #print path

    disk[hostname[i]]=path

    #print disk

#it=iter(disk.keys())

#print disk.values()

#for key in hostname:

#    print i

#print disk[hostname[i]]

#print disk[next(it)]

#print len(disk[next(it)])

#if len(disk[next(it)])==0:

if not disk:

    print("未采集到集群信息!")

    sys.exit(0)

else:

    for i in disk.keys():

        #print disk.get(i)

        if not disk.get(i):

            continue

        else:

            error += "節點"+i+":"

        for j in range(0,len(disk[i])):

            if j == len(disk[i])-1:

               error += disk[i][j].encode('utf-8')+"。"

            else:

               error += disk[i][j].encode('utf-8')+","

    if not error:

        print("cb_rt_kafka業務數據采集集群正常")

        sys.exit(0)

    else:

        #print ("cb_rt_kafka業務數據采集集群,%s,磁盤存儲超出百分之七十") % error.replace("\n", "")

        print ("cb_rt_kafka業務數據采集集群,%s,磁盤存儲超出百分之%s") % (error.replace("\n", ""),percent)

        sys.exit(2)

ssh.close()

 

 

摘自: https://mp.weixin.qq.com/s?__biz=MzA5MTc0NTMwNQ==&mid=2650718499&idx=2&sn=4623b2fe878ff2b2eea186934165e666&chksm=887ddc55bf0a5543cd41d694922f278a28f2a8264b54d3befe775aa6da680c7914cd25a7bed6&mpshare=1&scene=1&srcid=&sharer_sharetime=1571962165607&sharer_shareid=baa4e888a0ca6622375423a4f5fc2003&key=f2f51bf817f17703025a1edd40778287767cc23feb37df322f9093b18d3ff52de9a790f160ce0649db14aa01c2f3ef8414817c68c356ea28a0fe590958697366d7976b73d1e67c6a215ec8259941a1f2&ascene=1&uin=MTA1MzEwMTMzNw%3D%3D&devicetype=Windows+10&version=62070141&lang=zh_CN&pass_ticket=M37rI126jjXZNFJCyR79TfHcex8iB%2Fol%2BpfE3E4M6RBt%2F%2FnY5lOfaVn8SWEoa2tu


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM