目錄
kafka(2.2.1)(kerberos+LDAP+Sentry)訪問使用
一.訪問的kafka的一些配置(已集成kerberos )
由於kafka集成了kerberos 所以需要通過kerberos的認證
認證方式有兩種
- 1.通過配置文件
- 2.通過keytab文件
我們這里采用第一種
首先先在目錄/usr/local/kafka_client下創建兩個文件一個是client.properties,一個是jaas.conf
在client.properties文件里面寫入
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
group.id=testgroup
在jaas.conf寫入
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
之后在shell命令行執行一下命令來配置環境變量(這樣只針對當前進程有效)
[root@cdh-datanode03 kafka_client]# export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_client/jaas.conf"
[root@cdh-datanode03 kafka_client]# echo $KAFKA_OPTS
在執行kinit命令登陸kerberos用戶
二.Shell 命令行使用Kafka(已集成sentry)
1.創建kafka topic
[root@cdh-datanode03 kafka_client]# kafka-topics --create --zookeeper cdh-master01:2181 --replication-factor 1 --partitions 1 --topic testTopic
2.查看Topic列表
[root@cdh-datanode03 kafka_client]# kafka-topics --zookeeper cdh-master01:2181 --list
3.刪除Topic
[root@cdh-datanode03 kafka_client]# kafka-topics --delete --zookeeper cdh-master01:2181 --topic testTopic
4.向Topic生產數據(需要權限)
[root@cdh-datanode03 kafka_client]# kafka-console-producer --broker-list cdh-datanode03:9092,cdh-datanode04:9092 --topic testTopic --producer.config /usr/local/kafka_client/client.properties
5.消費Topic數據(需要權限)
[root@cdh-datanode03 kafka_client]# kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 --consumer.config /usr/local/kafka_client/client.properties
此時會報以下錯誤 表示沒有權限向testTopicTopic 寫入數據此時我們需要給我們kinit登陸的用戶賦予權限
ERROR internals.ErrorLoggingCallback: Error when sending message to topic testTopic with key: null, value: 3 bytes with error:
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [testTopic]
我們以fayson用戶為例 它屬於user組(id+用戶名 查看組)
- 1.我們需要首先創建一個kafka的principle
- 2.我們給user用戶組賦權可以寫入數據到testTopic,注意需要使用管理員kafka用戶登錄Kerberos才能進行操作
[root@cdh-datanode03 kafka_client]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka@GREE.IO
Valid starting Expires Service principal
09/11/2019 20:47:25 09/12/2019 20:47:25 krbtgt/GREE.IO@GREE.IO
renew until 09/18/2019 20:47:25
- 3.創建一個role
[root@cdh-datanode03 kafka_client]# kafka-sentry -cr -r kafka_role
- 4.給kafka_role賦予寫入testTopic權限
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe"
- 5.將角色加入到user組下面
[root@cdh-datanode03 kafka_client]# kafka-sentry -arg -r kafka_role -g user
- 6.以fayson用戶登錄(輸入密碼)
[root@cdh-datanode03 kafka_client]# kinit fayson
之后以此用戶寫入testTopic 就不會報權限問題了
此時我們還需要給 fayson 用戶賦予讀取testTopic的權限,所以需要給kafka_role賦予讀取testtopic的權限
- 1.我們在上面完成的基礎之上需要對kafka_role角色賦予讀取testTopic 的權限
- 2.執行以下命令需要使用kafka 用戶
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=zhcTestTopic->action=read"
三.代碼訪問(java)
需要創建consumer.properties,producer.properties,jaas.conf文件 還要引入krb5.conf文件
producer.properties文件內容
bootstrap.servers=cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092
#實現了Serializer接口的序列化類。用於告訴kafka如何序列化key
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#告訴kafka如何序列化value
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=1
#訪問kerberos的kafka client 配置
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
consumer.properties文件內容
bootstrap.servers=cdh-datanode04:9092
group.id=testgroup1
enable.auto.commit=true
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
jaas.conf文件內容
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
storeKey=true
renewTicket=true
keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
principal="gree1@GREE.IO";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
principal="gree1@GREE.IO";
};
- 1.pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1-cdh6.3.0</version>
</dependency>
- 2.producer
package producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.util.Properties;
class MyProducer {
private static final MyProducer Instance = new MyProducer();
private MyProducer() {
}
public static MyProducer getInstance() {
return Instance;
}
public int messageNo = 1;
/**
* 獲得一個Kafka生產者實例
*
* @return
*/
public KafkaProducer Produce() {
System.setProperty("java.security.auth.login.config", "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
Properties props = new Properties();
try {
props.load(this.getClass().getResourceAsStream("/producer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
KafkaProducer producer = new KafkaProducer(props);
return producer;
}
}
public class ProducerStarter implements Runnable {
private int threadIndex;
public ProducerStarter(int threadIndex) {
this.threadIndex = threadIndex;
}
/**
* 生產數據
*/
public void run() {
MyProducer pro = MyProducer.getInstance();
KafkaProducer prod = pro.Produce();
String topic = "testTopic";
int i = 0;
while (1 == 1) {
final int index = i++;
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
prod.send(new ProducerRecord<String, String>(topic,String.valueOf(index), String.valueOf(i)), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
System.out.println("message send to partition " + recordMetadata.partition() + /*value*/ ": hello word " + index);
}
});
prod.flush();
//sleep 1min
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 啟動200個線程,生產
*
* @param args
*/
public static void main(String args[]) {
for (int i = 0; i < 1; i++) {
System.out.println("啟動線程:" + i);
Thread thread = new Thread(new ProducerStarter(i));
thread.start();
}
}
}
- 3.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
public class ConsumerStarter {
public static void main(String[] args) throws InterruptedException {
KafkaConsumer consumer = Consumer.getInstance().Consume();
consumer.subscribe(Arrays.asList("testTopic"));
//消費並打印消費結果
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord record: records) {
System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
}
Thread.sleep(1000);
}
}
}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Properties;
/**
* Created by 260212 on 2018/4/12.
* Author:HarSenZhao
* 描述:
*/
class Consumer {
private static final Consumer Instance=new Consumer();
private Consumer(){}
public static Consumer getInstance(){
return Instance;
}
/**
* 獲得一個Kafka消費者
* kafka-clients版本要高於0.9.0.1,否則會取出為null
* @return
*/
public KafkaConsumer Consume (){
System.setProperty("java.security.auth.login.config", "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
Properties props=new Properties();
try {
props.load(this.getClass().getResourceAsStream("/consumer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
KafkaConsumer consumerSelf=new KafkaConsumer<String,String>(props);
return consumerSelf;
}
}