一、kafka broker配置以及sh客戶端的使用
最近在使用kafka集群的過程中,為了保證安全性,配置了ssl加密,首先按照官網的配置進行如下的設置
#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
這個流程下來,會生成以下文件:
- server.keystore.jks 存儲證書的密鑰庫文件
- ca-key CA密鑰,需要妥善保存
- ca-cert CA證書(包含公鑰)
- cert-file 當前未簽名的證書
- cert-signed 當前已簽名的證書
- client.truststore.jks 客戶端信任存儲區
- server.truststore.jks 服務端信任存儲區
需要將server.keystore.jks server.truststore.jks 給每一個broker復制一份
kafka server.conf配置
listeners=SSL://XXXXXX:9092
ssl.keystore.location=/home/admin/ssl/server.keystore.jks
ssl.keystore.password=XXXXX
ssl.key.password=XXXXX
ssl.truststore.location=/home/admin/ssl/server.truststore.jks
ssl.truststore.password=XXXXXX
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.secure.random.implementation=SHA1PRNG
security.inter.broker.protocol=SSL
kafka consumer配置
security.protocol=SSL
ssl.truststore.location=/home/admin/ssl/client.truststore.jks
ssl.truststore.password=XXXXXX
kafka producer配置
security.protocol=SSL
ssl.truststore.location=/home/admin/ssl/client.truststore.jks
ssl.truststore.password=XXXXXX
二、python客戶端配置
首先需要生產python客戶端可用的證書格式:
keytool -list -rfc -keystore server.keystore.jks
keytool -exportcert -alias localhost -keystore server.keystore.jks -rfc -file certificate.pem
keytool -exportcert -alias CARoot -keystore server.keystore.jks -rfc -file CARoot.pem
本例使用的是python的客戶端:kafka-python==1.4.2, kakfa由於版本是v1.1.0,所以參數設置的時候需要加入 api_version = (1, 1)
ssl_certfile = "certificate.pem"
ssl_cafile = "CARoot.pem"
producer = KafkaProducer(bootstrap_servers=address, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retries=retries, api_version = (1, 1), request_timeout_ms=1000, ssl_check_hostname=False, ssl_certfile=ssl_certfile, security_protocol="SSL", ssl_cafile=ssl_cafile)
成功