Spark Streaming和Flume-NG對接實驗


  Spark Streaming是一個新的實時計算的利器,而且還在快速的發展。它將輸入流切分成一個個的DStream轉換為RDD,從而可以使用Spark來處理。它直接支持多種數據源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些可以操作的函數:mapreducejoinwindow等。

  本文將Spark Streaming和Flume-NG進行對接,然后以官方內置的JavaFlumeEventCount作參考,稍作修改然后放到集群上去運行。 

  一、下載spark streaming的flume插件包,我們這里的spark版本是1.0.0(standlone),這個插件包的版本選擇spark-streaming-flume_2.10-1.0.1.jar,這個版本修復了一個重要的bug,參考下面參考中的7。

  二、把spark的編譯后的jar包以及上面flume的插件,放入工程,編寫如下類(參考8中的例子修改而來),代碼如下:

 1 package com.spark_streaming;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.function.Function;
 5 import org.apache.spark.streaming.*;
 6 import org.apache.spark.streaming.api.java.*;
 7 import org.apache.spark.streaming.flume.FlumeUtils;
 8 import org.apache.spark.streaming.flume.SparkFlumeEvent;
 9 
10 public final class JavaFlumeEventCount {
11   private JavaFlumeEventCount() {
12   }
13 
14   public static void main(String[] args) {
15 
16     String host = args[0];
17     int port = Integer.parseInt(args[1]);
18 
19     Duration batchInterval = new Duration(Integer.parseInt(args[2]));
20     SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
21     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
22     JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
23 
24     flumeStream.count();
25 
26     flumeStream.count().map(new Function<Long, String>() {
27       @Override
28       public String call(Long in) {
29         return "Received " + in + " flume events.";
30       }
31     }).print();
32 
33     ssc.start();
34     ssc.awaitTermination();
35   }
36 }

  這個和官方的區別是刪除了參數個數檢查和增加了自定義時間間隔(分割流),也就是第三個參數。這個類並沒有做太多處理,入門為主。

  三、打包這個類到ifeng_spark.jar,連同spark-streaming-flume_2.10-1.0.1.jar一起上傳到spark集群中的節點上。

  四、啟動flume,這個flume的sink要用avro,指定要發送到的spark集群中的一個節點,我們這里是10.32.21.165:11000。

  五、在spark安裝根目錄下執行如下命令:

  ./bin/spark-submit  --master spark://10.32.21.165:8070  --driver-memory 4G  --executor-memory 4G --jars /usr/lib/spark-1.0.0-cdh4/lib/spark-streaming-flume_2.10-1.0.1.jar,/usr/lib/flume-ng-1.4-cdh4.6.0/lib/flume-ng-sdk-1.4.0-cdh6.0.jar  /usr/lib/spark-1.0.0-cdh4/ifeng_spark.jar   --class com.spark_streaming.JavaFlumeEventCount 10.32.21.165 11000 2000

   這個命令中的參數解釋請參考下面參考3中的解釋,也可以自己增加一些參數,需要注意的是配置內存,自己根據需要自行增加內存(driver、executor)防止OOM。另外jars可以同時加載多個jar包,逗號分隔。記得指定類后需要指定3個參數。

  如果沒有指定Flume的sdk包,會爆如下錯誤:

  java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;沒有找到類。這個類在flume的sdk包內,在jars參數中指定jar包位置就可以。

  還有就是要將自己定義的業務類的jar單獨列出,不要放在jars參數指定,否則也會有錯誤拋出。

 

  運行后可以看到大量的輸出信息,然后可以看到有數據的RDD會統計出這個RDD有多少行,截圖如下,最后的部分就是這2秒(上面命令最后的參數設定的)統計結果:

     

  至此,flume-ng與spark的對接成功,這只是一個入門實驗。可根據需要靈活編寫相關的業務類來實現實時處理Flume傳輸的數據。

  spark streaming和一些數據傳輸工具對接可以達到實時處理的目的。

 

  參考:

  1、https://spark.apache.org/docs/0.9.0/streaming-programming-guide.html

  2、http://www.cnblogs.com/cenyuhai/p/3577204.html

  3、http://blog.csdn.net/book_mmicky/article/details/25714545 , 重要的參數解釋

  4、http://blog.csdn.net/lskyne/article/details/37561235 , 這是一個例子

  5、http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20 , spark-flume插件下載

  6、http://outofmemory.cn/spark/configuration , spark一些可配置參數說明

  7、https://issues.apache.org/jira/browse/SPARK-1916  ,這是1.0.1之前版本中spark streaming與flume對接的一個bug信息

  8、https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming , 這是java版本的spark streaming的一些例子,里面有flume的一個


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM