SBT 構建 spark streaming集成kafka (scala版本)


前言:

    
     最近在研究spark 還有 kafka , 想通過kafka端獲取的數據,利用spark streaming進行一些計算,但搭建整個環境着實不易,故特此寫下該過程,分享給大家,希望大家可以少走點彎路,能幫到大家!

 

環境准備:

        操作系統 : ubuntu14.04 LTS

      hadoop 2.7.1   偽分布式搭建

      sbt-0.13.9

      kafka_2.11-0.8.2.2

      spark-1.3.1-bin-hadoop2.6

      scala 版本 : 2.10.4

     

      注: 請重視版本問題,之前作者用的是spark-1.4.1 ,scala版本是2.11.7  結果作業提交至spark-submit 總是失敗,所以大家這點注意下!             

     

                   hadooop 2.7.1 偽分布式搭建 大家可以參照 http://www.wjxfpf.com/2015/10/517149.html

 

    kafka安裝與測試:        

  1. 到官網http://kafka.apache.org/downloads.html 下載 kafka_2.11-0.8.2.2.tgz    
  2. 進入下載目錄,打開終端,輸入以下命令,將其解壓至 /usr/local 目錄: sudo tar -xvzf   kafka_2.11-0.8.2.2.tgz -C /usr/local
  3. 敲入用戶密碼后,kafka 成功解壓,繼續輸入以下命令:
    1. cd  /usr/local    跳轉至/usr/local/ 目錄;
    2. sudo  chmod 777 -R  kafka_2.11-0.8.2.2   獲得該目錄的所有執行權;  gedit  ~/.bashrc  打開個人配置 末尾添加 export KAFKA_HOME=/usr/local/kafka_2.11-0.8.2.2 
      export PATH=$PATH:$KAFKA_HOME/bin
    3. 保存,終端輸入 source ~/.bashrc

 

kafka 有其自帶默認的zookeeper 所以省去了我們一些功夫,現在可以開始測試下kafka:

  • 新建終端輸入 cd $KAFKA_HOME 進入kafka 目錄    (為了方便,我們稱此終端為1號終端)
  • bin/zookeeper-server-start.sh config/zookeeper.properties &   后台運行zookeeper

  •  

    bin/kafka-server-start.sh  config/server.properties & 后台啟動kafka-server 
  • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test    新建一個叫test的topic 
  • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test   Kafka提供了一個命令行的工具,可以從輸入文件或者命令行中讀取消息並發送給Kafka集群。每一行是一條消息。
  • 開啟一個新的終端(為了方便,我們稱此終端為2號終端),並進入kafka 目錄 ,輸入:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 
  • 現在,在1號終端輸入HAHA,如果2號終端能輸出HAHA,說明kafka測試成功!

 

 

SBT構建一個關於單詞計數的scala程序

  • 新建一個文件夾,命名為spark_kafka      
  • 進入spark_kafka,按/src/main/scala/KafkaDemo.scala層級目錄   新建KafkaDemo.scala
  • 在spark_kafka目錄下 新建project 目錄 在project下新建plugins.sbt
  • 在spark_kafka目錄下新建assembly.sbt
  • 最后,你所看到的目錄結構如下 
      • spark_kafka/
      • spark_kafka/src
      • spark_kafka/src/main
      • spark_kafka/src/main/scala
      • spark_kafka/src/main/scala/KafkaDemo.scala
      • spark_kafka/project
      • spark_kafka/project/plugins.sbt
      • spark_kafka/assembly.sbt
                            

其中,KafkaDemo.scala 代碼如下

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object KafkaDemo {
    def main(args: Array[String]) {
  val zkQuorum = "127.0.0.1:2181"
  val group = "test-consumer-group"
  val topics = "test"
  val numThreads = 2
  val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
  val ssc =  new StreamingContext(sparkConf, Seconds(10))
  ssc.checkpoint("checkpoint")

  val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
  val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
  val words = lines.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  wordCounts.print()
  ssc.start()
  ssc.awaitTermination()
  }
}

 

assmebly.sbt 代碼如下

name := "KafkaDemo"
version := "1.0"
scalaVersion := "2.10.4"


libraryDependencies ++= Seq(
  ("org.apache.spark" %% "spark-core" % "1.3.1" % "provided")
)



libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.3.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.0"


mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case PathList("org", "apache", xs @ _*)         => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
    case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
    case x => old(x)
  }
}



resolvers += "OSChina Maven Repository" at "http://maven.oschina.net/content/groups/public/"



externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)

  

plugins.sbt 內容如下:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

 

 

 請大家注意 :

  

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
case x => old(x)
}
}

這段代碼只是針對我本機的解決依賴沖突的方法,如果沒有這段代碼,那么我打包的時候會有依賴沖突的發生,原因是不同包下有相同的類,解決的方法是合並依賴,下面是貼出沒有這段代碼的錯誤:

[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /home/hadoop/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.0.jar:org/apache/spark/unused/UnusedStubClass.class
[error] /home/hadoop/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class

 

大家注意紅色高亮的代碼,當大家發生其他依賴沖突的時候,可以照貓畫虎,解決依賴沖突

 

接下來,就是在較好的網絡環境下進行打包,終端進入spark_kafka 目錄 ,敲入sbt assembly , 耐心等代下載打包

 

spark streaming 對接 kafka 生產消息端口

  • 啟動hadoop
  • 后台啟動kafka zookeeper 和 server 端
  • 啟動producer 命令行(后續通過輸入字符串,spark對其進行單詞計數處理)
  • 新建終端進入spark_kafka 目錄,輸入

    $SPARK_HOME/bin/spark-submit --class "KafkaDemo" target/scala-2.10/KafkaDemo-assembly-1.0.jar

    (打包成功的話,會有一個target 目錄,而且target下有scala-2.10/KafkaDemo-assembly-1.0.jar ) 。  
  • 然后在producer 輸入一系列字符串,spark streaming會進行處理

 

如果能看到該結果,那就恭喜你了。

弄這個其實弄了有一段時間,主要問題是依賴的解決,以及版本的問題。如果大家在做的過程發現出現有scala :no such method...    等問題的時候,說明是scala版本不符合了

其他的問題大家可以谷歌,此外強調一點,以上命令跟我個人目錄環境有關,比如$SPARK_HOME代表我自己的spark 路徑,如果你的目錄跟我不一樣,自己要換一換;

此文是面向有linux基礎的同學,懂基本環境配置,這是最起碼的要求!此文也給自己,畢竟確實辛苦!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM