圖解大數據 | 綜合案例-使用spark分析新冠肺炎疫情數據


ShowMeAI研究中心

作者:韓信子@ShowMeAI
教程地址https://www.showmeai.tech/tutorials/84
本文地址https://www.showmeai.tech/article-detail/176
聲明:版權所有,轉載請聯系平台與作者並注明出處

引言

2020以來新冠疫情改變了全世界,影響着大家的生活,本案例結合大數據分析技術,使用pyspark對2020年美國新冠肺炎疫情進行數據分析,並結合可視化方法進行結果呈現。

1.實驗環境

  • (1)Linux: Ubuntu 16.04
  • (2)Hadoop3.1.3
  • (3)Python: 3.8
  • (4)Spark: 2.4.0
  • (5)Jupyter Notebook

2.數據集

1)數據集下載

本案例使用的數據集來自Kaggle平台的美國新冠肺炎疫情數據集,數據名稱us-counties.csv,為csv文件,它包含了美國發現首例新冠肺炎確診病例至2020-05-19的相關數據。

數據集下載(百度網盤)
鏈接:https://pan.baidu.com/s/1YNY2UREm5lXsNkHM3DZFmA
提取碼:show

數據一覽如下:

2)格式轉換

原始數據為csv格式文件,我們首先做一點數據格式轉換,方便spark讀取數據生成RDD或者DataFrame,具體數據轉換代碼如下:

import pandas as pd
#.csv->.txt
data = pd.read_csv('/home/hadoop/us-counties.csv')
with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:
    for line in data.values:
        f.write((str(line[0])+'\t'+str(line[1])+'\t'
                +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

3)數據上傳至HDFS

然后上傳“/home/hadoop/us-counties.txt”至HDFS文件系統中,具體路徑為“/user/hadoop/us-counties.txt”。操作命令如下:

./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

3.使用Spark對數據進行分析

這里采用Python作為編程語言,結合pyspark進行數據分析。

1)數據讀取與DataFrame構建

首先我們讀取數據文件,生成Spark DataFrame。
本案例中使用的數據為結構化數據,因此可以使用spark讀取源文件生成DataFrame以方便進行后續分析實現。

from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func

def toDate(inputStr):
    newStr = ""
    if len(inputStr) == 8:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7]
        newStr = s1+"-"+"0"+s2+"-"+"0"+s3
    else:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7:]
        newStr = s1+"-"+"0"+s2+"-"+s3
    date = datetime.strptime(newStr, "%Y-%m-%d")
    return date


#主程序:
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)

rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))

shemaUsInfo = spark.createDataFrame(rdd1,schema)

shemaUsInfo.createOrReplaceTempView("usInfo")

2)數據分析

本案例主要進行了以下統計分析,分析的目標和方法如下:

(1)統計美國截止每日的累計確診人數和累計死亡人數。

  • 以date作為分組字段,對cases和deaths字段進行匯總統計。

(2)統計美國每日的新增確診人數。

  • 因為「新增數=今日數-昨日數」,這里使用自連接,連接條件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases計算該日新增。

(3)統計美國每日的新增確診人數新增死亡人數。

  • 因為「新增數=今日數-昨日數」,這里使用自連接,連接條件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases計算該日新增。

(4)統計截止5.19日,美國各州的累計確診人數和死亡人數。

  • 首先篩選出5.19日的數據,然后以state作為分組字段,對cases和deaths字段進行匯總統計。

(5)統計截止5.19日,美國確診人數最多的十個州。

  • 對3)的結果DataFrame注冊臨時表,然后按確診人數降序排列,並取前10個州。

(6)統計截止5.19日,美國死亡人數最多的十個州。

  • 對3)的結果DataFrame注冊臨時表,然后按死亡人數降序排列,並取前10個州。

(7)統計截止5.19日,美國確診人數最少的十個州。

  • 對3)的結果DataFrame注冊臨時表,然后按確診人數升序排列,並取前10個州。

(8)統計截止5.19日,美國死亡人數最少的十個州。

  • 對3)的結果DataFrame注冊臨時表,然后按死亡人數升序排列,並取前10個州。

(9)統計截止5.19日,全美和各州的病死率。

  • 病死率 = 死亡數/確診數,對3)的結果DataFrame注冊臨時表,然后按公式計算。

我們下面基於Spark DataFrame和Spark sql進行統計分析。

# 1.計算每日的累計確診病例數和死亡數
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())

# 列重命名
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("result1.json")                               #寫入hdfs

# 注冊為臨時表供下一步使用
df1.createOrReplaceTempView("ustotal")

# 2.計算每日較昨日的新增確診病例數和死亡病例數
df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")

df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")           #寫入hdfs

# 3.統計截止5.19日 美國各州的累計確診人數和死亡人數
df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")

df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #寫入hdfs

df3.createOrReplaceTempView("eachStateInfo")

# 4.找出美國確診最多的10個州
df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")
df4.repartition(1).write.json("result4.json")

# 5.找出美國死亡最多的10個州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")

# 6.找出美國確診最少的10個州
df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")

# 7.找出美國死亡最少的10個州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")

# 8.統計截止5.19全美和各州的病死率
df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")

3)結果文件

上述Spark計算結果保存.json文件,方便后續可視化處理。由於使用Python讀取HDFS文件系統不太方便,故將HDFS上結果文件轉儲到本地文件系統中,使用以下命:

./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1
...

對於result2等結果文件,使用相同命令,只需要改一下路徑即可。下載過程如下圖所示:

4.數據可視化

1)可視化工具選擇與代碼

選擇使用python第三方庫pyecharts作為可視化工具。

在使用前,需要安裝pyecharts,安裝代碼如下:

pip install pyecharts

具體可視化實現代碼如下:

from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json

每日的累計確診病例數和死亡數 → 雙柱狀圖

#1.畫出每日的累計確診病例數和死亡數 → 雙柱狀圖

def drawChart_1(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,則終止循環
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['cases']))
            deaths.append(int(js['deaths']))

    d = (
    Bar()
    .add_xaxis(date)
    .add_yaxis("累計確診人數", cases, stack="stack1")
    .add_yaxis("累計死亡人數", deaths, stack="stack1")
    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(title_opts=opts.TitleOpts(title="美國每日累計確診和死亡人數"))
    .render("/home/hadoop/result/result1/result1.html")
    )

每日的新增確診病例數和死亡數 → 折線圖

#2.畫出每日的新增確診病例數和死亡數 → 折線圖
def drawChart_2(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,則終止循環
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['caseIncrease']))
            deaths.append(int(js['deathIncrease']))

    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="新增確診",
        y_axis=cases,
        markpoint_opts=opts.MarkPointOpts(
            data=[
                opts.MarkPointItem(type_="max", name="最大值")

            ]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[opts.MarkLineItem(type_="average", name="平均值")]
        ),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="美國每日新增確診折線圖", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result1.html")
    )
    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="新增死亡",
        y_axis=deaths,
        markpoint_opts=opts.MarkPointOpts(
            data=[opts.MarkPointItem(type_="max", name="最大值")]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[
                opts.MarkLineItem(type_="average", name="平均值"),
                opts.MarkLineItem(symbol="none", x="90%", y="max"),
                opts.MarkLineItem(symbol="circle", type_="max", name="最高點"),
            ]
        ),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="美國每日新增死亡折線圖", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result2.html")
    )

截止5.19,美國各州累計確診、死亡人數和病死率—->表格

#3.畫出截止5.19,美國各州累計確診、死亡人數和病死率--->表格
def drawChart_3(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    allState = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,則終止循環
                break
            js = json.loads(line)
            row = []
            row.append(str(js['state']))
            row.append(int(js['totalCases']))
            row.append(int(js['totalDeaths']))
            row.append(float(js['deathRate']))
            allState.append(row)

    table = Table()

    headers = ["State name", "Total cases", "Total deaths", "Death rate"]
    rows = allState
    table.add(headers, rows)
    table.set_global_opts(
        title_opts=ComponentTitleOpts(title="美國各州疫情一覽", subtitle="")
    )
    table.render("/home/hadoop/result/result3/result1.html")

美國確診最多的10個州 → 詞雲圖

#4.畫出美國確診最多的10個州 → 詞雲圖
def drawChart_4(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,則終止循環
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)

    c = (
    WordCloud()
    .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診Top10"))
    .render("/home/hadoop/result/result4/result1.html")
    )

美國死亡最多的10個州 → 柱狀圖

#5.畫出美國死亡最多的10個州 → 柱狀圖
def drawChart_5(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    state = []
    totalDeath = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,則終止循環
                break
            js = json.loads(line)
            state.insert(0,str(js['state']))
            totalDeath.insert(0,int(js['totalDeaths']))

    c = (
    PictorialBar()
    .add_xaxis(state)
    .add_yaxis(
        "",
        totalDeath,
        label_opts=opts.LabelOpts(is_show=False),
        symbol_size=18,
        symbol_repeat="fixed",
        symbol_offset=[0, 0],
        is_symbol_clip=True,
        symbol=SymbolType.ROUND_RECT,
    )
    .reversal_axis()
    .set_global_opts(
        title_opts=opts.TitleOpts(title="PictorialBar-美國各州死亡人數Top10"),
        xaxis_opts=opts.AxisOpts(is_show=False),
        yaxis_opts=opts.AxisOpts(
            axistick_opts=opts.AxisTickOpts(is_show=False),
            axisline_opts=opts.AxisLineOpts(
                linestyle_opts=opts.LineStyleOpts(opacity=0)
            ),
        ),
    )
    .render("/home/hadoop/result/result5/result1.html")
    )

找出美國確診最少的10個州 → 詞雲圖

#6.找出美國確診最少的10個州 → 詞雲圖
def drawChart_6(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,則終止循環
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)

    c = (
    WordCloud()
    .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診最少的10個州"))
    .render("/home/hadoop/result/result6/result1.html")
    )

美國死亡最少的10個州 → 漏斗圖

#7.找出美國死亡最少的10個州 → 漏斗圖
def drawChart_7(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,則終止循環
                break
            js = json.loads(line)
            data.insert(0,[str(js['state']),int(js['totalDeaths'])])

    c = (
    Funnel()
    .add(
        "State",
        data,
        sort_="ascending",
        label_opts=opts.LabelOpts(position="inside"),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
    .render("/home/hadoop/result/result7/result1.html")
    )

美國的病死率—->餅狀圖

#8.美國的病死率--->餅狀圖
def drawChart_8(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    values = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,則終止循環
                break
            js = json.loads(line)
            if str(js['state'])=="USA":
                values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
                values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
    c = (
    Pie()
    .add("", values)
    .set_colors(["blcak","orange"])
    .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
    .render("/home/hadoop/result/result8/result1.html")
    )

#可視化
index = 1
while index<9:
    funcStr = "drawChart_" + str(index)
    eval(funcStr)(index)
    index+=1

2)結果圖標展示

可視化結果是.html格式的,reslut1的結果展示圖保存路徑為“/home/hadoop/result/result1/result1.html”,reslut2的結果展示圖保存路徑為“/home/hadoop/result/result2/result1.html”,其余類似遞推。具體截圖如下:

(1)美國每日的累計確診病例數和死亡數 → 雙柱狀圖

(2)美國每日的新增確診病例數 → 折線圖

(3)美國每日的新增死亡病例數 → 折線圖

(4)截止5.19,美國各州累計確診、死亡人數和病死率 → 表格

(5)截止5.19,美國累計確診人數前10的州 → 詞雲圖

(6)截止5.19,美國累計死亡人數前10的州 → 柱狀圖

(7)截止5.19,美國累計確診人數最少的10個州 → 詞雲圖

(8)截止5.19,美國累計死亡人數最少的10個州 → 漏斗圖

(9)截止5.19,美國的病死率 → 餅狀圖

5.參考資料

【大數據技術與處理】推薦閱讀

ShowMeAI系列教程推薦


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM