1:概念
Amazon Kinesis
Amazon Kinesis是一種全面管理的服務,用於大規模實時處理流數據。提供多種核心功能,可以經濟高效地處理任意規模的流數據,同時具有很高的靈活性。借助 Amazon Kinesis,可以獲取實時數據 (例如視頻、音頻、應用程序日志、網站點擊流) 以及關於機器學習、分析和其他應用程序的 IoT 遙測數據。借助 Amazon Kinesis,可以即刻對收到的數據進行處理和分析並做出響應,無需等到收集完全部數據后才開始進行處理。
Shard (分區)
分區是 Amazon Kinesis 數據流的基本吞吐量單位。一個分片提供 1MB/秒數據輸入和 2MB/秒輸入輸出容量。一個分片最高可支持每秒 1 000 個 PUT 記錄。創建數據流時,必須指定所需的分區數量。
Record(記錄)
記錄是存儲在 Amazon Kinesis 數據流中的數據單元。記錄由序列號、分區鍵和數據 Blob 組成。數據 Blob 是數據創建器添加到數據流的重要數據。數據塊的最大尺寸(Base64 編碼前的數據有效載荷)是 1 兆字節 (MB)。
Partition Key(分區鍵)
分區鍵用於隔離Records並路由到不同的數據流Shards。分區鍵由數據創建器在添加數據到 Amazon Kinesis 數據流時指定。
Sequence Number(序列號)
序列號是每個Record的唯一標識符
2:Kinesis Data Stream的限制
1. kinesis默認情況下數據量的Record在添加后的最長24小時內進行訪問。也可以啟動延長數據保留期限來將該限制提升到7天。
2. Record內的數據塊最大是1MB
3. 每個Shard 最高可支持每秒1000個Put輸入記錄。也就是說每個Shard的最大寫帶寬為1Gb/s
本次實驗是以Spark官網給出的例子來實現。有興趣可以看英文原文:https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 這次實現會創建一個名叫word-counts-kinesis的Kinesis Data Stream Shard 數量為1。
3:創建Kinesis stream

4: 實現一個往kinesis寫數據的Producer
代碼實現如下:
import java.nio.ByteBuffer
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.log4j.{Level, Logger}
import scala.util.Random
object KinesisWordProducerASL {
def main(args: Array[String]): Unit = {
//調整日志級別
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//Kinesis Stream 名稱
val stream = "word-counts-kinesis"
//Kinesis 訪問路徑
val endpoint = "https://kinesis.us-east-1.amazonaws.com"
//一秒鍾發送1000個Records
val recordsPerSecond = "1000"
//一個Record包含100個單詞
val wordsPerRecord = "10"
val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt)
println("Totals for the words send")
totals.foreach(println(_))
}
private def generate(stream: String,
endpoint: String,
recordsPerSecond: Int,
wordsPerRecord: Int): Seq[(String, Int)] = {
//定義一個單詞列表
val randomWords = List("spark", "hadoop", "hive", "kinesis", "kinesis")
val totals = scala.collection.mutable.Map[String, Int]()
//建立Kinesis連接 這里aws_access_key_id,aws_secret_access_key已經存在本地credentials
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint)
println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per record")
//根據recordsPerSecond 和 wordsPerRecord 將隨機生成的單詞放入Record
for (i <- 1 to 2) {
val records =(1 to recordsPerSecond.toInt).foreach {
recordNum =>
val data = (1 to wordsPerRecord.toInt).map(x => {
val randomWordIdx = Random.nextInt(randomWords.size)
val randomWord = randomWords(randomWordIdx)
totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
randomWord
}).mkString(" ")
//創建一個分區鍵
val partitionKey = s"partitionKey-$recordNum"
//創建一個putRecordRequest
val putRecordRequest = new PutRecordRequest().withStreamName(stream)
.withPartitionKey(partitionKey)
.withData(ByteBuffer.wrap(data.getBytes))
//將record放到stream中
val putRecordResult = kinesisClient.putRecord(putRecordRequest)
}
Thread.sleep(1000)
println("Sent " + recordsPerSecond + " records")
}
totals.toSeq.sortBy(_._1)
}
}
運行結果
Note: 如果是在本地運行是需要AWS credentials 文件

5:實現Spark Streaming往Kinesis讀數據
Spark讀數據的頻率為2秒鍾從Kinesis Data Stream讀一次
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object KinesisWordCountASL {
def main(args: Array[String]): Unit = {
//調整日志級別
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val appName = "WordCountsApp"
//Kinesis Stream 名稱
val streamName = "word-counts-kinesis"
val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
require(credentials !=null, "No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription.getShards().size()
val numStreams = numShards
// Spark Streaming batch interval
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val regionName = getRegionNameByEndpoint(endpointUrl)
val sparkConf = new SparkConf().setAppName("KinesisWordCountASL").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, batchInterval)
val kinesisStreams = (0 until numStreams).map { i =>
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPosition(new Latest())
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
}
//Union all the streams
val unionStreams = ssc.union(kinesisStreams)
val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
val wordCounts =words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
def getRegionNameByEndpoint(endpoint: String): String = {
import scala.collection.JavaConverters._
val uri = new java.net.URI(endpoint)
RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
}
}
運行結果

6:架構思路

7:總結
Spark Streaming + Kinesis 易用性高,上手容易。可以很快速的搭建一個大數據爬蟲網站。前端開啟成千上萬個爬蟲往Kinesis里面寫數據。后端用Spark Streaming 分發,過濾,分析數據。
源碼路徑: https://github.com/mayflower-zc/spark-kinesis-sample-project
