一、概述
在Kafka0.9版本之前,Kafka集群時沒有安全機制的。Kafka Client應用可以通過連接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。來獲取存儲在Zookeeper中的Kafka元數據信息。拿到Kafka Broker地址后,連接到Kafka集群,就可以操作集群上的所有主題了。由於沒有權限控制,集群核心的業務主題時存在風險的。
權限控制類型
kafka權限控制整體可以分為三種類型:
- 基於SSL
- 基於Kerberos(此認證一般基於CDH,本文不與討論)
- 基於acl的
第一種類型,需要創建ca,給證書簽名,server和client配置SSL通訊。實現比較麻煩!
第二種類型,需要搭建一台Kerberos認證服務器,實現較復雜!
第三種類型,是kakfa內置的,實現簡單。
本文將重點介紹基於ACL的認證實現。
身份認證
Kafka的認證范圍包含如下:
- Client與Broker之間
- Broker與Broker之間
- Broker與Zookeeper之間
當前Kafka系統支持多種認證機制,如SSL、SASL(Kerberos、PLAIN、SCRAM)。
本文所使用的是基於SASL,認證范圍主要是Client與Broker之間。
SASL認證流程
在Kafka系統中,SASL機制包含三種,它們分別是Kerberos、PLAIN、SCRAM。
以PLAIN認證為示例,下面給大家介紹PLAIN認證流程。
先來簡述一下核心步驟,請勿操作!
配置Server
要配置SASL和ACL,我們需要在broker端進行兩個方面的設置。首先是創建包含所有認證用戶信息的JAAS文件。本例中,我們假設有3個用戶:admin, reader和writer,其中admin是管理員,reader用戶讀取Kafka集群中topic數據,而writer用戶則負責向Kafka集群寫入消息。我們假設這3個用戶的密碼分別與用戶名相同(在實際場景中,管理員需要單獨把密碼發給各自的用戶),因此我們可以這樣編寫JAAS文件:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_reader="reader" user_writer="writer"; };
保存該文件為kafka_cluster_jaas.conf,之后我們需要把該文件的完整路徑作為一個JVM參數傳遞給Kafka的啟動腳本。不過由於bin/kafka-server-start.sh只接收server.properties的位置,不再接收其他任何參數,故我們需要修改該啟動腳本。具體做法如下:
vim bin/kafka-server-start.sh
把該文件中的這行:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改為下面這行,然后保存退出
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/path/kafka_cluster_jaas.conf kafka.Kafka "$@"
配置好JAAS文件后,我們開始修改broker啟動所需的server.properties文件,你至少需要配置(或修改)以下這些參數:
# 配置ACL入口類 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # 本例使用SASL_PLAINTEXT listeners=SASL_PLAINTEXT://:9092 # 指定SASL安全協議 security.inter.broker.protocol= SASL_PLAINTEXT # 配置SASL機制 sasl.mechanism.inter.broker.protocol=PLAIN # 啟用SASL機制 sasl.enabled.mechanisms=PLAIN # 設置本例中admin為超級用戶 super.users=User:admin
Ok,現在我們可以啟動broker了(當前肯定要先啟動Zookeeper)
bin/ kafka-server-start.sh ../config/server.properties
可見,Kafka broker已經成功啟動了。不過當前該broker只會接收已認證client發來的請求。下面我們繼續clients端的配置。
Client端配置
當Kafka Server端配置啟用了SASL/PLAIN,那么Client連接的時候需要配置認證信息,Client配置一個kafka_client_jaas.conf文件,內容如下:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="writer" password="writer"; };
然后,在producer.properties和consumer.properties文件中設置認證協議,內容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
最后,在kafka-console-producer.sh腳本和kafka-console-producer.sh腳本中添加JAAS文件的路徑,內容如下:
把該文件中的這行:
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
修改為下面這行,然后保存退出
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/path/writer_jaas.conf kafka.tools.ConsoleProducer "$@"
ACL操作
在配置好SASL后,啟動Zookeeper集群和Kafka集群之后,就可以使用kafka-acls.sh腳本來操作ACL機制。
(1)查看:在kafka-acls.sh腳本中傳入list參數來查看ACL授權新
kafka-acls.sh --list --authorizer-properties zookeeper.connect=zookeeper_server:2181
(2)創建:創建待授權主題之前,在kafka-acls.sh腳本中指定JAAS文件路徑,然后在執行創建操作
kafka-topics.sh --create --zookeeper zookeeper_server:2181 --replication-factor 1 --partitions 1 --topic kafka_acl_topic
(3)生產者授權:對生產者執行授權操作
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation Write --topic=*
(4)消費者授權:對生產者執行授權后,通過消費者來進行驗證
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –topic=*
(5)組授權:允許只讀用戶的所有組操作
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –group=*
二、環境說明
操作系統 | 服務器地址 | K8s角色 | 服務 |
ubuntu-16.04.5-server-amd64 | 192.168.0.121 | master | ks8主控端 |
ubuntu-16.04.5-server-amd64 | 192.168.0.88 | node_1 | etcd |
ubuntu-16.04.5-server-amd64 | 192.168.0.89 | node_2 | docker私有庫 |
每台服務器的硬件配置為,1核3G,20G硬盤。請確保有2G的可用內存!
請確保已經安裝好了k8s集群,關於k8s的安裝,請參考連接:
https://www.cnblogs.com/xiao987334176/p/9947548.html
里面有詳細的過程,使用一鍵腳本即可。本文就是在這個環境上,操作的!
架構圖:
只需要在Kafka_server 設置ACL規則就可以了。主要針對topic 做權限驗證!創建讀寫用戶進行驗證。
客戶端可以隨意創建topic,但是向topic里面讀寫內容,就需要做驗證了!
三、安裝zookeeper(docker)
登錄到node2服務器
mkdir /opt/zookeeper
目錄結構如下:
./ ├── dockerfile ├── run.sh ├── sources.list ├── zoo.cfg └── zookeeper-3.4.13.tar.gz
具體文件內容,請參考鏈接:
https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-2-2-0
記住,先不要把docker run起來。后面會用k8s 啟動鏡像。
四、安裝kafka_server(docker)
登錄到node2服務器
mkdir /opt/kafka_server
目錄結構如下:
./ ├── dockerfile ├── kafka_2.12-2.1.0.tgz ├── kafka_cluster_jaas.conf ├── run.sh └── sources.list
具體文件內容,請參考鏈接:
https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-3-6-0
記住,先不要把docker run起來。后面會用k8s 啟動鏡像。
五、安裝kafka_client(docker)
登錄到node2服務器
mkdir /opt/kafka_client
目錄結構如下:
./ ├── consumer.config ├── dockerfile ├── kafka_2.12-2.1.0.tgz ├── producer.config ├── reader_jaas.conf ├── run.sh ├── sources.list └── writer_jaas.conf
具體文件內容,請參考鏈接:
https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-4-6-0
記住,先不要把docker run起來。后面會用介紹如何啟動鏡像。
六、推送鏡像到私有倉庫
登錄到node2 服務器,將zookeeper和kafka_server鏡像推送到私有倉庫
docker tag zookeeper 192.168.0.89:5000/zookeeper_v1 docker push 192.168.0.89:5000/zookeeper_v1 docker tag kafka_server 192.168.0.89:5000/kafka_server_v1 docker push 192.168.0.89:5000/kafka_server_v1
七、使用k8s部署服務
zookeeper
登錄到k8s主控制服務器,新建zookeeper.yaml

apiVersion: extensions/v1beta1 kind: Deployment metadata: name: zookeeper-1 spec: replicas: 1 template: metadata: labels: name: zookeeper-1 spec: containers: - name: zookeeper-1 image: 192.168.0.89:5000/zookeeper_v1 ports: - containerPort: 2128 --- apiVersion: v1 kind: Service metadata: name: zookeeper-1 labels: name: zookeeper-1 spec: #type: NodePort ports: - name: client port: 2181 protocol: TCP #nodePort: 12182 - name: followers port: 2888 protocol: TCP - name: leader port: 3888 protocol: TCP - name: jmx port: 7071 protocol: TCP #nodePort: 17072 selector: name: zookeeper-1
kafka_server
新建文件kafka_server.yaml

apiVersion: extensions/v1beta1 kind: Deployment metadata: name: kafka-server-1 spec: replicas: 1 template: metadata: labels: name: kafka-server-1 spec: containers: - name: kafka-server-1 image: 192.168.0.89:5000/kafka_server_v1 env: - name: zookeeper value: "zookeeper-1.default.svc.cluster.local" - name: kafka valueFrom: fieldRef: fieldPath: status.podIP ports: - containerPort: 9092 --- apiVersion: v1 kind: Service metadata: name: kafka-server-1 labels: name: kafka-server-1 spec: type: NodePort ports: targetPort: 9092 protocol: TCP nodePort: 9092 selector: name: kafka-server-1
注意:這里的kafka_server的listeners地址由kafka變量決定,它是pod ip。
在之前的文章,鏈接如下:
https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-3-6-0
啟動kafka時,依賴2個變量。一個是zookeeper地址,一個是kafka監聽地址。
看下面這段,就是制定了2個變量,分別是zookeeper和kafka。它對應就是run.sh中的2個變量
env: - name: zookeeper value: "zookeeper-1.default.svc.cluster.local" - name: kafka valueFrom: fieldRef: fieldPath: status.podIP
env表示環境變量。
kafka_server.yaml無法直接獲取zookeeper的pod ip。所以使用 zookeeper-1.default.svc.cluster.local 來獲取。其中zookeeper-1對應的是zookeeper.yaml中的name,后面的值,是固定的。
要想獲取kafka_server的pod id,需要使用這種寫法
valueFrom:
fieldRef:
fieldPath: status.podIP
創建應用
kubectl create -f zookeeper.yaml --validate
kubectl create -f kafka_server.yaml --validate
等待1分鍾,查看狀態
root@k8s-master001:~# kubectl get pods -o wide NAME READY STATUS RESTARTS AGE IP NODE kafka-server-1-5c58954d49-kxgj6 1/1 Running 0 2h 192.138.150.193 k8s-node001 zookeeper-1-f84745dd8-84xr8 1/1 Running 0 2h 192.138.6.129 k8s-node002
如果啟動失敗,使用以下命令查看日志
kubectl describe po zookeeper-1-f84745dd8-84xr8
八、客戶端測試
Shell客戶端測試
使用docker run一個鏡像
docker run -it -e zookeeper=192.169.6.131 -e kafka=192.169.150.195 kafka_client
注意:-e 參數后面的ip地址要正確,就是pod ip
進入容器
docker exec -it ada31484e3d6 /bin/bash
創建一個測試topic,名為test,單分區,副本因子是1
cd /kafka_2.12-2.1.0/ bin/kafka-topics.sh --create --zookeeper 192.169.6.131:2181 --topic test --partitions 1 --replication-factor 1
配置ACL來讓writer用戶有權限寫入所有topic
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation Write --topic=*
為reader用戶設置所有topic的讀取權限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –topic=*
然后設置reader用戶訪問group的權限,-group=* 表示允許所有組
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –group=*
登錄到kafka_client,再開一個窗口。
第一個窗口進入生產者模式,輸入342
bin/writer-kafka-console-producer.sh --broker-list 192.138.150.193:9092 --topic test --producer.config config/producer.config >342
第二個窗口,運行消費者
cd /kafka_2.12-2.1.0/ bin/reader-kafka-console-consumer.sh --bootstrap-server 192.138.150.193:9092 --topic test --from-beginning --consumer.config config/consumer.config
這個時候會接收到
342
Shell腳本的客戶端,測試完成。
如果需要給writer 用戶所有權限,可以使用以下命令:
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation All --topic=*
Java 客戶端測試
在使用java 客戶端測試之前,確保客戶端能直接連接k8s 中的 pod ip。
登錄k8s 主控端,增加一條iptables規則。192.138.0.0/16是pod網段
iptables -t nat -I POSTROUTING -s 192.168.0.0/24 -d 192.138.0.0/16 -o tunl0 -j MASQUERADE
客戶端是window 10電腦,增加一條路由,確保有管理權限
route add 192.138.0.0 MASK 255.255.0.0 192.168.0.121
測試是否能夠ping通 kafka_server的ip地址
ping 192.138.150.193
使用 java客戶端的測試,代碼如下:

public void send() { String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, "writer", "writer"); Properties props = new Properties(); props.put("bootstrap.servers", "192.138.150.193:9092"); props.put("acks", "all"); props.put("batch.size", 16384); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", jaasCfg); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 20; i++) { producer.send(new ProducerRecord<String, String>("test", "game", Integer.toString(i))); } producer.close(); } public void receive() { String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate1, "reader", "reader"); Properties props = new Properties(); props.put("bootstrap.servers", "192.138.150.193:9092"); props.put("group.id", "xxx"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", jaasCfg); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } }
如果輸出0~19,則測試生產者和消費者正常。
使用Python代碼測試
先安裝模塊,本文使用的python版本為3.5.2
pip3 install kafka
新建文件kafka_client.py,代碼如下:

#!/usr/bin/env python3 # coding: utf-8 from kafka import KafkaProducer from kafka import KafkaConsumer class KafkaClient(object): # kafka客戶端程序 def __init__(self, kafka_server, port, topic): self.kafka_server = kafka_server # kafka服務器ip地址 self.port = port # kafka端口 self.topic = topic # topic名 def producer(self, username, password, content): """ 生產者模式 :param username: 用戶名 :param password: 密碼 :param content: 發送內容 :return: object """ # 連接kafka服務器,比如['192.138.150.193:9092'] producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)], security_protocol="SASL_PLAINTEXT", # 指定SASL安全協議 sasl_mechanism='PLAIN', # 配置SASL機制 sasl_plain_username=username, # 認證用戶名 sasl_plain_password=password, # 密碼 ) producer.send(self.topic, content.encode('utf-8')) # 發送消息,必須是二進制 producer.flush() # flush確保所有meg都傳送給broker # producer.close() return producer def consumer(self, username, password): """ 消費者模式 :param username: 用戶名 :param password: 密碼 :return: object """ # 連接kafka,指定組為test_group consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)], sasl_mechanism="PLAIN", security_protocol='SASL_PLAINTEXT', sasl_plain_username=username, sasl_plain_password=password, ) return consumer # for msg in consumer: # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) # print(recv) kafka_server = "192.138.150.193" port = "9092" topic = "test" ### 生產者###################################################### username = "writer" password = "writer" kafka_client = KafkaClient(kafka_server, port, topic) result = kafka_client.producer(username, password, "hello") # 發送消息hello print("生產者執行完畢!") ### 消費者###################################################### username = "reader" password = "reader" consumer = kafka_client.consumer(username, password) # 消費消息 print("消費者已執行,等待輸出結果:") for msg in consumer: # 遍歷結果 # 輸出topic,partition,offset,key,value recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(recv)
執行代碼,輸出:
生產者執行完畢! 消費者已執行,等待輸出結果: test:0:218: key=None value=b'hello'
如果出現hello,表示成功!