Kafka ACL使用實戰(單機版)


一、簡介

自0.9.0.0.版本引入Security之后,Kafka一直在完善security的功能。當前Kafka security主要包含3大功能:認證(authentication)、信道加密(encryption)和授權(authorization)。信道加密就是為client到broker、broker到broker以及工具腳本與broker之間的數據傳輸配置SSL;認證機制主要是指配置SASL,而授權是通過ACL接口命令來完成的。

生產環境中,用戶若要使用SASL則必須配置Kerberos,但對於一些小公司而言,他們的用戶系統並不復雜(特別是專門為Kafka集群服務的用戶可能不是很多),顯然使用Kerberos有些大材小用,而且由於運行在內網環境,SSL加密也不是很必要。因此一個SASL+PLAINTEXT的集群環境足以應付一般的使用場景。本文給出一個可運行的實例來演示一下如何在不使用Kerberos的情況下配置SASL + ACL來構建secured Kafka集群。

在開始之前,我們簡單學習下Kafka ACL的格式。根據官網的介紹,Kafka中一條ACL的格式如下:“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”。它們的含義描述如下:

  • principal:表示一個Kafka user
  • operation:表示一個具體的操作類型,如WRITE, READ, DESCRIBE等。完整的操作列表詳見:http://docs.confluent.io/current/kafka/authorization.html#overview
  • Host:表示連向Kafka集群的client的IP地址,如果是‘*’則表示所有IP。注意:當前Kafka不支持主機名,只能指定IP地址
  • Resource:表示一種Kafka資源類型。當前共有4種類型:TOPIC、CLUSTER、GROUP、TRANSACTIONID

下面我使用Kafka 2.12-2.1.0版本來演示下如何構建支持SASL + PLAINTEXT + ACL的Kafka集群環境。

 

二、實戰環境

環境說明:

操作系統 服務器地址 Dockerd地址 角色 軟件版本
ubuntu-16.04.5-server-amd64 192.168.91.128 10.0.128.2 zookeeper 3.4.13
ubuntu-16.04.5-server-amd64 192.168.91.129 10.0.129.2 Kafka_server 2.12-2.1.0
ubuntu-16.04.5-server-amd64 192.168.91.131 10.0.131.2 Kafka_client 2.12-2.1.0

 

 

 

 

 

 

這3台服務器的docker容器,務必要可以相互通信。關於3台服務器的docker如何通訊,請參考鏈接:

https://www.cnblogs.com/xiao987334176/p/10049844.html#autoid-4-5-2

里面有詳細的過程,使用一鍵腳本即可。本文就是在這個環境上,操作的!

 

架構圖:

 

只需要在Kafka_server 設置ACL規則就可以了。主要針對topic 做權限驗證!創建讀寫用戶進行驗證。

客戶端可以隨意創建topic,但是向topic里面讀寫內容,就需要做驗證了!

 

三、安裝zookeeper(docker)

登錄到zookeeper服務器,創建空目錄

mkdir /opt/zookeeper

創建以下文件

 

dockerfile

FROM ubuntu:16.04
# 修改更新源為阿里雲
ADD sources.list /etc/apt/sources.list
ADD zookeeper-3.4.13.tar.gz /
ADD zoo.cfg / 
# 安裝jdk
RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all && \ 
    cd /zookeeper-3.4.13 && \
    mkdir data log && \
    mv /zoo.cfg conf

EXPOSE 2181
# 添加啟動腳本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]
View Code

 

run.sh

#!/bin/bash

cd /zookeeper-3.4.13/
bin/zkServer.sh start

tail -f NOTICE.txt
View Code

 

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main

deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main

deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe

deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
View Code

 

zoo.cfg

tickTime=2000
dataDir=/zookeeper-3.4.13/data
dataLogDir=/zookeeper-3.4.13/log
clientPort=2181
View Code

 

目錄結構如下:

./
├── dockerfile
├── run.sh
├── sources.list
├── zoo.cfg
└── zookeeper-3.4.13.tar.gz

 

創建鏡像

docker build -t zookeeper /opt/zookeeper

 

運行zookeeper

docker run -d -it -p 2181:2181 zookeeper

 

四、安裝Kafka_server(docker)

登錄到Kafka_server服務器,創建空目錄

mkdir /opt/kafka_server

 

dockerfile

FROM ubuntu:16.04
# 修改更新源為阿里雲
ADD sources.list /etc/apt/sources.list
ADD kafka_2.12-2.1.0.tgz /
ADD kafka_cluster_jaas.conf /
# 安裝jdk
RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all && \
    cd /kafka_2.12-2.1.0 && \
    mv /kafka_cluster_jaas.conf config/ && \
    sed -i '$ s/^/#&/g' bin/kafka-server-start.sh && \
    sed -i '$ a\exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf kafka.Kafka "$@"' bin/kafka-server-start.sh

EXPOSE 9092
# 添加啟動腳本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]
View Code

 

說明:kafka依賴java環境,最后2行的sed命令表示,先把最后一行注釋掉,添加一行新內容。指定一個配置文件,下面會說到。

 

kafka_cluster_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
};
View Code

 

說明:

要配置SASL和ACL,我們需要在broker端進行兩個方面的設置。首先是創建包含所有認證用戶信息的JAAS文件。本例中,我們假設有3個用戶:admin, reader和writer,其中admin是管理員,reader用戶讀取Kafka集群中topic數據,而writer用戶則負責向Kafka集群寫入消息。我們假設這3個用戶的密碼分別與用戶名相同(在實際場景中,管理員需要單獨把密碼發給各自的用戶),因此編寫JAAS文件就是上面的內容

 

run.sh

#!/bin/bash

if [ -z $zookeeper ];then
    zookeeper=`cat /etc/hosts | tail -1 | awk '{print $1}'`
fi

if [ -z $kafka ];then
    kafka=`cat /etc/hosts | tail -1 | awk '{print $1}'`
fi


cd /kafka_2.12-2.1.0
sed -i "123s/localhost/$zookeeper/" /kafka_2.12-2.1.0/config/server.properties

echo "

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
listeners=SASL_PLAINTEXT://$kafka:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin

" >> /kafka_2.12-2.1.0/config/server.properties

# 啟動kafka
bin/kafka-server-start.sh config/server.properties


# 設置訪問權限
# 配置ACL來讓writer用戶有權限寫入topic
#bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=$zookeeper:2181 --add --allow-principal User:writer --operation Write --topic test

# 為reader用戶設置test topic的讀權限
# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=$zookeeper:2181 --add --allow-principal User:reader --operation Read --topic test
# 然后設置訪問group的權限
# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=$zookeeper:2181 --add --allow-principal User:reader --operation Read --group test-group
View Code

 

說明:

-z 用來判斷變量是否存在,不存在時,使用docker的IP地址。讀取/etc/hosts最后一行,就是docker ip地址。

要配置SASL和ACL,需要更改server.properties才行,至少需要配置(或修改)以下這些參數:

# 配置ACL入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 本例使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
# 指定SASL安全協議 security.inter.broker.protocol= SASL_PLAINTEXT
# 配置SASL機制 sasl.mechanism.inter.broker.protocol
=PLAIN
# 啟用SASL機制 sasl.enabled.mechanisms
=PLAIN # 設置本例中admin為超級用戶 super.users=User:admin

 

run.sh中,下面有幾個ACL規則。在下面內容中,就介紹到!

 

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main

deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main

deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe

deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
View Code

 

目錄結構如下:

./
├── dockerfile
├── kafka_2.12-2.1.0.tgz
├── kafka_cluster_jaas.conf
├── run.sh
└── sources.list

 

創建鏡像

docker build -t kafka_server /opt/kafka_server

 

啟動kafka

docker run -it -p 9092:9092 -e zookeeper=10.0.128.2 -e kafka=10.0.129.2 kafka_server

 

注意:這里有一個 -e 參數。咦,這個參數是干啥的呢?我第一見它,也一臉懵逼。

ok,它就是用來指定環境變量的。

怎么使用呢?先來一個小例子,你就明白了!

再開一個窗口,啟動 alpine 鏡像,指定變量 愛好=美女

root@ubuntu:~# docker run -it -e hobby=beauty alpine
/ # echo $hobby
beauty
/ #

 

看到沒,指定-e 之后,在docker里面,可以直接使用這個變量。在docker鏡像里面,它就是一個全局變量。

那么在run.sh 這個shell腳本中,就可以直接調用了!

 

那為什么要用-e參數呢?在這篇文章,鏈接如下:

https://www.cnblogs.com/xiao987334176/p/10037395.html

啟動kafka時,使用了 --net=host 參數,也就是直接使用真實主機的IP地址。這樣才實現了kafka客戶端和server端的通訊。

但是,在k8s里面發布kafka服務時,不允許這樣。要使用docker自己的ip地址才行!因此,在kafka服務器容器啟動之前,就給它傳一個參數,使它能夠正常啟動!

 

五、安裝Kafka_client(docker)

本文直接使用kafka壓縮包里面的shell腳本,作為客戶端使用。在生產環境中,是用java代碼,作為客戶端使用的。或者還有其他語言,比如go,php等...

 

登錄到Kafka_client服務器,創建空目錄

mkdir /opt/kafka_client

 

consumer.config

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-group

 

這個文件是 消費者 配置文件,表示使用SASL協議,用戶組id為test-group

 

dockerfile

FROM ubuntu:16.04
# 修改更新源為阿里雲
ADD sources.list /etc/apt/sources.list
# 添加tgz文件會自動解壓,自動刪除tgz文件
ADD kafka_2.12-2.1.0.tgz /
ADD consumer.config /kafka_2.12-2.1.0/config/
ADD producer.config /kafka_2.12-2.1.0/config/
ADD reader_jaas.conf /kafka_2.12-2.1.0/config/
ADD writer_jaas.conf /kafka_2.12-2.1.0/config/
# 安裝jdk
RUN apt-get update && apt-get install -y openjdk-8-jdk vim --allow-unauthenticated && apt-get clean all
    
#EXPOSE 9092
# 添加啟動腳本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]
View Code

 

說明:客戶端的配置,統一放在cnofig目錄。使用時,指定一下配置文件,就可以了!

 

producer.config

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 

這個是 生產者 配置文件,指定使用SASL協議

 

reader_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="reader"
password="reader";
};

 

說明,這個是只讀用戶的配置文件,所有連接到broker和broker,都需要驗證。參數為:用戶名和密碼

 

run.sh

#!/bin/bash

if [ -z $zookeeper ];then
    zookeeper=`cat /etc/hosts | tail -1 | awk '{print $1}'`
fi

if [ -z $kafka ];then
    kafka=`cat /etc/hosts | tail -1 | awk '{print $1}'`
fi

# 進入工作目錄
cd /kafka_2.12-2.1.0

# 生產者
cp bin/kafka-console-producer.sh bin/writer-kafka-console-producer.sh
# 最后一行注釋掉,添加#
sed -i '$ s/^/#&/g' bin/writer-kafka-console-producer.sh
# 最后一行添加內容
sed -i '$ a\exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/writer_jaas.conf kafka.tools.ConsoleProducer "$@"' bin/writer-kafka-console-producer.sh

# 消費者
cp bin/kafka-console-consumer.sh bin/reader-kafka-console-consumer.sh
sed -i '$ s/^/#&/g' bin/reader-kafka-console-consumer.sh
sed -i '$ a\exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/reader_jaas.conf kafka.tools.ConsoleConsumer "$@"' bin/reader-kafka-console-consumer.sh

# 創建一個測試topic,名為test,單分區,副本因子是1
#bin/kafka-topics.sh --create --zookeeper $zookeeper:2181 --topic test --partitions 1 --replication-factor 1

# 運行生產者
# bin/writer-kafka-console-producer.sh --broker-list $kafka_server:9092 --topic test --producer.config config/producer.config

# 運行消費者
# bin/reader-kafka-console-consumer.sh --bootstrap-server $kafka_server:9092 --topic test --from-beginning --consumer.config config/consumer.config


tail -f bin/writer-kafka-console-producer.sh
View Code

 

 說明:由於是測試,所以復制了啟動腳本,然后做修改。最后一行,指定了配置文件。

 

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main

deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main

deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe

deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
View Code

 

writer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="writer";
};
View Code

 

說明:這個是寫入用戶的配置

 

目錄結構如下:

./
├── consumer.config
├── dockerfile
├── kafka_2.12-2.1.0.tgz
├── producer.config
├── reader_jaas.conf
├── run.sh
├── sources.list
└── writer_jaas.conf

 

創建鏡像

docker build -t kafka_client /opt/kafka_client

 

啟動kafka_client

docker run -it -e zookeeper=10.0.128.2 -e kafka=10.0.129.2 kafka_client

 

六、測試ACL

登錄到kafka_client,查看容器進程

root@ubuntu:~# docker ps
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS        NAMES
09853dfb8891        kafka_client        "/run.sh"           28 seconds ago      Up 27 seconds        elegant_wescoff

 

進入容器,創建一個測試topic,名為test,單分區,副本因子是1

root@ubuntu:~# docker exec -it 09853dfb8891 /bin/bash
root@09853dfb8891:/# cd /kafka_2.12-2.1.0/
root@09853dfb8891:/kafka_2.12-2.1.0# bin/kafka-topics.sh --create --zookeeper 10.0.128.2:2181 --topic test --partitions 1 --replication-factor 1
Created topic "test".

 

下面我們先來啟動一個console-consumer和一個console-producer來看下當前是個什么狀況:

輸入haha

root@09853dfb8891:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list 10.0.129.2:9092 --topic test
>haha
[2018-12-02 13:47:46,194] WARN [Producer clientId=console-producer] Connection to node -1 (/10.0.129.2:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-12-02 13:47:46,247] WARN [Producer clientId=console-producer] Connection to node -1 (/10.0.129.2:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
...

 

發現報錯了,producer無法工作,是因為kafka設置了security的緣故

 

kafka ACL配置

登錄到kafka_server服務器,進入容器,執行命令

配置ACL來讓writer用戶有權限寫入topic

cd /kafka_2.12-2.1.0/
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.0.128.2:2181 --add --allow-principal User:writer --operation Write --topic test

 

輸出:

Adding ACLs for resource `Topic:LITERAL:test`: 
     User:writer has Allow permission for operations: Write from hosts: * 

Current ACLs for resource `Topic:LITERAL:test`: 
     User:writer has Allow permission for operations: Write from hosts: *

 

為reader用戶設置test topic的讀權限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.0.128.2:2181 --add --allow-principal User:reader --operation Read --topic test

 

輸出:

Adding ACLs for resource `Topic:LITERAL:test`: 
     User:reader has Allow permission for operations: Read from hosts: * 

Current ACLs for resource `Topic:LITERAL:test`: 
     User:writer has Allow permission for operations: Write from hosts: *
    User:reader has Allow permission for operations: Read from hosts: *

 

然后設置訪問group的權限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.0.128.2:2181 --add --allow-principal User:reader --operation Read --group test-group

 

輸出:

Adding ACLs for resource `Group:LITERAL:test-group`: 
     User:reader has Allow permission for operations: Read from hosts: * 

Current ACLs for resource `Group:LITERAL:test-group`: 
     User:reader has Allow permission for operations: Read from hosts: *

 

登錄到kafka_client,再開一個窗口。

2個窗口,都進入到容器里面

第一個窗口進入生產者模式,輸入342

bin/writer-kafka-console-producer.sh --broker-list 10.0.129.2:9092 --topic test --producer.config config/producer.config
>342

 

第二個窗口,運行消費者

cd /kafka_2.12-2.1.0/
bin/reader-kafka-console-consumer.sh --bootstrap-server 10.0.129.2:9092 --topic test --from-beginning --consumer.config config/consumer.config

 

這個時候會接收到

342

 

 

大功告成!

上面的測試,只是針對topic為test設置ACL規則。假設kafka服務器有上百個topic,需要對所有topic設置ALC,可以使用--topic=*

比如允許寫用戶操作所有topic

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.0.128.2:2181 --add --allow-principal User:writer --operation Write --topic=*

 

 

本文參考鏈接:

https://www.cnblogs.com/huxi2b/p/7382144.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM