1. 創建 maven 工程 只加 spark-streaming 這個包就可以
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.spark</groupId> <artifactId>sparkStream</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_2.10</artifactId> <version>1.6.1</version> </dependency> </dependencies> </project>
2. 示例代碼
package com.dt.spark.sparkapps.sparkstreaming; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class WordCountOnLine { public static <U> void main(String[] args) { /** * 第一步:配置SparkConf, * 1. 因為 Spark Streaming 應用程序至少有一條線程用於不斷的循環結束數據,並且至少有一條線程用於處理 * 接收的數據(否則的話無線程用於處理數據,隨着時間的推移,內存和磁盤都會不堪重負) * 2. 對於集群而已,每個 Executor 一般肯定不止一個線程,那對於處理 Spark Streaming應用程序而言,每個 Executor 一般分配多少Core * 比較合適?根據我們過去的經驗,5個左右的 Core 是最佳的(一個段子分配為基數 Core 表現最佳,) */ // SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("sparkStreaming"); SparkConf conf = new SparkConf().setMaster("spark://hadoop1:7077").setAppName("sparkStreaming"); /** * 第二步:創建 SparkStreamingContext, * 1.這個 SparkStreaming 應用程序所有功能的起始點和程序調度的核心 * SparkStreamingContext 的構建可以基於 SparkConf參數,也可基於持久化的 SaprkStreamingContext的內容來回復過來 * (典型的場景是 Driver 奔潰后重新啟動,由於 Spark Streaming 具有連續 7*24 小時不間斷運行的特征,所有需要在 Driver 重新啟動后繼續上一次的狀態, * 此時的狀態恢復需要基於曾經的 Checkpoint) * 2.在一個Spark Streaming 應用程序中可以創建若干個 SaprkStreamingContext對象,使用下一個 SaprkStreamingContext * 之前需要把前面正在運行的 SparkStreamingContext 對象關閉掉,由此,我們獲得一個重大的啟發 SparkStreaming框架也只是Spark Core上的一個應用程序而言 * 只不過 Spark Streaming 框架要運行的話需要Spark工程師寫業務邏輯處理代碼; */ JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 第三步: 創建 Spark Streaming 輸入數據來源 input Stream * 1.數據輸入來源可以基於 File,HDFS, Flume,Kafka,Socket等; * 2.在這里我們制定數據來源於網絡 Socket端口,Spark Streaming鏈接上改端口並在運行的時候一直監聽該端口的數據(當然該端口服務首先必須存在),並且在后續會根據業務需要不斷的 * 有數據產生(當然對於Spark Streaming 引用程序的運行而言,有無數據其處理流程都是一樣的) * 3.如果經常在每隔 5 秒鍾沒有數據的話不斷的啟動空的 Job 其實是會造成調度資源的浪費,因為彬沒有數據需要發生計算;真實的企業級生產環境的代碼在具體提交 Job 前會判斷是否有數據,如果沒有的話 * 不再提交 Job; */ JavaReceiverInputDStream<String> lines = jsc.socketTextStream("hadoop1", 9999); /** * 第四步:接下來就是 對於 Rdd編程一樣基於 DStream進行編程!!!原因是DStream是RDD產生的模板(或者說類), 在 Saprk Stream發生計算前,其實質是把每個 Batch的DStream的操作翻譯 * 成為 Rdd 的操作!!! */ JavaDStream<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String line) throws Exception { String[] split = line.split(" "); return Arrays.asList(split); } }); JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); /** * 此處print並不會直接觸發 job 的執行,因為現在的一切都是在 Spark Streaming 框架的控制之下的,對於 Spark Streaming 而言具體是否觸發真正的 job 運行 * 是基設置的 Duration 時間間隔觸發 * 一定要注意的是 Spark Streaming應用程序要想執行具體的Job,對DStream就必須有 output Stream操作 * output Stream有很多類型的函數觸發,類print,saveAsTextFile,saveAsHadoopFile等,最為重要的一個方法是 foreachRDD,因為Spark Streaming處理的結果一般都會放在 Redis,DB, * DashBoard等上面,foreachRDD主要就是用來完成這些功能的,而且可以隨意的自定義具體數據到底放在那里 */ reduceByKey.print(); /** * Spark Streaming 執行引擎也就是Driver開始運行,Driver啟動的時候是位於一條新的線程中的,當然其內部有消息接收應用程序本身或者 Executor 中的消息; * */ jsc.start(); jsc.awaitTermination(); } }