前言:
最近在研究spark 還有 kafka , 想通過kafka端獲取的數據,利用spark streaming進行一些計算,但搭建整個環境着實不易,故特此寫下該過程,分享給大家,希望大家可以少走點彎路,能幫到大家!
環境准備:
操作系統 : ubuntu14.04 LTS
hadoop 2.7.1 偽分布式搭建
sbt-0.13.9
kafka_2.11-0.8.2.2
spark-1.3.1-bin-hadoop2.6
scala 版本 : 2.10.4
注: 請重視版本問題,之前作者用的是spark-1.4.1 ,scala版本是2.11.7 結果作業提交至spark-submit 總是失敗,所以大家這點注意下!
hadooop 2.7.1 偽分布式搭建 大家可以參照 http://www.wjxfpf.com/2015/10/517149.html
kafka安裝與測試:
- 到官網http://kafka.apache.org/downloads.html 下載 kafka_2.11-0.8.2.2.tgz
- 進入下載目錄,打開終端,輸入以下命令,將其解壓至 /usr/local 目錄: sudo tar -xvzf kafka_2.11-0.8.2.2.tgz -C /usr/local
- 敲入用戶密碼后,kafka 成功解壓,繼續輸入以下命令:
- cd /usr/local 跳轉至/usr/local/ 目錄;
- sudo chmod 777 -R kafka_2.11-0.8.2.2 獲得該目錄的所有執行權; gedit ~/.bashrc 打開個人配置 末尾添加 export KAFKA_HOME=/usr/local/kafka_2.11-0.8.2.2
export PATH=$PATH:$KAFKA_HOME/bin - 保存,終端輸入 source ~/.bashrc
kafka 有其自帶默認的zookeeper 所以省去了我們一些功夫,現在可以開始測試下kafka:
- 新建終端輸入 cd $KAFKA_HOME 進入kafka 目錄 (為了方便,我們稱此終端為1號終端)
-
bin/zookeeper-server-start.sh config/zookeeper.properties & 后台運行zookeeper
-
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 新建一個叫test的topic
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Kafka提供了一個命令行的工具,可以從輸入文件或者命令行中讀取消息並發送給Kafka集群。每一行是一條消息。
- 開啟一個新的終端(為了方便,我們稱此終端為2號終端),並進入kafka 目錄 ,輸入:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
-
現在,在1號終端輸入HAHA,如果2號終端能輸出HAHA,說明kafka測試成功!
SBT構建一個關於單詞計數的scala程序
- 新建一個文件夾,命名為spark_kafka
- 進入spark_kafka,按/src/main/scala/KafkaDemo.scala層級目錄 新建KafkaDemo.scala
- 在spark_kafka目錄下 新建project 目錄 在project下新建plugins.sbt
- 在spark_kafka目錄下新建assembly.sbt
- 最后,你所看到的目錄結構如下
-
- spark_kafka/
- spark_kafka/src
- spark_kafka/src/main
- spark_kafka/src/main/scala
- spark_kafka/src/main/scala/KafkaDemo.scala
- spark_kafka/project
- spark_kafka/project/plugins.sbt
- spark_kafka/assembly.sbt
-
其中,KafkaDemo.scala 代碼如下
import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object KafkaDemo { def main(args: Array[String]) { val zkQuorum = "127.0.0.1:2181" val group = "test-consumer-group" val topics = "test" val numThreads = 2 val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
assmebly.sbt 代碼如下
name := "KafkaDemo" version := "1.0" scalaVersion := "2.10.4" libraryDependencies ++= Seq( ("org.apache.spark" %% "spark-core" % "1.3.1" % "provided") ) libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.3.1" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.0" mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case PathList("org", "apache", xs @ _*) => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first case x => old(x) } } resolvers += "OSChina Maven Repository" at "http://maven.oschina.net/content/groups/public/" externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)
plugins.sbt 內容如下:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
請大家注意 :
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
case x => old(x)
}
}
這段代碼只是針對我本機的解決依賴沖突的方法,如果沒有這段代碼,那么我打包的時候會有依賴沖突的發生,原因是不同包下有相同的類,解決的方法是合並依賴,下面是貼出沒有這段代碼的錯誤:
[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /home/hadoop/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.0.jar:org/apache/spark/unused/UnusedStubClass.class
[error] /home/hadoop/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class
大家注意紅色高亮的代碼,當大家發生其他依賴沖突的時候,可以照貓畫虎,解決依賴沖突
接下來,就是在較好的網絡環境下進行打包,終端進入spark_kafka 目錄 ,敲入sbt assembly , 耐心等代下載打包
spark streaming 對接 kafka 生產消息端口
- 啟動hadoop
- 后台啟動kafka zookeeper 和 server 端
- 啟動producer 命令行(后續通過輸入字符串,spark對其進行單詞計數處理)
- 新建終端進入spark_kafka 目錄,輸入
$SPARK_HOME/bin/spark-submit --class "KafkaDemo" target/scala-2.10/KafkaDemo-assembly-1.0.jar
(打包成功的話,會有一個target 目錄,而且target下有scala-2.10/KafkaDemo-assembly-1.0.jar ) 。 - 然后在producer 輸入一系列字符串,spark streaming會進行處理
如果能看到該結果,那就恭喜你了。
弄這個其實弄了有一段時間,主要問題是依賴的解決,以及版本的問題。如果大家在做的過程發現出現有scala :no such method... 等問題的時候,說明是scala版本不符合了
其他的問題大家可以谷歌,此外強調一點,以上命令跟我個人目錄環境有關,比如$SPARK_HOME代表我自己的spark 路徑,如果你的目錄跟我不一樣,自己要換一換;
此文是面向有linux基礎的同學,懂基本環境配置,這是最起碼的要求!此文也給自己,畢竟確實辛苦!