kafka 添加SASL鑒權


kafka 版本信息:kafka_2.12-2.3.0

使用kafka自帶的zookeeper啟動

bin/zookeeper-server-start.sh config/zookeeper.properties

kafka啟動:

bin/kafka-server-start.sh config/server.properties

nohup bin/kafka-server-start.sh config/server.properties > logs/server-start.log 2>&1 &
這是后台啟動,不加nohup,日志會輸出到控制台上面去。其中,server-start.log是自己寫的一個log文件,在原有的文件logs下面是沒有的。

配置SASL步驟:

1.修改bin/server.properties文件

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://ip:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=SASL_PLAINTEXT://ip:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
其他值未做任何改變

2.新建config/kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice";
};
新建config/kafka_client_jaas.conf --此步可不建,因我要使用控制台去消費數據

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin";
};

3.修改bin/kafka-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_server_jaas.conf"
fi

修改bin/kafka-console-consumer.sh  kafka-console-producer.sh


if [ "x$KAFKA_OPTS" ]; then
 export KAFKA_OPTS="-Djava.security.auth.login.config=/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"
fi

修改config/consumer.properties producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

此紅色部分實際可不用修改,因我要使用控制台消費數據

4.控制台啟動消費者

bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.8:9092 --topic test --from-beginning --consumer.config config/consumer.properties

5.java 應用充當生產者:

public class TestDemo {

public static void main(String[] args) throws Exception {
System.setProperty("java.security.auth.login.config", "/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"); // 環境變量添加,需要輸入配置文件的路徑

Properties props = new Properties();
props.put("bootstrap.servers", "ip:9092");
// props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
// props.put("linger.ms", 1);
// props.put("buffer.memory", 33554432);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// props.put("partitioner.class", HashPartitioner.class.getName());
// props.put("interceptor.classes", EvenProducerInterceptor.class.getName());

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
// for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("test", null, "hello world 20190909 fox"));
producer.close();
}

}



7.結果:能正常發送,正常消費
zhufei@SilverRiver:~/software/kafka_2.12-2.3.0$ bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test --from-beginning --consumer.config config/consumer.properties
hello world 20190909 fox

8.kafka文檔中有寫在生產環境中使用SASL/PLAIN的方式,即配置
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.CustomPlainServerCallbackHandler
其中listener.name為固定值,sasl_plaintext為listenername的值,可從listeners = listener_name://host_name:port中獲得填入的listener_name,並且全小寫.
后面的plain.sasl.server.callback.handler.class也為固定值。
com.example.CustomPlainServerCallbackHandler 為自己實現的callbackhandler類。
自己實現的類,可以制作成jar包,放入kafka的libs目錄中,然后重啟kafka即可生效。




 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM