《Apache kafka實戰》讀書筆記-管理Kafka集群安全之ACL篇


               《Apache kafka實戰》讀書筆記-管理Kafka集群安全之ACL篇

                                              作者:尹正傑

版權聲明:原創作品,謝絕轉載!否則將追究法律責任。

 

  

  想必大家能看到這篇博客的小伙伴,估計你對kafka已經有了深入對一步了解了,因為現在的你已經不考慮如何部署kafka以及調優了,而是考慮到kafka安全的問題。其實,在很多企業中,很少有人考慮到kafka的安全,小到幾十人的小型互聯網公司,達到某些雲平台的雲服務(我這里就不說是哪家雲公司了),他們默認都是不會給kafka配置相關安全策略的,而是要求用戶自己配置,而很多公司既然要買你的雲服務,運維的事情自然也就外包出去了,換句話說,公司可能就沒有運維(畢竟不是每家公司都招的起運維的!)。

  很多企業或組織對於安全性都有着很高的要求(我所在的公司之前購買的雲服務並沒有配置安全策略,但是我們公司自建數據中心后,這些服務都是我從頭部署,而我對kafka的安全很是看重)。在0.9.0.0版本之前,kafka並未提供任何形式的安全配置,用戶只能通過粗粒度的網絡配置“一刀切式”地限制對kafka集群的訪問。其實關於kafka安全的解決方案有很多,比如,我們可以通過Linux自帶的iptables控制訪問來源,也可以配置相應的ACL,當然也可以用現在主流的kerberos + sentry這一套組合來配置kafka的安全認證策略。官方提供來關於ACL的命令行工具“kafka-acls.sh”可以用於處理與客戶端訪問控制相關的問題,它的文檔可以在Apache kafka官方網站上找到:http://kafka.apache.org/documentation/#security_authz_cli。如果對kafka在公網傳輸對建議使用Kerberos,官方文檔也有介紹,詳情請參考:http://kafka.apache.org/documentation/#security_sasl_kerberos

 

一.kafka安全

  隨着對kafka集群提供安全配置的呼聲越來越高,社區終於在0.9.0.0版本正式添加了安全特性,並在0.10.0.0版本中進一步完善。

1>.Kafka的主要安全特性

  第一:鏈接認證機制,包含服務器與客戶端(生產者/消費者)鏈接,服務器間鏈接以及服務器與工具間鏈接。支持的認證機制包括SSL(TLS)或SASL。

  第二:服務器與zookeeper之間的鏈接認證機制。

  第三:基於SSL的鏈接通道數據傳輸加密。

  第四:客戶端讀/寫授權。

  第五:支持可插拔的授權服務和外部授權服務的集成。

2>.認證(authentication)和授權(authorization)的區別

  認證就是證明你是誰的過程。當訪問kafka服務時你必須顯示地提供身份信息來證明你的身份是合法的。

  授權是驗證你能訪問那些服務的過程。

  舉一個簡單的例子,在一個配置了認證和授權的kafka集群中,只有合法的生產者(已認證)才被允許向Kafka集群發送Produce請求,而發給broker的Produce請求是否會被真正處理則需要該生產者有對應的權限,通常生產者必須擁有主題寫入的權限(topic write),即該生產者必須是已授權的。

3>.Kafka安全主要包含三大功能

  如前所述,自0.9.0.0版本引入安全配置后,Kafka一只在完善安全特性的功能。當前Kafka安全主要包含三大功能:認證(authenication),信道加密(encryption)和授權(authorization),而其中的認證機制主要是指配置SASL,而授權是通過ACL借口命令來完成的。

  在生產環境中,用戶若要使用SASL,通常會配置Kerberos,但對一些小公司而言,他們的用戶系統並不復雜(即需要訪問Kafka集群服務的用戶書並不是很多),顯然使用Kerberos有些大材小用,而且由於運行在內網,SSL加密也不是很必要。因此一個基於明文傳輸(PLAITEXT)的SASL集群環境足以應對一般的使用場景。本片博客我們將給出一個可運行的實例來掩飾一下如何在不實用kerberos的情況下配置SASL+ACL以構建安全的Kafka集群。當然kerberos認證kafka的博客我有時間也會給大家整理出來。

 

二.SASL+ACL

1>.broker端的配置

  若要開啟SASL和ACL機制,我們需要在broker端進行三個方面的設置。

    第一:編寫JAAS配置文件;

    第二:修改broker的啟動腳本,並指定JAAS的配置文件存放路徑;

    第三:配置“server.properties”的文件內容;

  首先是創建包含所有認證用戶信息的JAAS文件。在本片博客中,我使用的是Kafka1.1.0,我們假設有三個用戶:yinzhengjie,reader和writer,其中yinzhengjie這個用戶是集群管理員,reader用戶負責讀取kafka集群中的topic數據,而writer用戶則負責向kafka集群寫入消息。好了,話不多說,我們開始在kafka的config目錄下編寫JAAS文件如下:(修改后最好同步到其他broker節點)

[root@node101 config]# hostname
node101.yinzhengjie.org.cn
[root@node101 config]# 
[root@node101 config]# pwd
/soft/kafka/config
[root@node101 config]# 
[root@node101 config]# cat /soft/kafka/config/yinzhengjie_kafka_server_jaas.conf 
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="yinzhengjie"
   password="yinzhengjie-kafka"
   user_admin="yinzhengjie"
   user_reader="reader"
   user_writer="writer";              #注意,這里是倒數第二行,最后的分號千萬別丟了,如果不寫這個結尾的分號,那么JAAS文件將被視為無效。
};
[root@node101 config]# 

  由於“kafka-server-start.sh”只接收“server.properties”的位置,不接受其他任何參數,故需要修改Kafka啟動腳本,具體做法如下:(修改后最好同步到其他broker節點)

[root@node101 ~]# hostname
node101.yinzhengjie.org.cn
[root@node101 ~]# 
[root@node101 ~]# cp /soft/kafka/bin/kafka-server-start.sh /soft/kafka/bin/yinzhengjie-security-kafka-server-start.sh 
[root@node101 ~]# 
[root@node101 ~]# tail -1 /soft/kafka/bin/yinzhengjie-security-kafka-server-start.sh             #編輯配置文件,指定JAAS文件的所在位置。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/soft/kafka/config/yinzhengjie_kafka_server_jaas.conf kafka.Kafka "$@"
[root@node101 ~]#  

  做完上看的步驟后,我們就在kafka的bin目錄下做好了一份新的Kafka啟動腳本(當然,如果你是老司機的話也可以直接在原來的腳本繼續修改,但考慮到后期我需要給大家展示kerberos的配置方法,我有必要保留一份最初始的腳本)。接下來開始修改broker啟動所需要的“server.properties” 配置文件,我的配置文件參數如下,也做了相應的中文注釋。 

[root@node101 ~]# cat /soft/kafka/config/server.properties          
#kafka配置文檔可參考自官網:
#       http://kafka.apache.org/11/documentation.html#brokerconfigs。
#       @author :yinzhengjie
#       blog:http://www.cnblogs.com/yinzhengjie
#       EMAIL:y1053419035@qq.com

############################# Server Basics #############################

# 每一個broker在集群中的唯一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況
broker.id=101

# 這就是說,這條命令其實並不執行刪除動作,僅僅是在zookeeper上標記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關,否則刪除動作是不會執行的。推薦設置為true。
delete.topic.enable=true

# 是否允許自動創建topic,默認值是true,若是false,就需要通過命令創建topic,推薦設置為false
auto.create.topics.enable=false

# 0.11.0.0版本開始unclean.leader.election.enable參數的默認值由原來的true改為false,可以關閉unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不會被提升為新的leader partition。kafka集群的持久化力大於可用性,如果ISR中沒有其它的replica,會導致這個partition不能讀寫。
unclean.leader.election.enable=false

############################# Socket Server Settings #############################
# broker 監聽器的CSV列表,格式為:[協議]://[主機名]:[端口],[[協議]]://[主機名]:[端口]].該參數主要用於可混搭鏈接broker使用,可以認為是broker端開放給clients的監聽端口。如果不指定主機名,則表示綁定默認網卡;如果主機名是0.0.0.0,則表示綁定所有網卡。Kafka當前支持的協議類型包括PLAINTEXT,SSL及SASL_SSL等。對於新版本的Kafka,推薦值設置listerners一個參數就夠了,對於已經過時的兩個參數host.name和port,就不用再配置了。對於我未啟用安全的Kafka集群,使用PLAINTEXT協議足以。如果啟用的安全認證,可以考慮使用SSL或SAS_SSL協議。
listeners=SASL_PLAINTEXT://node101.yinzhengjie.org.cn:9092

# 設置協議,本例使用SASL_PLAINTEXT
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

# allow.everyone.if.no.acl.found參數是broker端端參數,不過有些奇怪的是,他並沒記錄在官網的broker端參數列表中(但我們可以在官網文檔中看到這個參數,他只出現過一次:http://kafka.apache.org/documentation/#security_authz。)。當設置為true時,整個ACL機制將改為黑名單機制,即只有在黑名單中端用戶才無法訪問資源,非黑名單用戶可以通常無阻端訪問任何kafka資源;當參數為false時,也就是他端默認值時,ACL機制是白名單機制,只有白名單用戶才能訪問設定的資源,其他用戶都屬於未授權用戶。
allow.everyone.if.no.acl.found=true

# 設置yinzhengjie為超級用戶,注意,這個用戶是你需要在JAAS文件中指定的喲~
super.users=User:yinzhengjie

# 配置ACL入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# 和listeners類似,該參數也是用於發布給clients的監聽器,不過該參數主要用於IaaS環境,比如運上的機器通常都配有多塊網卡(私網網卡和公網網卡)。對於這種機器,用戶可以設置該參數綁定公網IP供外部clients使用。然后配置上面的listers來綁定私網IP供broker間通信使用。當然不設置該參數也是可以的,只是雲上的機器很容易出現clients無法獲取數據的問題,原因就是listeners綁定的是默認網卡,而默認網卡通常都是綁定綁定私網IP的。在實際使用場景中,對於配有多塊網卡的機器而言,這個參數通常都是需要配置的。
#advertised.listeners=SASL_PLAINTEXT://node101.yinzhengjie.org.cn:9092

# 將偵聽器名稱映射到安全協議,默認情況下是相同的。在偵聽器名稱和安全協議之間映射。對於要在多個端口或IP中使用的相同安全協議,必須定義該安全協議。例如,內部和外部通信量可以分開,即使兩者都需要SSL。具體地說,用戶可以定義名稱為INTERNAL和EXTERNAL的偵聽器,該屬性為:“INTERNAL:SSL,EXTERNAL:SSL”。如圖所示,鍵和值用冒號分隔,映射項用逗號分隔。每個偵聽器名稱只在地圖中出現一次。可以通過向配置名稱添加規范化前綴(偵聽器名稱小寫)來為每個偵聽器配置不同的安全性(SSL和SASL)設置。例如,為了為INTERNAL偵聽器設置不同的密鑰存儲,將設置名為“listener.name...ssl.keystore.location”的配置。如果未設置偵聽器名稱的配置,則配置將返回到通用配置(即`ssl.keystore.location')。詳情請參考官網文檔:http://kafka.apache.org/11/documentation.html#brokerconfigs。
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL


# 它控制來一個broker在后台用於處理網絡請求的線程數,默認是3。通常情況下,broker啟動時會創建多個線程處理來自其他broker和clients發送過來的各種請求。注意,這里的“處理”其實只是負責轉發請求,它會將接收到的請求轉發到后面的處理線程中。在實際生產環境中,咱們需要不斷的監控NetworkProcessorAvgIdlePercent JMX指標。如果該參數指標持續低於0.3,建議適當增加該參數的值。
num.network.threads=30

# 這個參數就是控制broker端實際處理網絡請求的線程數,默認值是8,即kafka borker默認創建8個線程以輪詢的方式不停地監聽轉發過來的網絡請求並進行實時處理。Kafka同樣也為請求處理提供了一個JMX監控指標Request HandlerAvgIdlePercent。如果該參數指標持續低於0.3,建議適當增加該參數的值。
num.io.threads=30

# 接字服務器使用的發送緩沖區(SOYSNDBUF), Socket服務器套接字的SOYSNDBUF緩沖區。如果值為-1,則將使用OS默認值。
socket.send.buffer.bytes=5242880

# 接字服務器使用的接收緩沖區(SOYRCVBUF),用於網絡請求的套接字接收緩沖器.
socket.receive.buffer.bytes=5242880

# 接字服務器將接受的請求的最大大小(對OOM的保護).
socket.request.max.bytes=104857600

# I/O線程等待隊列中的最大的請求數,超過這個數量,network線程就不會再接收一個新的請求。應該是一種自我保護機制。
queued.max.requests=2000

############################# Log Basics #############################

# 該參數制定了Kafka持久化消息的目錄。若待保存的消息數量非常多,那么最好確保該文件夾有充足的磁盤空間。該參數可以設置多個目錄,以都好進行分隔。指定多個目錄的做法通常是被推薦的,配置多個目錄可以提升讀寫的能力,換句話說,就是提升Kafka的吞吐量。
log.dirs=/home/yinzhengjie/data/kafka/logs,/home/yinzhengjie/data/kafka/logs2,/home/yinzhengjie/data/kafka/logs3


# 在啟動時恢復日志和關閉時刷盤日志時每個數據目錄的線程的數量,默認1。這個值總共開啟的線程數=num.recovery.threads.per.data.dir * log.dirs的數量。舉個例子,假如log.dirs指定了3個目錄,且num.recovery.threads.per.data.dir配置的是10,那么總共會開啟30個線程會在broker停止或者啟動時去加載或寫入數據到磁盤中。理論上說我們應該適當增大這個值,以達到啟動或者停止broker更快的目的,這個參數也就在broker啟動和停止時才會起到一定的作用,而broker在啟動或者停止的期間borker是無法正常提供服務的,因此我建議大家適當增大這個值以減少broker的啟動時間。
num.recovery.threads.per.data.dir=10

# 服務器接受單個消息的最大大小,即消息體的最大大小,單位是字節
message.max.bytes=104857600

# 每當broker停止或崩潰領導時,該broker的分區轉移到其他副本。這意味着默認情況下,當代理重新啟動時,它將只是所有分區的跟隨者,這意味着它不會用於客戶端讀取和寫入。為了避免這種不平衡,Kafka有一個首選復制品的概念。如果分區的副本列表是1,5,9,則節點1首選作為節點5或9的引導者,因為它在副本列表中較早。您可以讓Kafka群集通過運行命令嘗試恢復已恢復副本的領導:"kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot " 由於運行此命令可能很繁瑣,您還可以通過設置以下配置來配置Kafka自動執行此操作:auto.leader.rebalance.enable=true。當然,你可以不配置該參數,因為官方默認的配置是true。 
auto.leader.rebalance.enable=true


# 在強制fsync一個partition的log文件之前暫存的消息數量。調低這個值會更頻繁的sync數據到磁盤,影響性能。通常建議人家使用replication來確保持久性,而不是依靠單機上的fsync,但是這可以帶來更多的可靠性.
log.flush.interval.ms=30000

############################# Log Retention Policy #############################


# 這個參數是指定時間數據可以在kafka集群的保留時間。Kafka通常根據時間來決定數據可以被保留多久。默認使用log.retention.hours參數來配置時間,默認是168小時,也就是1周。除此之外,還有其他兩個參數log.retention.minutes和log.retention.ms。這3個參數的作用是一樣的,都是決定消息多久以后被刪除,不過還是推薦使用log.retention.ms。如果指定了不止一個參數,Kafka會優先使用具有最小值的那個參數。
log.retention.hours=168

# 另一種方式是通過保留的消息字節數來判斷消息是否過期。他的值通過參數log.retention.bytes來指定,作用在每一個分區上,也就是說,如果一個包含8個分區的主題,並且log.retention.bytes被設置為1GB,那么這個主題最多可以保留8GB的數據。所以,當主題的分區個數增加時,整個主題可以保留的數據也隨之增加。溫馨提示:如果同時指定log.retention.bytes和log.retention.ms(或者log.retention.hours或log.retention.minutes),只要任意一個條件得到滿足,消息就會被刪除。
#log.retention.bytes=1073741824

# 以上的設置都作用在日志片段上,而不是作用在單個消息上。當消息達到broker時,他們被加分區的當前日志片段上。當日志片段大小達到log.segment.bytes指定的上線(默認是1GB)時,當前日志片段就會被關閉,一個新的日志片段被打開。如果一個日志片段被關閉,就開始等待過期。這個參數的值越小,就越會頻繁的關閉和分破新文件,從而降低磁盤寫入的整體效率。換句話說:它是控制日志segment文件的大小,超出該大小則追加到一個新的日志segment文件中(-1表示沒有限制)
log.segment.bytes=536870912


# 日志片段文件的檢查周期,查看它們是否達到了刪除策略的設置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 用於保存broker元數據的zookeeper地址是通過zookeeper.connect來指定的。大家發現沒有,下面就是我5台zookeeper地址,最后的“kafka_yinzhengjie_cluster1”表示zookeeper路徑,作為kafka的chroot環境,如果不指定,默認使用根路徑。推薦大家配置chroot路徑,因為這樣的話,當你部署多個kafka集群時可以共享同一套zookeeper集群,方便你維護。當然如果你不指定的話,它默認是在zookeeper的根路徑創建,這樣當你其他業務在使用zookeeper集群時,管理起來就很不方便啦!
zookeeper.connect=10.1.2.114:2181,10.1.2.115:2181,10.1.2.116:2181,10.1.2.117:2181,10.1.2.118:2181/yinzhengjie_kafka_test

# 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的消息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的消息,連接zk的超時時間
zookeeper.connection.timeout.ms=3000

# 求的最大大小為字節,請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請的數目,以防發出巨量的請求。
max.request.size=104857600

############################# Group Coordinator Settings #############################

# 在執行第一次重新平衡之前,組協調器將等待更多消費者加入新組的時間。更長的延遲意味着潛在的更少的重新平衡,但是增加了直到處理開始的時間。
group.initial.rebalance.delay.ms=30000
[root@node101 ~]# 
[root@node101 ~]# cat /soft/kafka/config/server.properties                    #node101.yinzhengjie.org.cn的broker節點配置信息
[root@node102 ~]# cat /soft/kafka/config/server.properties 
#kafka配置文檔可參考自官網:
#       http://kafka.apache.org/11/documentation.html#brokerconfigs。
#       @author :yinzhengjie
#       blog:http://www.cnblogs.com/yinzhengjie
#       EMAIL:y1053419035@qq.com

############################# Server Basics #############################

# 每一個broker在集群中的唯一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況
broker.id=102

# 這就是說,這條命令其實並不執行刪除動作,僅僅是在zookeeper上標記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關,否則刪除動作是不會執行的。推薦設置為true。
delete.topic.enable=true

# 是否允許自動創建topic,默認值是true,若是false,就需要通過命令創建topic,推薦設置為false
auto.create.topics.enable=false

# 0.11.0.0版本開始unclean.leader.election.enable參數的默認值由原來的true改為false,可以關閉unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不會被提升為新的leader partition。kafka集群的持久化力大於可用性,如果ISR中沒有其它的replica,會導致這個partition不能讀寫。
unclean.leader.election.enable=false

############################# Socket Server Settings #############################
# broker 監聽器的CSV列表,格式為:[協議]://[主機名]:[端口],[[協議]]://[主機名]:[端口]].該參數主要用於可混搭鏈接broker使用,可以認為是broker端開放給clients的監聽端口。如果不指定主機名,則表示綁定默認網卡;如果主機名是0.0.0.0,則表示綁定所有網卡。Kafka當前支持的協議類型包括PLAINTEXT,SSL及SASL_SSL等。對於新版本的Kafka,推薦值設置listerners一個參數就夠了,對於已經過時的兩個參數host.name和port,就不用再配置了。對於我未啟用安全的Kafka集群,使用PLAINTEXT協議足以。如果啟用的安全認證,可以考慮使用SSL或SAS_SSL協議。
listeners=SASL_PLAINTEXT://node102.yinzhengjie.org.cn:9092

# 設置協議,本例使用SASL_PLAINTEXT
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

# allow.everyone.if.no.acl.found參數是broker端端參數,不過有些奇怪的是,他並沒記錄在官網的broker端參數列表中(但我們可以在官網文檔中看到這個參數,他只出現過一次:http://kafka.apache.org/documentation/#security_authz。)。當設置為true時,整個ACL機制將改為黑名單機制,即只有在黑名單中端用戶才無法訪問資源,非黑名單用戶可以通常無阻端訪問任何kafka資源;當參數為false時,也就是他端默認值時,ACL機制是白名單機制,只有白名單用戶才能訪問設定的資源,其他用戶都屬於未授權用戶。
allow.everyone.if.no.acl.found=true

# 設置yinzhengjie為超級用戶,注意,這個用戶是你需要在JAAS文件中指定的喲~
super.users=User:yinzhengjie

# 配置ACL入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# 和listeners類似,該參數也是用於發布給clients的監聽器,不過該參數主要用於IaaS環境,比如運上的機器通常都配有多塊網卡(私網網卡和公網網卡)。對於這種機器,用戶可以設置該參數綁定公網IP供外部clients使用。然后配置上面的listers來綁定私網IP供broker間通信使用。當然不設置該參數也是可以的,只是雲上的機器很容易出現clients無法獲取數據的問題,原因就是listeners綁定的是默認網卡,而默認網卡通常都是綁定綁定私網IP的。在實際使用場景中,對於配有多塊網卡的機器而言,這個參數通常都是需要配置的。
#advertised.listeners=SASL_PLAINTEXT://node102.yinzhengjie.org.cn:9092

# 將偵聽器名稱映射到安全協議,默認情況下是相同的。在偵聽器名稱和安全協議之間映射。對於要在多個端口或IP中使用的相同安全協議,必須定義該安全協議。例如,內部和外部通信量可以分開,即使兩者都需要SSL。具體地說,用戶可以定義名稱為INTERNAL和EXTERNAL的偵聽器,該屬性為:“INTERNAL:SSL,EXTERNAL:SSL”。如圖所示,鍵和值用冒號分隔,映射項用逗號分隔。每個偵聽器名稱只在地圖中出現一次。可以通過向配置名稱添加規范化前綴(偵聽器名稱小寫)來為每個偵聽器配置不同的安全性(SSL和SASL)設置。例如,為了為INTERNAL偵聽器設置不同的密鑰存儲,將設置名為“listener.name...ssl.keystore.location”的配置。如果未設置偵聽器名稱的配置,則配置將返回到通用配置(即`ssl.keystore.location')。詳情請參考官網文檔:http://kafka.apache.org/11/documentation.html#brokerconfigs。
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL


# 它控制來一個broker在后台用於處理網絡請求的線程數,默認是3。通常情況下,broker啟動時會創建多個線程處理來自其他broker和clients發送過來的各種請求。注意,這里的“處理”其實只是負責轉發請求,它會將接收到的請求轉發到后面的處理線程中。在實際生產環境中,咱們需要不斷的監控NetworkProcessorAvgIdlePercent JMX指標。如果該參數指標持續低於0.3,建議適當增加該參數的值。
num.network.threads=30

# 這個參數就是控制broker端實際處理網絡請求的線程數,默認值是8,即kafka borker默認創建8個線程以輪詢的方式不停地監聽轉發過來的網絡請求並進行實時處理。Kafka同樣也為請求處理提供了一個JMX監控指標Request HandlerAvgIdlePercent。如果該參數指標持續低於0.3,建議適當增加該參數的值。
num.io.threads=30

# 接字服務器使用的發送緩沖區(SOYSNDBUF), Socket服務器套接字的SOYSNDBUF緩沖區。如果值為-1,則將使用OS默認值。
socket.send.buffer.bytes=5242880

# 接字服務器使用的接收緩沖區(SOYRCVBUF),用於網絡請求的套接字接收緩沖器.
socket.receive.buffer.bytes=5242880

# 接字服務器將接受的請求的最大大小(對OOM的保護).
socket.request.max.bytes=104857600

# I/O線程等待隊列中的最大的請求數,超過這個數量,network線程就不會再接收一個新的請求。應該是一種自我保護機制。
queued.max.requests=2000

############################# Log Basics #############################

# 該參數制定了Kafka持久化消息的目錄。若待保存的消息數量非常多,那么最好確保該文件夾有充足的磁盤空間。該參數可以設置多個目錄,以都好進行分隔。指定多個目錄的做法通常是被推薦的,配置多個目錄可以提升讀寫的能力,換句話說,就是提升Kafka的吞吐量。
log.dirs=/home/yinzhengjie/data/kafka/logs,/home/yinzhengjie/data/kafka/logs2,/home/yinzhengjie/data/kafka/logs3


# 在啟動時恢復日志和關閉時刷盤日志時每個數據目錄的線程的數量,默認1。這個值總共開啟的線程數=num.recovery.threads.per.data.dir * log.dirs的數量。舉個例子,假如log.dirs指定了3個目錄,且num.recovery.threads.per.data.dir配置的是10,那么總共會開啟30個線程會在broker停止或者啟動時去加載或寫入數據到磁盤中。理論上說我們應該適當增大這個值,以達到啟動或者停止broker更快的目的,這個參數也就在broker啟動和停止時才會起到一定的作用,而broker在啟動或者停止的期間borker是無法正常提供服務的,因此我建議大家適當增大這個值以減少broker的啟動時間。
num.recovery.threads.per.data.dir=10

# 服務器接受單個消息的最大大小,即消息體的最大大小,單位是字節
message.max.bytes=104857600

# 每當broker停止或崩潰領導時,該broker的分區轉移到其他副本。這意味着默認情況下,當代理重新啟動時,它將只是所有分區的跟隨者,這意味着它不會用於客戶端讀取和寫入。為了避免這種不平衡,Kafka有一個首選復制品的概念。如果分區的副本列表是1,5,9,則節點1首選作為節點5或9的引導者,因為它在副本列表中較早。您可以讓Kafka群集通過運行命令嘗試恢復已恢復副本的領導:"kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot " 由於運行此命令可能很繁瑣,您還可以通過設置以下配置來配置Kafka自動執行此操作:auto.leader.rebalance.enable=true。當然,你可以不配置該參數,因為官方默認的配置是true。 
auto.leader.rebalance.enable=true


# 在強制fsync一個partition的log文件之前暫存的消息數量。調低這個值會更頻繁的sync數據到磁盤,影響性能。通常建議人家使用replication來確保持久性,而不是依靠單機上的fsync,但是這可以帶來更多的可靠性.
log.flush.interval.ms=30000

############################# Log Retention Policy #############################


# 這個參數是指定時間數據可以在kafka集群的保留時間。Kafka通常根據時間來決定數據可以被保留多久。默認使用log.retention.hours參數來配置時間,默認是168小時,也就是1周。除此之外,還有其他兩個參數log.retention.minutes和log.retention.ms。這3個參數的作用是一樣的,都是決定消息多久以后被刪除,不過還是推薦使用log.retention.ms。如果指定了不止一個參數,Kafka會優先使用具有最小值的那個參數。
log.retention.hours=168

# 另一種方式是通過保留的消息字節數來判斷消息是否過期。他的值通過參數log.retention.bytes來指定,作用在每一個分區上,也就是說,如果一個包含8個分區的主題,並且log.retention.bytes被設置為1GB,那么這個主題最多可以保留8GB的數據。所以,當主題的分區個數增加時,整個主題可以保留的數據也隨之增加。溫馨提示:如果同時指定log.retention.bytes和log.retention.ms(或者log.retention.hours或log.retention.minutes),只要任意一個條件得到滿足,消息就會被刪除。
#log.retention.bytes=1073741824

# 以上的設置都作用在日志片段上,而不是作用在單個消息上。當消息達到broker時,他們被加分區的當前日志片段上。當日志片段大小達到log.segment.bytes指定的上線(默認是1GB)時,當前日志片段就會被關閉,一個新的日志片段被打開。如果一個日志片段被關閉,就開始等待過期。這個參數的值越小,就越會頻繁的關閉和分破新文件,從而降低磁盤寫入的整體效率。換句話說:它是控制日志segment文件的大小,超出該大小則追加到一個新的日志segment文件中(-1表示沒有限制)
log.segment.bytes=536870912


# 日志片段文件的檢查周期,查看它們是否達到了刪除策略的設置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 用於保存broker元數據的zookeeper地址是通過zookeeper.connect來指定的。大家發現沒有,下面就是我5台zookeeper地址,最后的“kafka_yinzhengjie_cluster1”表示zookeeper路徑,作為kafka的chroot環境,如果不指定,默認使用根路徑。推薦大家配置chroot路徑,因為這樣的話,當你部署多個kafka集群時可以共享同一套zookeeper集群,方便你維護。當然如果你不指定的話,它默認是在zookeeper的根路徑創建,這樣當你其他業務在使用zookeeper集群時,管理起來就很不方便啦!
zookeeper.connect=10.1.2.114:2181,10.1.2.115:2181,10.1.2.116:2181,10.1.2.117:2181,10.1.2.118:2181/yinzhengjie_kafka_test

# 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的消息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的消息,連接zk的超時時間
zookeeper.connection.timeout.ms=3000

# 求的最大大小為字節,請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請的數目,以防發出巨量的請求。
max.request.size=104857600

############################# Group Coordinator Settings #############################

# 在執行第一次重新平衡之前,組協調器將等待更多消費者加入新組的時間。更長的延遲意味着潛在的更少的重新平衡,但是增加了直到處理開始的時間。
group.initial.rebalance.delay.ms=30000
[root@node102 ~]# 
[root@node102 ~]# cat /soft/kafka/config/server.properties                    #node102.yinzhengjie.org.cn的broker節點配置信息
[root@node103 ~]# cat /soft/kafka/config/server.properties 
#kafka配置文檔可參考自官網:
#       http://kafka.apache.org/11/documentation.html#brokerconfigs。
#       @author :yinzhengjie
#       blog:http://www.cnblogs.com/yinzhengjie
#       EMAIL:y1053419035@qq.com

############################# Server Basics #############################

# 每一個broker在集群中的唯一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況
broker.id=103

# 這就是說,這條命令其實並不執行刪除動作,僅僅是在zookeeper上標記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關,否則刪除動作是不會執行的。推薦設置為true。
delete.topic.enable=true

# 是否允許自動創建topic,默認值是true,若是false,就需要通過命令創建topic,推薦設置為false
auto.create.topics.enable=false

# 0.11.0.0版本開始unclean.leader.election.enable參數的默認值由原來的true改為false,可以關閉unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不會被提升為新的leader partition。kafka集群的持久化力大於可用性,如果ISR中沒有其它的replica,會導致這個partition不能讀寫。
unclean.leader.election.enable=false

############################# Socket Server Settings #############################
# broker 監聽器的CSV列表,格式為:[協議]://[主機名]:[端口],[[協議]]://[主機名]:[端口]].該參數主要用於可混搭鏈接broker使用,可以認為是broker端開放給clients的監聽端口。如果不指定主機名,則表示綁定默認網卡;如果主機名是0.0.0.0,則表示綁定所有網卡。Kafka當前支持的協議類型包括PLAINTEXT,SSL及SASL_SSL等。對於新版本的Kafka,推薦值設置listerners一個參數就夠了,對於已經過時的兩個參數host.name和port,就不用再配置了。對於我未啟用安全的Kafka集群,使用PLAINTEXT協議足以。如果啟用的安全認證,可以考慮使用SSL或SAS_SSL協議。
listeners=SASL_PLAINTEXT://node103.yinzhengjie.org.cn:9092

# 設置協議,本例使用SASL_PLAINTEXT
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

# allow.everyone.if.no.acl.found參數是broker端端參數,不過有些奇怪的是,他並沒記錄在官網的broker端參數列表中(但我們可以在官網文檔中看到這個參數,他只出現過一次:http://kafka.apache.org/documentation/#security_authz。)。當設置為true時,整個ACL機制將改為黑名單機制,即只有在黑名單中端用戶才無法訪問資源,非黑名單用戶可以通常無阻端訪問任何kafka資源;當參數為false時,也就是他端默認值時,ACL機制是白名單機制,只有白名單用戶才能訪問設定的資源,其他用戶都屬於未授權用戶。
allow.everyone.if.no.acl.found=true

# 設置yinzhengjie為超級用戶,注意,這個用戶是你需要在JAAS文件中指定的喲~
super.users=User:yinzhengjie

# 配置ACL入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# 和listeners類似,該參數也是用於發布給clients的監聽器,不過該參數主要用於IaaS環境,比如運上的機器通常都配有多塊網卡(私網網卡和公網網卡)。對於這種機器,用戶可以設置該參數綁定公網IP供外部clients使用。然后配置上面的listers來綁定私網IP供broker間通信使用。當然不設置該參數也是可以的,只是雲上的機器很容易出現clients無法獲取數據的問題,原因就是listeners綁定的是默認網卡,而默認網卡通常都是綁定綁定私網IP的。在實際使用場景中,對於配有多塊網卡的機器而言,這個參數通常都是需要配置的。
#advertised.listeners=SASL_PLAINTEXT://node103.yinzhengjie.org.cn:9092

# 將偵聽器名稱映射到安全協議,默認情況下是相同的。在偵聽器名稱和安全協議之間映射。對於要在多個端口或IP中使用的相同安全協議,必須定義該安全協議。例如,內部和外部通信量可以分開,即使兩者都需要SSL。具體地說,用戶可以定義名稱為INTERNAL和EXTERNAL的偵聽器,該屬性為:“INTERNAL:SSL,EXTERNAL:SSL”。如圖所示,鍵和值用冒號分隔,映射項用逗號分隔。每個偵聽器名稱只在地圖中出現一次。可以通過向配置名稱添加規范化前綴(偵聽器名稱小寫)來為每個偵聽器配置不同的安全性(SSL和SASL)設置。例如,為了為INTERNAL偵聽器設置不同的密鑰存儲,將設置名為“listener.name...ssl.keystore.location”的配置。如果未設置偵聽器名稱的配置,則配置將返回到通用配置(即`ssl.keystore.location')。詳情請參考官網文檔:http://kafka.apache.org/11/documentation.html#brokerconfigs。
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL


# 它控制來一個broker在后台用於處理網絡請求的線程數,默認是3。通常情況下,broker啟動時會創建多個線程處理來自其他broker和clients發送過來的各種請求。注意,這里的“處理”其實只是負責轉發請求,它會將接收到的請求轉發到后面的處理線程中。在實際生產環境中,咱們需要不斷的監控NetworkProcessorAvgIdlePercent JMX指標。如果該參數指標持續低於0.3,建議適當增加該參數的值。
num.network.threads=30

# 這個參數就是控制broker端實際處理網絡請求的線程數,默認值是8,即kafka borker默認創建8個線程以輪詢的方式不停地監聽轉發過來的網絡請求並進行實時處理。Kafka同樣也為請求處理提供了一個JMX監控指標Request HandlerAvgIdlePercent。如果該參數指標持續低於0.3,建議適當增加該參數的值。
num.io.threads=30

# 接字服務器使用的發送緩沖區(SOYSNDBUF), Socket服務器套接字的SOYSNDBUF緩沖區。如果值為-1,則將使用OS默認值。
socket.send.buffer.bytes=5242880

# 接字服務器使用的接收緩沖區(SOYRCVBUF),用於網絡請求的套接字接收緩沖器.
socket.receive.buffer.bytes=5242880

# 接字服務器將接受的請求的最大大小(對OOM的保護).
socket.request.max.bytes=104857600

# I/O線程等待隊列中的最大的請求數,超過這個數量,network線程就不會再接收一個新的請求。應該是一種自我保護機制。
queued.max.requests=2000

############################# Log Basics #############################

# 該參數制定了Kafka持久化消息的目錄。若待保存的消息數量非常多,那么最好確保該文件夾有充足的磁盤空間。該參數可以設置多個目錄,以都好進行分隔。指定多個目錄的做法通常是被推薦的,配置多個目錄可以提升讀寫的能力,換句話說,就是提升Kafka的吞吐量。
log.dirs=/home/yinzhengjie/data/kafka/logs,/home/yinzhengjie/data/kafka/logs2,/home/yinzhengjie/data/kafka/logs3


# 在啟動時恢復日志和關閉時刷盤日志時每個數據目錄的線程的數量,默認1。這個值總共開啟的線程數=num.recovery.threads.per.data.dir * log.dirs的數量。舉個例子,假如log.dirs指定了3個目錄,且num.recovery.threads.per.data.dir配置的是10,那么總共會開啟30個線程會在broker停止或者啟動時去加載或寫入數據到磁盤中。理論上說我們應該適當增大這個值,以達到啟動或者停止broker更快的目的,這個參數也就在broker啟動和停止時才會起到一定的作用,而broker在啟動或者停止的期間borker是無法正常提供服務的,因此我建議大家適當增大這個值以減少broker的啟動時間。
num.recovery.threads.per.data.dir=10

# 服務器接受單個消息的最大大小,即消息體的最大大小,單位是字節
message.max.bytes=104857600

# 每當broker停止或崩潰領導時,該broker的分區轉移到其他副本。這意味着默認情況下,當代理重新啟動時,它將只是所有分區的跟隨者,這意味着它不會用於客戶端讀取和寫入。為了避免這種不平衡,Kafka有一個首選復制品的概念。如果分區的副本列表是1,5,9,則節點1首選作為節點5或9的引導者,因為它在副本列表中較早。您可以讓Kafka群集通過運行命令嘗試恢復已恢復副本的領導:"kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot " 由於運行此命令可能很繁瑣,您還可以通過設置以下配置來配置Kafka自動執行此操作:auto.leader.rebalance.enable=true。當然,你可以不配置該參數,因為官方默認的配置是true。 
auto.leader.rebalance.enable=true


# 在強制fsync一個partition的log文件之前暫存的消息數量。調低這個值會更頻繁的sync數據到磁盤,影響性能。通常建議人家使用replication來確保持久性,而不是依靠單機上的fsync,但是這可以帶來更多的可靠性.
log.flush.interval.ms=30000

############################# Log Retention Policy #############################


# 這個參數是指定時間數據可以在kafka集群的保留時間。Kafka通常根據時間來決定數據可以被保留多久。默認使用log.retention.hours參數來配置時間,默認是168小時,也就是1周。除此之外,還有其他兩個參數log.retention.minutes和log.retention.ms。這3個參數的作用是一樣的,都是決定消息多久以后被刪除,不過還是推薦使用log.retention.ms。如果指定了不止一個參數,Kafka會優先使用具有最小值的那個參數。
log.retention.hours=168

# 另一種方式是通過保留的消息字節數來判斷消息是否過期。他的值通過參數log.retention.bytes來指定,作用在每一個分區上,也就是說,如果一個包含8個分區的主題,並且log.retention.bytes被設置為1GB,那么這個主題最多可以保留8GB的數據。所以,當主題的分區個數增加時,整個主題可以保留的數據也隨之增加。溫馨提示:如果同時指定log.retention.bytes和log.retention.ms(或者log.retention.hours或log.retention.minutes),只要任意一個條件得到滿足,消息就會被刪除。
#log.retention.bytes=1073741824

# 以上的設置都作用在日志片段上,而不是作用在單個消息上。當消息達到broker時,他們被加分區的當前日志片段上。當日志片段大小達到log.segment.bytes指定的上線(默認是1GB)時,當前日志片段就會被關閉,一個新的日志片段被打開。如果一個日志片段被關閉,就開始等待過期。這個參數的值越小,就越會頻繁的關閉和分破新文件,從而降低磁盤寫入的整體效率。換句話說:它是控制日志segment文件的大小,超出該大小則追加到一個新的日志segment文件中(-1表示沒有限制)
log.segment.bytes=536870912


# 日志片段文件的檢查周期,查看它們是否達到了刪除策略的設置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 用於保存broker元數據的zookeeper地址是通過zookeeper.connect來指定的。大家發現沒有,下面就是我5台zookeeper地址,最后的“kafka_yinzhengjie_cluster1”表示zookeeper路徑,作為kafka的chroot環境,如果不指定,默認使用根路徑。推薦大家配置chroot路徑,因為這樣的話,當你部署多個kafka集群時可以共享同一套zookeeper集群,方便你維護。當然如果你不指定的話,它默認是在zookeeper的根路徑創建,這樣當你其他業務在使用zookeeper集群時,管理起來就很不方便啦!
zookeeper.connect=10.1.2.114:2181,10.1.2.115:2181,10.1.2.116:2181,10.1.2.117:2181,10.1.2.118:2181/yinzhengjie_kafka_test

# 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的消息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的消息,連接zk的超時時間
zookeeper.connection.timeout.ms=3000

# 求的最大大小為字節,請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請的數目,以防發出巨量的請求。
max.request.size=104857600

############################# Group Coordinator Settings #############################

# 在執行第一次重新平衡之前,組協調器將等待更多消費者加入新組的時間。更長的延遲意味着潛在的更少的重新平衡,但是增加了直到處理開始的時間。
group.initial.rebalance.delay.ms=30000
[root@node103 ~]# 
[root@node103 ~]# cat /soft/kafka/config/server.properties                    #node103.yinzhengjie.org.cn的broker節點配置信息

  啟動broker服務器。

[root@node101 ~]# which xkafka.sh              
/usr/local/bin/xkafka.sh
[root@node101 ~]# 
[root@node101 ~]# 
[root@node101 ~]# cat /usr/local/bin/xkafka.sh 
#!/bin/bash
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie
#EMAIL:y1053419035@qq.com

#判斷用戶是否傳參
if [ $# -ne 1 ];then
    echo "無效參數,用法為: $0  {start|stop}"
    exit
fi

#獲取用戶輸入的命令
cmd=$1



for (( i=101 ; i<=103 ; i++ )) ; do
    tput setaf 2
    echo ========== node${i}.yinzhengjie.org.cn  $cmd ================
    tput setaf 9
    case $cmd in
        start)
            ssh node${i}.yinzhengjie.org.cn  "source /etc/profile ; yinzhengjie-security-kafka-server-start.sh -daemon /soft/kafka/config/server.properties"
            echo node${i}.yinzhengjie.org.cn  "服務已啟動"
            ;;
        stop) 
            ssh node${i}.yinzhengjie.org.cn  "source /etc/profile ; kafka-server-stop.sh" 
            echo node${i}.yinzhengjie.org.cn  "服務已停止"
            ;;
            *) 
            echo "無效參數,用法為: $0  {start|stop}"
            exit 
            ;;
     esac
done
[root@node101 ~]# 
[root@node101 ~]# cat /usr/local/bin/xkafka.sh                       #編寫kafka集群的啟動腳本,在測試環境中可以使用,但在實際生產環境中不推薦使用!
[root@node101 logs]# xkafka.sh start
========== node101.yinzhengjie.org.cn start ================
node101.yinzhengjie.org.cn 服務已啟動
========== node102.yinzhengjie.org.cn start ================
node102.yinzhengjie.org.cn 服務已啟動
========== node103.yinzhengjie.org.cn start ================
node103.yinzhengjie.org.cn 服務已啟動
[root@node101 logs]# 
[root@node101 logs]# 
[root@node101 logs]# xcall.sh jps
============= node101.yinzhengjie.org.cn : jps ============
19616 Kafka
19765 Jps
2348 Main
命令執行成功
============= node102.yinzhengjie.org.cn : jps ============
22946 Kafka
23081 Jps
命令執行成功
============= node103.yinzhengjie.org.cn : jps ============
28069 Kafka
28206 Jps
命令執行成功
[root@node101 logs]# 
[root@node101 logs]# xkafka.sh start                            #啟動kafka集群
[root@yinzhengjie ~]# kafka-topics.sh --zookeeper 10.1.2.118/yinzhengjie_kafka_test --list                            #kafka啟動后,我們在zookeeper查看元數據信息,發現並沒topic
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# kafka-topics.sh --create --zookeeper 10.1.2.118/yinzhengjie_kafka_test --topic yinzhengjie-kafka --partitions 10 --replication-factor 2    #於是,我們可以創建kafka topic便於測試kafka寫入和讀取
Created topic "yinzhengjie-kafka".
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# kafka-topics.sh --zookeeper 10.1.2.118/yinzhengjie_kafka_test --list                            #再次查看kafka的topic列表
yinzhengjie-kafka
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# kafka-topics.sh --zookeeper 10.1.2.118/yinzhengjie_kafka_test --describe --topic yinzhengjie-kafka          #查看kafka的詳細信息
Topic:yinzhengjie-kafka PartitionCount:10       ReplicationFactor:2     Configs:
        Topic: yinzhengjie-kafka        Partition: 0    Leader: 101     Replicas: 101,102       Isr: 101,102
        Topic: yinzhengjie-kafka        Partition: 1    Leader: 102     Replicas: 102,103       Isr: 102,103
        Topic: yinzhengjie-kafka        Partition: 2    Leader: 103     Replicas: 103,101       Isr: 103,101
        Topic: yinzhengjie-kafka        Partition: 3    Leader: 101     Replicas: 101,103       Isr: 101,103
        Topic: yinzhengjie-kafka        Partition: 4    Leader: 102     Replicas: 102,101       Isr: 102,101
        Topic: yinzhengjie-kafka        Partition: 5    Leader: 103     Replicas: 103,102       Isr: 103,102
        Topic: yinzhengjie-kafka        Partition: 6    Leader: 101     Replicas: 101,102       Isr: 101,102
        Topic: yinzhengjie-kafka        Partition: 7    Leader: 102     Replicas: 102,103       Isr: 102,103
        Topic: yinzhengjie-kafka        Partition: 8    Leader: 103     Replicas: 103,101       Isr: 103,101
        Topic: yinzhengjie-kafka        Partition: 9    Leader: 101     Replicas: 101,103       Isr: 101,103
[root@yinzhengjie ~]# 

   細心的同學可能已經發現了,我們不是啟用了ACL嗎?為什么客戶端還能成功創建topic呢?這是因為當前“kafka-topics.sh”腳本是直接鏈接zookeeper的,完全繞過了ACL審查機制,故不受ACL的限制。所以無論是否配置了ACL,用戶總是可以使用kafka-topics.sh來管理topic。所以,在實際使用過程中,最好能對鏈接zookeeper的用戶也增加認證機制。言歸正傳,本例中我們的目的是要測試是否用戶writer向“yinzhengjie-kafka”topic寫入消息以及用戶reader從“yinzhengjie-kafka”讀取消息。很希然,當前的生產者和消費者都無法工作,報的錯誤就是無法鏈接broker(node101.yinzhengjie.org.cn:9092)。這是因為我們開啟了認證機制。下面是日志信息。

[root@1yinzhengjie ~]# kafka-console-producer.sh --broker-list node101.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka
尹正傑到此一游!!!
[2018-11-30 14:08:54,275] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,333] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,391] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,449] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,507] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,566] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,624] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,683] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,741] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,799] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,857] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,914] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:54,972] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,029] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,087] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,145] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,203] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,261] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,325] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,382] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,440] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,498] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:55,555] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
^C[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# 
[root@1yinzhengjie ~]# kafka-console-producer.sh --broker-list node101.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka                      #啟動生產者
[root@yinzhengjie ~]# kafka-console-consumer.sh --bootstrap-server node101.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka --from-beginning
[2018-11-30 14:08:34,472] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,530] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,588] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,647] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,704] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,762] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,819] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,877] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,934] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:34,992] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,049] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,107] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,165] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,223] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,280] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,338] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,397] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,456] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,514] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,571] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,629] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,686] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,744] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,801] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,859] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,916] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:35,974] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:36,031] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:36,088] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:36,146] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:36,203] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:36,260] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:36,318] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2018-11-30 14:08:36,375] WARN Bootstrap broker node101.yinzhengjie.org.cn:9092 disconnected (org.apache.kafka.clients.NetworkClient)
^C[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# kafka-console-consumer.sh --bootstrap-server node101.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka --from-beginning         #啟動消費者

  其實,我們可以查看此時的broker的日志信息,會有以下報錯:

2>.produce端的配置

  若要往開啟SASL和ACL機制的kafka集群寫入數據,我們需要在produce端進行三個方面的設置。

    第一:編寫JAAS配置文件;

    第二:修改produce端的啟動腳本,將第一步編寫的JAAS文件的路徑寫入;

    第三:使用kafka-alcs.sh進行寫入權限的授權操作;

  下面我們為writer用戶破誒值認證信息,首先需要創建一個屬於writer用戶的JAAS文件,該文件中指定用戶writer的鏈接信息如下:

[root@yinzhengjie ~]# cat /yinzhengjie/softwares/kafkasoft/kafka/kafka_2.11-1.1.0/config/yinzhengjie-writer-jaas.conf
KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="writer"
   password="writer";        #注意這個分號別忘記寫啦!
};
[root@yinzhengjie ~]# 

  編寫kafka的啟動腳本,指定“yinzhengjie-writer-jaas.conf”配置文件的絕對路徑。

[root@yinzhengjie ~]# cp /yinzhengjie/softwares/kafkasoft/kafka/kafka_2.11-1.1.0/bin/kafka-console-producer.sh /yinzhengjie/softwares/kafkasoft/kafka/kafka_2.11-1.1.0/bin/yinzhengjie-writer-kafka-console-producer.sh 
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# tail -1 /yinzhengjie/softwares/kafkasoft/kafka/kafka_2.11-1.1.0/bin/yinzhengjie-writer-kafka-console-producer.sh  
exec $(dirname $0)/kafka-run-class.sh  -Djava.security.auth.login.config=/yinzhengjie/softwares/kafkasoft/kafka/kafka_2.11-1.1.0/config/yinzhengjie-writer-jaas.conf kafka.tools.ConsoleProducer "$@"
[root@yinzhengjie ~]# 

 接下來,我們需要給produce端進行授權操作,即運行那個用戶寫入數據,授權操作需要使用“kafka-acls.sh”這個腳本。這個腳本的使用大家可以參考Apache kafka官網文檔,也可以參考confluent官網:https://docs.confluent.io/current/kafka/authorization.html#overview。既然是生產者,我們需要賦給它對應topic的寫入權限,故執行以下命令創建對應的ACL規則:

[root@yinzhengjie ~]# kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.1.2.118:2181 --add --allow-principal User:writer --operation Write --topic yinzhengjie-kafka
Adding ACLs for resource `Topic:yinzhengjie-kafka`: 
        User:writer has Allow permission for operations: Write from hosts: * 

Current ACLs for resource `Topic:yinzhengjie-kafka`: 
        User:writer has Allow permission for operations: Write from hosts: * 

[root@yinzhengjie ~]# 

上述重要參數介紹:
  principal:
    表示一個kafka user。
  operation:
    表示一個具體的操作類型,如WRITE,READ,DESCRIBE等。
  Host:
    表示連向Kafka集群的client的IP地址,如果是“*”則表示所有IP。注意,當前Kafka不支持主機名,只能指定IP地址。
  Resource:
    表示一種kafka資源類型,當前共有4種類型:TOPIC,CLUSTER,GROUP和TRANSCTIONID。

  授權后,我們就可以使用咱們修改后的生產者來往broker寫入數據:

[root@10-19-104-189 config]# yinzhengjie-writer-kafka-console-producer.sh --broker-list node101.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka --producer-property security.protocol=SASL_PLAINTEXT --producer-property sals.mechanism=PLAIN

  當然,如果你不想每次啟動時指定生產者的參數,我們可以把“--producer-property security.protocol=SASL_PLAINTEXT --producer-property sals.mechanism=PLAIN”這2個屬性寫入生產者的配置文件中。

[root@yinzhengjie ~]# tail -3 /yinzhengjie/softwares/kafkasoft/kafka/kafka_2.11-1.1.0/config/producer.properties  
#Add by yinzhengjie
security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN
[root@yinzhengjie ~]# 

  修改之后,我們啟動生產者時需要指定生產者的配置文件,具體操作如下:

[root@yinzhengjie ~]# yinzhengjie-writer-kafka-console-producer.sh --broker-list 10.1.2.101:9092 --topic yinzhengjie-kafka --producer.config /yinzhengjie/softwares/kafkasoft/kafka/kafka_2.11-1.1.0/config/producer.properties

   很奇怪的一個現象,我啟動生產者總是報錯說認證失敗,而大家也看到我配置了認證,始終報錯認證失敗:(這個原因我暫時還沒有解決掉,仍在調研中.....)

[root@yinzhengjie ~]# yinzhengjie-writer-kafka-console-producer.sh --broker-list 10.1.2.101:9092 --topic yinzhengjie-kafka --producer.config /yinzhengjie/softwares/kafkasoft/ka
fka/kafka_2.11-1.1.0/config/producer.properties                          
1
[2018-12-03 15:40:52,249] WARN Error while fetching metadata with correlation id 1 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:52,349] WARN Error while fetching metadata with correlation id 2 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:52,454] WARN Error while fetching metadata with correlation id 3 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:52,557] WARN Error while fetching metadata with correlation id 4 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:52,661] WARN Error while fetching metadata with correlation id 5 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:52,765] WARN Error while fetching metadata with correlation id 6 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:52,869] WARN Error while fetching metadata with correlation id 7 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:52,973] WARN Error while fetching metadata with correlation id 8 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,077] WARN Error while fetching metadata with correlation id 9 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,181] WARN Error while fetching metadata with correlation id 10 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,285] WARN Error while fetching metadata with correlation id 11 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,389] WARN Error while fetching metadata with correlation id 12 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,493] WARN Error while fetching metadata with correlation id 13 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,597] WARN Error while fetching metadata with correlation id 14 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,702] WARN Error while fetching metadata with correlation id 15 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,806] WARN Error while fetching metadata with correlation id 16 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:53,910] WARN Error while fetching metadata with correlation id 17 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:54,015] WARN Error while fetching metadata with correlation id 18 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2018-12-03 15:40:54,118] WARN Error while fetching metadata with correlation id 19 : {yinzhengjie-kafka=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
^C[root@yinzhengjie ~]# 

   

  搗鼓了一天,沒有搗鼓明白, 我決定先放一放,如果哪位大佬看到我的問題歡迎幫我指正,我先調研一下kafka集群監控度量指標。

 

3>.consumer端端配置

   

三.SSL加密

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM