對於NetworkInputDStream而言,其實不是真正的流方式,將數據讀出來后不是直接去處理,而是先寫到blocks中,后面的RDD再從blocks中讀取數據繼續處理
這就是一個將stream離散化的過程
NetworkInputDStream就是封裝了將數據從source中讀出來,然后放到blocks里面去的邏輯(Receiver線程)
還需要一個可以管理NetworkInputDStream,以及把NetworkInputDStream.Receiver部署到集群上執行的角色,這個就是NetworkInputTracker
NetworkInputTracker會負責執行一個獨立的job,把各個Receiver以RDD的task的形式,分布到各個worknode上去執行
InputDStream
/** * This is the abstract base class for all input streams. This class provides methods * start() and stop() which is called by Spark Streaming system to start and stop receiving data. * Input streams that can generate RDDs from new data by running a service/thread only on * the driver node (that is, without running a receiver on worker nodes), can be * implemented by directly inheriting this InputDStream. For example, * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for * new files and generates RDDs with the new files. For implementing input streams * that requires running a receiver on the worker nodes, use * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class. * * @param ssc_ Streaming context that will execute this input stream */ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { private[streaming] var lastValidTime: Time = null ssc.graph.addInputStream(this) // 首先將InputStream加入graph中 override def dependencies = List() override def slideDuration: Duration = { if (ssc == null) throw new Exception("ssc is null") if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") ssc.graph.batchDuration } /** Method called to start receiving data. Subclasses must implement this method. */ def start() /** Method called to stop receiving data. Subclasses must implement this method. */ def stop() }
NetworkInputDStream
NetworkInputDStream是比較典型的Input,主要接口兩個
getReceiver,Receiver對於NetworkInputDStream是最關鍵的,里面封裝了如果從數據源讀到數據,如果切分並寫到blocks中去
compute,由於Receiver只會把數據寫到blocks中去,問題我們如何取到這些數據了?
Receiver在寫block的同時,會發event給networkInputTracker注冊block
所以NetworkInputDStream.compute是無法直接算出數據來,而是先從networkInputTracker查詢出blockids,並從BlockManager中讀出數據
/** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] * that has to start a receiver on worker nodes to receive external data. * Specific implementations of NetworkInputDStream must * define the getReceiver() function that gets the receiver object of type * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent * to the workers to receive data. * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { // This is an unique identifier that is used to match the network receiver with the // corresponding network input stream. val id = ssc.getNewNetworkStreamId() // network stream id /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a NetworkInputDStream. */ def getReceiver(): NetworkReceiver[T] override def compute(validTime: Time): Option[RDD[T]] = { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure if (validTime >= graph.startTime) { val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime) // 從networkInputTracker中查詢blockids Some(new BlockRDD[T](ssc.sc, blockIds)) } else { Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } }
NetworkReceiver
private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage /** * Abstract class of a receiver that can be run on worker nodes to receive external data. See * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation. */ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging { lazy protected val env = SparkEnv.get lazy protected val actor = env.actorSystem.actorOf( // 創建NetworkReceiverActor(lazy),用於和networkInputTracker通信 Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId) lazy protected val receivingThread = Thread.currentThread() protected var streamId: Int = -1 /** * This method will be called to start receiving data. All your receiver * starting code should be implemented by defining this function. */ protected def onStart() /** This method will be called to stop receiving data. */ protected def onStop() /** Conveys a placement preference (hostname) for this receiver. */ def getLocationPreference() : Option[String] = None /** * Starts the receiver. First is accesses all the lazy members to * materialize them. Then it calls the user-defined onStart() method to start * other threads, etc required to receiver the data. */ def start() { try { // Access the lazy vals to materialize them env actor receivingThread // Call user-defined onStart() onStart() } catch { case ie: InterruptedException => logInfo("Receiving thread interrupted") //println("Receiving thread interrupted") case e: Exception => stopOnError(e) } } /** * Stops the receiver. First it interrupts the main receiving thread, * that is, the thread that called receiver.start(). Then it calls the user-defined * onStop() method to stop other threads and/or do cleanup. */ def stop() { receivingThread.interrupt() onStop() //TODO: terminate the actor } /** * Stops the receiver and reports exception to the tracker. * This should be called whenever an exception is to be handled on any thread * of the receiver. */ protected def stopOnError(e: Exception) { logError("Error receiving data", e) stop() actor ! ReportError(e.toString) } /** * Pushes a block (as an ArrayBuffer filled with data) into the block manager. */ def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) actor ! ReportBlock(blockId, metadata) } /** * Pushes a block (as bytes) into the block manager. */ def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { env.blockManager.putBytes(blockId, bytes, level) actor ! ReportBlock(blockId, metadata) } }
NetworkReceiverActor
用於將Receiver的event轉發給TrackerActor
/** A helper actor that communicates with the NetworkInputTracker */ private class NetworkReceiverActor extends Actor { logInfo("Attempting to register with tracker") val ip = env.conf.get("spark.driver.host", "localhost") val port = env.conf.getInt("spark.driver.port", 7077) val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) val tracker = env.actorSystem.actorSelection(url) val timeout = 5.seconds override def preStart() { val future = tracker.ask(RegisterReceiver(streamId, self))(timeout) Await.result(future, timeout) } override def receive() = { case ReportBlock(blockId, metadata) => tracker ! AddBlocks(streamId, Array(blockId), metadata) case ReportError(msg) => tracker ! DeregisterReceiver(streamId, msg) case StopReceiver(msg) => stop() tracker ! DeregisterReceiver(streamId, msg) } } protected[streaming] def setStreamId(id: Int) { streamId = id }
BlockGenerator
3個關鍵的接口,
+=,用於調用者將數據不斷加到currentBuffer上
updateCurrentBuffer,定時將currentBuffer的數據,生成block對象放到blocksForPushing隊列上(blockIntervalTimer調用)
keepPushingBlocks, 不斷將blocksForPushing隊列上的blocks取出,並寫到blockmanager中去(blockPushingThread調用)
/** * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts * them into appropriately named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. */ class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200) val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } var currentBuffer = new ArrayBuffer[T] def start() { blockIntervalTimer.start() blockPushingThread.start() logInfo("Data handler started") } def stop() { blockIntervalTimer.stop() blockPushingThread.interrupt() logInfo("Data handler stopped") } def += (obj: T): Unit = synchronized { currentBuffer += obj } private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[T] if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) blocksForPushing.add(newBlock) } } catch { case ie: InterruptedException => logInfo("Block interval timer thread interrupted") case e: Exception => NetworkReceiver.this.stop() } } private def keepPushingBlocks() { logInfo("Block pushing thread started") try { while(true) { val block = blocksForPushing.take() NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel) } } catch { case ie: InterruptedException => logInfo("Block pushing thread interrupted") case e: Exception => NetworkReceiver.this.stop() } } }
SocketInputDStream
Socket作為最為典型的NetworkInputDStream,看看是如何實現的
對於SocketInputDStream,關鍵實現getReceiver接口,可以獲取SocketReceiver對象
而對於SocketReceiver關鍵是實現onStart接口,將從socket上讀到的數據寫到blockGenerator的currentBuffer上
private[streaming] class SocketInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_) { def getReceiver(): NetworkReceiver[T] = { //關鍵是實現getReceiver接口 new SocketReceiver(host, port, bytesToObjects, storageLevel) } } private[streaming] class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends NetworkReceiver[T] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) // 創建BlockGenerator override def getLocationPreference = None protected def onStart() { val socket = new Socket(host, port) blockGenerator.start() val iterator = bytesToObjects(socket.getInputStream()) while(iterator.hasNext) { val obj = iterator.next blockGenerator += obj // 核心邏輯就是將從socket上讀到的數據寫到blockGenerator的currentBuffer上 } } protected def onStop() { blockGenerator.stop() } }
NetworkInputTracker
NetworkInputTracker用於管理和監控所有的NetworkInputDStream
首先NetworkInputTrackerActor,可以從NetworkInputDStream接收RegisterReceiver,AddBlocks,和DeregisterReceiver事件
從而知道有多少NetworkInputDStream,並且每個讀取並存儲了多少的blocks
再者,在ReceiverExecutor中他負責啟動所有NetworkInputDStream的Receivers,做法比較奇特,也是依賴於RDD
將每個receiver封裝在RDD的一個partition里,partition會作為一個task被調度,最后runjob去執行startReceiver,這樣每個receiver都會在task被執行的時候start
而外部通過getBlockIds,來取得某NetworkInputDStream所有的blockids,從而取到數據
private[streaming] sealed trait NetworkInputTrackerMessage //定義Tracker可能從receiver收到的event類型 private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage /** * This class manages the execution of the receivers of NetworkInputDStreams. Instance of * this class must be created after all input streams have been added and StreamingContext.start() * has been called because it needs the final set of input streams at the time of instantiation. */ private[streaming] class NetworkInputTracker(ssc: StreamingContext) extends Logging { val networkInputStreams = ssc.graph.getNetworkInputStreams() //獲取所有的networkInputStreams val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] //用於記錄所有receivers的信息 val receivedBlockIds = new HashMap[Int, Queue[BlockId]] //用於記錄每個InputDStream接受到的blockids val timeout = AkkaUtils.askTimeout(ssc.conf) // actor is created when generator starts. // This not being null means the tracker has been started and not stopped var actor: ActorRef = null var currentTime: Time = null /** Start the actor and receiver execution thread. */ def start() { if (!networkInputStreams.isEmpty) { actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), // 創建NetworkInputTrackerActor,用於和receivers通信 "NetworkInputTracker") receiverExecutor.start() // 啟動receiverExecutor } } /** Stop the receiver execution thread. */ def stop() { if (!networkInputStreams.isEmpty && actor != null) { receiverExecutor.interrupt() receiverExecutor.stopReceivers() ssc.env.actorSystem.stop(actor) logInfo("NetworkInputTracker stopped") } } /** Return all the blocks received from a receiver. */ def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { //用於獲取某個InputDStream相關的blockids val queue = receivedBlockIds.synchronized { receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]()) } val result = queue.synchronized { queue.dequeueAll(x => true) } logInfo("Stream " + receiverId + " received " + result.size + " blocks") result.toArray } /** Actor to receive messages from the receivers. */ private class NetworkInputTrackerActor extends Actor { def receive = { case RegisterReceiver(streamId, receiverActor) => { // Receiver向traker發送的register事件 receiverInfo += ((streamId, receiverActor)) // 將該Receiver加入receiverInfo sender ! true } case AddBlocks(streamId, blockIds, metadata) => { val tmp = receivedBlockIds.synchronized { if (!receivedBlockIds.contains(streamId)) { receivedBlockIds += ((streamId, new Queue[BlockId])) // Receiver通知tracker接受到新的block } receivedBlockIds(streamId) } tmp.synchronized { tmp ++= blockIds } networkInputStreamMap(streamId).addMetadata(metadata) } case DeregisterReceiver(streamId, msg) => { // Receiver取消注冊 receiverInfo -= streamId } } } /** This thread class runs all the receivers on the cluster. */ class ReceiverExecutor extends Thread { val env = ssc.env override def run() { try { SparkEnv.set(env) startReceivers() //啟動所有的Receivers } catch { case ie: InterruptedException => logInfo("ReceiverExecutor interrupted") } finally { stopReceivers() } } /** * Get the receivers from the NetworkInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ def startReceivers() { val receivers = networkInputStreams.map(nis => { //取出所有networkInputStreams的receivers val rcvr = nis.getReceiver() rcvr.setStreamId(nis.id) rcvr }) // Right now, we only honor preferences if all receivers have them val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined) //看看是否有LocationPreferences .reduce(_ && _) // Create the parallel collection of receivers to distributed them on the worker nodes val tempRDD = if (hasLocationPreferences) { val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString))) ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences) } else { ssc.sc.makeRDD(receivers, receivers.size) //makeRDD,使用ParallelCollectionRDD,這里其實就是將每個receiver封裝成RDD的一個partition(task) } // Function to start the receiver on the worker node val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => { if (!iterator.hasNext) { throw new Exception("Could not start receiver as details not found.") } iterator.next().start() //啟動receiver } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } // Distribute the receivers and start them ssc.sparkContext.runJob(tempRDD, startReceiver) //被封裝成task的receiver會在workernode上調用startReceiver,而startReceiver最終調用receiver.start() } /** Stops the receivers. */ def stopReceivers() { // Signal the receivers to stop receiverInfo.values.foreach(_ ! StopReceiver) } } }