使用爬蟲等獲取實時數據+Flume+Kafka+Spark Streaming+mysql+Echarts實現數據動態實時采集、分析、展示
【獲取完整源碼關注公眾號:靠譜楊閱讀人生 回復kafka獲取下載鏈接】
主要工作流程如下所示:
模擬隨機數據,把數據實時傳輸到Linux虛擬機文件中。
使用Flume實時監控該文件,如果發現文件內容變動則進行處理,將數據抓取並傳遞到Kafka消息隊列中。
之后使用Spark Streaming 實時處理Kafka中的數據,並寫入Windows本機mysql數據庫中,之后python讀取mysql數據庫中的數據並基於Echart圖表對數據進行實時動態展示。
啟動hadoop集群 myhadoop.sh start 【腳本參考 https://www.cnblogs.com/rainbow-1/p/16774523.html】
啟動zookeeper集群 myzk.sh start 【腳本參考 https://www.cnblogs.com/rainbow-1/p/15319226.html】
啟動kafka集群 kf.sh start 【腳本參考 https://www.cnblogs.com/rainbow-1/p/16015749.html】
一、實時數據的模擬
案例簡化了第一步的流程,使用模擬數據進行測試,代碼如下:
import datetime
import random
import time
import paramiko
hostname = "hadoop102"
port = 22
username = "root"
password = "000429"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, port, username, password, compress=True)
sftp_client = client.open_sftp()
# try:
# for line in remote_file:
# print(line)
# finally:
# remote_file.close()
#獲取系統時間
num1=3000
for i in range(1000):
remote_file = sftp_client.open("/opt/module/data/test1.csv", 'a') # 文件路徑
time1 = datetime.datetime.now()
time1_str = datetime.datetime.strftime(time1, '%Y-%m-%d %H:%M:%S')
print("當前時間: " + time1_str)
time.sleep(random.randint(1,3))
num1_str=str(num1+random.randint(-1300,1700))
print("當前隨機數: "+num1_str)
remote_file.write(time1_str+","+num1_str+"\n")
remote_file.close()
- 主要過程
-
在/opt/module/data/路徑下建立test1.csv文件
-
代碼實現遠程連接虛擬機hadoop102並以root用戶身份登錄,打開需要上傳的文件目錄。
-
使用一個for循環間隔隨機1到3秒向文件中寫入一些數據。
二、Flume實時監控文件
-
進入/opt/module/flume/job路徑編輯配置文件信息(myflume.conf)
內容如下:其中指定了被監控文件的路徑,Kafka服務主機地址,Kafka主題和序列化等信息
#給agent中的三個組件source、sink和channel各起一個別名,a1代表為agent起的別名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source屬性配置信息
a1.sources.r1.type = exec
#a1.sources.r1.bind = localhost
#a1.sources.r1.port = 44444
a1.sources.r1.command=tail -F /opt/module/data/test1.csv
# sink屬性配置信息
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers:hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic=first
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoer
#channel屬性配置信息
#內存模式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#傳輸參數設置
a1.channels.c1.transactionCapacity=100
#綁定source和sink到channel上
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
- 在/opt/module/flume 路徑下開啟Flume,此時Flume開始監控目標文件(job/myflume.conf)
bin/flume-ng agent -c conf/ -n a1 -f job/myflume.conf -Dflume.root.logger=INFO,console
三、使用Spark Streaming完成數據計算
-
新建一個名為first的消費主題(topic)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 1 --topic first1
-
新建Maven項目,編寫代碼,Kafka的topic主題的消費者
pom.xml配置如下:注意此處各個資源的版本號一定要與本機(IDEA編譯器)的Scala版本一致,博主為Scala 2.12.11
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.reliable.ycw</groupId> <artifactId>spark-test</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-network-common --> <!--<dependency>--> <!--<groupId>org.apache.spark</groupId>--> <!--<artifactId>spark-network-common_2.12</artifactId>--> <!--<version>3.0.0</version>--> <!--</dependency>--> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.18</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.11</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.12.11</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.12.11</version> </dependency> </dependencies> <build> <plugins> <!-- 該插件用於將 Scala 代碼編譯成 class 文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <!-- 聲明綁定到 maven 的 compile 階段 --> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
消費者類代碼如下:
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import java.sql.DriverManager import java.text.SimpleDateFormat import java.util.Date /** * 主要是計算X秒內數據條數的變化 * 比如5秒內進來4條數據 */ import org.apache.spark.streaming.{Seconds, StreamingContext} /** Utility functions for Spark Streaming examples.*/ object StreamingExamples extends App{ /** Set reasonable logging levels for streaming if the user has not configured log4j.*/ // def setStreamingLogLevels() { // val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements // if (!log4jInitialized) { // // We first log Appsomething to initialize Spark's default logging, then we override the // // logging level. // logInfo("Setting log level to [WARN] for streaming example." + // " To override add a custom log4j.properties to the classpath.") // Logger.getRootLogger.setLevel(Level.WARN) // } // } val conf=new SparkConf().setMaster("local").setAppName("jm") .set("spark.streaming.kafka.MaxRatePerPartition","3") .set("spark.local.dir","./tmp") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //創建上下文,5s為批處理間隔 val ssc = new StreamingContext(conf,Seconds(5)) //配置kafka參數,根據broker和topic創建連接Kafka 直接連接 direct kafka val KafkaParams = Map[String,Object]( //brokers地址 "bootstrap.servers"->"hadoop102:9092,hadoop103:9092,hadoop104:9092", //序列化類型 "key.deserializer"->classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "MyGroupId", //設置手動提交消費者offset "enable.auto.commit" -> (false: java.lang.Boolean)//默認是true ) //獲取KafkaDStream val kafkaDirectStream = KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](List("first"),KafkaParams)) kafkaDirectStream.print() var num=kafkaDirectStream.count() var num_1="" num foreachRDD (x => { //var res=x.map(line=>line.split(",")) val connection = getCon() var time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date).toString var sql = "insert into content_num values('" + time + "'," + x.collect()(0) + ")" connection.createStatement().execute(sql) connection.close() }) // print("sdfasdf") // print(num_1) //根據得到的kafak信息,切分得到用戶電話DStream // val nameAddrStream = kafkaDirectStream.map(_.value()).filter(record=>{ // val tokens: Array[String] = record.split(",") // tokens(1).toInt==0 // }) // // nameAddrStream.print() // .map(record=>{ // val tokens = record.split("\t") // (tokens(0),tokens(1)) // }) // // // val namePhoneStream = kafkaDirectStream.map(_.value()).filter( // record=>{ // val tokens = record.split("\t") // tokens(2).toInt == 1 // } // ).map(record=>{ // val tokens = record.split("\t") // (tokens(0),tokens(1)) // }) // // //以用戶名為key,將地址電話配對在一起,並產生固定格式的地址電話信息 // val nameAddrPhoneStream = nameAddrStream.join(namePhoneStream).map( // record=>{ // s"姓名:${record._1},地址:${record._2._1},郵編:${record._2._2}" // } // ) // //打印輸出 // nameAddrPhoneStream.print() //開始計算 ssc.start() ssc.awaitTermination() def getCon()={ Class.forName("com.mysql.cj.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8","root","reliable") } }
這段代碼指定了虛擬機中Kafka的主題信息,並從中定時獲取(博主設置的為5秒)期間變化的信息量,完成計算后把本機的時間和信息變化量存儲到本地Mysql數據庫中【庫spark 表content_num 字段 type num】
-
注意指定時區和編碼
jdbc:mysql://localhost:3306/spark?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8
-
四、可視化
使用Echarts平滑折線圖完成數據的展示
- 后台讀取mysql的數據【spark_sql.py】
import pymysql
def get_conn():
"""
獲取連接和游標
:return:
"""
conn = pymysql.connect(host="127.0.0.1",
user="root",
password="reliable",
db="spark",
charset="utf8")
cursor = conn.cursor()
return conn, cursor
def close_conn(conn, cursor):
"""
關閉連接和游標
:param conn:
:param cursor:
:return:
"""
if cursor:
cursor.close()
if conn:
conn.close()
# query
def query(sql, *args):
"""
通用封裝查詢
:param sql:
:param args:
:return:返回查詢結果 ((),())
"""
conn, cursor = get_conn()
print(sql)
cursor.execute(sql)
res = cursor.fetchall()
close_conn(conn, cursor)
return res
def dynamic_bar():
# 獲取數據庫連接
conn, cursor = get_conn()
if (conn != None):
print("數據庫連接成功!")
typenumsql = "select * from content_num order by num desc limit 11;"
detail_sql = ""
res_title = query(typenumsql)
type_num = [] # 存儲類別+數量
for item1 in res_title:
type_num.append(item1)
return type_num
- 路由獲取后台數據
@app.route('/dynamic_bar')
def dynamic_bar():
res_list=spark_sql.dynamic_bar()
my_list=[]
list_0=[]
list_1=[]
for item in res_list:
list_0.append(item[0])
list_1.append(item[1])
my_list.append(list_0)
my_list.append(list_1)
return {"data":my_list}
- 前台繪制折線圖 line.html
<!DOCTYPE html>
<html style="height: 100%">
<head>
<meta charset="utf-8">
</head>
<body style="height: 100%; margin: 0">
<div id="container" style="height: 100%"></div>
<script type="text/javascript" src="../static/js/echarts.min.js"></script>
<script src="../static/js/jquery-3.3.1.min.js"></script>
</body>
</html>
<script>
var dom = document.getElementById("container");
var myChart = echarts.init(dom);
var app = {};
var option;
</script>
<script type="text/javascript">
option = {
tooltip: {
trigger: 'axis',
axisPointer: {
type: 'shadow'
}
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
containLabel: true
},
xAxis: [
{
type: 'category',
data: [],
axisTick: {
alignWithLabel: true
}
}
],
yAxis: [
{
type: 'value'
}
],
series: [
{
name: 'Direct',
type: 'line',
barWidth: '60%',
data: []
}
]
};
if (option && typeof option === 'object') {
myChart.setOption(option);
}
function update(){
$.ajax({
url:"/dynamic_bar",
async:true,
success:function (data) {
option.xAxis[0].data=data.data[0]
option.series[0].data=data.data[1]
myChart.setOption(option);
},
error:function (xhr,type,errorThrown) {
alert("出現錯誤!")
}
})
}
setInterval("update()",100)
</script>
可視化這里需要注意的點:
- 注意先引入echarts.min.js再引入jquery-3.3.1.min.js
- 注意指定放置圖像的div塊的大小
- 把賦值方法放在圖像初始化配置代碼的后面
- 注意設置方法循環執行:setInterval("update()",100)
小結:整個流程的關鍵在於對實時數據的監控和展示,首先要保證數據傳輸的動態性,其次要保證Flume實時監控數據的變化。其中使用Kafka的目的在於當數據量足夠大的時候,往往會出現數據的監控和采集速度跟不上數據的變化,所以采用Kafka消息隊列機制,讓其緩沖數據以實現大數據量的處理,后續需要編寫Spark Streaming代碼完成對消息的收集處理(存入本地mysql數據庫),最后讀取數據庫數據並用折線圖完成動態展示效果,數據庫的數據是實時變動的,這就需要在讀取的時候要讀到最新進來的數據,這樣才能看到圖線的動態效果。(下圖的圖線會隨着數據的變化動態改變!)