Flume+Kafka+SparkStreaming打造實時流處理框架


Flume+Kafka+SparkStreaming打造實時流處理框架

1-1 實時流處理產生背景
時效性高,數據量大

1-2 實時流處理概述
實時計算(秒,毫秒級別)
流式計算(在不斷產生的實時數據流計算)7*24

1-3 離線計算與實時計算的對比
1-3-1 數據來源
離線:HDFS,歷史數據,數據量比較大
實時:消息隊列(kafka)實時新增,修改記錄過來的某一筆數據
1-3-2 處理過程
離線:MR Spark
實時:Spark Streaming
1-3-3 處理速度
離線:慢
實時:快
1-3-4 進程
離線:啟動+銷毀
實時:7*24

1-4 實時流處理框架對比
storm
spark streaming :按照你設置的時間間隔拆成小的批處理
flink

1-5 實時流處理架構與技術選型
web/app ---> WebServers ---> Flume ---> Kafka ---> Spark Streaming
---> RDBMS/HBASE ---> 可視化展示

1-6 實時流處理在企業中的應用
電信行業:流量短信提醒
電商行業:分布式日志收集框架

2.Flume:分布式日志收集框架
2-1 業務現狀分析
大量的日志數據如何從其他的Server上移動到hadoop之上
要考慮網絡開銷,io開銷
server ---> Flume --->Hadoop集群

2-2 flume概述

webServer ---> Source ---> channel ---> sink ---> HDFS

2-3 flume 核心組件和架構
source :收集
channel: 聚集
sink: 輸出

2-4 安裝
2-4-1 安裝JDK,配置環境變量,source 生效
2-4-2 安裝flume,配置flume-env.sh文件,引入JAVA_HOME

2-5 flume的使用案例
使用flume的關鍵就是寫配置文件
2-5-1 配置source
2-5-2 配置channel
2-5-3 配置sink
2-5-4 把三個組件串起來
*****其中一個source可以對應多個channel
*****但一個sink只能對應一個channel

需求1:從指定的網絡端口采集數據,輸出到控制台

啟動flume
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec_memory_avro.conf \
-Dflume.root.logger=INFO,console

使用telnet進行測試
telnet master 44444
****Event 是Flume數據傳輸的基本單元


需求2:監控一個文件實時采集新增的數據輸出到控制台
Agent選型:exec.source + memory.channel + logger.sink
type=exec command=tail -F /home/data/data.log
shell=/bin/sh -C

需求3:將A服務器上的日志實時采集到B服務器 (最常用的方式)
Agent1:exec.source+memory.channel+avrp.sink
Agent2: arvo.source+memory.channel+logger.sink

配置好兩個conf文件,啟動兩個flume

2-6 日志收集過程
2-6-1 機器A上監控一個文件,當我們訪問主站時,會有用戶行為日志記錄到access.log中
2-6-2 avro sink 把新產生的日志輸出到相應的avro.source指定的hostname和port上
2-6-3 通過avro.source對應的agent將我們的日志輸出到控制台


3.Kafka :分布式消息隊列

3-1 kafka概述
消息中間件,生產者和消費者

3-2 kafka的核心架構
producer 生產者,生產饅頭
consumer 消費者,吃饅頭
broker 籃子 ,一個broker就是一個kafka
topic 主題,給饅頭打一個標簽,topic1的饅頭是給你吃的,topic2的饅頭是給弟弟吃的

3-3 kafka的部署和使用
分為三種,單節點單broker,單節點多broker,多節點多broker
這里以單節點單broker為例,其他兩種類似

3-3-1 安裝zookeeper 配置環境變量並source生效

3-3-2 配置zoo.cfg dataDir=/home/master/tmp/zookeeper

3-3-3 啟動zk ./bin/zkServer.sh start

3-3-4 安裝kafka 配置環境變量並source生效

3-3-5 配置server.properties中的hostname,brokerid,log.dirs,listeners,
zookeeper.connect=master:2181

3-3-6 啟動kafka
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

3-3-7 創建topic
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic hello_topic

***replication-factor代表着副本系數,有幾個從節點就設置為幾***

3-3-8 查看topic是否創建成功
kafka-topics.sh --list --zookeeper master:2181

3-3-9 查看所有topic的詳細信息
kafka.topics.sh --describe --zookeeper master:2181

3-3-10 producer端發送消息
kafka-console-producer.sh --broker-list master:9092 --topic hello_topic

3-3-11 consumer端消費消息
kafka-console-consumer.sh --zookeeper master:2181 --topic hello_topic \

--from beginning ***這個代表着每次從頭消費,可以不寫***

3-4 kafka的容錯性測試

不管是kill掉leader還是任意一個flower,只要還有一個副本存在,就不會影響到kafka的正常使用


4.Spark Streaming
4-1 Spark Streaming 概述
基於spark core,講不同的數據源的數據經過spark streaming的處理之后,將結果輸出到外部文件系統

特點
低延遲
能從錯誤中高效的恢復:fault-tolerant
能夠運行在成百上千的節點
能夠將批處理,機器學習,圖計算等子框架和spark streaming結合起來使用

俗稱:One stack to rule them all ----- 一棧式解決

4-2 應用場景
電商推薦系統-----最最常用
實時監控系統

4-3 從詞頻統計功能着手入門 spark streaming
4-3-1 先啟動 nc -lk 9999

4-3-2 使用spark-submit方式來提交我們的spark應用程序運行的腳本
spark-submit --master local[2] \
--class org.apache.examples.streaming.NetWorkWordCount \
--name NetWorkWordCount \
--jars $SPARK_HOME/examples/jars/spark-examples_xxx.jar master 9999

4-3-2 使用spark-shell方式來提交(僅測試代碼時用)
spark-shell --master local[2] 來啟動
將代碼copy到shell,import相應的包,直接運行

4-4 spark streaming 工作原理
4-4-1 粗粒度
spark streaming接收到實時的數據流,把數據流按照指定的時間段切成一片片小的數據塊,然后把小的數據塊
傳給spark Engine來處理
4-4-2 細粒度
首先,spark stremaing 應用程序運行在Driver端,
Driver中有一個StreamingContext和SparkContext
Driver會要求在executor中啟動一個Receiver,當有數據輸入后,Receiver會將數據拆分成一些blocks
存放在內存中,如果設置了多副本,則也會copy這些blocks到其他機器
之后,receiver會把blocks的一些元數據信息告訴StreamingContext,當每隔幾秒的周期后,
StreamingContext會通知SparkContext去啟動jobs,並分發到executors中執行去處理數據

4-5 SparkStreaming 核心
4-5-1 StreamingContext 有兩個副結構體
4-5-1-1 def this(SparkContext,Duration)
4-5-1-2 def this(SparkConf,Duration) -----這個用的多
bathch interval可以根據你應用程序需要的延遲要求,以及集群的可用資源來配置

4-5-2 DStream (Discretized Streams)
一個DStream 代表着一系列不間斷的RDD
每一個RDD包含着這一個批次匯總的所有數據
對DStream操作算子,比如map/flatmap。其實底層會被翻譯為對Dstream 中的每一個RDD都做相同的
操作,因為一個DStream是由不同批次的RDD所構成

4-5-3 Input DStreams and Receiverss
Input DStream (從輸入數據流的源頭過來的DStream)
每一個Input DStream都需要關聯一個Receiver (文件系統除外)

local模式下不要使用local和local[1],因為若為1,則代表着只有一個核心可以用來接收數據,
但沒用核心去處理數據了,常用local[2]

local[n]中的 n > Receivers的個數

4-5-4 Transformation on DStreams
map,flatmap,filter...

4-5-5 Output Operations on DStreams (輸出結果)
print,saveAsTextFile,saveAsHadoopFile,foreachRDD...

4-6 實戰案例
spark streaming處理socket數據
spark streaming處理hdfs文件數據

4-7 spark streaming進階
4-7-1 updateStateByKey算子
截止到目前為止xxx的統計
使用這個時必須設置checkpoint
4-7-2 計算目前為止累積出現的單詞個數寫入到mysql中

建表:create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);

4-7-3 基於windows的統計
window:定時的進行一個時間段內的數據處理
window length:窗口長度 -----10m
sliding interval:窗口的間隔 -----10s
代表着:每隔10s統計前10分鍾的數據

4-7-4 黑名單過濾

4-8 spark streaming整合kafka實戰
分為兩種
Receiver方式和Direct Approch (容錯性,性能更高)

采用Direct方式
優點:簡化並行度,性能更高,0數據丟失,滿足只執行一次 Exactly-once
缺點: 不能跟新offset到zk中,需要手動加入更新

direct方式會直接讀取kafka底層的元數據信息
kafka就相當於底層的文件系統,
direct 直接作用於Driver端

 

5.使用log4j來模擬生產日志,flume采集到后傳遞給kafka,再交由spark streaming來處理數據

5-1 配置flume
streaming.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.log-sink.type=logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

啟動 flume
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console


streaming2.conf
#flume 1.6版本使用此方法

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streamingtopic
agent1.sinks.kafka-sink.brokerList = master:9092
agent1.sinks.kafka-sink.batchSize = 20
agent1.sinks.kafka-sink.requiredAcks = 1

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

啟動 flume
flume-ng agent \
--name agent1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
-Dflume.root.logger=INFO,console


我們現在是在本地進行測試的,在IDEA中運行LoggerGenerator,然后使用Flume,kafka以及Spark Streaming 進行處理操作

在生產上肯定不是這么干的,怎么干呢?
1)打包jar,執行LoggerGenerator類
2)Flume,kafka和測試的是一樣的
3)Spark Streaming的代碼也需要打成jar包,然后使用spark-submit方式運行

在生產上,整個流處理的流程都是一樣的,區別在於業務邏輯的復雜性


crontab -e

*/1 * * * * /home/master/文檔/code/log_generator.sh

每隔一分鍾執行一次sh文件,即每隔一分鍾會產生一批日志寫到log里

service crond stop 停止定時文件運行

service crond start 開始定時文件運行

選型:access.log ---> 控制台輸出
exec + memory + logger

1.對接python日志產生器輸出的日志到Flume
streaming_project.conf

exec-memory-logger.sources=exec-source
exec-memory-logger.sinks=logger-sink
exec-memory-logger.channels=memory-channel

exec-memory-logger.sources.exec-source.type=exec
exec-memory-logger.sources.exec-source.command=tail -F /home/master/文檔/data/access.log
exec-memory-logger.sources.exec-source.shell=/bin/sh -c


exec-memory-logger.channels.memory-channel.type=memory

exec-memory-logger.sinks.logger-sink.type=logger


exec-memory-logger.sources.exec-source.channels=memory-channel
exec-memory-logger.sinks.logger-sink.channel=memory-channel

啟動flume:

flume-ng agent \
--name exec-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_project.conf \
-Dflume.root.logger=INFO,console


2.日志===>Flume===>kafka

啟動zk ./zkServer.sh start
啟動kafka kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

修改flume配置文件,使得flume的日志到kafka
選型:exec + memory + kafka sink

streaming_project2.conf

exec-memory-kafka.sources=exec-source
exec-memory-kafka.sinks=kafka-sink
exec-memory-kafka.channels=memory-channel

exec-memory-kafka.sources.exec-source.type=exec
exec-memory-kafka.sources.exec-source.command=tail -F /home/master/文檔/data/access.log
exec-memory-kafka.sources.exec-source.shell=/bin/sh -c


exec-memory-kafka.channels.memory-channel.type=memory


exec-memory-kafka.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList=master:9092
exec-memory-kafka.sinks.kafka-sink.topic=streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize=5
exec-memory-kafka.sinks.kafka-sink.requireAcks=5

exec-memory-kafka.sources.exec-source.channels=memory-channel
exec-memory-kafka.sinks.kafka-sink.channel=memory-channel

啟動flume:

flume-ng agent \
--name exec-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_project2.conf \
-Dflume.root.logger=INFO,console


啟動消費者
kafka-console-consumer.sh --zookeeper master:2181 --topic streamingtopic


idea寫spark streaming對kafka中的數據進行處理

數據清洗操作:從原始日志中取出我們所需要的字段信息
結果如下:
ClickLog(46.30.143.10,20200113032901,145,200,-)
ClickLog(29.87.10.156,20200113032901,131,404,-)
ClickLog(87.10.72.30,20200113032901,145,200,-)

將結果寫入到外部數據庫中,前端頁面調取結果,以供圖形化展示

選擇HBase來作為結果存儲數據庫

HBase表設計

create 'course_clickcount','info'
Rowkey設計:根據業務需求
day_courseid


create 'course_search_clickcount',"info"
Rowkey設計:
day_searche_course

清空表
truncate 'course_search_clickcount'

如何使用scala操作hbase

項目打包: mvn clean package -DskipTests


spark-submit --master local[5] \
--class scala/org/example/project/spark/StreamingApp.scala \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 \
/home/master/文檔/code/KafkaTest-1.0-SNAPSHOT.jar \
192.168.187.10:2181 test streamingtopic 1


--jars $(echo /usr/local/src/hbase-1.3.1/lib/*.jar | tr ' ' ',') \

 

數據可視化

將抽象的科學或者商業數據,用圖像表示出來,幫助理解數據的意義的過程。

Spring Boot構建Web項目

Echarts

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM