使用kafka-python客戶端進行kafka kerberos認證


    之前說過python confluent kafka客戶端做kerberos認證的過程,如果使用kafka python客戶端的話同樣也可以進行kerberos的認證,具體的認證機制這里不再描述,主要敘述配置認證的過程

    需要的模塊有下面這些:

    kafka-python:https://pypi.org/project/kafka-python/

    gssapi:https://pypi.org/project/gssapi/

    decorator:https://pypi.org/project/decorator/

    six:https://pypi.org/project/six/

    kerberos環境

    kafka python開啟GSSAPI需要模塊gssapi的支持,而gssapi模塊需要依賴於decorator模塊和six模塊,但是安裝時不會校驗和提示,如果不安裝的話kafka python運行是會提示找不到gssapi lib,真正的原因還是因為decorator或者six沒有安裝,這里要注意. 

    首先安裝decorator和six這兩個模塊.

    然后安裝gssapi模塊,安裝的時候要確保decorator和six模塊正常安裝並且kerberos需要的開發包正常安裝,否則gssapi會編譯失敗,安裝kerberos庫可以使用yum命令如下:

yum install krb5-server krb5-libs krb5-auth-dialog

    然后編譯並安裝gssapi,這里是gssapi-1.6.1.tar.gz,安裝如下:

tar -xvzf gssapi-1.6.1.tar.gz
cd gssapi-1.6.1
python3 setup.py build
python3 setup.py install
cd ..

    完成之后要退出源碼目錄,因為導入模塊可能會出現沖突,然后進入python解釋器,測試一下模塊的安裝情況:

from gssapi.raw.misc import GSSError

    如果導入模塊沒問題,則說明gssapi安裝成功. 

    最后直接安裝kafka-python模塊即可. 

    然后可以開始測試python腳本認證是否正常,注意執行之前要先kinit保證klist有對應的用戶,然后再使用下面的代碼調試:

#!/usr/bin/env python3
# coding=utf-8
import time

from kafka import KafkaProducer
from kafka import KafkaConsumer

def kafka_python_producer_main():
    producer = KafkaProducer(bootstrap_servers='192.168.0.3:9092,192.168.0.4:9092,192.168.0.5:9092',
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='GSSAPI',
                             sasl_kerberos_service_name='kafka',
                             sasl_kerberos_domain_name='hadoop.hadoop.com',
                             sasl_plain_username='kafkaclient')
    producer.send('testTopic', 'kafka python test'.encode('utf-8'))
    producer.flush()
    producer.close()
    print('done')

def kafka_python_consumer_main():
    consumer = KafkaConsumer('testTopic',
                             bootstrap_servers='192.168.0.3:9092,192.168.0.4:9092,192.168.0.5:9092',
                             group_id='kafka-test-20191014',
                             auto_offset_reset='earliest',
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='GSSAPI',
                             sasl_kerberos_service_name='kafka',
                             sasl_kerberos_domain_name='hadoop.hadoop.com',
                             sasl_plain_username='kafkaclient')
    for msg in consumer:
        print(msg.value)
        print(msg.partition)

if __name__ == '__main__':
    kafka_python_producer_main()
    time.sleep(1)
    kafka_python_consumer_main()

    然后執行腳本測試,如果生產和消費消息都正常,說明kafka kerberos認證成功. 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM