Spark Streaming是一個新的實時計算的利器,而且還在快速的發展。它將輸入流切分成一個個的DStream轉換為RDD,從而可以使用Spark來處理。它直接支持多種數據源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些可以操作的函數:map, reduce, join, window等。
本文將Spark Streaming和Flume-NG進行對接,然后以官方內置的JavaFlumeEventCount作參考,稍作修改然后放到集群上去運行。
一、下載spark streaming的flume插件包,我們這里的spark版本是1.0.0(standlone),這個插件包的版本選擇spark-streaming-flume_2.10-1.0.1.jar,這個版本修復了一個重要的bug,參考下面參考中的7。
二、把spark的編譯后的jar包以及上面flume的插件,放入工程,編寫如下類(參考8中的例子修改而來),代碼如下:
1 package com.spark_streaming; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.function.Function; 5 import org.apache.spark.streaming.*; 6 import org.apache.spark.streaming.api.java.*; 7 import org.apache.spark.streaming.flume.FlumeUtils; 8 import org.apache.spark.streaming.flume.SparkFlumeEvent; 9 10 public final class JavaFlumeEventCount { 11 private JavaFlumeEventCount() { 12 } 13 14 public static void main(String[] args) { 15 16 String host = args[0]; 17 int port = Integer.parseInt(args[1]); 18 19 Duration batchInterval = new Duration(Integer.parseInt(args[2])); 20 SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); 21 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); 22 JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); 23 24 flumeStream.count(); 25 26 flumeStream.count().map(new Function<Long, String>() { 27 @Override 28 public String call(Long in) { 29 return "Received " + in + " flume events."; 30 } 31 }).print(); 32 33 ssc.start(); 34 ssc.awaitTermination(); 35 } 36 }
這個和官方的區別是刪除了參數個數檢查和增加了自定義時間間隔(分割流),也就是第三個參數。這個類並沒有做太多處理,入門為主。
三、打包這個類到ifeng_spark.jar,連同spark-streaming-flume_2.10-1.0.1.jar一起上傳到spark集群中的節點上。
四、啟動flume,這個flume的sink要用avro,指定要發送到的spark集群中的一個節點,我們這里是10.32.21.165:11000。
五、在spark安裝根目錄下執行如下命令:
./bin/spark-submit --master spark://10.32.21.165:8070 --driver-memory 4G --executor-memory 4G --jars /usr/lib/spark-1.0.0-cdh4/lib/spark-streaming-flume_2.10-1.0.1.jar,/usr/lib/flume-ng-1.4-cdh4.6.0/lib/flume-ng-sdk-1.4.0-cdh6.0.jar /usr/lib/spark-1.0.0-cdh4/ifeng_spark.jar --class com.spark_streaming.JavaFlumeEventCount 10.32.21.165 11000 2000
這個命令中的參數解釋請參考下面參考3中的解釋,也可以自己增加一些參數,需要注意的是配置內存,自己根據需要自行增加內存(driver、executor)防止OOM。另外jars可以同時加載多個jar包,逗號分隔。記得指定類后需要指定3個參數。
如果沒有指定Flume的sdk包,會爆如下錯誤:
java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;沒有找到類。這個類在flume的sdk包內,在jars參數中指定jar包位置就可以。
還有就是要將自己定義的業務類的jar單獨列出,不要放在jars參數指定,否則也會有錯誤拋出。
運行后可以看到大量的輸出信息,然后可以看到有數據的RDD會統計出這個RDD有多少行,截圖如下,最后的部分就是這2秒(上面命令最后的參數設定的)統計結果:

至此,flume-ng與spark的對接成功,這只是一個入門實驗。可根據需要靈活編寫相關的業務類來實現實時處理Flume傳輸的數據。
spark streaming和一些數據傳輸工具對接可以達到實時處理的目的。
參考:
1、https://spark.apache.org/docs/0.9.0/streaming-programming-guide.html
2、http://www.cnblogs.com/cenyuhai/p/3577204.html
3、http://blog.csdn.net/book_mmicky/article/details/25714545 , 重要的參數解釋
4、http://blog.csdn.net/lskyne/article/details/37561235 , 這是一個例子
5、http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20 , spark-flume插件下載
6、http://outofmemory.cn/spark/configuration , spark一些可配置參數說明
7、https://issues.apache.org/jira/browse/SPARK-1916 ,這是1.0.1之前版本中spark streaming與flume對接的一個bug信息
8、https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming , 這是java版本的spark streaming的一些例子,里面有flume的一個
