1.概述
最近有同學問道,除了使用 Storm 充當實時計算的模型外,還有木有其他的方式來實現實時計算的業務。了解到,在使用 Storm 時,需要編寫基於編程語言的代碼。比如,要實現一個流水指標的統計,需要去編寫相應的業務代碼,能不能有一種簡便的方式來實現這一需求。在解答了該同學的疑惑后,整理了該實現方案的一個案例,供后面的同學學習參考。
2.內容
實現該方案,整體的流程是不變的,我這里只是替換了其計算模型,將 Storm 替換為 Spark,原先的數據收集,存儲依然可以保留。
2.1 Spark Overview
Spark 出來也是很久了,說起它,應該並不會陌生。它是一個開源的類似於 Hadoop MapReduce 的通用並行計算模型,它擁有 Hadoop MapReduce 所具有的有點,但與其不同的是,MapReduce 的 JOB 中間輸出結果可以保存在內存中,不再需要回寫磁盤,因而,Spark 能更好的適用於需要迭代的業務場景。
2.2 Flow
上面只是對 Spark 進行了一個簡要的概述,讓大家知道其作用,由於本篇博客的主要內容並不是講述 Spark 的工作原理和計算方法,多的內容,這里筆者就不再贅述,若是大家想詳細了解 Spark 的相關內容,可參考官方文檔。[參考地址]
接下來,筆者為大家呈現本案例的一個實現流程圖,如下圖所示:

通過上圖,我們可以看出,首先是采集上報的日志數據,將其存放於消息中間件,這里消息中間件采用的是 Kafka,然后在使用計算模型按照業務指標實現相應的計算內容,最后是將計算后的結果進行持久化,DB 的選擇可以多樣化,這里筆者就直接使用了 Redis 來作為演示的存儲介質,大家所示在使用中,可以替換該存儲介質,比如將結果存放到 HDFS,HBase Cluster,或是 MySQL 等都行。這里,我們使用 Spark SQL 來替換掉 Storm 的業務實現編寫。
3.實現
在介紹完上面的內容后,我們接下來就去實現該內容,首先我們要生產數據源,實際的場景下,會有上報好的日志數據,這里,我們就直接寫一個模擬數據類,實現代碼如下所示:
object KafkaIPLoginProducer {
private val uid = Array("123dfe", "234weq","213ssf")
private val random = new Random()
private var pointer = -1
def getUserID(): String = {
pointer = pointer + 1
if (pointer >= users.length) {
pointer = 0
uid(pointer)
} else {
uid(pointer)
}
}
def plat(): String = {
random.nextInt(10) + "10"
}
def ip(): String = {
random.nextInt(10) + ".12.1.211"
}
def country(): String = {
"中國" + random.nextInt(10)
}
def city(): String = {
"深圳" + random.nextInt(10)
}
def location(): JSONArray = {
JSON.parseArray("[" + random.nextInt(10) + "," + random.nextInt(10) + "]")
}
def main(args: Array[String]): Unit = {
val topic = "test_data3"
val brokers = "dn1:9092,dn2:9092,dn3:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)
while (true) {
val event = new JSONObject()
event
.put("_plat", "1001")
.put("_uid", "10001")
.put("_tm", (System.currentTimeMillis / 1000).toString())
.put("ip", ip)
.put("country", country)
.put("city", city)
.put("location", JSON.parseArray("[0,1]"))
println("Message sent: " + event)
producer.send(new KeyedMessage[String, String](topic, event.toString))
event
.put("_plat", "1001")
.put("_uid", "10001")
.put("_tm", (System.currentTimeMillis / 1000).toString())
.put("ip", ip)
.put("country", country)
.put("city", city)
.put("location", JSON.parseArray("[0,1]"))
println("Message sent: " + event)
producer.send(new KeyedMessage[String, String](topic, event.toString))
event
.put("_plat", "1001")
.put("_uid", "10002")
.put("_tm", (System.currentTimeMillis / 1000).toString())
.put("ip", ip)
.put("country", country)
.put("city", city)
.put("location", JSON.parseArray("[0,1]"))
println("Message sent: " + event)
producer.send(new KeyedMessage[String, String](topic, event.toString))
event
.put("_plat", "1002")
.put("_uid", "10001")
.put("_tm", (System.currentTimeMillis / 1000).toString())
.put("ip", ip)
.put("country", country)
.put("city", city)
.put("location", JSON.parseArray("[0,1]"))
println("Message sent: " + event)
producer.send(new KeyedMessage[String, String](topic, event.toString))
Thread.sleep(30000)
}
}
}
上面代碼,通過 Thread.sleep() 來控制數據生產的速度。接下來,我們來看看如何實現每個用戶在各個區域所分布的情況,它是按照坐標分組,平台和用戶ID過濾進行累加次數,邏輯用 SQL 實現較為簡單,關鍵是在實現過程中需要注意的一些問題,比如對象的序列化問題。這里,細節的問題,我們先不討論,先看下實現的代碼,如下所示:
object IPLoginAnalytics {
def main(args: Array[String]): Unit = {
val sdf = new SimpleDateFormat("yyyyMMdd")
var masterUrl = "local[2]"
if (args.length > 0) {
masterUrl = args(0)
}
// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("IPLoginCountStat")
val ssc = new StreamingContext(conf, Seconds(5))
// Kafka configurations
val topics = Set("test_data3")
val brokers = "dn1:9092,dn2:9092,dn3:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
val ipLoginHashKey = "mf::ip::login::" + sdf.format(new Date())
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})
def func(iter: Iterator[(String, String)]): Unit = {
while (iter.hasNext) {
val item = iter.next()
println(item._1 + "," + item._2)
}
}
events.foreachRDD { rdd =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.map(f => Record(f.getString("_plat"), f.getString("_uid"), f.getString("_tm"), f.getString("country"), f.getString("location"))).toDF()
// Register as table
wordsDataFrame.registerTempTable("events")
// Do word count on table using SQL and print it
val wordCountsDataFrame = sqlContext.sql("select location,count(distinct plat,uid) as value from events where from_unixtime(tm,'yyyyMMdd') = '" + sdf.format(new Date()) + "' group by location")
var results = wordCountsDataFrame.collect().iterator
/**
* Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
*/
object InternalRedisClient extends Serializable {
@transient private var pool: JedisPool = null
def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)
}
def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
if (pool == null) {
val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxTotal(maxTotal)
poolConfig.setMaxIdle(maxIdle)
poolConfig.setMinIdle(minIdle)
poolConfig.setTestOnBorrow(testOnBorrow)
poolConfig.setTestOnReturn(testOnReturn)
poolConfig.setMaxWaitMillis(maxWaitMillis)
pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)
val hook = new Thread {
override def run = pool.destroy()
}
sys.addShutdownHook(hook.run)
}
}
def getPool: JedisPool = {
assert(pool != null)
pool
}
}
// Redis configurations
val maxTotal = 10
val maxIdle = 10
val minIdle = 1
val redisHost = "dn1"
val redisPort = 6379
val redisTimeout = 30000
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
val jedis = InternalRedisClient.getPool.getResource
while (results.hasNext) {
var item = results.next()
var key = item.getString(0)
var value = item.getLong(1)
jedis.hincrBy(ipLoginHashKey, key, value)
}
}
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(plat: String, uid: String, tm: String, country: String, location: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
我們在開發環境進行測試的時候,使用 local[k] 部署模式,在本地啟動 K 個 Worker 線程來進行計算,而這 K 個 Worker 在同一個 JVM 中,上面的示例,默認使用 local[k] 模式。這里我們需要普及一下 Spark 的架構,架構圖來自 Spark 的官網,[鏈接地址]

這里,不管是在 local[k] 模式,Standalone 模式,還是 Mesos 或是 YARN 模式,整個 Spark Cluster 的結構都可以用改圖來闡述,只是各個組件的運行環境略有不同,從而導致他們可能運行在分布式環境,本地環境,亦或是一個 JVM 實利當中。例如,在 local[k] 模式,上圖表示在同一節點上的單個進程上的多個組件,而對於 YARN 模式,驅動程序是在 YARN Cluster 之外的節點上提交 Spark 應用,其他組件都是運行在 YARN Cluster 管理的節點上的。
而對於 Spark Cluster 部署應用后,在進行相關計算的時候會將 RDD 數據集上的函數發送到集群中的 Worker 上的 Executor,然而,這些函數做操作的對象必須是可序列化的。上述代碼利用 Scala 的語言特性,解決了這一問題。
4.結果預覽
在完成上述代碼后,我們執行代碼,看看預覽結果如下,執行結果,如下所示:
4.1 啟動生產線程

4.2 Redis 結果預覽

5.總結
整體的實現內容不算太復雜,統計的業務指標,這里我們使用 SQL 來完成這部分工作,對比 Storm 來說,我們專注 SQL 的編寫就好,難度不算太大。可操作性較為友好。
6.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
