2、 Spark Streaming方式從socket中獲取數據進行簡單單詞統計


Spark 1.5.2 Spark Streaming 學習筆記和編程練習

Overview 概述  

  Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.  

  Spark Streaming 是核心Spark API的一個擴展,其處理實時流數據具有可擴展性、高吞吐量,容錯性。數據可以通過多種源加載進來,如Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets;並且能夠使用像map, reduce, join and window這樣高級別的復雜算法處理。數據處理后可以輸出到文件系統,如databases, and live dashboards。你也可以使用spark的機器學習,圖處理算法在數據流上。

http://spark.apache.org/docs/1.5.2/img/streaming-arch.png

  Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

Spark Streaming

  Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.

  Spark Streaming提出了一個高度抽象的概念叫做離散流或者DStream,來表達一個連續的流數據。一個Dstream可以看作一系列RDD。

Java編程練習:

  一個spark streaming從socket獲取數據進行單詞統計的例子:(pom文件要添加spark相關依賴)

  socket代碼:

  說明:啟動一個socket服務端,等待連接,連接之后,重復輸出一個字符串到連接的socket中。socket地址為本機,9999端口。

 1 import java.io.*;
 2 import java.net.ServerSocket;
 3 import java.net.Socket;
 4 import java.util.Date;
 5 
 6 /**
 7  * socket服務端簡單實現,主要作用往socket客戶端發送數據
 8  */
 9 public class SocketServerPut {
10     public static void main(String[] args) {
11         try {
12             ServerSocket serverSocket = new ServerSocket(9999);
13             Socket socket=null;
14             while(true) {
15                 socket = serverSocket.accept();
16                 while(socket.isConnected()) {
17                     // 向服務器端發送數據
18                     OutputStream os =  socket.getOutputStream();
19                     DataOutputStream bos = new DataOutputStream(os);
20                     //每隔20ms發送一次數據
21                     String str="Connect 123 test spark streaming abc xyz hik\n";
22                     while(true){
23                         bos.writeUTF(str);
24                         bos.flush();
25                         System.out.println(str);
26                         //20ms發送一次數據
27                         try {
28                             Thread.sleep(500L);
29                         } catch (InterruptedException e) {
30                             e.printStackTrace();
31                         }
32                     }
33                 }
34                 //10ms檢測一次連接
35                 try {
36                     Thread.sleep(10L);
37                 } catch (InterruptedException e) {
38                     e.printStackTrace();
39                 }
40             }
41         } catch (IOException e) {
42             e.printStackTrace();
43         }
44     }
45 }

  Spark Streaming 處理代碼:

 1 import org.apache.spark.SparkConf;
 2 import org.apache.spark.api.java.function.FlatMapFunction;
 3 import org.apache.spark.api.java.function.Function2;
 4 import org.apache.spark.api.java.function.PairFunction;
 5 import org.apache.spark.streaming.Durations;
 6 import org.apache.spark.streaming.api.java.JavaDStream;
 7 import org.apache.spark.streaming.api.java.JavaPairDStream;
 8 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 9 import org.apache.spark.streaming.api.java.JavaStreamingContext;
10 import scala.Tuple2;
11 
12 import java.util.Arrays;
13 
14 /**
15  * streaming從socket獲取數據處理
16  */
17 public class StreamingFromSocket {
18     public static void main(String[] args) {
19         //設置運行模式local 設置appname
20         SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("StreamingFromSocketTest");
21         //初始化,設置窗口大小為2s
22         JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(2L));
23         //從本地Socket的9999端口讀取數據
24         JavaReceiverInputDStream<String> lines= jssc.socketTextStream("localhost", 9999);
25         //把一行數據轉化成單個單次  以空格分隔
26         JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,String>(){
27             @Override
28             public Iterable<String> call(String x){
29                 return Arrays.asList(x.split(" "));
30             }
31         });
32         //計算每一個單次在一個batch里出現的個數
33         JavaPairDStream<String, Integer> pairs= words.mapToPair(new PairFunction<String, String, Integer>() {
34             @Override
35             public Tuple2<String, Integer> call(String s) throws Exception {
36                 return new Tuple2<String, Integer>(s,1);
37             }
38         });
39         JavaPairDStream<String,Integer> wordCounts=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
40             @Override
41             public Integer call(Integer integer, Integer integer2) throws Exception {
42                 return integer+integer2;
43             }
44         });
45         //輸出統計結果
46         wordCounts.print();
47         jssc.start();
48         //20s后結束
49         jssc.awaitTerminationOrTimeout(20*1000L);
50 
51     }
52 }
輸出結果:
-------------------------------------------
Time: 1470385522000 ms
-------------------------------------------
(hik,4)
(123,4)
(streaming,4)
(abc,4)
(test,4)

 初始化streamingContext

  1、方式一:使用sparkconf初始化

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));

  2、由已存在的sparkcontext初始化

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

  

After a context is defined, you have to do the following.

  1. Define the input sources by creating input DStreams.
  2. Define the streaming computations by applying transformation and output operations to DStreams.
  3. Start receiving data and processing it using streamingContext.start().
  4. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  5. The processing can be manually stopped using streamingContext.stop().
Points to remember:
  • Once a context has been started, no new streaming computations can be set up or added to it.
  • Once a context has been stopped, it cannot be restarted.
  • Only one StreamingContext can be active in a JVM at the same time.
  • stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  • A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

初始化Context后,需要做如下幾件事情,才能完成一個job。

  1)定義一個輸入源,從而產生DStreams;

  2)定義streaming計算通過對DStreams應用轉換和輸出操作;

  3)使用streamingContext.start()語句開始接受數據並進行處理;

  4)使用streamingContext.awaitTermination().讓程序等待job完成;程序異常也可導致停止job;

  5)使用streamingContext.stop()可以停止job;

注意項:

  1)當context開始后,新的streaming computation不能被設置和添加進來;

  2)context停止后,不能重啟;

  3)同一時間JVM(java虛擬機)中只允許一個StreamingContext存在;

  4)停止StreamingContext后,sparkcontext也會停止;如果你只想停止StreamingContext,你可以在stop的參數中設置stopSparkContext為false;

  5)一個SparkContext可以被重復使用去創建StreamingContext,但新的StreamingContext被創建前,前一個StreamingContext要停止。

 未完待續

 

 

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM