1:概念
Amazon Kinesis
Amazon Kinesis是一種全面管理的服務,用於大規模實時處理流數據。提供多種核心功能,可以經濟高效地處理任意規模的流數據,同時具有很高的靈活性。借助 Amazon Kinesis,可以獲取實時數據 (例如視頻、音頻、應用程序日志、網站點擊流) 以及關於機器學習、分析和其他應用程序的 IoT 遙測數據。借助 Amazon Kinesis,可以即刻對收到的數據進行處理和分析並做出響應,無需等到收集完全部數據后才開始進行處理。
Shard (分區)
分區是 Amazon Kinesis 數據流的基本吞吐量單位。一個分片提供 1MB/秒數據輸入和 2MB/秒輸入輸出容量。一個分片最高可支持每秒 1 000 個 PUT 記錄。創建數據流時,必須指定所需的分區數量。
Record(記錄)
記錄是存儲在 Amazon Kinesis 數據流中的數據單元。記錄由序列號、分區鍵和數據 Blob 組成。數據 Blob 是數據創建器添加到數據流的重要數據。數據塊的最大尺寸(Base64 編碼前的數據有效載荷)是 1 兆字節 (MB)。
Partition Key(分區鍵)
分區鍵用於隔離Records並路由到不同的數據流Shards。分區鍵由數據創建器在添加數據到 Amazon Kinesis 數據流時指定。
Sequence Number(序列號)
序列號是每個Record的唯一標識符
2:Kinesis Data Stream的限制
1. kinesis默認情況下數據量的Record在添加后的最長24小時內進行訪問。也可以啟動延長數據保留期限來將該限制提升到7天。
2. Record內的數據塊最大是1MB
3. 每個Shard 最高可支持每秒1000個Put輸入記錄。也就是說每個Shard的最大寫帶寬為1Gb/s
本次實驗是以Spark官網給出的例子來實現。有興趣可以看英文原文:https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 這次實現會創建一個名叫word-counts-kinesis的Kinesis Data Stream Shard 數量為1。
3:創建Kinesis stream
4: 實現一個往kinesis寫數據的Producer
代碼實現如下:
import java.nio.ByteBuffer import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.model.PutRecordRequest import org.apache.log4j.{Level, Logger} import scala.util.Random object KinesisWordProducerASL { def main(args: Array[String]): Unit = { //調整日志級別 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //Kinesis Stream 名稱 val stream = "word-counts-kinesis" //Kinesis 訪問路徑 val endpoint = "https://kinesis.us-east-1.amazonaws.com" //一秒鍾發送1000個Records val recordsPerSecond = "1000" //一個Record包含100個單詞 val wordsPerRecord = "10" val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt) println("Totals for the words send") totals.foreach(println(_)) } private def generate(stream: String, endpoint: String, recordsPerSecond: Int, wordsPerRecord: Int): Seq[(String, Int)] = { //定義一個單詞列表 val randomWords = List("spark", "hadoop", "hive", "kinesis", "kinesis") val totals = scala.collection.mutable.Map[String, Int]() //建立Kinesis連接 這里aws_access_key_id,aws_secret_access_key已經存在本地credentials val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpoint) println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" + s" $recordsPerSecond records per second and $wordsPerRecord words per record") //根據recordsPerSecond 和 wordsPerRecord 將隨機生成的單詞放入Record for (i <- 1 to 2) { val records =(1 to recordsPerSecond.toInt).foreach { recordNum => val data = (1 to wordsPerRecord.toInt).map(x => { val randomWordIdx = Random.nextInt(randomWords.size) val randomWord = randomWords(randomWordIdx) totals(randomWord) = totals.getOrElse(randomWord, 0) + 1 randomWord }).mkString(" ") //創建一個分區鍵 val partitionKey = s"partitionKey-$recordNum" //創建一個putRecordRequest val putRecordRequest = new PutRecordRequest().withStreamName(stream) .withPartitionKey(partitionKey) .withData(ByteBuffer.wrap(data.getBytes)) //將record放到stream中 val putRecordResult = kinesisClient.putRecord(putRecordRequest) } Thread.sleep(1000) println("Sent " + recordsPerSecond + " records") } totals.toSeq.sortBy(_._1) } }
運行結果
Note: 如果是在本地運行是需要AWS credentials 文件
5:實現Spark Streaming往Kinesis讀數據
Spark讀數據的頻率為2秒鍾從Kinesis Data Stream讀一次
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Milliseconds, StreamingContext} object KinesisWordCountASL { def main(args: Array[String]): Unit = { //調整日志級別 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val appName = "WordCountsApp" //Kinesis Stream 名稱 val streamName = "word-counts-kinesis" val endpointUrl = "https://kinesis.us-east-1.amazonaws.com" val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() require(credentials !=null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html") val kinesisClient = new AmazonKinesisClient(credentials) kinesisClient.setEndpoint(endpointUrl) val numShards = kinesisClient.describeStream(streamName).getStreamDescription.getShards().size() val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) val kinesisCheckpointInterval = batchInterval val regionName = getRegionNameByEndpoint(endpointUrl) val sparkConf = new SparkConf().setAppName("KinesisWordCountASL").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, batchInterval) val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build() } //Union all the streams val unionStreams = ssc.union(kinesisStreams) val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) val wordCounts =words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } def getRegionNameByEndpoint(endpoint: String): String = { import scala.collection.JavaConverters._ val uri = new java.net.URI(endpoint) RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX) .asScala .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost)) .map(_.getName) .getOrElse( throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint")) } }
運行結果
6:架構思路
7:總結
Spark Streaming + Kinesis 易用性高,上手容易。可以很快速的搭建一個大數據爬蟲網站。前端開啟成千上萬個爬蟲往Kinesis里面寫數據。后端用Spark Streaming 分發,過濾,分析數據。
源碼路徑: https://github.com/mayflower-zc/spark-kinesis-sample-project