kafka-producer kerberos 原理和配置


kerberos簡單介紹

kerberos這一名詞來源於希臘神話“三個頭的狗---地獄之門守護者”后來沿用作為安全認證的概念,該系統設計上

采用客戶端/服務器結構與DES(Data Encryption Standard標准加密技術),AES(Advanced Encryption Standerd

高級加密技術)等加密技術,並且能夠進行相互認證,即客戶端和服務端均可對對方進行身份認證。可以防止竊聽、

防止replay攻擊、保護數據完整性等場合,是一種應對對稱密鑰體制進行密鑰管理的系統。

基本概念

票據授權票(TGT Ticket Granting):

用於應用程序與KDC(Key Distribution Center 密鑰分發中心)服務器建立安全會話的票據,票據授權票存在有效期,

當票據授權票失效后,應用側需要重新建立與KDC服務器的安全會話。會話有效期為24小時,不可配置。

服務票據(ST Service Ticket):

用於應用程序與服務端建立安全會話的票據,服務票據存在有效期,當服務票據失效后,應用側需要重新建立於

服務端的安全會話。默認有效期5分鍾,不可配置。

 

kafka kerberos 配置

詳見網絡說明在這里不再贅述:http://orchome.com/500

 

kafka kerberos producer客戶端配置

1)使用配置文件kafka-console-producer.sh生產數據

cat kafka_client_jaas.conf 文件配置

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=false
useKeyTab=true
keyTab="/hbase/test.keytab"
principal="test@KERBEROS.TEST"
serviceName="kafka"
Client=true;
};

 

 

 配置環境變量KAFKA_OPTS 舉例說明

 

export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/kafka_client_jaas_acl.conf"

 

 cat producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

 生產腳本kafka-console-producer.sh

./kafka-console-producer.sh --broker-list hostname:9092 --topic TEST_ACL1 --producer.config producer.properties

 

 kafka kerberos producer客戶端JAVA代碼設置

 

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

/**
 * Created by Administrator on 2018-05-24.
 */
public class mykafkaprocucerwithkerberos {

    private static final String BROKER_LIST = "127.0.0.1:9093";


    public static void main(String[] args) throws InterruptedException {

        String rootPath = System.getProperty("user.dir");
        System.setProperty("java.security.krb5.conf", rootPath + "/src/main/resources/krb5.conf");
        System.setProperty("java.security.auth.login.config",rootPath + "/src/main/resources/kafka_client_jaas.conf");

        Properties props1 = new Properties();
        Producer<String, Object> producer1=null;
        props1.put("bootstrap.servers", BROKER_LIST);
        props1.put("security.protocol","SASL_PLAINTEXT");
        props1.put("sasl.mechanism","GSSAPI");
        props1.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer1= new KafkaProducer<String,Object>(props1);
        int line = 1;
        while (line <=4) {
            ProducerRecord<String, Object>    message1= new ProducerRecord<String, Object>("TEST_ACL1","TEST_TOPIC_DATA");
            producer1.send(message1, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if( e!=null){
                        e.printStackTrace();
                        System.out.println("failed");
                    }else {
                        System.out.println(recordMetadata.topic());
                    }
                }
            });
            line++;
        }
        producer1.close();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM