kafka 加密通信,python-kafka 訪問加密服務器


1、證書准備

 1 #!/bin/bash
 2 
 3 PASSWORD=pwd123
 4 ADDR=kafka-single
 5 ITEM_NAME=localhost 
6
CAROOT_NAME=CARoot 7 8 #Step 0: 創建保存目錄 9 mkdir -p /usr/ca/{root,server,client,trust,py-kfk} && cd /usr/ca/ 10 #修改hosts文件 11 if [ $(awk "BEGIN{flag=0}{if(match(\$0,/$ADDR/)){flag=1}}END{print flag}" /etc/hosts) -eq 0 ];then 12 echo 127.0.0.1 $ADDR >> /etc/hosts 13 fi 14 15 #Step 1 生成服務端秘鑰庫 16 KEYSTORE_S=/usr/ca/server/server.keystore.jks 17 keytool -keystore $KEYSTORE_S -alias $ITEM_NAME -validity 365 -keyalg RSA -genkey -keypass $PASSWORD -storepass $PASSWORD -dname "CN=SX, OU=SX, O=sx, L=SZ, ST=GD, C=CN" 1819 20 #Step 2 生成CA根證書、秘鑰 21 CA_KEY=/usr/ca/root/ca-key 22 CA_CERT=/usr/ca/root/ca-cert 23 openssl req -new -x509 -keyout $CA_KEY -out $CA_CERT -days 365 -passout pass:$PASSWORD -subj "/C=CN/ST=Guangdong/O=SX/OU=sx/CN=KAFKA-SERVER/emailAddress=sx@sx.com" 24 25 #Step 3 通過CA證書創建一個受服務端、客戶端信任的證書 26 TRUSTSTORE_S=/usr/ca/trust/server.truststore.jks 27 TRUSTSTORE_C=/usr/ca/trust/client.truststore.jks 28 echo y | keytool -keystore $TRUSTSTORE_S -alias $CAROOT_NAME -import -file $CA_CERT -storepass $PASSWORD 29 echo y | keytool -keystore $TRUSTSTORE_C -alias $CAROOT_NAME -import -file $CA_CERT -storepass $PASSWORD 30 31 #Step 3 導出服務端證書 server-cert-file 32 SERVER_CERT_FILE=/usr/ca/server/server-cert-file 33 keytool -keystore $KEYSTORE_S -alias $ITEM_NAME -certreq -file $SERVER_CERT_FILE -storepass $PASSWORD 34 35 #Step 4 使用CA根證書秘鑰給服務端證書簽名 36 SERVER_CERT_SIGNED=/usr/ca/server/server-cert-signed 37 openssl x509 -req -CA $CA_CERT -CAkey $CA_KEY -in $SERVER_CERT_FILE -out $SERVER_CERT_SIGNED -days 365 -CAcreateserial -passin pass:$PASSWORD 38 39 #Step 5 將CA根證書導入服務端倉庫 40 echo y | keytool -keystore $KEYSTORE_S -alias $CAROOT_NAME -import -file $CA_CERT -storepass $PASSWORD 41 42 #Step 6 將簽名后的服務端證書導入服務端秘鑰庫 43 keytool -keystore $KEYSTORE_S -alias $ITEM_NAME -import -file $SERVER_CERT_SIGNED -storepass $PASSWORD 44 45 46 #2.導出 pem 格式的證書供 kafka-python 使用 47 48 #查看服務端秘鑰庫中的秘鑰證書 49 keytool -list -rfc -keystore $KEYSTORE_S -storepass $PASSWORD 50 51 #導出CA簽名后的服務端證書 52 SERVER_CERT_SIGNED_PEM_FORMAT=/usr/ca/py-kfk/server_signed.pem 53 CAROOT_PEM_FORMAT=/usr/ca/py-kfk/CARoot.pem 54 keytool -exportcert -alias $ITEM_NAME -keystore $KEYSTORE_S -storepass $PASSWORD -rfc -file $SERVER_CERT_SIGNED_PEM_FORMAT 55 keytool -exportcert -alias $CAROOT_NAME -keystore $KEYSTORE_S -storepass $PASSWORD -rfc -file $CAROOT_PEM_FORMAT

 

2. 導出 python-kafka 所需證書

1 #導出 pem 格式的證書供 kafka-python 使用
2 #查看服務端秘鑰庫中的秘鑰證書
3 #keytool -list -rfc -keystore $KEYSTORE_S
4 
5 #導出CA簽名后的服務端證書
6 SERVER_CERT_SIGNED_PEM_FORMAT=/usr/ca/py-kfk/server_signed.pem
7 CAROOT_PEM_FORMAT=/usr/ca/py-kfk/CARoot.pem
8 keytool -exportcert -alias $ITEM_NAME -keystore $KEYSTORE_S -storepass $PASSWORD -rfc -file $SERVER_CERT_SIGNED_PEM_FORMAT 
9 keytool -exportcert -alias $CAROOT_NAME -keystore $KEYSTORE_S -storepass $PASSWORD -rfc -file $CAROOT_PEM_FORMAT

3. 修改kafka服務器配置

修改 server.properties 配置
listeners=PLAINTEXT://localhost:9092,SASL_SSL://kafka-single:9093
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=pwd123
ssl.key.password=pwd123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=pwd123
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.secure.random.implementation=SHA1PRNG
security.inter.broker.protocol=SSL  #只使用SSL

#使用SASL/PLAIN + TLS 時
#security.inter.broker.protocol=SASL_SSL  
#sasl.mechanism.inter.broker.protocol=PLAIN
#sasl.enabled.mechanisms=PLAIN

4. 啟動 zookeeper
~/kafka # bin/zookeeper-server-start.sh config/zookeeper.properties &

5. 啟動 broker
~/kafka # bin/kafka-server-start.sh config/server.properties

 

6. python-kafka 連接服務器,若使用 PLAIN 文本

sasl_mechanism="PLAIN" 即可,SCRAM-SHA-512 需服務器做相應配置
 1 # -*- coding: utf-8 -*-
 2 from kafka import KafkaConsumer, KafkaProducer
 3 import kafka
 4 import ssl
 5 import logging
 6 import time
 7 import json
 8 
 9 
10 bootstrap_servers = 'kafka-single:9093'
11 topic='test'
12 
13 ssl_certfile = "/usr/ca/py-kfk/certificate.pem"
14 ssl_cafile = "/usr/ca/py-kfk/CARoot.pem"
15 
16 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
17 context.verify_mode = ssl.CERT_NONE
18 context.check_hostname = False
19 context.load_verify_locations(ssl_cafile)
20 
21 sasl_mechanism="SCRAM-SHA-512"
22 
23 username="custom"
24 pwd="pwd123"
25 
26 producer = KafkaProducer(
27     bootstrap_servers=bootstrap_servers,
28     acks='all',
29     retries=1,
30     api_version=(0, 11, 0, 3),
31     ssl_check_hostname=False,
32     security_protocol="SASL_SSL",
33     ssl_context=context,
34     sasl_mechanism=sasl_mechanism,
35     sasl_plain_username=username,
36     sasl_plain_password=pwd
37 )
38 print producer.bootstrap_connected()
39 print "send message"
40 for i in range(1):
41     producer.send(topic,'localhost test')
42     producer.flush()
43     producer.close()

 

#########################################
7.SASL/PLAIN 用戶名秘密配置

配置 Kafka Broker
在 Kafka broker 的 config 目錄中, 添加一個類似於下面的適當修改過的 JAAS 文件. 在這個例子中, 讓我們將它命名為

kafka_server_jaas.conf:

KafkaServer {

            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="admin"
            password="admin-secret"
            user_admin="admin-secret"
            user_custom="pwd123";
};

#將其路徑作為JVM參數,下面行也可直接加入啟動腳本中去 bin/kafka-server-start.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=~/kafka/config/kafka_server_jaas.conf"
前台啟動broker (啟動前記得修改 server.properties 使其支持 SASL/PLAIN + TLS)
~/kafka # bin/kafka-server-start.sh config/server.properties

8.kafka 的SCRAM機制依賴zookeeper存儲憑證,在zk中創建:如配置使用 SASL_SSL/SCRAM 時
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=pwd123],SCRAM-SHA-512=[password=pwd123]' --entity-type users --entity-name custom

# jaas.conf 如下:KafkaServer {

        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="cumstom"
        password="pwd123";
    };

執行報錯

rror while executing config command org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /config/changes/config_change_

org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /config/changes/config_change_

原因ZK缺少節點,登錄zk 后創建節點

bash bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 0] create /config changes
Node already exists: /config
[zk: localhost:2181(CONNECTED) 1] create /config/changes config_change_

 

然后再執行


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM