Kafka的認證機制
概述
- GSSAPI:使用的Kerberos認證,可以集成目錄服務,比如AD。Kafka最小版本 0.9
- PLAIN:使用簡單用戶名和密碼形式,Kafka最小版本 0.10
- SCRAM:主要解決PLAIN動態更新問題以及安全機制,Kafka最小版本 0.10.2
- OAUTHBEARER:基於OAuth 2認證框架,Kafka最小版本 2.0
配置SASL\PLAIN
創建kafka_server_jaas.conf文件
這個文件是配置Broker服務端的JAAS,進入Kafka程序目錄的config目錄中,執行下面的命令:
touch kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_rex="123456"
user_alice="123456";
user_lucy="123456";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin";
};
上面的文件配置要頂格寫。另外上面的
username="admin" password="admin"
只是定義了broker間通信使用的用戶名和密碼,但是並不是創建用戶的函數以,所以后面要寫user_admin="admin"
,來創建創建用,如果你不創建,那么broker啟動將會報驗證失敗的錯誤。
KafkaServer
字段是用來配置broker間通信使用的用戶名和密碼以及客戶端連接時需要的用戶名和密碼,其中username和password是broker用於初始化連接到其他的broker,kafka用戶為broker間的通訊。在一個Kafka集群中,這個文件的內容要一樣,集群中每個Borker去連接其他Broker的時候都使用這個文件中定義的username和password來讓對方進行認證。
而user_NAME
語句,是用來配置客戶端連接Broker時使用的用戶名和密碼,也就是新建用戶的用戶名都是以user_
開頭的,等號后面是密碼,密碼可以是明文且沒有復雜度要求。完整語句是user_USERNAME="PASSWORD"
。
Client
部分是用來設置與Zookeeper的連接的,它還允許broker設置 SASL ACL 到zookeeper 節點,鎖定這些節點,只有broker可以修改它。如果Zookeeper與Broker之間不設置認證,那么就可以不配置。
創建kafka_client_jaas.conf文件
進入Kafka程序目錄的config目錄中,執行下面的命令:
touch kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin";
};
這個文件是客戶端連接Broker使用的,username和password是用來配置客戶端連接broker的用戶,在上面配置中,客戶端使用admin用戶連接到broker。
這些客戶端包括kafka的二進制包中bin目錄下的客戶端工具以及其他外部客戶端,比如Java、Python程序等。
內部客戶端工具
kafka自帶的生產和消費客戶端工具
比如下面的客戶端工具:
- kafka-console-consumer.sh
- kafka-console-producer.sh
- kafka-consumer-groups.sh
針對kafka-console-consumer.sh
和kafka-console-producer.sh
,可以在consumer.properties
和producer.properties
,加入下面的內容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
在kafka-console-consumer.sh和kafka-console-producer.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"
目的是讓客戶端啟動時帶上身份信息,也就是它使用的用戶名和密碼
輸入命令,啟動生產者:
./kafka-console-producer.sh --broker-list x.x.x.x:9092 --topic test --producer.config ../config/producer.properties
輸入命令,啟動消費者:
./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092 --topic test --from-beginning --consumer.config ../config/consumer.properties
這兩個加上
../config/consumer.properties
這個的目的是設置認證協議。這個東西要和Broker端設置的一樣。
還有另外一種設置,我們在啟動命令行生產者和消費者時不需要引入外部文件,只需要在consumer.properties
和producer.properties
中加入下面的內容,這樣就不需要建立kafka_client_jaas.conf
文件:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
注意上面的
sasl.jaas.config
后面要加分號。
kafka-consumer-groups.sh工具
直接啟動會拒絕。針對kafka-consumer-groups.sh
工具你可以單獨寫一個配置文件,包含上面的內容,在啟動程序的時候加上一個參數引入這個文件即可,比如:
touch sasl.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
./kafka-consumer-groups.sh --bootstrap-server X.X.X.X:9092 --list --command-config ../config/sasl.properties
外部客戶端工具
Python
config._kwargs = {
"bootstrap_servers": "x.x.x.x:9092",
"client_id": "ClientId",
"group_id": "GroupID",
"enable_auto_commit": False,
"auto_offset_reset": "latest",
"key_deserializer": lambda m: json.loads(m.decode('utf-8')),
"value_deserializer": lambda m: json.loads(m.decode('utf-8')),
# 配置驗證信息
"security_protocol": "SASL_PLAINTEXT",
"sasl_mechanism": "PLAIN",
"sasl_plain_username": "rex",
"sasl_plain_password": "123456",
}
try:
self._consumer = KafkaConsumer(**config._kwargs)
except Exception as err:
print("Consumer init failed, %s" % err)
security_protocol
:與Borker通信協議,默認是PLAINTEXT
,可選值為:PLAINTEXT
, SSL
,SASL_PLAINTEXT
。
sasl_mechanism
:當security_protocol
被配置為SASL_PLAINTEXT
或者SASL_SSL
時,該參數的可用值為PLAIN
、GSSAPI
、OAUTHBEARER
。
sasl_plain_usernam
和sasl_plain_password
,設置SASL PLAIN認證使用的用戶名和密碼,這個信息就是在Broker使用的kafka_server_jaas.conf
文件中配置的。如果sasl_mechanism
設置成PLAIN
的話,這兩個必須配置。
生產者和消費者都是這么配置,至於這個用戶有什么權限,比如讀、寫等需要在服務端設置ACL。如果遇到check_version錯誤,則在配置中需要制定版本
api_version=(0,10)
。
Java
方式一:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “node01:9092”);
props.put(“acks”, “all”);
props.put(“retries”, 0);
props.put(“batch.size”, 16384);
props.put(“linger.ms”, 1);
props.put(“buffer.memory”, 33554432);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
// 這里設置SASL連接
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, “SASL_PLAINTEXT”);
props.put(SaslConfigs.SASL_MECHANISM, “PLAIN”);
props.put(“sasl.jaas.config”,
“org.apache.kafka.common.security.plain.PlainLoginModule required username=\”alice\” password=\”123456\”;”);
方式二:
System.setProperty("java.security.auth.login.config", "/tmp/kafka_client_jaas.conf"); //配置文件路徑
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
而上面那個文件的內容就是:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="123456";
};
修改Broker配置文件
在server.properties
增加如下內容:
# 配置listener
listeners=SASL_PLAINTEXT://x-x-x-x:9092
advertised.listeners=SASL_PLAINTEXT://x-x-x-x:9092
# 認證方面
# 表示Broker間通信使用SASL
security.inter.broker.protocol=SASL_PLAINTEXT
# 表示開啟PLAIN認證機制
sasl.enabled.mechanisms=PLAIN
# 表示Broker間通信也啟用PLAIN機制
sasl.mechanism.inter.broker.protocol=PLAIN
# 授權方面
# 設置身份驗證使用的類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 設置超級賬號,如果是多個需要分號分割,例如:User:admin;User:root
super.users=User:admin
# 對所有用戶topic可見,要禁用。
allow.everyone.if.no.acl.found=false
服務端的另外一種配置
Broker還可以使用sasl.jaas.config
配置JAAS。字段是:listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
修改server.propertis文件
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin" \
user_admin="admin" \
user_tom="123456" \
user_alice="123456";
listener.name
:固定格式
{listenerName}
:就是上面的sasl_plaintext
,這里和linsteners
中配置的協議要一致。
{saslMechanism}
:就是上面的plain
sasl.jaas.config
:固定格式
下面是另外一種listener.name,這種是使用sasl的scram,:
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
所謂listener
就是:listeners=SASL_PLAINTEXT://x-x-x-x:9092
就是這個語句配置的,因為你可以看到地址前面配置了協議。換句話說Broker可以配置多個驗證方式,用逗號分隔即可PLAINTEXT://localhost:9092,SSL://localhost:9091
。
注意:如果使用在
server.propertis
文件中設置JAAS的方式,那么就不用建立kafka_server_jaas.conf
文件。不過由於PLAIN形式是基於文件配置的用戶名和密碼,所以建議使用單獨的kafka_server_jaas.conf
,畢竟用戶多了會造成server.propertis
比較亂。對於SCRAM機制的話可以配置在server.propertis
里,畢竟它的用戶是存放在Zookeeper中。
修改Broker啟動腳本
編輯./kafka-server-start.sh
增加一個環節變量:
export SECURITY_OPTS="-Djava.security.auth.login.config=/opt/java/kafka/config/kafka_server_jaas.conf"
主要是讓kafka-run-class.sh
啟動腳本加載這個文件。在kafka-run-class.sh
增加下面的內容:
KAFKA_OPTS="$SECURITY_OPTS $KAFKA_OPTS"
默認kafka-run-class.sh
文件中的$KAFKA_OPTS
是空的,如下:
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS=""
fi
所以你可以直接修改這個變量為-Djava.security.auth.login.config=/opt/java/kafka/config/kafka_server_jaas.conf"
,而不需要建立SECURITY_OPTS
環境變量。
說明:如果Broker的JAAS通過在
server.properties
文件中的sasl.jaas.config
參數配置,那么就不需要修改啟動腳本。因為修改啟動腳本是為了讓它去加載外部的JAAS文件,如果這個文件的內容都在Broker啟動使用的server.properties
文件中配置了,那么在啟動的時候就無需加載外部文件了。
配置用戶的ACL
列出ACL設置
列出所有主題的ACL設置
kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.x:2181/CHROOT --list
列出指定主題的ACL設置
kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.x:2181/CHROOT --list --topic TOPIC_NAME
授權用戶
# 允許 alice 用戶讀取和寫入test主題
./kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.x:2181/CHROOT --add --allow-principal User:alice --operation Read --operation Write --topic test
# 允許GroupA組使用alice用戶對主題test讀寫操作
./kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.x:2181/CHROOT --add --allow-principal User:alice --operation Read --operation Write --topic test --group GroupA
# 允許所有用戶對某個主題有讀寫操作
./kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.x:2181/CHROOT --add --allow-principal User:* --consumer --topic test
# 拒絕某個主機使用alice賬號讀寫,允許其他所有的主機使用任何賬號進行讀寫某個主題
./kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.x:2181/CHROOT --add --allow-principal User:* --allow-host * --deny-principal User:alice --deny-host 198.51.100.3 --consumer --topic test
如果是刪除操作就把上面的 --add 改成 --remove。
--operation支持的操作有:READ、WRITE、DELETE、CREATE、ALTER、DESCRIBE、ALL,所以一般我們快捷方式授權:
- 授權給某個消費者組讀取和訂閱權限就直接使用
--consumer
參數而不使用--operation
來指定,前者就包含了允許消費者在主題上READ、DESCRIBE以及在消費者組在主題上READ。 - 授權給某個消費者組寫和訂閱權限就直接使用
--producer
參數,它包括了生產者對主題有寫和訂閱權限以及在集群上建立主題的權限。
# 快捷設置允許某個組使用某用戶對某主題讀操作
./kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.x:2181/CHROOT --add --allow-principal User:alice --consumer --topic test --group GroupA
# 快捷設置允許某用戶對某主題寫操作
./kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.x:2181/CHROOT --add --allow-principal User:alice --producer --topic test
如果授權用戶同上讀寫那么就要手動設置
--operation
參數
配置SASL\SCRAM
SASL\SCRAM都屬於SASL認證框架范圍,它主要解決了PLAIN機制中密碼銘文問題以及動態更新問題,解決動態更新用戶的機制就是用戶信息存放在Zookeeper中。
創建kafka_server_jaas.conf文件
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
這個文件的作用上面以及說過了,不過在這種SASL\SCRAM機制下,這個文件只需要配置這個部分,因為用戶信息在Zookeeper中。
修改Broker配置文件
# 設置監聽使用SASL而不是SSL
listeners=SASL_PLAINTEXT://172.16.247.100:9092
advertised.listeners=SASL_PLAINTEXT://172.16.247.100:9092
# Zk連接設置
zookeeper.connect=172.16.247.100:2181/kafka
# 認證部分
# 表示開啟SCRAM認證機制,並啟用SHA-256算法
sasl.enabled.mechanisms=SCRAM-SHA-256
# 表示Broker間通信也要開啟SCRAM認證,同樣適用SHA-256算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# 表示Broker間通信使用SASL
security.inter.broker.protocol=SASL_PLAINTEXT
#
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin";
# 授權方面
# 設置身份驗證使用的類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 設置超級賬號,如果是多個需要分號分割,例如:User:admin;User:root
super.users=User:admin
# 對所有用戶topic可見,要禁用。
allow.everyone.if.no.acl.found=false
這里我通過sasl.jaas.config
這種方式來配置,那么上面的kafka_server_jaas.conf
這個文件就可以不用,之所以寫主要是為了說明一下,如果你希望通過使用外部文件的方式那么你就需要這個文件,並且啟動時修改啟動腳本增加環境變量來引用這個文件。
請注意這里的ZK連接設置,我這里使用了chroot,172.16.247.100:2181/kafka
,指定了一個路徑,這個路徑要提前在ZK中創建,create /kafka kafka
。
說明:官網這里設置的
security.inter.broker.protocol
這個參數是SASL_SSL
,包括listeners
也是SASL_SSL
,需要說明一下如果你啟用了SSL加密通信的話你就設置,如果你沒有啟用那么就設置SASL_PLAINTEXT
就好。
修改Broker啟動腳本
如果使用外部kafka_server_jaas.conf
文件方式,則需要修改啟動腳本,把下面的環境變量添加到kafka-server-start.sh
腳本中。如果采用配置方式則無需修改腳本。
export SECURITY_OPTS="-Djava.security.auth.login.config=/work/kafka_2.11-2.3.0/config/kafka_server_jaas.conf"
export KAFKA_OPTS="$SECURITY_OPTS $KAFKA_OPTS"
創建用戶
添加三個用戶
./kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
./kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name tom
./kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name lucy
創建用戶時ZK的連接路徑也要寫上chroot,如果你使用了chroot的話。如果你kafka配置中寫了,而這里創建用戶沒有寫,那么你啟動kafka集群就會報驗證失敗的錯誤。
另外再說一下為什么要創建admin
用戶,在配置文件sasl.jaas.config
中或者是kafka_server_jaas.conf
文件中的KafkaServer
字段,定義了Broker間連接使用的用戶名和密碼,這個用戶名和密碼可以是你自己定義的,不一定是admin
,這只是一個名字。另外這里只定義了Broker連接使用的用戶名和密碼,但是不是創建,所以之后我們要創建這個用戶,所以就是上面的語句為什么要創建admin
賬號。即使是單台單實例的場景下,Broker啟動會連接ZK,如果啟用了SASL,那么它會使用配置的賬號到ZK中去驗證。/KAFKA_CHROOT/config/users/
節點下存放着用戶。
查看所有用戶
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
創建主題
創建主題的時候有兩種方式一個是使用--zookeeper
和--bootstrap-server
,前者不需要認證因為它不連接broker,但是后者需要。
創建文件kafka_client_jaas.conf
創建主題的命令也是客戶端,你broker配置了認證,那么這個客戶端工具連接broker需要攜帶認證信息,這里配置的就是客戶端使用的用戶名和密碼,這里我們就使用之前創建的admin
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
創建文件adminclient-configs.conf
內容如下:這里的設置要和broker配置的認證方式一致
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
修改kafka-topics.sh
腳本內容
添加如下內容:
export KAFKA_OPTS="-Djava.security.auth.login.config=/work/kafka_2.11-2.3.0/config/kafka_client_jaas.conf"
運行kafka-topics.sh命令
./kafka-topics.sh --list --bootstrap-server 172.16.42.100:9092 --command-config ../config/adminclient-configs.conf
命令行工具測試生產和消費
修改consumer.properties
配置文件
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="lucy" password="123456";
修改producer.properties
配置文件
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="tom" password="123456";
配置ACL
這個和之前是一致的,所以不再進行說明。
Kafka在什么時候會檢測權限
kafka授權,如果使用**非管理賬號*,程序啟動會連接broker來獲取元數據,從metadata返回的topic列表時,也是根據acl過濾過的,也就是這個賬號沒有讀取某個主題的權限,那么返回的列表中將不會有這個主題。
我們工作中有一個kafka監控程序,用於監控消費者的偏移量,使用的是一個開源的go開發的exporter,它會讀取broker的元數據以及消費__consumer_offsets
主題。我們使用的非管理賬號,在授權的時候我們給這個賬號授予所有主題的讀取權限,因為通過讀取__consumer_offsets
獲取主題的消費者偏移量,也會受到你所使用賬號是不是對某個主題有讀取權限的限制。所以如果使用非管理賬號那么最好的辦法就是授權該賬號對所有主題具有讀權限。