基於Spark通用計算平台,可以很好地擴展各種計算類型的應用,尤其是Spark提供了內建的計算庫支持,像Spark Streaming、Spark SQL、MLlib、GraphX,這些內建庫都提供了高級抽象,可以用非常簡潔的代碼實現復雜的計算邏輯、這也得益於Scala編程語言的簡潔性。這里,我們基於1.3.0版本的Spark搭建了計算平台,實現基於Spark Streaming的實時計算。
我們的應用場景是分析用戶使用手機App的行為,描述如下所示:
- 手機客戶端會收集用戶的行為事件(我們以點擊事件為例),將數據發送到數據服務器,我們假設這里直接進入到Kafka消息隊列
- 后端的實時服務會從Kafka消費數據,將數據讀出來並進行實時分析,這里選擇Spark Streaming,因為Spark Streaming提供了與Kafka整合的內置支持
- 經過Spark Streaming實時計算程序分析,將結果寫入Redis,可以實時獲取用戶的行為數據,並可以導出進行離線綜合統計分析
Spark Streaming介紹
Spark Streaming提供了一個叫做DStream(Discretized Stream)的高級抽象,DStream表示一個持續不斷輸入的數據流,可以基於Kafka、TCP Socket、Flume等輸入數據流創建。在內部,一個DStream實際上是由一個RDD序列組成的。Sparking Streaming是基於Spark平台的,也就繼承了Spark平台的各種特性,如容錯(Fault-tolerant)、可擴展(Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每個DStream包含了一個時間間隔之內的數據項的集合,我們可以理解為指定時間間隔之內的一個batch,每一個batch就構成一個RDD數據集,所以DStream就是一個個batch的有序序列,時間是連續的,按照時間間隔將數據流分割成一個個離散的RDD數據集,如圖所示(來自官網):
我們都知道,Spark支持兩種類型操作:Transformations和Actions。Transformation從一個已知的RDD數據集經過轉換得到一個新的RDD數據集,這些Transformation操作包括map、filter、flatMap、union、join等,而且Transformation具有lazy的特性,調用這些操作並沒有立刻執行對已知RDD數據集的計算操作,而是在調用了另一類型的Action操作才會真正地執行。Action執行,會真正地對RDD數據集進行操作,返回一個計算結果給Driver程序,或者沒有返回結果,如將計算結果數據進行持久化,Action操作包括reduceByKey、count、foreach、collect等。關於Transformations和Actions更詳細內容,可以查看官網文檔。
同樣、Spark Streaming提供了類似Spark的兩種操作類型,分別為Transformations和Output操作,它們的操作對象是DStream,作用也和Spark類似:Transformation從一個已知的DStream經過轉換得到一個新的DStream,而且Spark Streaming還額外增加了一類針對Window的操作,當然它也是Transformation,但是可以更靈活地控制DStream的大小(時間間隔大小、數據元素個數),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操作允許我們將DStream數據輸出到一個外部的存儲系統,如數據庫或文件系統等,執行Output操作類似執行Spark的Action操作,使得該操作之前lazy的Transformation操作序列真正地執行。
Kafka+Spark Streaming+Redis編程實踐
下面,我們根據上面提到的應用場景,來編程實現這個實時計算應用。
首先,創建一個scala工程,創建方法見 三、使用maven創建scala工程(scala和java混一起)
引入kafka、redis、json等相關的包,pom.xml 如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>fvp</artifactId> <groupId>com.sf.fvp</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>com.sf</groupId> <artifactId>scalademo3</artifactId> <version>1.0-SNAPSHOT</version> <name>${project.artifactId}</name> <description>My wonderfull scala app</description> <inceptionYear>2015</inceptionYear> <licenses> <license> <name>My License</name> <url>http://....</url> <distribution>repo</distribution> </license> </licenses> <properties> <maven.compiler.source>1.6</maven.compiler.source> <maven.compiler.target>1.6</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.5</scala.version> <scala.compat.version>2.11</scala.compat.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>org.codehaus.jettison</groupId> <artifactId>jettison</artifactId> <version>1.3.8</version> </dependency> <dependency> <groupId>net.sf.ezmorph</groupId> <artifactId>ezmorph</artifactId> <version>1.0.6</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.5.2</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.2</version> </dependency> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs2</groupId> <artifactId>specs2-core_${scala.compat.version}</artifactId> <version>2.4.16</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.compat.version}</artifactId> <version>2.2.4</version> <scope>test</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <!-- see http://davidb.github.com/scala-maven-plugin --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> </plugins> </build> </project>
二、寫了一個Kafka Producer模擬程序,用來模擬向Kafka實時寫入用戶行為的事件數據,數據是JSON格式,示例如下:
{"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6}
一個事件包含4個字段:
- uid:用戶編號
- event_time:事件發生時間戳
- os_type:手機App操作系統類型
- click_count:點擊次數
下面是我們實現的代碼,如下所示:
package com.sf.scalademo3 import java.util.Properties import scala.util.Properties import org.codehaus.jettison.json.JSONObject import kafka.javaapi.producer.Producer import kafka.producer.KeyedMessage import kafka.producer.KeyedMessage import kafka.producer.ProducerConfig import scala.util.Random object KafkaEventProducer { private val users = Array( "4A4D769EB9679C054DE81B973ED5D768", "8dfeb5aaafc027d89349ac9a20b3930f", "011BBF43B89BFBF266C865DF0397AA71", "f2a8474bf7bd94f0aabbd4cdd2c06dcf", "068b746ed4620d25e26055a9f804385f", "97edfc08311c70143401745a03a50706", "d7f141563005d1b5d0d3dd30138f3f62", "c8ee90aade1671a21336c721512b817a", "6b67c8c700427dee7552f81f3228c927", "a95f22eabc4fd4b580c011a3161a9d9d") private val random = new Random() private var pointer = -1 def getUserID(): String = { pointer = pointer + 1 if (pointer >= users.length) { pointer = 0 users(pointer) } else { users(pointer) } } def click(): Double = { random.nextInt(10) } // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2 // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events // bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning def main(args: Array[String]): Unit = { val topic = "user_events" val brokers = "localhost:9092" val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig = new ProducerConfig(props) val producer = new Producer[String, String](kafkaConfig) while (true) { // prepare event data val event = new JSONObject() event .put("uid", getUserID) .put("event_time", System.currentTimeMillis.toString) .put("os_type", "Android") .put("click_count", click) // produce event message producer.send(new KeyedMessage[String, String](topic, event.toString)) println("Message sent: " + event) Thread.sleep(200) } } }
通過控制上面程序最后一行的時間間隔來控制模擬寫入速度。
三、下面我們來討論實現實時統計每個用戶的點擊次數,它是按照用戶分組進行累加次數,邏輯比較簡單,關鍵是在實現過程中要注意一些問題,如對象序列化等。先看實現代碼,稍后我們再詳細討論,代碼實現如下所示:
package com.sf.scalademo3 import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils import net.sf.json.JSONObject import redis.clients.jedis.JedisPool import org.apache.commons.pool2.impl.GenericObjectPoolConfig object UserClickCountAnalytics { def main(args: Array[String]): Unit = { var masterUrl = "local[1]" if (args.length > 0) { masterUrl = args(0) } // Create a StreamingContext with the given master URL val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat") val ssc = new StreamingContext(conf, Seconds(5)) // Kafka configurations val topics = Set("user_events") val brokers = "localhost:9092" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") val dbIndex = 1 val clickHashKey = "app::users::click" // Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val events = kafkaStream.flatMap(line => { val data = JSONObject.fromObject(line._2) Some(data) }) // Compute user click times val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _) userClicks.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { partitionOfRecords.foreach(pair => { val uid = pair._1 val clickCount = pair._2 val jedis = RedisClient.pool.getResource jedis.select(dbIndex) jedis.hincrBy(clickHashKey, uid, clickCount) RedisClient.pool.returnResource(jedis) }) }) }) ssc.start() ssc.awaitTermination() } }
上面代碼使用了Jedis客戶端來操作Redis,將分組計數結果數據累加寫入Redis存儲,如果其他系統需要實時獲取該數據,直接從Redis實時讀取即可。RedisClient實現代碼如下所示:
package com.sf.scalademo3 import redis.clients.jedis.JedisPool import org.apache.commons.pool2.impl.GenericObjectPoolConfig object RedisClient extends Serializable { val redisHost = "10.202.34.232" val redisPort = 6383 val redisTimeout = 30000 lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout) lazy val hook = new Thread { override def run = { println("Execute hook thread: " + this) pool.destroy() } } sys.addShutdownHook(hook.run) }
上面代碼我們分別在local[K]和Spark Standalone集群模式下運行通過。
如果我們是在開發環境進行調試的時候,也就是使用local[K]部署模式,在本地啟動K個Worker線程來計算,這K個Worker在同一個JVM實例里,上面的代碼默認情況是,如果沒有傳參數則是local[K]模式,所以如果使用這種方式在創建Redis連接池或連接的時候,可能非常容易調試通過,但是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集群部署模式的時候,就會報錯,主要是由於在處理Redis連接池或連接的時候出錯了。我們可以看一下Spark架構,如圖所示(來自官網):
無論是在本地模式、Standalone模式,還是在Mesos或YARN模式下,整個Spark集群的結構都可以用上圖抽象表示,只是各個組件的運行環境不同,導致組件可能是分布式的,或本地的,或單個JVM實例的。如在本地模式,則上圖表現為在同一節點上的單個進程之內的多個組件;而在YARN Client模式下,Driver程序是在YARN集群之外的一個節點上提交Spark Application,其他的組件都運行在YARN集群管理的節點上。
在Spark集群環境部署Application后,在進行計算的時候會將作用於RDD數據集上的函數(Functions)發送到集群中Worker上的Executor上(在Spark Streaming中是作用於DStream的操作),那么這些函數操作所作用的對象(Elements)必須是可序列化的,通過Scala也可以使用lazy引用來解決,否則這些對象(Elements)在跨節點序列化傳輸后,無法正確地執行反序列化重構成實際可用的對象。上面代碼我們使用lazy引用(Lazy Reference)來實現的,代碼如下所示:
package com.sf.scalademo3 import org.apache.commons.pool2.impl.GenericObjectPoolConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kafka.KafkaUtils import kafka.serializer.StringDecoder import net.sf.json.JSONObject import redis.clients.jedis.JedisPool object UserClickCountAnalytics2 { def main(args: Array[String]): Unit = { var masterUrl = "local[1]" if (args.length > 0) { masterUrl = args(0) } // Create a StreamingContext with the given master URL val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat") val ssc = new StreamingContext(conf, Seconds(5)) // Kafka configurations val topics = Set("user_events") val brokers = "localhost:9092" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") val dbIndex = 1 val clickHashKey = "app::users::click" // Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val events = kafkaStream.flatMap(line => { val data = JSONObject.fromObject(line._2) Some(data) }) // Compute user click times val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _) userClicks.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { partitionOfRecords.foreach(pair => { /** * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool} */ object InternalRedisClient extends Serializable { @transient private var pool: JedisPool = null def makePool(redisHost: String, redisPort: Int, redisTimeout: Int, maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = { makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000) } def makePool(redisHost: String, redisPort: Int, redisTimeout: Int, maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean, testOnReturn: Boolean, maxWaitMillis: Long): Unit = { if (pool == null) { val poolConfig = new GenericObjectPoolConfig() poolConfig.setMaxTotal(maxTotal) poolConfig.setMaxIdle(maxIdle) poolConfig.setMinIdle(minIdle) poolConfig.setTestOnBorrow(testOnBorrow) poolConfig.setTestOnReturn(testOnReturn) poolConfig.setMaxWaitMillis(maxWaitMillis) pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout) val hook = new Thread { override def run = pool.destroy() } sys.addShutdownHook(hook.run) } } def getPool: JedisPool = { assert(pool != null) pool } } // Redis configurations val maxTotal = 10 val maxIdle = 10 val minIdle = 1 val redisHost = "10.202.34.232" val redisPort = 6383 val redisTimeout = 30000 val dbIndex = 1 InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle) val uid = pair._1 val clickCount = pair._2 val jedis = InternalRedisClient.getPool.getResource jedis.select(dbIndex) //原子操作--Redis HINCRBY命令用於增加存儲在字段中存儲由增量鍵哈希的數量。 //如果鍵不存在,新的key被哈希創建。如果字段不存在,值被設置為0之前進行操作。 jedis.hincrBy(clickHashKey, uid, clickCount) InternalRedisClient.getPool.returnResource(jedis) }) }) }) ssc.start() ssc.awaitTermination() } }
上面代碼實現,得益於Scala語言的特性,可以在代碼中任何位置進行class或object的定義,我們將用來管理Redis連接的代碼放在了特定操作的內部,就避免了瞬態(Transient)對象跨節點序列化的問題。這樣做還要求我們能夠了解Spark內部是如何操作RDD數據集的,更多可以參考RDD或Spark相關文檔。
在集群上,以Standalone模式運行,執行如下命令:
cd /usr/local/spark 2 ./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077
可以查看集群中各個Worker節點執行計算任務的狀態,也可以非常方便地通過Web頁面查看。
下面,看一下我們存儲到Redis中的計算結果,如下所示:

有關更多關於Spark Streaming的詳細內容,可以參考官方文檔。
附錄
這里,附上前面開發的應用所對應的依賴,以及打包Spark Streaming應用程序的Maven配置,以供參考。如果使用maven-shade-plugin插件,配置有問題的話,打包后在Spark集群上提交Application時候可能會報錯Invalid signature file digest for Manifest main attributes。