flume消費需要kerberos認證的kafka集群
環境准備:
kerberos認證需要有三個認證相關文件:
jaas.conf krb5.conf .keytab密鑰文件(能實現密鑰文件,如果沒有該密鑰文件,jaas.conf文件中需要指明認證的用戶名及密碼)
flume版本:
需要保持和認證的kafka集群的版本一致,至少不能低於kafka集群。
查看flume source kafka接口的版本:
運行flume程序后,日志有輸出kafka的版本:
認證步驟:
1: flume配置文件:
agent.sources = s1
agent.sinks = k1
agent.channels = c1
agent.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.s1.kafka.bootstrap.servers = 192.168.133.137:9092 # 消費的kafka集群地址
agent.sources.s1.kafka.topics = test-log # topic名稱
agent.sources.s1.kafka.consumer.group.id = test
agent.sources.s1.kafka.consumer.security.protocol = SASL_PLAINTEXT
agent.sources.s1.kafka.consumer.sasl.mechanism = GSSAPI
agent.sources.s1.kafka.consumer.sasl.kerberos.service.name = kafka
agent.sources.s1.interceptors=i1
agent.sources.s1.interceptors.i1.type = static
agent.sources.s1.interceptors.i1.key=key
agent.sources.s1.interceptors.i1.value={"agent_ip":"192.168.133.130"}
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 5000
agent.sinks.k1.type = thrift
agent.sinks.k1.connect-timeout = 1000
agent.sinks.k1.request-timeout = 1000
agent.sinks.k1.hostname = 192.168.133.137
agent.sinks.k1.port = 5330
agent.sinks.k1.connect.timeout = 0
agent.sinks.k1.request.timeout = 0
2、修改flume-en.sh文件
把krb5.onf和jass.conf文件的路徑寫入JAVA_OPTS變量中。
3、修改 jass.conf文件中密鑰的路徑:
4、確認認證server域名的映射。