Kafka 入門實戰(4)--開啟 Kerberos 認證


本主要介紹在 Kafka 中如何配置 Kerberos 認證,文中所使用到的軟件版本:Java 1.8.0_191、Kafka 2.13-2.4.1、Kerberos 1.15.1。

1、Kerberos 安裝

要使用 Kerberos 服務,需先安裝 Kerberos,安裝方法可參考:Kerberos 入門實戰(2)--Kerberos 安裝及使用

2、Zookeeper 開啟 Kerberos 認證

Zookeeper 開啟 Kerberos 認證的方法參見:Zookeeper 入門實戰(4)--開啟 Kerberos 認證

3、Kafka 開啟 Kerberos 認證

假設 Kafka 集群信息如下:

ip 主機名 描述
10.49.196.10 pxc1 zookeeper、kafka
10.49.196.11 pxc2 zookeeper、kafka
10.49.196.12 pxc3 zookeeper、kafka

3.1、創建 keytab

在安裝 Kerberos 的機器上進入 kadmin(Kerberos 服務端上使用 kadmin.local,安裝了 Kerberos Client 的機器上可以使用 kadmin),然后執行如下命令分別創建服務端和客戶端的 keytab:

kadmin.local:  add_principal -randkey kafka-server/pxc1@ABC.COM
kadmin.local:  add_principal -randkey kafka-server/pxc2@ABC.COM
kadmin.local:  add_principal -randkey kafka-server/pxc3@ABC.COM
kadmin.local:  add_principal -randkey kafka-client@ABC.COM
kadmin.local:  xst -k /root/zk-server.keytab kafka-server/pxc1@ABC.COM
kadmin.local:  xst -k /root/zk-server.keytab kafka-server/pxc2@ABC.COM
kadmin.local:  xst -k /root/zk-server.keytab kafka-server/pxc3@ABC.COM
kadmin.local:  xst -k /root/zk-client.keytab kafka-client@ABC.COM

3.2、配置

3.2.1、配置 hosts 文件

10.49.196.10    pxc1
10.49.196.11    pxc2
10.49.196.12    pxc3

3.2.2、Kerberos 相關配置

拷貝 krb5.conf 及 keytab 文件到所有安裝 Kafka 的機器,這里把文件都放到 Kafka 的 config/kerveros 目錄下(kerberos 目錄需新建)。

2.2.3、Kafka 服務端配置

A、修改 config/server.properties,增加如下配置:

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka-server

B、新建 kafka-server-jaas.conf 文件,該文件也放到 Kafka 的 config/kerveros 目錄下。

KafkaServer {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server.keytab"
   storeKey=true
   useTicketCache=false
   principal="kafka-server/pxc1@ABC.COM";  #不同的主機,需修改為本機的主機名
};

//Zookeeper client authentication
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server.keytab"
   storeKey=true
   useTicketCache=false
   principal="kafka-server/pxc1@ABC.COM"; #不同的主機,需修改為本機的主機名
};

C、修改 bin/kafka-server-start.sh 腳本,倒數第二行增加如下配置:

export KAFKA_OPTS="-Dzookeeper.sasl.client=true -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server-jaas.conf"

如果 Zookeeper 沒有開啟 Kerberos 認證,則這里 zookeeper.sasl.client 可設為 false;僅僅啟用 Kafka 的 Kerberos 認證。

3.2.4、Kafka 客戶端配置

該配置主要為了使用 bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 等命令。

A、新建 client.properties 文件,該文件也放到 Kafka 的 config/kerveros 目錄下。

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka-server

B、新建 kafka-client-jaas.conf 文件,該文件也放到 Kafka 的 config/kerveros 目錄下。

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-client.keytab"
   storeKey=true
   useTicketCache=false
   principal="kafka-client@ABC.COM";
};

C、修改 bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 腳本,倒數第二行增加如下配置:

export KAFKA_OPTS="-Djava.security.krb5.conf=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-client-jaas.conf"

3.3、啟動並測試

配置完成后就可以啟動 Kafka 集群了:

cd /home/hadoop/app/kafka_2.13-2.4.1/bin
./kafka-server-start.sh -daemon ../config/server.properties

啟動完成后可以使用 bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 來測試:

查看 topic 信息:

cd /home/hadoop/app/kafka_2.13-2.4.1/bin
./kafka-topics.sh --list --bootstrap-server 10.49.196.10:9092 --command-config ../config/kerberos/client.properties

發送消息:

cd /home/hadoop/app/kafka_2.13-2.4.1/bin
./kafka-console-producer.sh --broker-list 10.49.196.10:9092 --topic test --producer.config ../config/kerberos/client.properties

接受消息:

cd /home/hadoop/app/kafka_2.13-2.4.1/bin
./kafka-console-consumer.sh --bootstrap-server 10.49.196.10:9092 --topic test --from-beginning --consumer.config ../config/kerberos/client.properties

3.4、java 程序連接 Zookeeper

java 可以使用 JAAS 來進行 Kerberos 認證,需要 JAAS 配置文件、keytab 文件及 Kerberos 配置文件。

A、配置文件

JAAS 配置文件(kafka-client-jaas.conf):

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client.keytab"
   storeKey=true
   useTicketCache=false
   principal="kafka-client@ABC.COM";
};

keytab 文件:

從 Kerberos 服務器上拷貝到目標機器,拷貝路徑即為 JAAS 配置中間配置的路徑:D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client.keytab。

Kerberos 配置文件(krb5.conf):

從 Kerberos 服務器上拷貝 /etc/krb5.conf 到目標機器即可。

B、配置 hosts 文件

在 hosts 文件中添加:

10.49.196.10    pxc1
10.49.196.11    pxc2
10.49.196.12    pxc3

C、引入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.1</version>
</dependency>

D、樣例程序

package com.abc.demo.general.kafka;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Collection;
import java.util.Properties;

public class KafkaKerberos {
    private AdminClient adminClient;

    @Before
    public void before() {
        System.setProperty("java.security.auth.login.config", "D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client-jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\krb5.conf");

        Properties props = new Properties();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092,10.49.196.11:9092,10.49.196.12:9092");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka-server");
        adminClient = AdminClient.create(props);
    }

    @After
    public void after() {
        adminClient.close();
    }

    @Test
    public void listTopics() throws Exception {
        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
        //是否羅列內部主題
        listTopicsOptions.listInternal(true);
        ListTopicsResult result = adminClient.listTopics(listTopicsOptions);
        Collection<TopicListing> list = result.listings().get();
        System.out.println(list);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM