【Spark】SparkStreaming從不同基本數據源讀取數據



基本數據源

文件數據源

注意事項

1.SparkStreaming不支持監控嵌套目錄
2.文件進入dataDirectory(受監控的文件夾)需要通過移動或者重命名實現
3.一旦文件移動進目錄,則不能再修改,即使修改也不會讀取修改后的數據

步驟

一、創建maven工程並導包
<properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.0</spark.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.5</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.38</version>
    </dependency>

</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <!-- <verbal>true</verbal>-->
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass></mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
二、在HDFS創建目錄,並上傳要做測試的數據
cd /export/servers/
vim wordcount.txt
hello world
abc test
hadoop hive

HDFS上創建目錄

hdfs dfs -mkdir /stream_data
hdfs dfs -put wordcount.txt /stream_data
三、開發SparkStreaming代碼
package cn.itcast.sparkstreaming.demo1

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object getHdfsFiles {

  // 自定義updateFunc函數
  /** * updateFunc需要兩個參數 * * @param newValues 新輸入數據計數累加的值 * @param runningCount 歷史數據計數累加完成的值 * @return 返回值是Option * * Option是scala中比較特殊的類,是some和none的父類,主要為了解決null值的問題 */
  def updateFunc(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val finalResult: Int = newValues.sum + runningCount.getOrElse(0)
    Option(finalResult)
  }

  def main(args: Array[String]): Unit = {
    //獲取SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("getHdfsFiles_to_wordcount").setMaster("local[6]").set("spark.driver.host", "localhost")
    // 獲取SparkContext
    val sparkContext = new SparkContext(sparkConf)
    // 設置日志級別
    sparkContext.setLogLevel("WARN")
    // 獲取StreamingContext
    val streamingContext = new StreamingContext(sparkContext, Seconds(5))
    // 將歷史結果都保存到一個路徑下
    streamingContext.checkpoint("./stream.check")

    // 讀取HDFS上的文件
    val fileStream: DStream[String] = streamingContext.textFileStream("hdfs://node01:8020/stream_data")
    // 對讀取到的文件進行計數操作
    val flatMapStream: DStream[String] = fileStream.flatMap(x => x.split(" "))
    val wordAndOne: DStream[(String, Int)] = flatMapStream.map(x => (x, 1))
    // reduceByKey不會將歷史消息的值進行累加,所以需要用到updateStateByKey,需要的參數是updateFunc,需要自定義
    val byKey: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)

    //輸出結果
    byKey.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
四、運行代碼后,往HDFS文件夾上傳文件

在這里插入圖片描述

五、控制台輸出結果
-------------------------------------------
Time: 1586856345000 ms
-------------------------------------------

-------------------------------------------
Time: 1586856350000 ms
-------------------------------------------

-------------------------------------------
Time: 1586856355000 ms
-------------------------------------------
(abc,1)
(world,1)
(hadoop,1)
(hive,1)
(hello,1)
(test,1)

-------------------------------------------
Time: 1586856360000 ms
-------------------------------------------
(abc,1)
(world,1)
(hadoop,1)
(hive,1)
(hello,1)
(test,1)

-------------------------------------------
Time: 1586856365000 ms
-------------------------------------------
(abc,1)
(world,1)
(hadoop,1)
(hive,1)
(hello,1)
(test,1)

-------------------------------------------
Time: 1586856370000 ms
-------------------------------------------
(abc,2)
(world,2)
(hadoop,2)
(hive,2)
(hello,2)
(test,2)

-------------------------------------------
Time: 1586856375000 ms
-------------------------------------------
(abc,2)
(world,2)
(hadoop,2)
(hive,2)
(hello,2)
(test,2)


自定義數據源

步驟

一、使用nc工具給指定端口發送數據
nc -lk 9999
二、開發代碼
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object CustomReceiver {

  /** * 自定義updateFunc函數 * @param newValues * @param runningCount * @return */
  def updateFunc(newValues:Seq[Int], runningCount:Option[Int]):Option[Int] = {
    val finalResult: Int = newValues.sum + runningCount.getOrElse(0)
    Option(finalResult)
  }

  def main(args: Array[String]): Unit = {
    // 獲取SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[6]").set("spark.driver.host", "localhost")
    // 獲取SparkContext
    val sparkContext = new SparkContext(sparkConf)
    sparkContext.setLogLevel("WARN")
    // 獲取StreamingContext
    val streamingContext = new StreamingContext(sparkContext, Seconds(5))
    streamingContext.checkpoint("./stream_check")

    // 讀取自定義數據源的數據
    val stream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("node01", 9999))

    // 對數據進行切割、計數操作
    val mapStream: DStream[String] = stream.flatMap(x => x.split(" "))
    val wordAndOne: DStream[(String, Int)] = mapStream.map((_, 1))
    val byKey: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)

    // 輸出結果
    byKey.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2){
  /** * 自定義receive方法接收socket數據,並調用store方法將數據保存起來 */
  private def receiverDatas(): Unit ={
    // 接收socket數據
    val socket = new Socket(host, port)
    // 獲取socket數據輸入流
    val stream: InputStream = socket.getInputStream
    //通過BufferedReader ,將輸入流轉換為字符串
    val reader = new BufferedReader(new InputStreamReader(stream,StandardCharsets.UTF_8))

    var line: String = null
    //判斷讀取到的數據不為空且receiver沒有被停掉時
    while ((line = reader.readLine()) != null && !isStopped()){
      store(line)
    }

    stream.close()
    socket.close()
    reader.close()
  }


  /** * 重寫onStart和onStop方法,主要是onStart,onStart方法會被反復調用 */
  override def onStart(): Unit = {
    // 啟動通過連接接收數據的線程
    new Thread(){
      //重寫run方法
      override def run(): Unit = {
        // 定義一個receiverDatas接收socket數據
        receiverDatas()
      }
    }
  }

  // 停止結束的時候被調用
  override def onStop(): Unit = {

  }
}

RDD隊列

步驟

一、開發代碼
package cn.itcast.sparkstreaming.demo3

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object QueneReceiver {
  def main(args: Array[String]): Unit = {
    //獲取SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[6]").setAppName("queneReceiver").set("spark.driver.host", "localhost")
    //獲取SparkContext
    val sparkContext = new SparkContext(sparkConf)
    sparkContext.setLogLevel("WARN")
    //獲取StreamingContext
    val streamingContext = new StreamingContext(sparkContext, Seconds(5))

    val queue = new mutable.SynchronizedQueue[RDD[Int]]
    // 需要參數 queue: Queue[RDD[T]]
    val inputStream: InputDStream[Int] = streamingContext.queueStream(queue)
    // 對DStream進行操作
    val mapStream: DStream[Int] = inputStream.map(x => x * 2)

    mapStream.print()

    streamingContext.start()
    //定義一個RDD隊列
    for (x <- 1 to 100){
      queue += streamingContext.sparkContext.makeRDD(1 to 10)
      Thread.sleep(3000)
    }
    streamingContext.awaitTermination()

  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM