1.Spark Streaming簡介
Spark Streaming從各種輸入源中讀取數據,並把數據分組為小的批次。新的批次按均勻的時間間隔創建出來。在每個時間區間開始的時候,一個新的批次就創建出來,在該區間內收到的數據都會被添加到這個批次中。在時間區間結束時,批次停止增長,時間區間的大小是由批次間隔這個參數決定的。批次間隔一般設在500毫秒到幾秒之間,由開發者配置。每個輸入批次都形成一個RDD,以 Spark 作業的方式處理並生成其他的 RDD。 處理的結果可以以批處理的方式傳給外部系統,Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個 RDD 序列,每個RDD代表數據流中一個時間片內的數據。另外加入了窗口操作和狀態轉化,其他和批次處理類似。
與StructedStreaming的區別
StructedStreaming誕生於2.x后,主要用於處理結構化數據,除了實現與Spark Streaming的批處理,還實現了long-running的task,主要理解為處理的時機可以是數據的生產時間,而非收到數據的時間,可以細看下表:
流處理模式 | SparkStreaming | Structed Streaming |
---|---|---|
執行模式 | Micro Batch | Micro batch / Streaming |
API | Dstream/streamingContext | Dataset/DataFrame,SparkSession |
Job 生成方式 | Timer定時器定時生成job | Trigger觸發 |
支持數據源 | Socket,filstream,kafka,zeroMq,flume,kinesis | Socket,filstream,kafka,ratesource |
executed-based | Executed based on dstream api | Executed based on sparksql |
Time based | Processing Time | ProcessingTime & eventTIme |
UI | Built-in | No |
對於流處理,現在生產環境下使用Flink較多,數據源方式,現在基本是以kafka為主,所以本文對Spark Streaming的場景即ETL流處理結構化日志,將結果輸入Kafka隊列
2.Spark Sreaming的運行流程
1、客戶端提交Spark Streaming作業后啟動Driver,Driver啟動Receiver,Receiver接收數據源的數據
2、每個作業包含多個Executor,每個Executor以線程的方式運行task,SparkStreaming至少包含一個receiver task(一般情況下)
3、Receiver接收數據后生成Block,並把BlockId匯報給Driver,然后備份到另外一個 Executor 上
4、ReceiverTracker維護 Reciver 匯報的BlockId
5、Driver定時啟動JobGenerator,根據Dstream的關系生成邏輯RDD,然后創建Jobset,交給JobScheduler
6、JobScheduler負責調度Jobset,交給DAGScheduler,DAGScheduler根據邏輯RDD,生成相應的Stages,每個stage包含一到多個Task,將TaskSet提交給TaskSchedule
7、TaskScheduler負責把 Task 調度到 Executor 上,並維護 Task 的運行狀態
常用數據源的讀取方式
常數據流:
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
Socket:
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
RDD隊列:
val queue = new Queue[RDD[Int]]()
val queueDStream: InputDStream[Int] = ssc.queueStream(queue)
文件夾:
val lines: DStream[String] = ssc.textFileStream("data/log/")
3.案例說明
生產上,常用流程如下,批處理原始Kafka日志,比如請求打點日志等,使用Spark Streaming來將數據清洗轉變為一定格式再導入Kafka中,為了保證exact-once,會將offer自己來保存,主要保存在redis-offset中
數據地址:鏈接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取碼:hell
3.1 原始Kafka日志
sample.log格式如下:
我們將它先放到文件里,模擬生產環境下xx.log
3.2 創建兩個topic,並創建KafkaProducer來嫁給你數據寫入mytopic1
一個用來放原始的日志數據,一個用來放處理過后的日志
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1
啟動redis服務:
./redis-server redis.conf
查看mytopic1數據
kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning
3.3 代碼實現
第一部分,處理原始文件數據寫入mytopic1
package com.hoult.Streaming.work
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object FilerToKafka {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
// 定義 kafka producer參數
val lines: RDD[String] = sc.textFile("data/sample.log")
// 定義 kafka producer參數
val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
// 將讀取到的數據發送到mytopic1
lines.foreachPartition{iter =>
// KafkaProducer
val producer = new KafkaProducer[String, String](prop)
iter.foreach{line =>
val record = new ProducerRecord[String, String]("mytopic1", line)
producer.send(record)
}
producer.close()
}
}
}
第二部分,streaming讀取mytopic1的數據,寫入mytopic2
package com.hoult.Streaming.work
import java.util.Properties
import com.hoult.Streaming.kafka.OffsetsWithRedisUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 每秒處理Kafka數據,生成結構化數據,輸入另外一個Kafka topic
*/
object KafkaStreamingETL {
val log = Logger.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
// 需要消費的topic
val topics: Array[String] = Array("mytopic1")
val groupid = "mygroup1"
// 定義kafka相關參數
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)
// 從Redis獲取offset
val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)
// 創建DStream
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
// 從kafka中讀取數據
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)
)
// 轉換后的數據發送到另一個topic
dstream.foreachRDD{rdd =>
if (!rdd.isEmpty) {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(process)
// 將offset保存到Redis
OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)
}
}
// 啟動作業
ssc.start()
ssc.awaitTermination()
}
def process(iter: Iterator[ConsumerRecord[String, String]]) = {
iter.map(line => parse(line.value))
.filter(!_.isEmpty)
// .foreach(println)
.foreach(line =>sendMsg2Topic(line, "mytopic2"))
}
def parse(text: String): String = {
try{
val arr = text.replace("<<<!>>>", "").split(",")
if (arr.length != 15) return ""
arr.mkString("|")
} catch {
case e: Exception =>
log.error("解析數據出錯!", e)
""
}
}
def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
}
def getKafkaProducerParameters(): Properties = {
val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop
}
def sendMsg2Topic(msg: String, topic: String): Unit = {
val producer = new KafkaProducer[String, String](getKafkaProducerParameters())
val record = new ProducerRecord[String, String](topic, msg)
producer.send(record)
}
}
第三部分,從redis中讀寫offset的工具
package com.hoult.Streaming.kafka
import java.util
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
import scala.collection.mutable
object OffsetsWithRedisUtils {
// 定義Redis參數
private val redisHost = "linux121"
private val redisPort = 6379
// 獲取Redis的連接
private val config = new JedisPoolConfig
// 最大空閑數
config.setMaxIdle(5)
// 最大連接數
config.setMaxTotal(10)
private val pool = new JedisPool(config, redisHost, redisPort, 10000)
private def getRedisConnection: Jedis = pool.getResource
private val topicPrefix = "kafka:topic"
// Key:kafka:topic:TopicName:groupid
private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"
// 根據 key 獲取offsets
def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
val jedis: Jedis = getRedisConnection
val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>
val key = getKey(topic, groupId)
import scala.collection.JavaConverters._
jedis.hgetAll(key)
.asScala
.map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }
}
// 歸還資源
jedis.close()
offsets.flatten.toMap
}
// 將offsets保存到Redis中
def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {
// 獲取連接
val jedis: Jedis = getRedisConnection
// 組織數據
offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
.groupBy(_._1)
.foreach{case (topic, buffer) =>
val key: String = getKey(topic, groupId)
import scala.collection.JavaConverters._
val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava
// 保存數據
jedis.hmset(key, maps)
}
jedis.close()
}
def main(args: Array[String]): Unit = {
val topics = Array("mytopic1")
val groupid = "mygroup1"
val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid)
x.foreach(println)
}
}
3.4 演示
-
啟動redis
./redis-server ./redis.conf
-
啟動kafka並創建topic
sh scripts/kafka.sh start
3.2 創建兩個topic,並創建KafkaProducer來嫁給你數據寫入mytopic1 -
啟動FilerToKafka 和 KafkaStreamingETL
完整代碼:https://github.com/hulichao/bigdata-spark/blob/master/src/main/scala/com/hoult/Streaming/work
4.spark-streamin注意事項
spark-streaming讀文件讀不到的問題 ,讀取本地文件時候,要注意,它不會讀取原本就存在於該文件里的文本,只會讀取在監聽期間,傳進文件夾里的數據,而且本文本還有要求,必須是它組后一次更改並且保存的操作,是在監聽開始的那一刻
之后的,其實意思就是,如果要向被監聽的文件夾里傳一個文本,你就要在監聽開始之后,先打開這個文本,隨便輸入幾個空格,或者回車,或者其他不影響文本內容的操作,然后保存,最后再傳進文件夾里,這樣它才能
檢測到這個被傳進來的文本。(估計它這個用意是只監聽被更改過的文本吧),參考:https://www.codeleading.com/article/9561702251/
吳邪,小三爺,混跡於后台,大數據,人工智能領域的小菜鳥。
更多請關注