kafka(2.2.1)(kerberos+LDAP+Sentry)訪問使用


kafka(2.2.1)(kerberos+LDAP+Sentry)訪問使用

一.訪問的kafka的一些配置(已集成kerberos )

由於kafka集成了kerberos 所以需要通過kerberos的認證

認證方式有兩種

  • 1.通過配置文件
  • 2.通過keytab文件

我們這里采用第一種

首先先在目錄/usr/local/kafka_client下創建兩個文件一個是client.properties,一個是jaas.conf

在client.properties文件里面寫入

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
group.id=testgroup

在jaas.conf寫入

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};

之后在shell命令行執行一下命令來配置環境變量(這樣只針對當前進程有效)

[root@cdh-datanode03 kafka_client]# export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_client/jaas.conf"
[root@cdh-datanode03 kafka_client]# echo $KAFKA_OPTS

在執行kinit命令登陸kerberos用戶

二.Shell 命令行使用Kafka(已集成sentry)

1.創建kafka topic

[root@cdh-datanode03 kafka_client]# kafka-topics --create --zookeeper cdh-master01:2181 --replication-factor 1 --partitions 1 --topic testTopic

2.查看Topic列表

[root@cdh-datanode03 kafka_client]# kafka-topics --zookeeper cdh-master01:2181 --list

3.刪除Topic

[root@cdh-datanode03 kafka_client]# kafka-topics --delete --zookeeper cdh-master01:2181 --topic testTopic

4.向Topic生產數據(需要權限)

[root@cdh-datanode03 kafka_client]# kafka-console-producer --broker-list cdh-datanode03:9092,cdh-datanode04:9092 --topic testTopic --producer.config /usr/local/kafka_client/client.properties

5.消費Topic數據(需要權限)

[root@cdh-datanode03 kafka_client]# kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 --consumer.config /usr/local/kafka_client/client.properties

此時會報以下錯誤 表示沒有權限向testTopicTopic 寫入數據此時我們需要給我們kinit登陸的用戶賦予權限

ERROR internals.ErrorLoggingCallback: Error when sending message to topic testTopic with key: null, value: 3 bytes with error:
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [testTopic]

我們以fayson用戶為例 它屬於user組(id+用戶名 查看組)

  • 1.我們需要首先創建一個kafka的principle
  • 2.我們給user用戶組賦權可以寫入數據到testTopic,注意需要使用管理員kafka用戶登錄Kerberos才能進行操作
[root@cdh-datanode03 kafka_client]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka@GREE.IO

Valid starting       Expires              Service principal
09/11/2019 20:47:25  09/12/2019 20:47:25  krbtgt/GREE.IO@GREE.IO
	renew until 09/18/2019 20:47:25
  • 3.創建一個role
[root@cdh-datanode03 kafka_client]#  kafka-sentry -cr -r kafka_role
  • 4.給kafka_role賦予寫入testTopic權限
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe"
  • 5.將角色加入到user組下面
[root@cdh-datanode03 kafka_client]#  kafka-sentry -arg -r kafka_role -g user
  • 6.以fayson用戶登錄(輸入密碼)
[root@cdh-datanode03 kafka_client]# kinit fayson

之后以此用戶寫入testTopic 就不會報權限問題了

此時我們還需要給 fayson 用戶賦予讀取testTopic的權限,所以需要給kafka_role賦予讀取testtopic的權限

  • 1.我們在上面完成的基礎之上需要對kafka_role角色賦予讀取testTopic 的權限
  • 2.執行以下命令需要使用kafka 用戶
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=zhcTestTopic->action=read"

三.代碼訪問(java)

需要創建consumer.properties,producer.properties,jaas.conf文件 還要引入krb5.conf文件

producer.properties文件內容

bootstrap.servers=cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092
#實現了Serializer接口的序列化類。用於告訴kafka如何序列化key
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#告訴kafka如何序列化value
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=1
#訪問kerberos的kafka client 配置
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

consumer.properties文件內容

bootstrap.servers=cdh-datanode04:9092
group.id=testgroup1
enable.auto.commit=true
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

jaas.conf文件內容

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  doNotPrompt=true
  useKeyTab=true
  storeKey=true
  renewTicket=true
  keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
  principal="gree1@GREE.IO";
};


Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
  principal="gree1@GREE.IO";
};
  • 1.pom.xml
    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.2.1-cdh6.3.0</version>
      </dependency>
  • 2.producer
package producer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.util.Properties;

class MyProducer {
    private static final MyProducer Instance = new MyProducer();

    private MyProducer() {
    }

    public static MyProducer getInstance() {
        return Instance;
    }

    public int messageNo = 1;

    /**
     * 獲得一個Kafka生產者實例
     *
     * @return
     */
    public KafkaProducer Produce() {
        System.setProperty("java.security.auth.login.config",             "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
       
        Properties props = new Properties();
        try {
            props.load(this.getClass().getResourceAsStream("/producer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        KafkaProducer producer = new KafkaProducer(props);
        return producer;
    }
}

public class ProducerStarter implements Runnable {
    private int threadIndex;

    public ProducerStarter(int threadIndex) {
        this.threadIndex = threadIndex;
    }

    /**
     * 生產數據
     */

    public void run() {
        MyProducer pro = MyProducer.getInstance();
        KafkaProducer prod = pro.Produce();
        String topic = "testTopic";
        int i = 0;
        while (1 == 1) {
            final int index = i++; 
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            prod.send(new ProducerRecord<String, String>(topic,String.valueOf(index), String.valueOf(i)), new Callback() {

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    }
                    System.out.println("message send to partition " + recordMetadata.partition() + /*value*/ ": hello word " + index);
                }
        });
            prod.flush();
            //sleep 1min
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 啟動200個線程,生產
     *
     * @param args
     */
    public static void main(String args[]) {

        for (int i = 0; i < 1; i++) {
            System.out.println("啟動線程:" + i);
            Thread thread = new Thread(new ProducerStarter(i));
            thread.start();
        }
    }
}

  • 3.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


import java.util.Arrays;

public class ConsumerStarter {
    public static void main(String[] args) throws InterruptedException {

        KafkaConsumer consumer = Consumer.getInstance().Consume();
        consumer.subscribe(Arrays.asList("testTopic"));
        //消費並打印消費結果
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record: records) {
                System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
            }
            Thread.sleep(1000);
        }
    }
}
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Properties;

/**
 * Created by 260212 on 2018/4/12.
 * Author:HarSenZhao
 * 描述:
 */
class Consumer {
    private static final Consumer Instance=new Consumer();
    private Consumer(){}
    public static Consumer getInstance(){
        return Instance;
    }

    /**
     * 獲得一個Kafka消費者
     * kafka-clients版本要高於0.9.0.1,否則會取出為null
     * @return
     */
    public KafkaConsumer Consume (){
        System.setProperty("java.security.auth.login.config",  "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
        Properties props=new Properties();
        try {
            props.load(this.getClass().getResourceAsStream("/consumer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        KafkaConsumer consumerSelf=new KafkaConsumer<String,String>(props);
        return consumerSelf;
    }
}



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM