一、簡介
在Kafka0.9版本之前,Kafka集群時沒有安全機制的。Kafka Client應用可以通過連接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。來獲取存儲在Zookeeper中的Kafka元數據信息。拿到Kafka Broker地址后,連接到Kafka集群,就可以操作集群上的所有主題了。由於沒有權限控制,集群核心的業務主題時存在風險的。
本文主要使用SASL+ACL
二、技術關鍵點
配置文件
修改broker啟動所需的server.properties文件,你至少需要配置(或修改)以下這些參數:
listeners=SASL_PLAINTEXT://:9092 advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN allow.everyone.if.no.acl.found=true super.users=User:admin
其他參數講解,請參考鏈接:
https://www.cnblogs.com/xiao987334176/p/10065844.html
這里主要講解幾個重點參數
默認情況下,如果資源R沒有關聯acl,除了超級用戶,沒有用戶允許訪問。如果你想改變這種方式你可以做如下配置
allow.everyone.if.no.acl.found=true
什么意思呢?上面的配置已經啟動了acl,除了超級用戶之外,其他用戶無法訪問。那么問題就來了,在kafka集群中,其它節點需要同步數據,需要相互訪問。
它默認會使用ANONYMOUS的用戶名連接集群。在這種情況下,啟動kafka集群,必然失敗!所以這個參數一定要配置才行!
listeners=SASL_PLAINTEXT://:9092
這個參數,表示kafka監聽的地址。此參數必須要配置,默認是注釋掉的。默認會使用listeners=PLAINTEXT://:9092,但是我現在開啟了SASL,必須使用SASL協議連接才行。
//:9092 這里雖然沒有寫IP地址,根據官方解釋,它會監聽所有IP。注意:這里只能是IP地址,不能是域名。否則啟動時,會提示無法綁定IP。
advertised.listeners 這個參數,表示外部的連接地址。這里可以寫域名,也可以寫IP地址。建議使用域名,為什么呢?因為IP可能會變動,但是主機名是不會變動的。
所以在java代碼里面寫死,就可以了!注意:必須是SASL協議才行!
super.users=User:admin 表示啟動超級用戶admin,注意:此用戶名不允許更改,否則使用生產模式時,會有異常!
啟動腳本
bin/kafka-server-start.sh 這個是kafka的啟動腳本,要使用ACL,需要增加一個參數才行。
有2種方法修改,這里分別介紹一下:
1. 增加環境變量KAFKA_OPTS(推薦)
先來看一下,默認的bin/kafka-server-start.sh的最后一行
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
只需要在最后一行的上面一行,添加一個環境變量即可
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
2. 增加參數-Djava.security.auth.login.config
直接將最后一行修改為
exec $base_dir/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf $EXTRA_ARGS kafka.Kafka "$@"
JAAS文件
kafka_cluster_jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_reader="123456" user_writer="123456"; };
這個文件,是專門用來做認證的。用戶名和密碼的格式如下:
user_用戶名="密碼"
注意:對於超級用戶,這幾行是固定的
username="admin" password="123456" user_admin="admin"
這里指定的是admin用戶密碼為123456,密碼可自行更改。
下面的,才是普通用戶。最后一個用戶,要有一個分號才行!
三、正式部署
環境介紹
本文采用的環境,參考以下鏈接
https://www.cnblogs.com/xiao987334176/p/10088497.html#autoid-3-0-0
使用了3台zookeeper和5台kafka。都是在一台服務器上面運行的!
其中zookeeper的鏡像,不需要變動,直接啟動即可。
但是kafka的鏡像,需要重新構建,請看下面的內容。
創建鏡像
創建空目錄
mkdir /opt/kafka_cluster_acl
dockerfile
FROM ubuntu:16.04 # 修改更新源為阿里雲 ADD sources.list /etc/apt/sources.list ADD kafka_2.12-2.1.0.tgz / ADD kafka_cluster_jaas.conf / # 安裝jdk RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all EXPOSE 9092 # 添加啟動腳本 ADD run.sh . RUN chmod 755 run.sh ENTRYPOINT [ "/run.sh"]
kafka_cluster_jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_reader="123456" user_writer="123456"; };
run.sh

#!/bin/bash if [ -z $broker_id ];then echo "broker_id變量不能為空" exit 1 fi if [ -z $zookeeper ];then echo "zookeeper變量不能為空" exit 2 fi if [ -z $advertised_hostname ];then echo "advertised_hostname變量不能為空" exit 3 fi # 開啟kafka acl驗證 echo " listeners=SASL_PLAINTEXT://:9092 advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN allow.everyone.if.no.acl.found=true super.users=User:admin " >> /kafka_2.12-2.1.0/config/server.properties cd /kafka_2.12-2.1.0 # 設置唯一id sed -i "21s/0/$broker_id/" /kafka_2.12-2.1.0/config/server.properties # 設置zookeeper連接地址 sed -i "123s/localhost/$zookeeper/" /kafka_2.12-2.1.0/config/server.properties # 配置啟動腳本,最后一行之前添加環境變量 sed -i -e "44"i'\export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"' bin/kafka-server-start.sh # 添加配置文件 mv /kafka_cluster_jaas.conf /kafka_2.12-2.1.0/config/ # 臨時添加5條hosts echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts # 啟動kafka bin/kafka-server-start.sh config/server.properties
注意:由於沒有DNS,這里臨時添加了5條hosts記錄。5台kafka之間,必須要相互連通,否則會報錯
WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
sources.list
deb http://mirrors.aliyun.com/ubuntu/ xenial main deb-src http://mirrors.aliyun.com/ubuntu/ xenial main deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main deb http://mirrors.aliyun.com/ubuntu/ xenial universe deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe deb http://mirrors.aliyun.com/ubuntu/ xenial-security main deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
此時,目錄結構如下:
./ ├── dockerfile ├── kafka_2.12-2.1.0.tgz ├── kafka_cluster_jaas.conf ├── run.sh └── sources.list
生成鏡像
docker build -t kafka_cluster_acl /opt/kafka_cluster_acl
啟動鏡像
請確保已經啟動了3台zookeeper的鏡像!
第一個kafka節點
docker run -it -p 9092:9092 -e broker_id=1 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-1.default.svc.cluster.local --network br1 --ip=172.168.0.5 kafka_cluster_acl
第二個kafka節點
docker run -it -p 9093:9092 -e broker_id=2 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-2.default.svc.cluster.local --network br1 --ip=172.168.0.6 kafka_cluster_acl
第三個kafka節點
docker run -it -p 9094:9092 -e broker_id=3 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-3.default.svc.cluster.local --network br1 --ip=172.168.0.7 kafka_cluster_acl
第四個kafka節點
docker run -it -p 9095:9092 -e broker_id=4 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-4.default.svc.cluster.local --network br1 --ip=172.168.0.8 kafka_cluster_acl
第五個kafka節點
docker run -it -p 9096:9092 -e broker_id=5 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-5.default.svc.cluster.local --network br1 --ip=172.168.0.9 kafka_cluster_acl
客戶端測試
shell腳本客戶端
先來查看docker進程
root@jqb-node128:~# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES a5ff3c8f5c2a kafka_cluster_acl "/run.sh" About a minute ago Up About a minute 0.0.0.0:9096->9092/tcp gifted_jones 36a4d94054b5 kafka_cluster_acl "/run.sh" 3 minutes ago Up 3 minutes 0.0.0.0:9095->9092/tcp modest_khorana f614d734ac8b kafka_cluster_acl "/run.sh" 3 minutes ago Up 3 minutes 0.0.0.0:9094->9092/tcp tender_kare 29ef9a2edd08 kafka_cluster_acl "/run.sh" 3 minutes ago Up 3 minutes 0.0.0.0:9093->9092/tcp reverent_jepsen d9cd45c62e86 kafka_cluster_acl "/run.sh" 3 minutes ago Up 3 minutes 0.0.0.0:9092->9092/tcp silly_mcclintock 69dba560bc09 zookeeper_cluster "/run.sh" 4 minutes ago Up 4 minutes 0.0.0.0:2183->2181/tcp confident_fermat d73a01e76949 zookeeper_cluster "/run.sh" 4 minutes ago Up 4 minutes 0.0.0.0:2182->2181/tcp admiring_snyder 7ccab68252e7 zookeeper_cluster "/run.sh" 4 minutes ago Up 4 minutes 0.0.0.0:2181->2181/tcp gifted_wilson
確保已經運行了5個kafka和3個zk
隨便進入一個kafka容器
root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/
新增一個配置文件 kafka_client_jaas.conf
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# apt-get install -y vim root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi config/kafka_client_jaas.conf
內容如下:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="writer" password="123456"; };
同理我們也要將配置文件內容傳遞給JVM, 因此需要修改。
生產者
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-producer.sh
最后一行的上面,添加 KAFKA_OPTS 變量
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf" exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
修改生產者配置文件,最后一行追加2行內容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
使用echo 追加
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/producer.properties root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/producer.properties
消費者
修改生產者配置文件,使用echo追加
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/consumer.properties root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/consumer.properties
編輯測試腳本
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-consumer.sh
最后一行的上面,添加 KAFKA_OPTS 變量
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf" exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
測試生產者
目前還沒有topic,先來創建一個topic
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-topics.sh --create --zookeeper 172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --topic test --partitions 1 --replication-factor 1 Created topic "test". root@a5ff3c8f5c2a:/kafka_2.12-2.1.0#
進入生產者模式,指定kafka的服務器為第一個kafka。當然,只要是5個kafka中的任意一個即可!
輸入消息 fdsa,回車
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties >fdsa [2018-12-17 08:45:15,455] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [] [2018-12-17 08:45:15,457] ERROR [Producer clientId=console-producer] Connection to node -1 (d9cd45c62e86.br1/172.168.0.5:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
會出現報錯,則說明配置的security 已生效, 要想普通用戶能讀寫消息,需要配置ACL
配置ACL
kafka的ACL規則,是存儲在zookeeper中的,只需要連接zookeeper即可!
topic權限
允許writer用戶有所有權限,訪問所有topic
--operation All 表示所有權限,
--topic=* 表示所有topic
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All --topic=* Adding ACLs for resource `Topic:LITERAL:*`: User:writer has Allow permission for operations: All from hosts: * Current ACLs for resource `Topic:LITERAL:*`: User:writer has Allow permission for operations: All from hosts: *
組權限
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All -group=* Adding ACLs for resource `Group:LITERAL:*`: User:writer has Allow permission for operations: All from hosts: * Current ACLs for resource `Group:LITERAL:*`: User:writer has Allow permission for operations: All from hosts: *
再次測試
root@e0bb740ac0ce:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties >123>
注意:在config/server.properties 文件中,設置了
advertised.listeners=SASL_PLAINTEXT://kafka-1.default.svc.cluster.local:9092
所以連接地址,必須是指定域名才可以!
再開一個窗口,連接同樣的容器
root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/
啟動消費者模式
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning --consumer.config config/consumer.properties 123
收到123表示成功了!
python客戶端測試
由於真實主機無法直接連接到網橋的地址172.168.0.5,那么因此代碼需要在
創建空目錄
mkdir /opt/py_test
放2個文件
sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main deb-src http://mirrors.aliyun.com/ubuntu/ xenial main deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main deb http://mirrors.aliyun.com/ubuntu/ xenial universe deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe deb http://mirrors.aliyun.com/ubuntu/ xenial-security main deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
produer_consumer_acl_test.py

#!/usr/bin/env python3 # coding: utf-8 # 注意:需要手動創建topic才行執行此腳本 import sys import io def setup_io(): # 設置默認屏幕輸出為utf-8編碼 sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True) sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True) setup_io() import time from kafka import KafkaProducer from kafka import KafkaConsumer class KafkaClient(object): # kafka客戶端程序 def __init__(self, kafka_server, port, topic,content,username,password): self.kafka_server = kafka_server # kafka服務器ip地址 self.port = port # kafka端口 self.topic = topic # topic名 self.content = content # 發送內容 self.username = username # 用戶名 self.password = password # 密碼 def producer(self): """ 生產者模式 :return: object """ # 連接kafka服務器,比如['192.138.150.193:9092'] producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)], security_protocol="SASL_PLAINTEXT", # 指定SASL安全協議 sasl_mechanism='PLAIN', # 配置SASL機制 sasl_plain_username=self.username, # 認證用戶名 sasl_plain_password=self.password, # 密碼 ) producer.send(self.topic, self.content) # 發送消息,必須是二進制 producer.flush() # flush確保所有meg都傳送給broker producer.close() return producer def consumer(self): """ 消費者模式 :return: object """ # 連接kafka,指定組為test_group consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)], sasl_mechanism="PLAIN", security_protocol='SASL_PLAINTEXT', sasl_plain_username=self.username, sasl_plain_password=self.password, ) return consumer # for msg in consumer: # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) # print(recv) def main(self): startime = time.time() # 開始時間 client = KafkaClient(self.kafka_server, self.port, self.topic, self.content,self.username,self.password) # 實例化客戶端 client.producer() # 執行生產者 print('執行生產者') consumer = client.consumer() # 執行消費者 print('執行消費者') print('等待結果....') flag = False for msg in consumer: # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) # 判斷生產的消息和消費的消息是否一致 print(msg.value) # print(self.content) if msg.value == self.content: flag = True break consumer.close() # 關閉消費者對象 endtime = time.time() # 結束時間 if flag: # %.2f %(xx) 表示保留小數點2位 return "kafka驗證消息成功,花費時間", '%.2f 秒' % (endtime - startime) else: return "kafka驗證消息失敗,花費時間", '%.2f 秒' % (endtime - startime) if __name__ == '__main__': kafka_server = "kafka-1.default.svc.cluster.local" port = "9092" topic = "test" content = "hello honey".encode('utf-8') username = "writer" password = "123456" client = KafkaClient(kafka_server,port,topic,content,username,password) # 實例化客戶端 print(client.main())
此時目錄結構如下:
./
├── produer_consumer_acl_test.py
└── sources.list
進入容器,更新ubuntu更新源
root@jqb-node128:/opt/py_test# docker run -it -v /opt/py_test:/mnt --network br1 --ip=172.168.0.10 ubuntu:16.04 root@064f2f97aad2:/# cp /mnt/sources.list /etc/apt/ root@064f2f97aad2:/# apt-get update
安裝python3-pip
root@064f2f97aad2:/# apt-get install -y python3-pip
安裝kafka模塊
root@064f2f97aad2:/# pip3 install kafka
添加hosts記錄
echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts
執行Python文件
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 執行生產者 執行消費者 等待結果.... b'hello honey' ('kafka驗證消息成功,花費時間', '28.59 秒')
注意:第一次執行時,會非常慢。等待30秒,如果沒有輸出hello honey。終止掉,再次執行。
反復5次。就可以了!
之后再次執行幾次,就會很快了!
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 執行生產者 執行消費者 等待結果.... b'hello honey' ('kafka驗證消息成功,花費時間', '5.37 秒') root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 執行生產者 執行消費者 等待結果.... b'hello honey' ('kafka驗證消息成功,花費時間', '0.43 秒')
為啥,前面幾次會很慢。之后就很快了,什么原因,我也不知道!
總之,只要經歷過慢的階段,之后就很快了!
本文參考鏈接:
http://blog.51cto.com/xiaoyouyou/2061143