Spark Streaming 與 Kinesis的集成


1:概念

Amazon Kinesis

  Amazon Kinesis是一種全面管理的服務,用於大規模實時處理流數據。提供多種核心功能,可以經濟高效地處理任意規模的流數據,同時具有很高的靈活性。借助 Amazon Kinesis,可以獲取實時數據 (例如視頻、音頻、應用程序日志、網站點擊流) 以及關於機器學習、分析和其他應用程序的 IoT 遙測數據。借助 Amazon Kinesis,可以即刻對收到的數據進行處理和分析並做出響應,無需等到收集完全部數據后才開始進行處理。

Shard (分區)

  分區是 Amazon Kinesis 數據流的基本吞吐量單位。一個分片提供 1MB/秒數據輸入和 2MB/秒輸入輸出容量。一個分片最高可支持每秒 1 000 個 PUT 記錄。創建數據流時,必須指定所需的分區數量。

Record(記錄)

  記錄是存儲在 Amazon Kinesis 數據流中的數據單元。記錄由序列號、分區鍵和數據 Blob 組成。數據 Blob 是數據創建器添加到數據流的重要數據。數據塊的最大尺寸(Base64 編碼前的數據有效載荷)是 1 兆字節 (MB)。

Partition Key(分區鍵)

  分區鍵用於隔離Records並路由到不同的數據流Shards。分區鍵由數據創建器在添加數據到 Amazon Kinesis 數據流時指定。

Sequence Number(序列號)

  序列號是每個Record的唯一標識符

 

2:Kinesis Data Stream的限制

  1. kinesis默認情況下數據量的Record在添加后的最長24小時內進行訪問。也可以啟動延長數據保留期限來將該限制提升到7天。

  2. Record內的數據塊最大是1MB

  3. 每個Shard 最高可支持每秒1000個Put輸入記錄。也就是說每個Shard的最大寫帶寬為1Gb/s

  

  本次實驗是以Spark官網給出的例子來實現。有興趣可以看英文原文:https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 這次實現會創建一個名叫word-counts-kinesis的Kinesis Data Stream Shard 數量為1。

 

3:創建Kinesis stream

 

 

 

4: 實現一個往kinesis寫數據的Producer

代碼實現如下:

import java.nio.ByteBuffer

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.log4j.{Level, Logger}

import scala.util.Random

object KinesisWordProducerASL {

  def main(args: Array[String]): Unit = {
    //調整日志級別
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //Kinesis Stream 名稱
    val stream = "word-counts-kinesis"
    //Kinesis 訪問路徑
    val endpoint = "https://kinesis.us-east-1.amazonaws.com"
    //一秒鍾發送1000個Records
    val recordsPerSecond = "1000"
    //一個Record包含100個單詞
    val wordsPerRecord = "10"

    val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt)

    println("Totals for the words send")
    totals.foreach(println(_))
  }

  private def generate(stream: String,
                       endpoint: String,
                       recordsPerSecond: Int,
                       wordsPerRecord: Int): Seq[(String, Int)] = {
    //定義一個單詞列表
    val randomWords = List("spark", "hadoop", "hive", "kinesis", "kinesis")
    val totals = scala.collection.mutable.Map[String, Int]()

    //建立Kinesis連接 這里aws_access_key_id,aws_secret_access_key已經存在本地credentials
    val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
    kinesisClient.setEndpoint(endpoint)

    println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
      s" $recordsPerSecond records per second and $wordsPerRecord words per record")

    //根據recordsPerSecond 和 wordsPerRecord 將隨機生成的單詞放入Record
    for (i <- 1 to 2) {
      val records =(1 to recordsPerSecond.toInt).foreach {
        recordNum =>
          val data = (1 to wordsPerRecord.toInt).map(x => {
            val randomWordIdx = Random.nextInt(randomWords.size)
            val randomWord = randomWords(randomWordIdx)

            totals(randomWord) = totals.getOrElse(randomWord, 0) + 1

            randomWord
          }).mkString(" ")

          //創建一個分區鍵
          val partitionKey = s"partitionKey-$recordNum"
          //創建一個putRecordRequest
          val putRecordRequest = new PutRecordRequest().withStreamName(stream)
            .withPartitionKey(partitionKey)
            .withData(ByteBuffer.wrap(data.getBytes))
          //將record放到stream中
          val putRecordResult = kinesisClient.putRecord(putRecordRequest)
      }

      Thread.sleep(1000)
      println("Sent " + recordsPerSecond + " records")
    }

    totals.toSeq.sortBy(_._1)
  }
}

  運行結果

Note: 如果是在本地運行是需要AWS credentials 文件

 

5:實現Spark Streaming往Kinesis讀數據

Spark讀數據的頻率為2秒鍾從Kinesis Data Stream讀一次

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object KinesisWordCountASL {
  def main(args: Array[String]): Unit = {
    //調整日志級別
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val appName = "WordCountsApp"
    //Kinesis Stream 名稱
    val streamName = "word-counts-kinesis"
    val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"

    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
    require(credentials !=null, "No AWS credentials found. Please specify credentials using one of the methods specified " +
      "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
    val kinesisClient = new AmazonKinesisClient(credentials)
    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription.getShards().size()

    val numStreams = numShards

    // Spark Streaming batch interval
    val batchInterval = Milliseconds(2000)
    val kinesisCheckpointInterval = batchInterval
    val regionName = getRegionNameByEndpoint(endpointUrl)

    val sparkConf = new SparkConf().setAppName("KinesisWordCountASL").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisInputDStream.builder
        .streamingContext(ssc)
        .streamName(streamName)
        .endpointUrl(endpointUrl)
        .regionName(regionName)
        .initialPosition(new Latest())
        .checkpointAppName(appName)
        .checkpointInterval(kinesisCheckpointInterval)
        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
        .build()
    }

    //Union all the streams
    val unionStreams = ssc.union(kinesisStreams)

    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))

    val wordCounts =words.map(word => (word, 1)).reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def getRegionNameByEndpoint(endpoint: String): String = {
    import scala.collection.JavaConverters._
    val uri = new java.net.URI(endpoint)
    RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
      .asScala
      .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
      .map(_.getName)
      .getOrElse(
        throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
  }
}

  運行結果

6:架構思路

 7:總結

Spark Streaming + Kinesis 易用性高,上手容易。可以很快速的搭建一個大數據爬蟲網站。前端開啟成千上萬個爬蟲往Kinesis里面寫數據。后端用Spark Streaming 分發,過濾,分析數據。

源碼路徑: https://github.com/mayflower-zc/spark-kinesis-sample-project


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM