Spark Streaming 讀取Kafka數據寫入ES


簡介: 
目前項目中已有多個渠道到Kafka的數據處理,本文主要記錄通過Spark Streaming 讀取Kafka中的數據,寫入到Elasticsearch,達到一個實時(嚴格來說,是近實時,刷新時間間隔可以自定義)數據刷新的效果。

應用場景: 
業務庫系統做多維分析的時候,數據來源各不相同。很多歷史數據都是每天定時跑批生成。但是做分析產品,對於T+0日的數據, 則不好取。對於T+0日的數據,目前我采取的解決方案就是Spark Streaming 讀取Kafka寫入到Elasticsearch,業務系統通過查詢歷史數據和T+0日數據,得到一個數據實時展示的效果。


先介紹一下內容涉及的幾個版本:

<java.version>1.8</java.version> <spark.version>1.6.2</spark.version> <scala.version>2.10.6</scala.version> <elasticsearch.version>5.2.0</elasticsearch.version> <kafka.version>1.0</kafka.version>
  • 1
  • 2
  • 3
  • 4
  • 5

下面是Spring boot搭建的項目結構:

這里寫圖片描述

之前學習的時候,參考的spark版本1.6.2,kafka版本是0.8的,但是后面自己做項目的kafka版本是1.0的。我把對應的kafka_2.10-0.8.2.1.jar改成kafka_2.10-0.10.0.0.jar 但是遇到了下面的這個異常:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) at scala.util.Either$RightProjection.flatMap(Either.scala:523) at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85) at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179) at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161) at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) at scala.util.Either$RightProjection.flatMap(Either.scala:523) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at com.midea.magiccube.spark.LoanInfoStatistic.getActionDStream(LoanInfoStatistic.java:210) at com.midea.magiccube.spark.LoanInfoStatistic.main(LoanInfoStatistic.java:69)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

主要內容是:java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker,經過一番了解后,初步估計是kafka版本和spark版本不兼容,於是我又將版本回退,發現能夠跑通。

pom.xml內容如下:

    <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.7.RELEASE</version> <relativePath /> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.7</java.version> <spark.version>1.6.2</spark.version> <scala.version>2.10.6</scala.version> <elasticsearch.version>5.2.0</elasticsearch.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <!-- spark 相關 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${spark.version}</version> </dependency> <!-- spark to es 相關支持 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-13_2.10</artifactId> <version>${elasticsearch.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>

接下來就是開發具體的Spark Streaming 讀寫的代碼了。

  1. 配置SparkConf對象並初始化es配置參數。

    SparkConf sc = new SparkConf();
    sc.setAppName("Name").setMaster("local[2]");
    sc.set("es.nodes", IP);
    sc.set("es.index.auto.create", "true");
    sc.set("es.mapping.id", "id");
    sc.set("es.port", PORT);
  2. 綁定sc參數,並設置循環取數時間間隔為5s

    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(5));
    jssc.checkpoint("E:/checkpoint");
  3. 設置kafka配置信息,KafkaUtils.createDirectStream()方法讀取信息得到 JavaPairDStream< String,String>對象dStream。

  4. dStream.mapToPair()解析kafka數據並封裝成JavaPairDStream< String, 自定義實體> entityDStream對象。

  5. entityDStream.transform()將數據轉化為JavaDStream dataDStream方便寫入ES。

  6. 接着將數據寫入ES,JavaEsSparkStreaming.saveToEs(dataDStream, “索引名”);

  7. 最后啟動和關閉對象JavaStreamingContext jssc

    jssc.start();
    jssc.awaitTermination();
    jssc.close();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM