一、Kafka Tool使用
1、添加cluster
2、開啟SASL_PLAINTEXT
如果kafka 開啟SASL_PLAINTEXT認證(用戶名和密碼認證)
3、高級設置
如果設置的是SASL Plaintext,則必須將sasl.mechanism
客戶端屬性更改為PLAIN
。可以在“高級”部分下的“ SASL機制”文本字段中輸入此屬性。
4、JAAS配置
org.apache.kafka.common.security.plain.PlainLoginModule required username = "kafka" password = "123456";
二、腳本修改
相應的腳本也需要對應的進行修改
1、生產者
producer = KafkaProducer(
sasl_mechanism="PLAIN",
security_protocol='SASL_PLAINTEXT',
sasl_plain_username=self.username,
sasl_plain_password=self.password,
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda m: json.dumps(m).encode())
2、消費者
consumer = KafkaConsumer(self.topic,
sasl_mechanism="PLAIN",
security_protocol='SASL_PLAINTEXT',
sasl_plain_username=self.username,
sasl_plain_password=self.password,
bootstrap_servers=self.bootstrap_servers,
consumer_timeout_ms=5000,
group_id=group_id,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit
)
if consumer.bootstrap_connected():
lk_kafka = []
for message in consumer:
msg = message.value.decode()
lk_kafka.append(msg)
log.warning("數據讀取完成,數據讀取超時,自動斷開連接")
return lk_kafka
else:
log.error("連接kafka失敗,請確認連接信息是否正確")