作者:韓信子@ShowMeAI
教程地址:https://www.showmeai.tech/tutorials/84
本文地址:https://www.showmeai.tech/article-detail/176
聲明:版權所有,轉載請聯系平台與作者並注明出處
引言
2020以來新冠疫情改變了全世界,影響着大家的生活,本案例結合大數據分析技術,使用pyspark對2020年美國新冠肺炎疫情進行數據分析,並結合可視化方法進行結果呈現。
1.實驗環境
- (1)Linux: Ubuntu 16.04
- (2)Hadoop3.1.3
- (3)Python: 3.8
- (4)Spark: 2.4.0
- (5)Jupyter Notebook
2.數據集
1)數據集下載
本案例使用的數據集來自Kaggle平台的美國新冠肺炎疫情數據集,數據名稱us-counties.csv,為csv文件,它包含了美國發現首例新冠肺炎確診病例至2020-05-19的相關數據。
數據集下載(百度網盤)
鏈接:https://pan.baidu.com/s/1YNY2UREm5lXsNkHM3DZFmA
提取碼:show
數據一覽如下:
2)格式轉換
原始數據為csv格式文件,我們首先做一點數據格式轉換,方便spark讀取數據生成RDD或者DataFrame,具體數據轉換代碼如下:
import pandas as pd
#.csv->.txt
data = pd.read_csv('/home/hadoop/us-counties.csv')
with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:
for line in data.values:
f.write((str(line[0])+'\t'+str(line[1])+'\t'
+str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))
3)數據上傳至HDFS
然后上傳“/home/hadoop/us-counties.txt”至HDFS文件系統中,具體路徑為“/user/hadoop/us-counties.txt”。操作命令如下:
./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop
3.使用Spark對數據進行分析
這里采用Python作為編程語言,結合pyspark進行數據分析。
1)數據讀取與DataFrame構建
首先我們讀取數據文件,生成Spark DataFrame。
本案例中使用的數據為結構化數據,因此可以使用spark讀取源文件生成DataFrame以方便進行后續分析實現。
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func
def toDate(inputStr):
newStr = ""
if len(inputStr) == 8:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7]
newStr = s1+"-"+"0"+s2+"-"+"0"+s3
else:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7:]
newStr = s1+"-"+"0"+s2+"-"+s3
date = datetime.strptime(newStr, "%Y-%m-%d")
return date
#主程序:
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)
rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
shemaUsInfo = spark.createDataFrame(rdd1,schema)
shemaUsInfo.createOrReplaceTempView("usInfo")
2)數據分析
本案例主要進行了以下統計分析,分析的目標和方法如下:
- 獲取數據集與代碼 → ShowMeAI的官方GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
- 運行代碼段與學習 → 在線編程環境 http://blog.showmeai.tech/python3-compiler
(1)統計美國截止每日的累計確診人數和累計死亡人數。
- 以date作為分組字段,對cases和deaths字段進行匯總統計。
(2)統計美國每日的新增確診人數。
- 因為「新增數=今日數-昨日數」,這里使用自連接,連接條件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases計算該日新增。
(3)統計美國每日的新增確診人數新增死亡人數。
- 因為「新增數=今日數-昨日數」,這里使用自連接,連接條件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases計算該日新增。
(4)統計截止5.19日,美國各州的累計確診人數和死亡人數。
- 首先篩選出5.19日的數據,然后以state作為分組字段,對cases和deaths字段進行匯總統計。
(5)統計截止5.19日,美國確診人數最多的十個州。
- 對3)的結果DataFrame注冊臨時表,然后按確診人數降序排列,並取前10個州。
(6)統計截止5.19日,美國死亡人數最多的十個州。
- 對3)的結果DataFrame注冊臨時表,然后按死亡人數降序排列,並取前10個州。
(7)統計截止5.19日,美國確診人數最少的十個州。
- 對3)的結果DataFrame注冊臨時表,然后按確診人數升序排列,並取前10個州。
(8)統計截止5.19日,美國死亡人數最少的十個州。
- 對3)的結果DataFrame注冊臨時表,然后按死亡人數升序排列,並取前10個州。
(9)統計截止5.19日,全美和各州的病死率。
- 病死率 = 死亡數/確診數,對3)的結果DataFrame注冊臨時表,然后按公式計算。
我們下面基於Spark DataFrame和Spark sql進行統計分析。
# 1.計算每日的累計確診病例數和死亡數
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
# 列重命名
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("result1.json") #寫入hdfs
# 注冊為臨時表供下一步使用
df1.createOrReplaceTempView("ustotal")
# 2.計算每日較昨日的新增確診病例數和死亡病例數
df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json") #寫入hdfs
# 3.統計截止5.19日 美國各州的累計確診人數和死亡人數
df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #寫入hdfs
df3.createOrReplaceTempView("eachStateInfo")
# 4.找出美國確診最多的10個州
df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10")
df4.repartition(1).write.json("result4.json")
# 5.找出美國死亡最多的10個州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")
# 6.找出美國確診最少的10個州
df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")
# 7.找出美國死亡最少的10個州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")
# 8.統計截止5.19全美和各州的病死率
df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")
3)結果文件
上述Spark計算結果保存.json文件,方便后續可視化處理。由於使用Python讀取HDFS文件系統不太方便,故將HDFS上結果文件轉儲到本地文件系統中,使用以下命:
./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1
...
對於result2等結果文件,使用相同命令,只需要改一下路徑即可。下載過程如下圖所示:
4.數據可視化
1)可視化工具選擇與代碼
選擇使用python第三方庫pyecharts作為可視化工具。
- 獲取數據集與代碼 → ShowMeAI的官方GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
- 運行代碼段與學習 → 在線編程環境 http://blog.showmeai.tech/python3-compiler
在使用前,需要安裝pyecharts,安裝代碼如下:
pip install pyecharts
具體可視化實現代碼如下:
from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json
每日的累計確診病例數和死亡數 → 雙柱狀圖
#1.畫出每日的累計確診病例數和死亡數 → 雙柱狀圖
def drawChart_1(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,則終止循環
break
js = json.loads(line)
date.append(str(js['date']))
cases.append(int(js['cases']))
deaths.append(int(js['deaths']))
d = (
Bar()
.add_xaxis(date)
.add_yaxis("累計確診人數", cases, stack="stack1")
.add_yaxis("累計死亡人數", deaths, stack="stack1")
.set_series_opts(label_opts=opts.LabelOpts(is_show=False))
.set_global_opts(title_opts=opts.TitleOpts(title="美國每日累計確診和死亡人數"))
.render("/home/hadoop/result/result1/result1.html")
)
每日的新增確診病例數和死亡數 → 折線圖
#2.畫出每日的新增確診病例數和死亡數 → 折線圖
def drawChart_2(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,則終止循環
break
js = json.loads(line)
date.append(str(js['date']))
cases.append(int(js['caseIncrease']))
deaths.append(int(js['deathIncrease']))
(
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
series_name="新增確診",
y_axis=cases,
markpoint_opts=opts.MarkPointOpts(
data=[
opts.MarkPointItem(type_="max", name="最大值")
]
),
markline_opts=opts.MarkLineOpts(
data=[opts.MarkLineItem(type_="average", name="平均值")]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="美國每日新增確診折線圖", subtitle=""),
tooltip_opts=opts.TooltipOpts(trigger="axis"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
.render("/home/hadoop/result/result2/result1.html")
)
(
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
series_name="新增死亡",
y_axis=deaths,
markpoint_opts=opts.MarkPointOpts(
data=[opts.MarkPointItem(type_="max", name="最大值")]
),
markline_opts=opts.MarkLineOpts(
data=[
opts.MarkLineItem(type_="average", name="平均值"),
opts.MarkLineItem(symbol="none", x="90%", y="max"),
opts.MarkLineItem(symbol="circle", type_="max", name="最高點"),
]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="美國每日新增死亡折線圖", subtitle=""),
tooltip_opts=opts.TooltipOpts(trigger="axis"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
.render("/home/hadoop/result/result2/result2.html")
)
截止5.19,美國各州累計確診、死亡人數和病死率—->表格
#3.畫出截止5.19,美國各州累計確診、死亡人數和病死率--->表格
def drawChart_3(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
allState = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,則終止循環
break
js = json.loads(line)
row = []
row.append(str(js['state']))
row.append(int(js['totalCases']))
row.append(int(js['totalDeaths']))
row.append(float(js['deathRate']))
allState.append(row)
table = Table()
headers = ["State name", "Total cases", "Total deaths", "Death rate"]
rows = allState
table.add(headers, rows)
table.set_global_opts(
title_opts=ComponentTitleOpts(title="美國各州疫情一覽", subtitle="")
)
table.render("/home/hadoop/result/result3/result1.html")
美國確診最多的10個州 → 詞雲圖
#4.畫出美國確診最多的10個州 → 詞雲圖
def drawChart_4(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,則終止循環
break
js = json.loads(line)
row=(str(js['state']),int(js['totalCases']))
data.append(row)
c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診Top10"))
.render("/home/hadoop/result/result4/result1.html")
)
美國死亡最多的10個州 → 柱狀圖
#5.畫出美國死亡最多的10個州 → 柱狀圖
def drawChart_5(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
state = []
totalDeath = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,則終止循環
break
js = json.loads(line)
state.insert(0,str(js['state']))
totalDeath.insert(0,int(js['totalDeaths']))
c = (
PictorialBar()
.add_xaxis(state)
.add_yaxis(
"",
totalDeath,
label_opts=opts.LabelOpts(is_show=False),
symbol_size=18,
symbol_repeat="fixed",
symbol_offset=[0, 0],
is_symbol_clip=True,
symbol=SymbolType.ROUND_RECT,
)
.reversal_axis()
.set_global_opts(
title_opts=opts.TitleOpts(title="PictorialBar-美國各州死亡人數Top10"),
xaxis_opts=opts.AxisOpts(is_show=False),
yaxis_opts=opts.AxisOpts(
axistick_opts=opts.AxisTickOpts(is_show=False),
axisline_opts=opts.AxisLineOpts(
linestyle_opts=opts.LineStyleOpts(opacity=0)
),
),
)
.render("/home/hadoop/result/result5/result1.html")
)
找出美國確診最少的10個州 → 詞雲圖
#6.找出美國確診最少的10個州 → 詞雲圖
def drawChart_6(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,則終止循環
break
js = json.loads(line)
row=(str(js['state']),int(js['totalCases']))
data.append(row)
c = (
WordCloud()
.add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診最少的10個州"))
.render("/home/hadoop/result/result6/result1.html")
)
美國死亡最少的10個州 → 漏斗圖
#7.找出美國死亡最少的10個州 → 漏斗圖
def drawChart_7(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,則終止循環
break
js = json.loads(line)
data.insert(0,[str(js['state']),int(js['totalDeaths'])])
c = (
Funnel()
.add(
"State",
data,
sort_="ascending",
label_opts=opts.LabelOpts(position="inside"),
)
.set_global_opts(title_opts=opts.TitleOpts(title=""))
.render("/home/hadoop/result/result7/result1.html")
)
美國的病死率—->餅狀圖
#8.美國的病死率--->餅狀圖
def drawChart_8(index):
root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
values = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,返回空字符串,則終止循環
break
js = json.loads(line)
if str(js['state'])=="USA":
values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
c = (
Pie()
.add("", values)
.set_colors(["blcak","orange"])
.set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
.render("/home/hadoop/result/result8/result1.html")
)
#可視化
index = 1
while index<9:
funcStr = "drawChart_" + str(index)
eval(funcStr)(index)
index+=1
2)結果圖標展示
可視化結果是.html格式的,reslut1的結果展示圖保存路徑為“/home/hadoop/result/result1/result1.html”,reslut2的結果展示圖保存路徑為“/home/hadoop/result/result2/result1.html”,其余類似遞推。具體截圖如下:
(1)美國每日的累計確診病例數和死亡數 → 雙柱狀圖
(2)美國每日的新增確診病例數 → 折線圖
(3)美國每日的新增死亡病例數 → 折線圖
(4)截止5.19,美國各州累計確診、死亡人數和病死率 → 表格
(5)截止5.19,美國累計確診人數前10的州 → 詞雲圖
(6)截止5.19,美國累計死亡人數前10的州 → 柱狀圖
(7)截止5.19,美國累計確診人數最少的10個州 → 詞雲圖
(8)截止5.19,美國累計死亡人數最少的10個州 → 漏斗圖
(9)截止5.19,美國的病死率 → 餅狀圖
5.參考資料
- 數據科學工具速查 | Spark使用指南(RDD版) https://www.showmeai.tech/article-detail/106
- 數據科學工具速查 | Spark使用指南(SQL版) https://www.showmeai.tech/article-detail/107
【大數據技術與處理】推薦閱讀
- 圖解大數據 | 大數據生態與應用導論
- 圖解大數據 | 分布式平台Hadoop與Map-Reduce詳解
- 圖解大數據 | Hadoop系統搭建與環境配置@實操案例
- 圖解大數據 | 應用Map-Reduce進行大數據統計@實操案例
- 圖解大數據 | Hive搭建與應用@實操案例
- 圖解大數據 | Hive與HBase詳解@海量數據庫查詢
- 圖解大數據 | 大數據分析挖掘框架@Spark初步
- 圖解大數據 | 基於RDD大數據處理分析@Spark操作
- 圖解大數據 | 基於Dataframe / SQL大數據處理分析@Spark操作
- 圖解大數據 | 使用Spark分析新冠肺炎疫情數據@綜合案例
- 圖解大數據 | 使用Spark分析挖掘零售交易數據@綜合案例
- 圖解大數據 | 使用Spark分析挖掘音樂專輯數據@綜合案例
- 圖解大數據 | Spark Streaming @流式數據處理
- 圖解大數據 | 工作流與特征工程@Spark機器學習
- 圖解大數據 | 建模與超參調優@Spark機器學習
- 圖解大數據 | GraphFrames @基於圖的數據分析挖掘
ShowMeAI系列教程推薦
- 大廠技術實現方案系列
- 圖解Python編程:從入門到精通系列教程
- 圖解數據分析:從入門到精通系列教程
- 圖解AI數學基礎:從入門到精通系列教程
- 圖解大數據技術:從入門到精通系列教程
- 圖解機器學習算法:從入門到精通系列教程
- 機器學習實戰:手把手教你玩轉機器學習系列
- 深度學習教程:吳恩達專項課程 · 全套筆記解讀
- 自然語言處理教程:斯坦福CS224n課程 · 課程帶學與全套筆記解讀
- 深度學習與計算機視覺教程:斯坦福CS231n · 全套筆記解讀