Spark Streaming之二:StreamingContext解析


1.1 創建StreamingContext對象

1.1.1通過SparkContext創建

源碼如下:

def this(sparkContext: SparkContext, batchDuration: Duration) = {
    this(sparkContext, null, batchDuration)
  }

第一參數為sparkContext對象,第二個參數為批次時間;

創建實例:

val ssc = new StreamingContext(sc, Seconds(5))

1.1.2通過SparkConf創建

源碼如下:

def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }

第一參數為SparkConf對象,第二個參數為批次時間;

創建實例:

val conf =new SparkConf().setAppName("StreamTest")

val ssc = new StreamingContext(conf,Seconds(5))

 

1.1.3通過SparkConf參數創建

源碼如下:

def this(
      master: String,
      appName: String,
      batchDuration: Duration,
      sparkHome: String = null,
      jars: Seq[String] = Nil,
      environment: Map[String, String] = Map()) = {
    this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
         null, batchDuration)
  }

第一參數為需要創建SparkConf對象的詳細參數,master-spark地址,appName-對象名稱,sparkHome- sparkHome環境變量,jars, environment,第二個參數為批次時間;

創建實例:

val ssc = newStreamingContext(“ spark://host:port”, "StreamTest", Seconds(5),      System.getenv("SPARK_HOME"),StreamingContext.jarOfClass(this.getClass))

 

1.1.4通過checkpointfile參數創建

源碼如下:

def this(path: String, hadoopConf: Configuration) =
    this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null)

第一參數為checkpoint file的路徑,第二個參數為haoop的配置

  

1.2創建Dstream監聽對象

 

1.2.1 fileStream

val ssc = new StreamingContext(sparkConf, Seconds(10))  
    ssc.fileStream(directory)

源碼如下:

/**
   * Create an input stream that monitors a Hadoop-compatible filesystem
   * for new files and reads them using the given key-value types and input format.
   * Files must be written to the monitored directory by "moving" them from another
   * location within the same file system. File names starting with . are ignored.
   * @param directory HDFS directory to monitor for new file
   * @tparam K Key type for reading HDFS file
   * @tparam V Value type for reading HDFS file
   * @tparam F Input format for reading HDFS file
   */
  def fileStream[
    K: ClassTag,
    V: ClassTag,
    F <: NewInputFormat[K, V]: ClassTag
  ] (directory: String): InputDStream[(K, V)] = {
    new FileInputDStream[K, V, F](this, directory)
  }

參數:K-讀入HDFS的Key的類型,V-讀入HDFS的Value的類型,F-讀入HDFS的類型;directory-監聽HDFS的路徑,filter-對監聽HDFS的文件進行過濾的函數,newFilesOnly-是否只監聽新增文件;

fileStream可以通過設置filter函數,對監聽目錄下的文件進行過濾,只對滿足條件的文件進行監聽和處理;

默認過濾方法:

def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")

該方法是過濾以隱藏文件。

fileStream可以通過設置newFilesOnly為TRUE或者FALES,是否處理監聽目錄下已存在的文件,默認是不處理已存在文件,只處理新增加文件,如果設置為FALES,可以處理前一個窗口時間內的老文件。

源碼如下:在class FileInputDStream里:

private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.getTimeMillis() else 0L

 

private def findNewFiles(currentTime: Long): Array[String] = {
    try {
      lastNewFileFindingTime = clock.getTimeMillis()

      // Calculate ignore threshold
      val modTimeIgnoreThreshold = math.max(
        initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
        currentTime - durationToRemember.milliseconds  // trailing end of the remember window
      )
...

modTimeIgnoreThreshold是時間窗口過濾條件,通過newFilesOnly值來取的是當前時間或者前一個窗口時間。

創建實例:

// 創建新過濾函數

   def myFilter(path:Path): Boolean = path.getName().contains("data")

// 創建fileStream

val data1 = ssc.fileStream[LongWritable,Text, TextInputFormat](Spath1, pa => myFilter(pa),false).map(_._2.toString)

 

1.2.2 textFileStream

val ssc = new StreamingContext(sparkConf, Seconds(10))  
    ssc.textFileStream(directory)

源碼如下:

/**
   * Create an input stream that monitors a Hadoop-compatible filesystem
   * for new files and reads them as text files (using key as LongWritable, value
   * as Text and input format as TextInputFormat). Files must be written to the
   * monitored directory by "moving" them from another location within the same
   * file system. File names starting with . are ignored.
   * @param directory HDFS directory to monitor for new file
   */
  def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
  }

參數:directory監聽的目錄;

其實textFileStream是fileStream的一個實例。

創建實例:

   val StreamFile1=ssc.textFileStream(Spath1)

 

1.2.3 socketTextStream

val ssc = new StreamingContext(sparkConf, Seconds(10))  
    ssc.socketStream(hostname, port, converter, storageLevel)

源碼如下:

/**
   * Creates an input stream from TCP source hostname:port. Data is received using
   * a TCP socket and the receive bytes it interpreted as object using the given
   * converter.
   * @param hostname      Hostname to connect to for receiving data
   * @param port          Port to connect to for receiving data
   * @param converter     Function to convert the byte stream to objects
   * @param storageLevel  Storage level to use for storing the received objects
   * @tparam T            Type of the objects received (after converting bytes to objects)
   */
  def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  }

參數:hostname是主機IP,port是端口號,storageLevel數據的存儲級別,默認2份MEMORY_AND_DISK;

創建實例:

val lines = ssc.socketTextStream(serverIP, serverPort);

 

1.2.4 rawSocketStream

val ssc = new StreamingContext(sparkConf, Seconds(10))  
    ssc.rawSocketStream(hostname, port, storageLevel)

源碼如下:

/**
   * Create an input stream from network source hostname:port, where data is received
   * as serialized blocks (serialized using the Spark's serializer) that can be directly
   * pushed into the block manager without deserializing them. This is the most efficient
   * way to receive data.
   * @param hostname      Hostname to connect to for receiving data
   * @param port          Port to connect to for receiving data
   * @param storageLevel  Storage level to use for storing the received objects
   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
   * @tparam T            Type of the objects in the received blocks
   */
  def rawSocketStream[T: ClassTag](
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
    new RawInputDStream[T](this, hostname, port, storageLevel)
  }

rawSocketStream類似於socketTextStream;參照socketTextStream。

 

1.2.5 networkStream

源碼如下:

 /**

  * Create an input stream with any arbitrary user implemented receiver.

  * Find more details at:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

  * @param receiver Custom implementation of Receiver

  */

 @deprecated("Use receiverStream","1.0.0")

 def networkStream[T: ClassTag](

   receiver: Receiver[T]): ReceiverInputDStream[T] = {

   receiverStream(receiver)

  }

創建實例:

參照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

 

1.2.6 receiverStream

val ssc = new StreamingContext(sparkConf, Seconds(10))  
    ssc.receiverStream(receiver)

源碼如下:

/**
   * Create an input stream with any arbitrary user implemented receiver.
   * Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html
   * @param receiver Custom implementation of Receiver
   */
  def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
    withNamedScope("receiver stream") {
      new PluggableInputDStream[T](this, receiver)
    }
  }

參照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

 

1.2.7 actorStream

源碼如下:

 /**

  * Create an input stream with any arbitrary user implemented actorreceiver.

  * Find more details at:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

  * @param props Props object defining creation of the actor

  * @param name Name of the actor

  * @param storageLevel RDD storage level (default:StorageLevel.MEMORY_AND_DISK_SER_2)

  *

  * @note An important point to note:

  *       Since Actor may exist outsidethe spark framework, It is thus user's responsibility

  *       to ensure the type safety,i.e parametrized type of data received and actorStream

  *       should be same.

  */

 defactorStream[T: ClassTag](

     props: Props,

     name: String,

     storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,

     supervisorStrategy: SupervisorStrategy =ActorSupervisorStrategy.defaultStrategy

   ): ReceiverInputDStream[T] = {

   receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))

  }

創建實例:

val StreamFile1 = ssc.actorStream[String](Props(newCustomActor()),"CustomReceiver")

參照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

 

1.2.8 queueStream

val ssc = new StreamingContext(sparkConf, Seconds(10))  
    ssc.queueStream(queue, oneAtATime)

源碼如下:

/**
   * Create an input stream from a queue of RDDs. In each batch,
   * it will process either one or all of the RDDs returned by the queue.
   *
   * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
   * those RDDs, so `queueStream` doesn't support checkpointing.
   *
   * @param queue      Queue of RDDs. Modifications to this data structure must be synchronized.
   * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
   * @tparam T         Type of objects in the RDD
   */
  def queueStream[T: ClassTag](
      queue: Queue[RDD[T]],
      oneAtATime: Boolean = true
    ): InputDStream[T] = {
    queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
  }

 

1.2.9 union DStream

val ssc = new StreamingContext(sparkConf, Seconds(10))  
    ssc.union(streams)

源碼如下:

/**
   * Create a unified DStream from multiple DStreams of the same type and same slide duration.
   */
  def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
    new UnionDStream[T](streams.toArray)
  }

對同一類型的DStream進行合並,生成一個新的DStream,其中要求DStream的數據格式一致,批次時間間隔一致。

 

1.2.10 transform DStream

val ssc = new StreamingContext(sparkConf, Seconds(10))  
    ssc.transform(dstreams, transformFunc)

源碼如下:

/**
   * Create a new DStream in which each RDD is generated by applying a function on RDDs of
   * the DStreams.
   */
  def transform[T: ClassTag](
      dstreams: Seq[DStream[_]],
      transformFunc: (Seq[RDD[_]], Time) => RDD[T]
    ): DStream[T] = withScope {
    new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
  }

對Dstream進行transform操作生成一個新的Dstream。

 

1.3 Checkpointing

狀態的操作是基於多個批次的數據的。它包括基於window的操作和updateStateByKey。因為狀態的操作要依賴於上一個批次的數據,所以它要根據時間,不斷累積元數據。為了清空數據,它支持周期性的檢查點,通過把中間結果保存到hdfs上。因為檢查操作會導致保存到hdfs上的開銷,所以設置這個時間間隔,要很慎重。對於小批次的數據,比如一秒的,檢查操作會大大降低吞吐量。但是檢查的間隔太長,會導致任務變大。通常來說,5-10秒的檢查間隔時間是比較合適的。

實例:

   ssc.checkpoint("hdfs://192.168.1.100:9000/check")

   val StreamFile1=ssc.textFileStream(Spath1)

StreamFile1.checkpoint(Seconds(30))

 

轉載請注明出處:

http://blog.csdn.net/sunbow0/article/details/42966467


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM