cdh環境下,spark streaming與flume的集成問題總結


文章發自:http://www.cnblogs.com/hark0623/p/4170156.html  轉發請注明

 

如何做集成,其實特別簡單,網上其實就是教程。

http://blog.csdn.net/fighting_one_piece/article/details/40667035  看這里就成。 我用的是第一種集成。。
 
做的時候,出現了各種問題。    大概從從2014.12.17 早晨5點搞到2014.12.17晚上18點30
 
總結起來其實很簡單,但做的時候搞了許久啊啊啊!!!!   這樣的事情,吃一塹長一智吧
問題1、  需要引用各種包,這些包要打入你的JAR中, 因為用的是spark on yarn模式,所以如果不打進去,在集群中是找不到依賴包的!!!  去哪找呢?  直接去search.maven.org找。。
 
問題2:因為搭建的spark on yarn集群,所以監聽時只能監聽localhost,不然如果你指定了ip,那么非該IP下的結點,就會因為監聽不到而出現了問題
 
問題3:cdh中的flume的啟動,你要去find / -name flume.conf ,找一下,然后找到最新的,與cloudera manager配置文件一樣的那么,flume啟動時就用這個配置文件
 
問題4:不要直接用集群,先用單點測試一下。。因為單點測試一下后會發現各種問題。 解決后再去集群測試
 
問題5:一定要注意版本!  cdh5.2中spark的版本是1.1.0,而我用的插件一直是1.1.1版本的!!! 啊, 為這事兒,我從中午搞到現在。   這個要吃一塹長一智啦!!!
 
 
 
 
spark代碼如下
package com.hark

import java.io.File

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

/**
 * Created by Administrator on 2014-12-16.
 */
object SparkStreamingFlumeTest {
  def main(args: Array[String]) {
    //println("harkhark")



    val path = new File(".").getCanonicalPath()
    //File workaround = new File(".");
    System.getProperties().put("hadoop.home.dir", path);
    new File("./bin").mkdirs();
    new File("./bin/winutils.exe").createNewFile();

    //val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")

    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(20))

   
    //val hostname = "127.0.0.1"
   val hostname = "localhost"
    val port = 2345
    val storageLevel = StorageLevel.MEMORY_ONLY
    val flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel)

    flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()



    ssc.start()
    ssc.awaitTermination()


  }
}

  

flume配置文件如下
 
# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

# For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type     = exec
tier1.sources.source1.command     = tail -F /opt/data/test3/123
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory
#tier1.sinks.sink1.type         = logger
tier1.sinks.sink1.type         = avro
tier1.sinks.sink1.hostname        = localhost
tier1.sinks.sink1.port        = 2345
tier1.sinks.sink1.channel      = channel1

# Other properties are specific to each type of yhx.hadoop.dn01
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
tier1.channels.channel1.capacity = 100

 

 
 
spark啟動命令如下:
spark-submit --driver-memory 512m --executor-memory 512m --executor-cores 1  --num-executors 3 --class com.hark.SparkStreamingFlumeTest --deploy-mode cluster --master yarn /opt/spark/SparkTest.jar

 

 
 
flume啟動命令如下:
flume-ng agent --conf /opt/cloudera-manager/run/cloudera-scm-agent/process/585-flume-AGENT --conf-file /opt/cloudera-manager/run/cloudera-scm-agent/process/585-flume-AGENT/flume.conf --name tier1 -Dflume.root.logger=INFO,console

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM