1.大數據處理的常用方法
大數據處理目前比較流行的是兩種方法,一種是離線處理,一種是在線處理,基本處理架構如下:

在互聯網應用中,不管是哪一種處理方式,其基本的數據來源都是日志數據,例如對於web應用來說,則可能是用戶的訪問日志、用戶的點擊日志等。
如果對於數據的分析結果在時間上有比較嚴格的要求,則可以采用在線處理的方式來對數據進行分析,如使用Flink進行處理。比較貼切的一個例子是天貓雙十一的成交額,在其展板上,我們看到交易額是實時動態進行更新的,對於這種情況,則需要采用在線處理。下面要介紹的是實時數據處理方式,即基於Flink的在線處理,在下面給出的完整案例中,我們將會完成下面的幾項工作:
-
1.如何一步步構建我們的實時處理系統(Flume+Kafka+Flink+Redis)
-
2.實時處理網站的用戶訪問日志,並統計出該網站的PV、UV
-
3.將實時分析出的PV、UV動態地展示在我們的前面頁面上
如果你對上面提及的大數據組件已經有所認識,或者對如何構建大數據實時處理系統感興趣,那么就可以盡情閱讀下面的內容了。
需要注意的是,核心在於如何構建實時處理系統,而這里給出的案例是實時統計某個網站的PV、UV,在實際中,基於每個人的工作環境不同,業務不同,因此業務系統的復雜度也不盡相同,相對來說,這里統計PV、UV的業務是比較簡單的,但也足夠讓我們對大數據實時處理系統有一個基本的、清晰的了解與認識,是的,它不再那么神秘了。
2.實時處理系統架構
我們的實時處理系統整體架構如下:

即從上面的架構中我們可以看出,其由下面的幾部分構成:
-
Flume集群
-
Kafka集群
-
Flink集群
從構建實時處理系統的角度出發,我們需要做的是,如何讓數據在各個不同的集群系統之間打通(從上面的圖示中也能很好地說明這一點),即需要做各個系統之前的整合,包括Flume與Kafka的整合,Kafka與Flink的整合。當然,各個環境是否使用集群,依個人的實際需要而定,在我們的環境中,Flume、Kafka、Flink都使用集群。
3.Flume+Kafka整合

3.1整合思路
對於Flume而言,關鍵在於如何采集數據,並且將其發送到Kafka上,並且由於我們這里了使用Flume集群的方式,Flume集群的配置也是十分關鍵的。而對於Kafka,關鍵就是如何接收來自Flume的數據。從整體上講,邏輯應該是比較簡單的,即可以在Kafka中創建一個用於我們實時處理系統的topic,然后Flume將其采集到的數據發送到該topic上即可。
3.2整合過程:Flume集群配置與Kafka Topic創建
在我們的場景中,兩個Flume Agent分別部署在兩台Web服務器上,用來采集Web服務器上的日志數據,然后其數據的下沉方式都為發送到另外一個Flume Agent上,所以這里我們需要配置三個Flume Agent.
3.2.1.1 Flume Agent01
該Flume Agent部署在一台Web服務器上,用來采集產生的Web日志,然后發送到Flume Consolidation Agent上,創建一個新的配置文件
flume-sink-avro.conf
,其配置內容如下:
######################################################### ## ##主要作用是監聽文件中的新增數據,采集到數據之后,輸出到avro ## 注意:Flume agent的運行,主要就是配置source channel sink ## 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #對於source的配置描述 監聽文件中的新增數據 exec a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/uplooking/data/data-clean/data-access.log #對於sink的配置描述 使用avro日志做數據的消費 a1.sinks.k1.type = avro a1.sinks.k1.hostname = uplooking03 a1.sinks.k1.port = 44444 #對於channel的配置描述 使用文件做數據的臨時緩存 這種的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #通過channel c1將source r1和sink k1關聯起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成后, 啟動Flume Agent,即可對日志文件進行監聽:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1 &
3.2.1.2 Flume Agent02
該Flume Agent部署在一台Web服務器上,用來采集產生的Web日志,然后發送到Flume Consolidation Agent上,創建一個新的配置文件
flume-sink-avro.conf
,其配置內容如下:
######################################################### ## ##主要作用是監聽文件中的新增數據,采集到數據之后,輸出到avro ## 注意:Flume agent的運行,主要就是配置source channel sink ## 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #對於source的配置描述 監聽文件中的新增數據 exec a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/uplooking/data/data-clean/data-access.log #對於sink的配置描述 使用avro日志做數據的消費 a1.sinks.k1.type = avro a1.sinks.k1.hostname = uplooking03 a1.sinks.k1.port = 44444 #對於channel的配置描述 使用文件做數據的臨時緩存 這種的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #通過channel c1將source r1和sink k1關聯起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成后, 啟動Flume Agent,即可對日志文件進行監聽:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1 &
3.2.1.2 Flume Consolidation Agent
該Flume Agent用於接收其它兩個Agent發送過來的數據,然后將其發送到Kafka上,創建一個新的配置文件
flume-source_avro-sink_kafka.conf
,配置內容如下:
######################################################### ## ##主要作用是監聽目錄中的新增文件,采集到數據之后,輸出到kafka ## 注意:Flume agent的運行,主要就是配置source channel sink ## 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #對於source的配置描述 監聽avro a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 #對於sink的配置描述 使用kafka做數據的消費 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = f-k-s a1.sinks.k1.brokerList = uplooking01:9092,uplooking02:9092,uplooking03:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 #對於channel的配置描述 使用內存緩沖區域做數據的臨時緩存 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #通過channel c1將source r1和sink k1關聯起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成后, 啟動Flume Agent,即可對avro的數據進行監聽:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-source_avro-sink_kafka.conf >/dev/null 2>&1 &
3.2.2 Kafka配置
在我們的
Kafka
中,先創建一個
topic
,用於后面接收
Flume
采集過來的數據:
kafka-topics.sh --create --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3
4.Kafka+Flink整合
Flink 提供了特殊的Kafka Connectors來從Kafka topic中讀取數據或者將數據寫入到Kafkatopic中,Flink的Kafka Consumer與Flink的檢查點機制相結合,提供exactly-once處理語義。為了做到這一點,Flink並不完全依賴於Kafka的consumer組的offset跟蹤,而是在自己的內部去跟蹤和檢查。
Kafka Consumer
Flink的kafka consumer叫做FlinkKafkaConsumer08(對於Kafka 0.9.0.X來說是09 等),它提供了對一個或者多個Kafka topic的訪問。
FlinkKafkaConsumer08、09等的構造函數接收以下參數: 1、topic名稱或者名稱列表 2、反序列化來自kafka的數據的DeserializationSchema/Keyed Deserialization Schema 3、Kafka consumer的一些配置,下面的配置是必需的: "bootstrap.servers"(以逗號分隔的Kafka brokers列表) "zookeeper.connect"(以逗號分隔的Zookeeper 服務器列表) "group.id"(consumer組的id)
例如: Java 代碼:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
Scala 代碼:
val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); stream = env .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)) .print
當前FlinkKafkaConsumer的實現會建立一個到Kafka客戶端的連接來查詢topic的列表和分區。
為此,consumer需要能夠訪問到從提交Job任務的服務器到Flink服務器的consumer,如果你在客戶端遇到任何Kafka Consumer的問題,你都可以在客戶端日志中看到關於請求失敗的日志。
Kafka Consumers 和Fault Tolerance
Flink的checkpoint啟用之后,Flink Kafka Consumer將會從一個topic中消費記錄並以一致性的方式周期性地檢查所有Kafka偏移量以及其他操作的狀態。Flink將保存流程序到狀態的最新的checkpoint中,並重新從Kafka中讀取記錄,記錄從保存在checkpoint中的偏移位置開始讀取。
checkpoint的時間間隔定義了程序在發生故障時可以恢復多少。
同時需要注意的是Flink只能在有足夠的slots時才會去重啟topology,所以如果topology由於TaskManager丟失而失敗時,任然需要有足夠的slot可用。Flink on YARN支持YARN container丟失自動重啟。
5.Flink+Redis整合
其實所謂Flink和Redis的整合,指的是在我們的實時處理系統中的數據的落地方式,即在Flink中包含了我們處理數據的邏輯,而數據處理完畢后,產生的數據處理結果該保存到什么地方呢?顯然就有很多種方式了,關系型數據庫、NoSQL、HDFS、HBase等,這應該取決於具體的業務和數據量,在這里,我們使用Redis來進行最后分析數據的存儲。
所以實際上做這一步的整合,其實就是開始寫我們的業務處理代碼了,因為通過前面Flume-Kafka-FLink的整合,已經打通了整個數據的流通路徑,接下來關鍵要做的是,在Flink中,如何處理我們的數據並保存到Redis中。
Flink Redis Connector
Flink自帶的connector提供了一種簡潔的寫入Redis的方式,只需要在項目中加入下面的依賴即可實現。
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
兼容版本:Redis 2.8.5 注意:Flink的connector並不是Flink的安裝版本,需要寫入用戶的jar包並上傳才能使用。
6.數據可視化處理
數據可視化處理目前我們需要完成兩部分的工作:
-
1.開發一個Web項目,能夠查詢Redis中的數據,同時提供訪問的頁面
-
2.自行開發或找一個符合我們需求的前端UI,將Web項目中查詢到的數據展示出來
對於Web項目的開發,因個人的技術棧能力而異,選擇的語言和技術也有所不同,只要能夠達到我們最終數據可視化的目的,其實都行的。這個項目中我們要展示的是pv和uv數據,難度不大,因此可以選擇Java Web,如Servlet、SpringMVC等,或者Python Web,如Flask、Django等,Flask我個人非常喜歡,因為開發非常快,但因為前面一直用的是Java,因此這里我還是選擇使用SpringMVC來完成。
至於UI這一塊,我前端能力一般,普通的開發沒有問題,但是要做出像上面這種地圖類型的UI界面來展示數據的話,確實有點無能為力。好在現在第三方的UI框架比較多,對於圖表類展示的,比如就有highcharts和echarts,其中echarts是百度開源的,有豐富的中文文檔,非常容易上手,所以這里我選擇使用echarts來作為UI,並且其剛好就有能夠滿足我們需求的地圖類的UI組件。
對於頁面數據的動態刷新有兩種方案,一種是定時刷新頁面,另外一種則是定時向后端異步請求數據。
目前我采用的是第一種,頁面定時刷新,有興趣的同學也可以嘗試使用第二種方法,只需要在后端開發相關的返回JSON數據的API即可。
7.總結
那么至此,從整個大數據實時處理系統的構建到最后的數據可視化處理工作,我們都已經完成了。