正文
SparkStreaming的入口是StreamingContext,通過scala實現 一個簡單的實時獲取數據。代碼SparkStreaming官網也可以找到。
object SocketDStreamTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("streamTest")
val ssc = new StreamingContext(conf, Seconds(2))
val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
textStream.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
通過maven配置相關的配置,可能有時候會遇到如下情況:
找到你的maven配置pop.xml,將對應的spark-streaming那個節點,你會發現有一個scope節點。這個節點我認為就是一個作用范圍的設置,如果從maven的官網上找配置那么會有這一個節點<scope>provided</scope>,配置為provided。而我們的SparkStreaming的官網上沒有這個節點,具體原因我還沒有去看。解決完這個問題,直接運行代碼等待。在Linux系統中,啟動$ nc -lk 9999 官網寫的,啟動會因為我們代碼中有對9999端口的監視,然后我們往里面添加數據。它是以一行一行的去讀取的。
但是缺陷就是他不會累加前后的讀取結果,那么怎么去累加呢,之后會在另開一個博客講述,當然這些官網上都是有的。就這么多了,如有不對之處,還望指點。
it's a long way for success