Kafka安裝及開啟SASL_PLAINTEXT認證(用戶名和密碼認證)


前些日子要封裝一個kafka的客戶端驅動,配置了下kafka環境,發現配置復雜度完爆rabbitmq很多倍啊,而且發布訂閱模式使用起來也很麻煩,可能就勝在分布式了吧。

kafka需要java環境,自行安裝java sdk 1.8+.

http://kafka.apache.org/downloads

官方加載安裝包,當前為kafka_2.11-1.1.0.tgz

(新版kafka包內集成了zookeeper,直接啟動即可)

解壓縮后kafka目錄下文件:

 

配置:

  config路徑下配置文件:

    consumer.properites 消費者配置,這個配置文件用於配置開啟的消費者,默認即可

    producer.properties 生產者配置,這個配置文件用於配置開啟的生產者,默認即可

    server.properties kafka服務器的配置,此配置文件用來配置kafka服務器,核心配置如下:

     (1)broker.id 申明當前kafka服務器在集群中的唯一ID,需配置為integer,並且集群中的每一個kafka服務器的id都應是唯一的,默認即可

     (2)listeners 申明此kafka服務器需要監聽的端口號,如果是在本機上跑虛擬機運行可以不用配置本項,默認會使用localhost的地址,如果是在遠程服務器上運行則必須配置,

        例如:listeners=PLAINTEXT:// 192.168.1.1:9092。並確保服務器的9092端口能夠訪問

     (3)zookeeper.connect 申明kafka所連接的zookeeper的地址 ,需配置為zookeeper的地址,由於本次使用的是kafka高版本中自帶zookeeper,默認即可,

        例如:zookeeper.connect=localhost:2181

啟動zookeeper:

  bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台運行)

啟動kafka:

  bin/kafka-server-start.sh config/server.properties &

創建一個名為test的topic:

  bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已經創建的topic:

  bin/kafka-topics.sh --list --zookeeper localhost:2181

更改tioic分區:

  bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --alter --partitions 4

查看指定topic信息:

  bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe

創建一個消息消費者:

  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

  (如果彈出WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available,需將上文中localhost改為隊列配置的地址)

創建一個消息生產者:

  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  出現 > 符號后輸入消息發送,消費者連接可以看到輸出。

 (如果彈出WARN [Consumer clientId=consumer-1, groupId=console-consumer-59948] Connection to node -1 could not be established. Broker may not be available,

  需將上文中localhost改為隊列配置的地址)

Kafka安裝完成。

 

Kafka開啟使用 SASL_PLAINTEXT認證:

  輸入下面命令,關閉kafka:

    bin/kafka-server-stop.sh

  輸入下面命令,關閉zookeeper:

    bin/zookeeper-server-stop.sh

   進入config目錄,增加如下配置文件:

    cd config

    (1)touch kafka_server_jaas.conf

      配置如下:

        KafkaServer {

            org.apache.kafka.common.security.plain.PlainLoginModule required

            username="admin"

            password="admin"

            user_admin="admin"

            user_alice="alice";

        };

      在KafkaServer部分,username和password是broker用於初始化連接到其他的broker,在上面配置中,admin用戶為broker間的通訊,

      user_userName定義了所有連接到 broker和 broker驗證的所有的客戶端連接包括其他 broker的用戶密碼,user_userName必須配置admin用戶,否則報錯。  

    (2)touch kafka_cilent_jaas.conf

      配置如下:

        KafkaClient {

                org.apache.kafka.common.security.plain.PlainLoginModule required

                username="admin"

                password="admin";

        };

      在KafkaClient部分,username和password是客戶端用來配置客戶端連接broker的用戶,在上面配置中,客戶端使用admin用戶連接到broker。 

  更改server.properties配置文件:

    listeners=SASL_PLAINTEXT://ip:9092

    # 使用的認證協議 

    security.inter.broker.protocol=SASL_PLAINTEXT

    #SASL機制 

    sasl.enabled.mechanisms=PLAIN  

    sasl.mechanism.inter.broker.protocol=PLAIN   

    # 完成身份驗證的類 

    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 

     # 如果沒有找到ACL(訪問控制列表)配置,則允許任何操作。 

    #allow.everyone.if.no.acl.found=true

    super.users=User:admin

  修改consuer和producer的配置文件consumer.properties和producer.properties,分別增加如下配置:

    security.protocol=SASL_PLAINTEXT

    sasl.mechanism=PLAIN

 

  切換到kafka目錄下bin路徑:

    cd ..

    cd bin

   JAAS文件作為每個broker的jvm參數,在kafka-server-start.sh腳本中增加如下配置(可在最上面):

    export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_server_jaas.conf"

   在kafka-console-consumer.sh和kafka-console-producer.sh中添加:

    export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"

    

  啟動zookeeper和kafka:

    bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台運行)

    bin/kafka-server-start.sh config/server.properties &

   輸入命令,啟動生產者:

    bin/kafka-console-producer.sh --broker-list 10.100.17.79:9092 --topic test --producer.config config/producer.properties

   輸入命令,啟動消費者

    bin/kafka-console-consumer.sh --bootstrap-server 10.100.17.79:9092 --topic test --from-beginning --consumer.config config/consumer.properties

 

  如果消費者啟動不了或者無法消費指定topic,嘗試設置所使用用戶的組權限(當前使用用戶admin為超級用戶,不需要配置權限):

    利用kafka-acls.sh為topic設置ACL:

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal  User:admin  --group test-consumer-group --topic test

  注意,如果admin要作為消費端連接alice-topic的話,必須對其使用的group(test-consumer-group)也賦權(group 在consumer.properties中有默認配置group.id)

  權限操作例子:

    增加權限:

      # 為用戶 alice 在 test(topic)上添加讀寫的權限

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:alice --operation Read --operation Write --topic test

      # 對於 topic 為 test 的消息隊列,拒絕來自 ip 為192.168.1.100賬戶為 zhangsan 進行 read 操作,其他用戶都允許

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:* --allow-host * --deny-principal User:zhangsan

       --deny-host 192.168.1.100 --operation Read --topic test

      # 為 zhangsan 和 alice 添加all,以允許來自 ip 為192.168.1.100或者192.168.1.101的讀寫請求

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:zhangsan --allow-principal User:alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test

    獲取權限列表:

      # 列出 topic 為 test 的所有權限賬戶

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --list --topic test

    移除權限:

      # 移除 acl

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --remove --allow-principal User:zhangsan --allow-principal User:Alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test

 

以上完成Kafka的SASL_PLAINTEXT認證。

 

多節點zookeeper下認證:

  增加配置文件:

    touch kafka_zoo_jaas.conf

    配置如下:

      ZKServer{

        org.apache.kafka.common.security.plain.PlainLoginModule required

          username="admin"

          password="admin"

          user_admin="admin";

      };

    配置應用名稱為ZKServer,zookeeper默認使用的JAAS應用名稱是Server(或者zookeeper)。

  設置的環境變量其實只是傳入到JVM的參數。這里設置的環境變量是KAFKA_OPTS。修改zookeeper的啟動腳本zookeeper-server-start.sh如下:

    export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf  -Dzookeeper.sasl.serverconfig=ZKServer"

  如果配置使用默認名稱,則只需要添加:

    export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf“”

   修改zookeeper.properties配置文件,增加配置:

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

    requireClientAuthScheme=sasl

    jaasLoginRenew=3600000

 

以上配置完成后可啟動嘗試多節點zookeeper身份認證

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM