【調試背景】
目前測試kafka集群有兩套,版本為 0.10.x。有一套是添加了Kerberos+Sentry認證,另一套沒有添加。
現在需要通過sparkStreaming接入kafka做實時分析。
【總體結論】
實驗1:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,無Kerberos+Sentry認證,用createStream,可以從zk中獲取broker,接入成功;
實驗2:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,有Kerberos+Sentry認證,用createStream,無法zk中獲取broker,接入失敗,報空指針;
實驗3:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,有Kerberos+Sentry認證,用createDirectStream,直接設置broker,接入失敗,報EOFException;
實驗4:2.1.x版本spark的jar包,010版本的spark-streaming-kafka,有Kerberos+Sentry認證,用createDirectStream,直接設置broker,需要修改“KafkaUtils”源碼,接入成功;
PS :2.1.x版本spark的jar包,010版本的spark-streaming-kafka,無Kerberos+Sentry認證,用createDirectStream,直接設置broker,接入成功;
PS :2.1.x版本spark的jar包,010版本的spark-streaming-kafka,無Kerberos+Sentry認證,用createDirectStream,直接設置broker,接入成功;
【實驗1】可以正常運行
(1)kafka環境:無Kerberos+Sentry認證
(2)使用jar包:
(3)核心代碼:
package com.xx.kafka; import java.util.HashMap; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; public class PrintRecMsg { public static void main(String[] args) { Map<String, Integer> topicmap = new HashMap<>(); topicmap.put(args[0], 2); SparkConf sparkConf = new SparkConf().setAppName("PrintRecMsg").setMaster("local[2]"); final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); String zkQuorum ="host1:2181,host2:2181,host3:2181"; String group = "mygroup"; JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap); JavaDStream<String> msg = lines.map(new Function<Tuple2<String,String>, String>() { @Override public String call(Tuple2<String, String> tuple2) throws Exception { return tuple2._1() + "," + tuple2._2(); } }); msg.print(20); jssc.start(); jssc.awaitTermination(); } }
(4)分析:
創建的流只需要三個和Kafka有關的參數:zk集群地址,消費者組,topicMap。
createStream是走的Zookeeper去獲取對應集群broker信息,然后進行消費。
【實驗2】無法運行
(1)kafka環境:kafka+Kerberos+Sentry認證
(2)使用jar包:(同上)
(3)核心代碼:由於需要添加額外的kafka參數,因此采用了另一個“createStream”的重載方法。
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]"); final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); Map<String, Integer> topicmap = new HashMap<>(); topicmap.put("gaoweiurl", 2); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("zookeeper.connect", "host1:2181,host2:2181,host3:2181";); kafkaParams.put("group.id", "mygroup"); kafkaParams.put("auto.offset.reset", "largest"); /** 以下是和Kerberos+sentry認證相關 **/ kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); kafkaParams.put("sasl.mechanism", "GSSAPI"); kafkaParams.put("sasl.kerberos.service.name", "kafka"); System.setProperty("java.security.auth.login.config", "/xx/xx/kafka-jaas.conf"); System.setProperty("java.security.krb5.conf", "/xx/xx/krb5.conf"); JavaPairInputDStream<String, String> lines = KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap, StorageLevel.MEMORY_AND_DISK_SER_2()); JavaDStream<String> msg = lines.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) throws Exception { return tuple2._1() + "," + tuple2._2(); } }); msg.print(20); jssc.start(); jssc.awaitTermination(); }
(4)問題及分析

Zookeeper可以正常連接,而且日志顯示,已經成功通過Kerberos+sentry認證。
但是在開始消費消息的時候,一直報一個錯誤:

通過異常堆棧一行行的查找源碼,首先:
==>“at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)”
拋出了空指針異常。進入代碼發現:

說明傳入host為空。
==>“at kafka.cluster.Broker.connectionString(Broker.scala:62)”
說明是創建Broker對象的時候,調用“connectionString(host,port)”出現的。
查看這個方法,是Broker的一個成員方法

創建Broker的時候,入參host為空,那么是誰創建Broker對象呢?
==>“at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)”

這行代碼說明,是有人先創建了“brokers”,然后在執行“fetchTopicMetadata”方法的時候,
執行broker.map(_.connectionString),而brokers的“host”為空,所以空指針異常。
這里的結果已經明確了,但是,還沒有找到brokers對象創建的地方。
異常堆棧在這個地方的時候就斷了。剩下的就是找到brokers的創建位置。
接下來從下往上看異常:
==>“at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)”

線程“LeaderFinderThread”啟動之后,開始執行doWork()方法。
然后創建了brokers,然后通過“ClientUtlis.fetchTopicMetadata”觸發了前面的空指針錯誤。
那么“getAllBrokersInCluster(zkClient)”方法是怎么生成brokers的呢?
首先,這個例子是通過Zookeeper獲取broker的,這個地方基本上可以確定是通過zk獲取kafka的broker信息。
通過

進入“getAllBrokersInCluster”方法:

其中,BrokerIdspath為:

接下來,

總之,就是會讀取“brokers/ids/id”中的信息,返回brokerInfo,然后創建broker。
回到Broker,進入“createBroker”方法查看:

最終,看看配置了Kerberos+Sentry的kafka的broker信息吧,

呵呵,比較下不加Kerberos+sentry的kafka的broker信息:

呵呵。
【實驗3】無法運行
(1)kafka環境:kafka+Kerberos+Sentry認證
(2)使用jar包:(同上)
(3)核心代碼:使用createDirectStream,不用Zookeeper,直接連接broker。
import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; public class PrintDirectMsgDirect { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]"); final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); int numThreads = 2; Map<String, Integer> topicmap = new HashMap<>(); topicmap.put("gaoweiurl", numThreads); Set<String> topicSet = new HashSet<>(); topicSet.add("gaoweiurl"); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("bootstrap.servers", "node86:9092,node99:9092,node101:9092"); kafkaParams.put("metadata.broker.list", "node86:9092,node99:9092,node101:9092"); kafkaParams.put("group.id", "spark-executor-kafka_shjs_wlpt"); kafkaParams.put("auto.offset.reset", "smallest"); kafkaParams.put("enable.auto.commit", "true"); kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); kafkaParams.put("sasl.mechanism", "GSSAPI"); kafkaParams.put("sasl.kerberos.service.name", "kafka"); System.setProperty("java.security.auth.login.config", "D:\\Kerberos\\kafka-jaas.conf"); System.setProperty("java.security.krb5.conf", "D:\\Kerberos\\krb5.conf"); JavaPairInputDStream<String, String> lines= KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet); JavaDStream<String> msg = lines.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) throws Exception { return tuple2._1() + "," + tuple2._2(); } }); msg.print(20); jssc.start(); jssc.awaitTermination(); } }
(4)結果及分析

然后,百度了下,呵呵。

Spark的1.x版本不支持“Secure Kafka”。
【實驗4】失敗后成功
換了高版本的spark和sparkStreaming和sparkStreamingKafka的jar包
(1)kafka環境:kafka+Kerberos+Sentry認證
(2)使用jar包:全部是高版本
<spark.version>2.1.0</spark.version>
<spark-streaming-kafka.version>2.1.0</spark-streaming-kafka.version>
<!-- spark core 核心依賴包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark streaming 依賴包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark streaming kafka 依賴包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>${spark-streaming-kafka.version}</version>
</dependency>
(3)核心代碼:
package com.ustcinfo.ishare.bdp.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent /** * @Description: 所有spark任務的總入口 * @author: Beethoven.S * @date: 2017/9/14 13:50 * @e-mail: sheng.gang@ustcinfo.com */ object sparkJobExecutor { /** krb5.conf配置文件 **/ val KRB5_CONF: String = "D:\\Kerberos\\krb5.conf"
/** JAAS配置文件 **/ val KAFKA_JAAS_CONF: String = "D:\\Kerberos\\kafka-jaas.conf"
/** kafka broker地址,多個broker用逗號分開 **/ val KAFKA_BROKERS: String = "node86:9092,node99:9092,node101:9092" def main(args: Array[String]) { /** 添加Kerberos認證所需的JAAS配置文件到運行時環境 **/ System.setProperty("java.security.auth.login.config", KAFKA_JAAS_CONF) /** 添加krb5配置文件到運行時環境 **/ System.setProperty("java.security.krb5.conf", KRB5_CONF) val sparkConf = new SparkConf().setMaster("local[4]").setAppName("sparkStreaming") val streamingContext = new StreamingContext(sparkConf, Seconds(10)) val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array("gaoweiurl"), Map( "bootstrap.servers" -> "node86:9092,node99:9092,node101:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> "kafka_shjs_wlpt", "enable.auto.commit" -> "true", "auto.offset.reset" -> "earliest", "sasl.kerberos.service.name" -> "kafka", "security.protocol" -> "SASL_PLAINTEXT" )) ) stream.foreachRDD(kv => { println("============> ") kv.foreach(x => println("RDD==> " + x)) }) streamingContext.start() streamingContext.awaitTermination() } }
然后,在沒有數據寫入的時候,很正常,但是一旦開始接收數據之后,就會出現如下錯誤:

沒錯,sparkStreaming開始監聽並且連接的時候,用的消費者組ID確實是我代碼中配置的:

但是,一旦開始接收消息,通過RDD讀取數據的時候,groupId居然被自動添加了“spark-executor-”的前綴。
(我這個開始設置的“spark-executor-kafka_shjs_wlpt”這個就是看看是不是有了前綴就不再加的,結果,
還是自動的添加了前綴,成了“spark-executor-spark-executor-kafka_shjs_wlpt”)。
然后,在spark運行的時候發現了這句話:

找到“KafkaUtils”代碼中,追蹤到萬惡之源:

而且是,只要收到消息,創建RDD的時候會這么干:

解決方式就是覆蓋這個源碼:

注意覆蓋的時候,包的名稱路徑必須要和源碼路徑一模一樣,否則會出現scala的私有依賴引用問題。
然后再次,執行:

kafka的數據可以正常讀取。
