Flume+Kafka+SparkStreaming打造實時流處理框架
1-1 實時流處理產生背景
時效性高,數據量大
1-2 實時流處理概述
實時計算(秒,毫秒級別)
流式計算(在不斷產生的實時數據流計算)7*24
1-3 離線計算與實時計算的對比
1-3-1 數據來源
離線:HDFS,歷史數據,數據量比較大
實時:消息隊列(kafka)實時新增,修改記錄過來的某一筆數據
1-3-2 處理過程
離線:MR Spark
實時:Spark Streaming
1-3-3 處理速度
離線:慢
實時:快
1-3-4 進程
離線:啟動+銷毀
實時:7*24
1-4 實時流處理框架對比
storm
spark streaming :按照你設置的時間間隔拆成小的批處理
flink
1-5 實時流處理架構與技術選型
web/app ---> WebServers ---> Flume ---> Kafka ---> Spark Streaming
---> RDBMS/HBASE ---> 可視化展示
1-6 實時流處理在企業中的應用
電信行業:流量短信提醒
電商行業:分布式日志收集框架
2.Flume:分布式日志收集框架
2-1 業務現狀分析
大量的日志數據如何從其他的Server上移動到hadoop之上
要考慮網絡開銷,io開銷
server ---> Flume --->Hadoop集群
2-2 flume概述
webServer ---> Source ---> channel ---> sink ---> HDFS
2-3 flume 核心組件和架構
source :收集
channel: 聚集
sink: 輸出
2-4 安裝
2-4-1 安裝JDK,配置環境變量,source 生效
2-4-2 安裝flume,配置flume-env.sh文件,引入JAVA_HOME
2-5 flume的使用案例
使用flume的關鍵就是寫配置文件
2-5-1 配置source
2-5-2 配置channel
2-5-3 配置sink
2-5-4 把三個組件串起來
*****其中一個source可以對應多個channel
*****但一個sink只能對應一個channel
需求1:從指定的網絡端口采集數據,輸出到控制台
啟動flume
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec_memory_avro.conf \
-Dflume.root.logger=INFO,console
使用telnet進行測試
telnet master 44444
****Event 是Flume數據傳輸的基本單元
需求2:監控一個文件實時采集新增的數據輸出到控制台
Agent選型:exec.source + memory.channel + logger.sink
type=exec command=tail -F /home/data/data.log
shell=/bin/sh -C
需求3:將A服務器上的日志實時采集到B服務器 (最常用的方式)
Agent1:exec.source+memory.channel+avrp.sink
Agent2: arvo.source+memory.channel+logger.sink
配置好兩個conf文件,啟動兩個flume
2-6 日志收集過程
2-6-1 機器A上監控一個文件,當我們訪問主站時,會有用戶行為日志記錄到access.log中
2-6-2 avro sink 把新產生的日志輸出到相應的avro.source指定的hostname和port上
2-6-3 通過avro.source對應的agent將我們的日志輸出到控制台
3.Kafka :分布式消息隊列
3-1 kafka概述
消息中間件,生產者和消費者
3-2 kafka的核心架構
producer 生產者,生產饅頭
consumer 消費者,吃饅頭
broker 籃子 ,一個broker就是一個kafka
topic 主題,給饅頭打一個標簽,topic1的饅頭是給你吃的,topic2的饅頭是給弟弟吃的
3-3 kafka的部署和使用
分為三種,單節點單broker,單節點多broker,多節點多broker
這里以單節點單broker為例,其他兩種類似
3-3-1 安裝zookeeper 配置環境變量並source生效
3-3-2 配置zoo.cfg dataDir=/home/master/tmp/zookeeper
3-3-3 啟動zk ./bin/zkServer.sh start
3-3-4 安裝kafka 配置環境變量並source生效
3-3-5 配置server.properties中的hostname,brokerid,log.dirs,listeners,
zookeeper.connect=master:2181
3-3-6 啟動kafka
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
3-3-7 創建topic
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic hello_topic
***replication-factor代表着副本系數,有幾個從節點就設置為幾***
3-3-8 查看topic是否創建成功
kafka-topics.sh --list --zookeeper master:2181
3-3-9 查看所有topic的詳細信息
kafka.topics.sh --describe --zookeeper master:2181
3-3-10 producer端發送消息
kafka-console-producer.sh --broker-list master:9092 --topic hello_topic
3-3-11 consumer端消費消息
kafka-console-consumer.sh --zookeeper master:2181 --topic hello_topic \
--from beginning ***這個代表着每次從頭消費,可以不寫***
3-4 kafka的容錯性測試
不管是kill掉leader還是任意一個flower,只要還有一個副本存在,就不會影響到kafka的正常使用
4.Spark Streaming
4-1 Spark Streaming 概述
基於spark core,講不同的數據源的數據經過spark streaming的處理之后,將結果輸出到外部文件系統
特點
低延遲
能從錯誤中高效的恢復:fault-tolerant
能夠運行在成百上千的節點
能夠將批處理,機器學習,圖計算等子框架和spark streaming結合起來使用
俗稱:One stack to rule them all ----- 一棧式解決
4-2 應用場景
電商推薦系統-----最最常用
實時監控系統
4-3 從詞頻統計功能着手入門 spark streaming
4-3-1 先啟動 nc -lk 9999
4-3-2 使用spark-submit方式來提交我們的spark應用程序運行的腳本
spark-submit --master local[2] \
--class org.apache.examples.streaming.NetWorkWordCount \
--name NetWorkWordCount \
--jars $SPARK_HOME/examples/jars/spark-examples_xxx.jar master 9999
4-3-2 使用spark-shell方式來提交(僅測試代碼時用)
spark-shell --master local[2] 來啟動
將代碼copy到shell,import相應的包,直接運行
4-4 spark streaming 工作原理
4-4-1 粗粒度
spark streaming接收到實時的數據流,把數據流按照指定的時間段切成一片片小的數據塊,然后把小的數據塊
傳給spark Engine來處理
4-4-2 細粒度
首先,spark stremaing 應用程序運行在Driver端,
Driver中有一個StreamingContext和SparkContext
Driver會要求在executor中啟動一個Receiver,當有數據輸入后,Receiver會將數據拆分成一些blocks
存放在內存中,如果設置了多副本,則也會copy這些blocks到其他機器
之后,receiver會把blocks的一些元數據信息告訴StreamingContext,當每隔幾秒的周期后,
StreamingContext會通知SparkContext去啟動jobs,並分發到executors中執行去處理數據
4-5 SparkStreaming 核心
4-5-1 StreamingContext 有兩個副結構體
4-5-1-1 def this(SparkContext,Duration)
4-5-1-2 def this(SparkConf,Duration) -----這個用的多
bathch interval可以根據你應用程序需要的延遲要求,以及集群的可用資源來配置
4-5-2 DStream (Discretized Streams)
一個DStream 代表着一系列不間斷的RDD
每一個RDD包含着這一個批次匯總的所有數據
對DStream操作算子,比如map/flatmap。其實底層會被翻譯為對Dstream 中的每一個RDD都做相同的
操作,因為一個DStream是由不同批次的RDD所構成
4-5-3 Input DStreams and Receiverss
Input DStream (從輸入數據流的源頭過來的DStream)
每一個Input DStream都需要關聯一個Receiver (文件系統除外)
local模式下不要使用local和local[1],因為若為1,則代表着只有一個核心可以用來接收數據,
但沒用核心去處理數據了,常用local[2]
local[n]中的 n > Receivers的個數
4-5-4 Transformation on DStreams
map,flatmap,filter...
4-5-5 Output Operations on DStreams (輸出結果)
print,saveAsTextFile,saveAsHadoopFile,foreachRDD...
4-6 實戰案例
spark streaming處理socket數據
spark streaming處理hdfs文件數據
4-7 spark streaming進階
4-7-1 updateStateByKey算子
截止到目前為止xxx的統計
使用這個時必須設置checkpoint
4-7-2 計算目前為止累積出現的單詞個數寫入到mysql中
建表:create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);
4-7-3 基於windows的統計
window:定時的進行一個時間段內的數據處理
window length:窗口長度 -----10m
sliding interval:窗口的間隔 -----10s
代表着:每隔10s統計前10分鍾的數據
4-7-4 黑名單過濾
4-8 spark streaming整合kafka實戰
分為兩種
Receiver方式和Direct Approch (容錯性,性能更高)
采用Direct方式
優點:簡化並行度,性能更高,0數據丟失,滿足只執行一次 Exactly-once
缺點: 不能跟新offset到zk中,需要手動加入更新
direct方式會直接讀取kafka底層的元數據信息
kafka就相當於底層的文件系統,
direct 直接作用於Driver端
5.使用log4j來模擬生產日志,flume采集到后傳遞給kafka,再交由spark streaming來處理數據
5-1 配置flume
streaming.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
啟動 flume
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
streaming2.conf
#flume 1.6版本使用此方法
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streamingtopic
agent1.sinks.kafka-sink.brokerList = master:9092
agent1.sinks.kafka-sink.batchSize = 20
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
啟動 flume
flume-ng agent \
--name agent1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
-Dflume.root.logger=INFO,console
我們現在是在本地進行測試的,在IDEA中運行LoggerGenerator,然后使用Flume,kafka以及Spark Streaming 進行處理操作
在生產上肯定不是這么干的,怎么干呢?
1)打包jar,執行LoggerGenerator類
2)Flume,kafka和測試的是一樣的
3)Spark Streaming的代碼也需要打成jar包,然后使用spark-submit方式運行
在生產上,整個流處理的流程都是一樣的,區別在於業務邏輯的復雜性
crontab -e
*/1 * * * * /home/master/文檔/code/log_generator.sh
每隔一分鍾執行一次sh文件,即每隔一分鍾會產生一批日志寫到log里
service crond stop 停止定時文件運行
service crond start 開始定時文件運行
選型:access.log ---> 控制台輸出
exec + memory + logger
1.對接python日志產生器輸出的日志到Flume
streaming_project.conf
exec-memory-logger.sources=exec-source
exec-memory-logger.sinks=logger-sink
exec-memory-logger.channels=memory-channel
exec-memory-logger.sources.exec-source.type=exec
exec-memory-logger.sources.exec-source.command=tail -F /home/master/文檔/data/access.log
exec-memory-logger.sources.exec-source.shell=/bin/sh -c
exec-memory-logger.channels.memory-channel.type=memory
exec-memory-logger.sinks.logger-sink.type=logger
exec-memory-logger.sources.exec-source.channels=memory-channel
exec-memory-logger.sinks.logger-sink.channel=memory-channel
啟動flume:
flume-ng agent \
--name exec-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_project.conf \
-Dflume.root.logger=INFO,console
2.日志===>Flume===>kafka
啟動zk ./zkServer.sh start
啟動kafka kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
修改flume配置文件,使得flume的日志到kafka
選型:exec + memory + kafka sink
streaming_project2.conf
exec-memory-kafka.sources=exec-source
exec-memory-kafka.sinks=kafka-sink
exec-memory-kafka.channels=memory-channel
exec-memory-kafka.sources.exec-source.type=exec
exec-memory-kafka.sources.exec-source.command=tail -F /home/master/文檔/data/access.log
exec-memory-kafka.sources.exec-source.shell=/bin/sh -c
exec-memory-kafka.channels.memory-channel.type=memory
exec-memory-kafka.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList=master:9092
exec-memory-kafka.sinks.kafka-sink.topic=streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize=5
exec-memory-kafka.sinks.kafka-sink.requireAcks=5
exec-memory-kafka.sources.exec-source.channels=memory-channel
exec-memory-kafka.sinks.kafka-sink.channel=memory-channel
啟動flume:
flume-ng agent \
--name exec-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_project2.conf \
-Dflume.root.logger=INFO,console
啟動消費者
kafka-console-consumer.sh --zookeeper master:2181 --topic streamingtopic
idea寫spark streaming對kafka中的數據進行處理
數據清洗操作:從原始日志中取出我們所需要的字段信息
結果如下:
ClickLog(46.30.143.10,20200113032901,145,200,-)
ClickLog(29.87.10.156,20200113032901,131,404,-)
ClickLog(87.10.72.30,20200113032901,145,200,-)
將結果寫入到外部數據庫中,前端頁面調取結果,以供圖形化展示
選擇HBase來作為結果存儲數據庫
HBase表設計
create 'course_clickcount','info'
Rowkey設計:根據業務需求
day_courseid
create 'course_search_clickcount',"info"
Rowkey設計:
day_searche_course
清空表
truncate 'course_search_clickcount'
如何使用scala操作hbase
項目打包: mvn clean package -DskipTests
spark-submit --master local[5] \
--class scala/org/example/project/spark/StreamingApp.scala \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 \
/home/master/文檔/code/KafkaTest-1.0-SNAPSHOT.jar \
192.168.187.10:2181 test streamingtopic 1
--jars $(echo /usr/local/src/hbase-1.3.1/lib/*.jar | tr ' ' ',') \
數據可視化
將抽象的科學或者商業數據,用圖像表示出來,幫助理解數據的意義的過程。
Spring Boot構建Web項目
Echarts