轉自 http://dblab.xmu.edu.cn/post/8274/
0.案例概述
本案例利用Spark+Kafka實時分析男女生每秒購物人數,利用Spark Streaming實時處理用戶購物日志,然后利用websocket將數據實時推送給瀏覽器,最后瀏覽器將接收到的數據實時展現,案例的整體框架圖如下:
下面分析詳細分析下上述步驟:
- 應用程序將購物日志發送給Kafka,topic為”sex”,因為這里只是統計購物男女生人數,所以只需要發送購物日志中性別屬性即可。這里采用模擬的方式發送購物日志,即讀取購物日志數據,每間隔相同的時間發送給Kafka。
- 接着利用Spark Streaming從Kafka主題”sex”讀取並處理消息。這里按滑動窗口的大小按順序讀取數據,例如可以按每5秒作為窗口大小讀取一次數據,然后再處理數據。
- Spark將處理后的數據發送給Kafka,topic為”result”。
- 然后利用Flask搭建一個web應用程序,接收Kafka主題為”result”的消息。
- 利用Flask-SocketIO將數據實時推送給客戶端。
- 客戶端瀏覽器利用js框架socketio實時接收數據,然后利用js可視化庫hightlights.js庫動態展示。
至此,本案例的整體架構已介紹完畢。
一、實驗環境准備
實驗系統和軟件要求
Ubuntu: 16.04
Spark: 2.1.0
Scala: 2.11.8
kafka: 0.8.2.2
Python: 3.x(3.0以上版本)
Flask: 0.12.1
Flask-SocketIO: 2.8.6
kafka-python: 1.3.3
系統和軟件的安裝
Spark安裝(前續文檔已經安裝)
Kafka安裝
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群機來提供實時的消費。下面介紹有關Kafka的簡單安裝和使用, 簡單介紹參考KAFKA簡介, 想全面了解Kafka,請訪問Kafka的官方博客。
我選擇的是kafka_2.11-0.10.1.0.tgz(注意,此處版本號,在后面spark使用時是有要求的,見集成指南)版本。
sudo tar -zxf kafka_2.11-0.10.1.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.11-0.10.1.0/ ./kafka
sudo chown -R hadoop ./kafka
接下來在Ubuntu系統環境下測試簡單的實例。Mac系統請自己按照安裝的位置,切換到相應的指令。按順序執行如下命令:
cd /usr/local/kafka # 進入kafka所在的目錄
bin/zookeeper-server-start.sh config/zookeeper.properties
命令執行后不會返回Shell命令輸入狀態,zookeeper就會按照默認的配置文件啟動服務,請千萬不要關閉當前終端.啟動新的終端,輸入如下命令:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
kafka服務端就啟動了,請千萬不要關閉當前終端。啟動另外一個終端,輸入如下命令:
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
topic是發布消息發布的category,以單節點的配置創建了一個叫dblab的topic.可以用list列出所有創建的topics,來查看剛才創建的主題是否存在。
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以在結果中查看到dblab這個topic存在。接下來用producer生產點數據:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
並嘗試輸入如下信息:
hello hadoop
hello xmu
hadoop world
然后再次開啟新的終端或者直接按CTRL+C退出。然后使用consumer來接收數據,輸入如下命令:
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
便可以看到剛才產生的三條信息。說明kafka安裝成功。
Python安裝
Ubuntu16.04系統自帶Python2.7和Python3.5,本案例直接使用Ubuntu16.04自帶Python3.5;
Python依賴庫
案例主要使用了兩個Python庫,Flask和Flask-SocketIO,這兩個庫的安裝非常簡單,請啟動進入Ubuntu系統,打開一個命令行終端。
Python之所以強大,其中一個原因是其豐富的第三方庫。pip則是python第三方庫的包管理工具。Python3對應的包管理工具是pip3。因此,需要首先在Ubuntu系統中安裝pip3,命令如下:
sudo apt-get install python3-pip
安裝完pip3以后,可以使用如下Shell命令完成Flask和Flask-SocketIO這兩個Python第三方庫的安裝以及與Kafka相關的Python庫的安裝:
pip3 install flask
pip3 install flask-socketio
pip3 install kafka-python
這些安裝好的庫在我們的程序文件的開頭可以直接用來引用。比如下面的例子。
from flask import Flask
from flask_socketio import SocketIO
from kafka import KafkaConsumer
from import 跟直接import的區別舉個例子來說明。
import socket的話,要用socket.AF_INET,因為AF_INET這個值在socket的名稱空間下。
from socket import* 是把socket下的所有名字引入當前名稱空間。
二、數據處理和Python操作Kafka
本案例采用的數據集壓縮包為data_format.zip點擊這里下載data_format.zip數據集,該數據集壓縮包是淘寶2015年雙11前6個月(包含雙11)的交易數據(交易數據有偏移,但是不影響實驗的結果),里面包含3個文件,分別是用戶行為日志文件user_log.csv 、回頭客訓練集train.csv 、回頭客測試集test.csv. 在這個案例中只是用user_log.csv這個文件,下面列出文件user_log.csv的數據格式定義:
用戶行為日志user_log.csv,日志中的字段定義如下:
- user_id | 買家id
- item_id | 商品id
- cat_id | 商品類別id
- merchant_id | 賣家id
- brand_id | 品牌id
- month | 交易時間:月
- day | 交易事件:日
- action | 行為,取值范圍{0,1,2,3},0表示點擊,1表示加入購物車,2表示購買,3表示關注商品
- age_range | 買家年齡分段:1表示年齡<18,2表示年齡在[18,24],3表示年齡在[25,29],4表示年齡在[30,34],5表示年齡在[35,39],6表示年齡在[40,49],7和8表示年齡>=50,0和NULL則表示未知
- gender | 性別:0表示女性,1表示男性,2和NULL表示未知
- province| 收獲地址省份
數據具體格式如下:
user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,內蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西
328862,996875,1271,2882,2661,08,29,0,1,1,內蒙古
328862,1086186,1271,1253,1049,08,29,0,0,2,浙江
這個案例實時統計每秒中男女生購物人數,因此針對每條購物日志,我們只需要獲取gender即可,然后發送給Kafka,接下來Spark Streaming再接收gender進行處理。
數據預處理
接着可以寫如下Python代碼,文件名為producer.py:(具體的工程文件結構參照步驟一)
mkdir -p ~/kafka-exp/scripts
cd ~/kafka-exp/scripts
vim producer.py
添加如入內容:
# coding: utf-8
import csv
import time
from kafka import KafkaProducer
# 實例化一個KafkaProducer示例,用於向Kafka投遞消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打開數據文件
csvfile = open("../data/user_log.csv","r")
# 生成一個可用於讀取csv文件的reader
reader = csv.reader(csvfile)
for line in reader:
gender = line[9] # 性別在每行日志代碼的第9個元素
if gender == 'gender':
continue # 去除第一行表頭
time.sleep(0.1) # 每隔0.1秒發送一行數據
# 發送數據,topic為'sex'
producer.send('sex',line[9].encode('utf8'))
上述代碼很簡單,首先是先實例化一個Kafka生產者。然后讀取用戶日志文件,每次讀取一行,接着每隔0.1秒發送給Kafka,這樣1秒發送10條購物日志。這里發送給Kafka的topic為’sex’。
Python操作Kafka
我們可以寫一個KafkaConsumer測試數據是否投遞成功,代碼如下,文件名為consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('sex')
for msg in consumer:
print((msg.value).decode('utf8'))
在開啟上述KafkaProducer和KafkaConsumer之前,需要先開啟Kafka。然后再開兩個終端,分別用作發布消息與消費消息,執行命令如下:
cd ~/kafka-exp/scripts
python3 producer.py #啟動生產者發送消息給Kafaka
打開另外一個命令行 終端窗口,消費消息,執行如下命令:
cd ~/kafka-exp/scripts
python3 consumer.py #啟動消費者從Kafaka接收消息
運行上面這條命令以后,這時,你會看到屏幕上會輸出一行又一行的數字,類似下面的樣子:
2
1
1
1
.......
三、Spark Streaming實時處理數據
本案例在於實時統計每秒中男女生購物人數,而Spark Streaming接收的數據為1,1,0,2…,其中0代表女性,1代表男性,所以對於2或者null值,則不考慮。其實通過分析,可以發現這個就是典型的wordcount問題,而且是基於Spark流計算。女生的數量,即為0的個數,男生的數量,即為1的個數。
因此利用Spark Streaming接口reduceByKeyAndWindow,設置窗口大小為1,滑動步長為1,這樣統計出的0和1的個數即為每秒男生女生的人數。
Spark准備工作
Kafka和Flume等高級輸入源,需要依賴獨立的庫(jar文件)。按照我們前面安裝好的Spark版本,這些jar包都不在里面,為了證明這一點,我們現在可以測試一下。請打開一個新的終端,然后啟動spark-shell:
cd /usr/local/spark/spark-2.3.0-bin-hadoop2.7
./bin/spark-shell
啟動成功后,在spark-shell中執行下面import語句:
scala> import org.apache.spark.streaming.kafka010._
<console>:25: error: object kafka is not a member of package org.apache.spark.streaming
import org.apache.spark.streaming.kafka010._
^
你可以看到,馬上會報錯,因為找不到相關的jar包。然后我們退出spark-shell。
根據Spark官網的說明,對於Spark2.3.0版本,如果要使用Kafka,則需要下載spark-streaming-kafka-0-10_2.11相關jar包。
現在請在Linux系統中,打開一個火狐瀏覽器,請點擊這里訪問Spark官網,里面有提供spark-streaming-kafka-0-10_2.11-2.3.0.jar文件的下載,其中,2.11表示scala的版本,2.3.0表示Spark版本號。下載后的文件會被默認保存在當前Linux登錄用戶的下載目錄下,本教程統一使用hadoop用戶名登錄Linux系統,所以,我們就把這個文件復制到Spark目錄的jars目錄下。請新打開一個終端,輸入下面命令:
mkdir /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/kafka
cp ./spark-streaming-kafka-0-10_2.11-2.3.0.jar /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/kafka
下面還要繼續把Kafka安裝目錄的libs目錄下的所有jar文件復制到“/usr/local/spark/jars/kafka”目錄下,請在終端中執行下面命令:
cd /usr/local/kafka/libs
ls
cp ./* /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/kafka
建立Spark項目
之前有很多教程都有說明如何創建Spark項目,這里再次說明。首先在/usr/local/spark/mycode新建項目主目錄kafka,然后在kafka目錄下新建scala文件存放目錄以及scala工程文件
mkdir -p /usr/local/spark/mycode/kafka/src/main/scala
接着在src/main/scala文件下創建兩個文件,一個是用於設置日志,一個是項目工程主文件,設置日志文件為StreamingExamples.scala
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
這個文件不做過多解釋,因為這只是一個輔助文件,下面着重介紹工程主文件,文件名為KafkaTest.scala
package org.apache.spark.examples.streaming
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.json4s._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.Interval
import org.apache.spark.streaming.kafka010._
object KafkaWordCount {
implicit val formats = DefaultFormats//數據格式化時需要
def main(args: Array[String]): Unit={
if (args.length < 3) {
System.err.println("Usage: KafkaWordCount <brokers> <groupId> <topics>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(brokers, groupId, topics) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("checkpoint")
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))//將輸入的每行用空格分割成一個個word
// 對每一秒的輸入數據進行reduce,然后將reduce后的數據發送給Kafka
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd => {
if(rdd.count !=0 ){
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
// 實例化一個Kafka生產者
val producer = new KafkaProducer[String, String](props)
// rdd.colect即將rdd中數據轉化為數組,然后write函數將rdd內容轉化為json格式
val str = write(rdd.collect)
// 封裝成Kafka消息,topic為"result"
val message = new ProducerRecord[String, String]("result", null, str)
// 給Kafka發送消息
producer.send(message)
}
})
ssc.start()
ssc.awaitTermination()
}
}
上述代碼注釋已經也很清楚了,下面在簡要說明下:
- 首先按每秒的頻率讀取Kafka消息;
- 然后對每秒的數據執行wordcount算法,統計出0的個數,1的個數,2的個數;
- 最后將上述結果封裝成json發送給Kafka。
另外,需要注意,上面代碼中有一行如下代碼:
ssc.checkpoint(".")
這行代碼表示把檢查點文件寫入分布式文件系統HDFS,所以一定要事先啟動Hadoop。如果沒有啟動Hadoop,則后面運行時會出現“拒絕連接”的錯誤提示。如果你還沒有啟動Hadoop,則可以現在在Ubuntu終端中,使用如下Shell命令啟動Hadoop:
cd /usr/local/hadoop #這是hadoop的安裝目錄
./sbin/start-dfs.sh
另外,如果不想把檢查點寫入HDFS,而是直接把檢查點寫入本地磁盤文件(這樣就不用啟動Hadoop),則可以對ssc.checkpoint()方法中的文件路徑進行指定,比如下面這個例子:
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint")
checkpoint的意思就是建立檢查點,類似於快照,例如在spark計算里面 計算流程DAG特別長,服務器需要將整個DAG計算完成得出結果,但是如果在這很長的計算流程中突然中間算出的數據丟失了,spark又會根據RDD的依賴關系從頭到尾計算一遍,這樣子就很費性能,當然我們可以將中間的計算結果通過cache或者persist放到內存或者磁盤中,但是這樣也不能保證數據完全不會丟失,存儲的這個內存出問題了或者磁盤壞了,也會導致spark從頭再根據RDD計算一遍,所以就有了checkpoint,其中checkpoint的作用就是將DAG中比較重要的中間數據做一個檢查點將結果存儲到一個高可用的地方(通常這個地方就是HDFS里面)
運行項目
編寫好程序之后,下面介紹下如何打包運行程序。在/usr/local/spark/mycode/kafka目錄下新建文件simple.sbt,輸入如下內容:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.3.0"
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"
然后,即可編譯打包程序,輸入如下命令
/usr/local/sbt/sbt package
打包成功之后,接下來編寫運行腳本,在/usr/local/spark/mycode/kafka目錄下新建startup.sh文件,輸入如下內容:
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/*:/usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar 127.0.0.1:9092 1 sex
其中最后四個為輸入參數,含義如下
- 127.0.0.1:9092為brokerer地址
- 1 為consumer group標簽
- sex為消費者接收的topic
最后在/usr/local/spark/mycode/kafka目錄下,運行如下命令即可執行剛編寫好的Spark Streaming程序
sh startup.sh
程序運行成功之后,下面通過之前的KafkaProducer和KafkaConsumer來檢測程序。
測試程序
下面開啟之前編寫的KafkaProducer投遞消息,然后將KafkaConsumer中接收的topic改為result,驗證是否能接收topic為result的消息,更改之后的KafkaConsumer為
from kafka import KafkaConsumer
consumer = KafkaConsumer('result')
for msg in consumer:
print((msg.value).decode('utf8'))
在同時開啟Spark Streaming項目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer運行窗口,出現以下類似數據:
[{"0":1},{"2":3},{"1":6}]
[{"0":5},{"2":2},{"1":3}]
[{"0":3},{"2":3},{"1":4}]
.......
四、結果展示
接下來做的事是,利用Flask-SocketIO實時推送數據,socket.io.js實時獲取數據,highlights.js展示數據。
Flask-SocketIO實時推送數據
將介紹如何利用Flask-SocketIO將結果實時推送到瀏覽器。
下載代碼,用python3.5 運行 app.py即可:
python app.py