一、什么是 Spark Streaming
1、SparkStreaming 是 Spark核心API 的擴展。可實現可伸縮、高吞吐、容錯機制的實時流處理。
如圖,數據可從 Kafka、Flume、HDFS 等多種數據源獲得,最后將數據推送到 HDFS、數據庫 或者 Dashboards 上面。
2、SparkStreaming 接收到實時的數據,然后按照時間段將實時數據分成多個批次,經過Spark處理引擎的數據處理,最后按照批次輸出。
3、SparkStreaming 提供了一個高抽象的離散流或者叫做 DStream,它相當於連續的數據流。
外部數據不斷的涌入,數據按照自定義的時間將數據進行切片,每個時間段內的數據是連續的,時間段與時間段之間的關系史相互獨立的。這就是 離散流。
DStream 是 RDD 的序列化,DStream 可以看作一組 RDD 的集合。
4、在DStream上執行的任何操作都轉換為對基礎RDD的操作。
二、需要的maven包
<!-- Spark Streaming --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- config的統一配置 --> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.3</version> </dependency> <!-- json格式轉化 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency>
三、項目參數的配置文件
首先在 src/resources 路徑下創建一個 application.conf 文件,上面 maven 文件中添加的 config 依賴默認會讀取 src/resources 路徑下的 application.conf 文件。
applicaltion.conf
kafka.topic = "topicName"
kafka.group.id = "Your group"
kafka.broker.list = "your Kafka's host:port"
redis.host = "redisIP, no port"
redis.db=1
Redis 會有 16個庫,這里面 redis.db=1,代表我會將數據存入到redis的1庫中。
然后我會創建一個 Object,通過 ConfigFactory 來獲取 application 中的參數的值.
object ParamsConf {
private lazy val conf = ConfigFactory.load()
val topics = conf.getString("kafka.topic").split(",")
val groupId = conf.getString("kafka.group.id")
val brokers = conf.getString("kafka.broker.list")
val redisHost = conf.getString("redis.host")
val redisDB = conf.getInt("redis.db")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
}
這樣項目就會很方便的通過這個工具類來獲取到統一的參數配置了。
四、向 Kafka 傳入 Demo 數據
通過 KafkaProducer 創建與 Kafka 之間的連接,KafkaProducer 在創建對象的時候需要傳入生產者的配置參數。通過讀 Source Code 都可以查到對應的參數配置
val prop = new Properties() // 序列化用到的 key、value prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer") prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer") // kafka的地址,在上面的配置里 prop.put("bootstrap.servers", ParamsConf.brokers) prop.put("request.required.acks", "1") val topic = ParamsConf.topics(0) val producer = new KafkaProducer[String, String](prop)
其中 request.required.acks 是 Kafka 的發送確認,有 -1、0、1 三個級別。-1 代表 producer 會獲得所有同步replicas 都收到數據的確認,才會發下一條消息;0代表 producer 不等待確認消息,producer中有message就會發給broker;1代表 獲得leader replica已經接收了數據的確認信息。
之后利用循環編造數據,並將數據發送給 Kafka
val random = new Random() val dateFormat = FastDateFormat.getInstance("yyyyMMddHHmmss") for(i <- 1 to 100) { val time = dateFormat.format(new Date()) val userId = random.nextInt(1000).toString val courseid = random.nextInt(500).toString val fee = random.nextInt(400).toString val result = Array("0","1") // 0 表示未成功;1 表示成功 val flag = result(random.nextInt(2)).toString var orderId = UUID.randomUUID().toString() val map = new util.HashMap[String, Object]() map.put("time",time) map.put("userId",userId) map.put("courseid",courseid) map.put("fee",fee) map.put("result",result) map.put("flag",flag) map.put("orderId",orderId) val json = new JSONObject(map) producer.send(new ProducerRecord[String, String](topic, i.toString, json.toString())) }
五、 Spark Streaming 對接 Kafka
初始化 SparkStreaming 程序,首先需要創建 StreamingContext。StreamingContext的創建方式:
val conf = new SparkConf().setMaster("local[2]").setAppName("hunterV2") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) ... ... Do Something... ... ssc.start() ssc.awaitTermination()
在邏輯代碼之后要調用 ssc.start() 來開始實時處理數據。ssc.awaitTermination() 方法也是必需的, 來等待應用程序的終止,也可以用 ssc.stop() 來終止程序。或者就是讓它持續不斷的運行進行計算
通過 KafkaUtils 以直連方式拉取數據,這種方式不會修改數據的偏移量,需要手動的更新
val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](ParamsConf.topics, ParamsConf.kafkaParams))
createDirectStream 有三個參數,查看 Source Code
def createDirectStream[K, V]( ssc: StreamingContext, locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V] )
前兩個參數較明確,第三個參數中則是要添加 kafka 的 topic 以及 Kafka 的信息,在 ParamsConf 中已經定義好了,這個也可以在官網文檔中可以找到。(https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html)
拉取數據之后,SparkStreaming 提供的 stream 變量是 DStream 類型數據。DStream 是一組 RDD 的序列,DStream 任何操作都轉換為對基礎 RDD 的操作。因此需要用到 DStream 的 action -- foreachRDD.
stream.foreachRDD( rdd => { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges ... ... Do SomeThing ... stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })
在邏輯代碼的前后有兩行 關於 offsetRanges 的語句,這是 kafka 自帶的用作管理 offset 的語句。第一句是讀取當前的偏移量的數據,邏輯執行成功之后,最后一句是將偏移量提交到 Kafka 上。(https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself)