疫情echarts可视化(1) 数据爬取、存储以及取出来


 

参考:Python爬取疫情实战:Flask搭建web/Echarts可视化大屏/MySQL数据库/Linux项目部署与任务定时调度_哔哩哔哩_bilibili

由于这个工作量比较大,所以我将分为两个部分,今天就先实现,相关数据爬取,储存以及如何取出来。可视化的话得过一段时间才能完成,还请尽情期待吧。 

 

一、疫情数据来源分析

数据爬取连接:实时更新:新冠肺炎疫情最新动态 (qq.com)

 

1、分析数据,找到数据:

win+F12 开发者

首先先分析中国近几个月的每日数据:

找到数据所在位置

 

1.1、查看数据格式:下面可以看到有点类似于字典的格式,在python中可以利用json将其转化为字典的格式便于提取所需要的数据

 

 

 

 

2、当日数据:先找到数据所在位置

当日数据:

 

 2.1、数据格式:

下面可以看到:第一行并不类似于字典而是从第二行开始才类似于,所以得截取合适的范围,再利用python 的json模块转化为字典格式的数据,再提取所需要的数据。

将下面的蓝色部分删除后回车

 

 

 

完全变成了类似于字典的数据

 

 

 

 

 3、全球数据:先找到数据的所在位置

 

3.1、分析数据格式:下面看到可以直接利用json

 

 

 

4、百度疫情实时播报数据来源分析

 

 

4.1、数据格式:可以通过截取,使其完全满足json的要求,也可以

 

 

也可以把蓝色的部分删除,也可以完全满足

 

 

 

 

 

 

删除后就满足了

 

 

 

 二、利用python爬取数据

1、中国的数据分析

  

 

 

1.1、中国数据爬取代码:

import time
import requests
import json

# 获取中国每日情况,和各个省的数据
def get_china_data():
    # 每日情况的连接
    china_everyDay_url = 'https://api.inews.qq.com/newsqa/v1/query/inner/publish/modules/list?modules=chinaDayList,chinaDayAddList,nowConfirmStatis,provinceCompare'
    # 爬取当天的连接
    china_today_details_url = 'https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5&callback=&_=%d' % int(time.time() * 1000)

    # 设置请求头,防止爬虫失败
    headers = {
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36"
    }

    # 将得到对象转化为string的格式才能转化为json       获取该对象的文本内容,它的类型就是str
    response1 = requests.get(china_everyDay_url, headers).text

    # 将json格式转换为字典格式
    res1 = json.loads(response1)

    data_all1 = res1["data"]
    # 每天的情况
    history = {}
    # 把确诊的情况以字典的形式加到history字典中
    for i in data_all1["chinaDayList"]:
        # 时间 下面将其转化为 类似于2021.12.04
        ds = i['y']+'.'+ i["date"]
        tup = time.strptime(ds, "%Y.%m.%d")  # 匹配时间
        ds = time.strftime("%Y.%m.%d", tup)  # 改变时间格式
        # 确诊病例
        confirm = i["confirm"]
        # 疑似病例
        suspect = i["suspect"]
        # 自愈病例
        heal = i["heal"]
        # 死亡病例
        dead = i["dead"]

        history[ds] = {"confirm": confirm, "suspect": suspect, "heal": heal, "dead": dead}

    # 把新增的以字典的形式加到history中
    for i in data_all1["chinaDayAddList"]:
        ds = i["y"]+'.' + i["date"]
        # 转化格式
        tup = time.strptime(ds, "%Y.%m.%d")  # 匹配时间
        ds = time.strftime("%Y.%m.%d", tup)  # 改变时间格式
        # 新增确诊
        confirm = i["confirm"]
        # 新增疑似
        suspect = i["suspect"]
        # 新增自愈
        heal = i["heal"]
        # 新增死亡
        dead = i["dead"]
        history[ds].update({"confirm_add": confirm, "suspect_add": suspect, "heal_add": heal, "dead_add": dead})
    # 当日的详情数据
    details = []
    response2 = requests.get(china_today_details_url, headers)
    res = json.loads(response2.text)
    # json字符串转字典
    data_all1 = json.loads(res['data'])
    update_time = data_all1["lastUpdateTime"]
    data_country = data_all1["areaTree"]
    data_province = data_country[0]["children"]
    for pro_infos in data_province:
        # 省份
        province = pro_infos["name"]
        # 省份下的市区
        for city_infos in pro_infos["children"]:
            # 市名
            city = city_infos["name"]
            # 新增确诊
            confirm_add = city_infos["today"]["confirm"]
            # 确诊
            confirm = city_infos["total"]["confirm"]
            # 死亡
            dead = city_infos["total"]["dead"]
            # 自愈
            heal = city_infos["total"]["heal"]
            details.append([update_time, province, city, confirm, confirm_add, heal, dead])
    return history, details

 

 

2、全球疫情数据分析:

 

 2.1 、全球数据爬取代码

# 返回数据格式:国名 : {'date': '2021-12-28', 'confirm_add': 229016, 'confirm': 53791852, 'heal': 41203698, 'dead': 839605}
def get_foreign_data():
    start_time = time.time()
    urls = 'https://api.inews.qq.com/newsqa/v1/automation/modules/list?modules=FAutoCountryConfirmAdd,WomWorld,WomAboard'
    header = {
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36"
    }
    # value 格式{'date': '2021-02-01', 'confirm_add': 0, 'confirm': 2, 'heal': 0, 'dead': 0}
    response = requests.get(urls,header)
    data_all = json.loads(response.text)['data']["WomAboard"]
    country_data = {}
    for data in data_all:
        # 活得国家名
        country = data['name']
        value = {}
        # 时间 以及格式化: 2021-12-28
        value['date'] = data['y']+data['date']
        tup = time.strptime(value['date'], '%Y%m.%d')
        update_time = time.strftime('%Y-%m-%d', tup)  # 改变时间格式,不然插入数据库会报错
        value['date'] = update_time
        print(update_time)
        # 增加
        value['confirm_add'] = data['confirmAdd']
        # 确诊
        value['confirm'] = data['confirm']
        # 治愈
        value['heal'] = data['heal']
        # 死亡
        value['dead'] = data['dead']
        country_data[country] = value

    print("各国数据请求完毕:", time.time() - start_time, '')
    return country_data

 

 3、疫情实时播报数据分析

 

 

3.1、实时播报爬取代码

# 实时播报#time broadcast
def get_time_broadcast():
    url = 'https://opendata.baidu.com/data/inner?tn=reserved_all_res_tn&dspName=iphone&from_sf=1&dsp=iphone&resource_id=28565&alr=1&query=%E5%9B%BD%E5%86%85%E6%96%B0%E5%9E%8B%E8%82%BA%E7%82%8E%E6%9C%80%E6%96%B0%E5%8A%A8%E6%80%81&cb=jsonp_1640266664481_72760'
    # 设置请求头,防止爬虫失败
    headers = {
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36"
    }
    response = requests.get(url, headers)
    # 截取使其满足条件,可以转化为json
    data = response.text[26:-1]
    data_json = json.loads(data)['Result'][0]['DisplayData']['result']['items']
    context = []
    for ev in data_json:
        # 获取事件
        eventDescription = ev['eventDescription']
        # 获取时间参数
        eventTime = ev['eventTime']
        # 拼接在一起
        context.append(eventDescription + eventTime)

    return context

 

 

 

三、存储爬取的数据

这里是利用MySQL进行存储数据

这里需要提前安装mysql,自己去搜索吧!

 

1、创建history表,每日情况

CREATE TABLE `history`(
`ds` datetime NOT NULL COMMENT '日期',
`confirm` int(11) DEFAULT NULL COMMENT '累计确诊',
`confirm_add` int(11) DEFAULT NULL COMMENT '当日新增确诊',
`suspect` int(11) DEFAULT NULL COMMENT '剩余疑似',
`suspect_add` int(11) DEFAULT NULL COMMENT '当日新增疑似',
`heal` int(11) DEFAULT NULL COMMENT '累计治愈',
`heal_add` int(11) DEFAULT NULL COMMENT '当日新增治愈',
`dead` int(11) DEFAULT NULL COMMENT '累计死亡',
`dead_ add` int(11) DEFAULT NULL COMMENT '当日新增死亡',
PRIMARY KEY (`ds`) USING BTREE
) ENGINE= InnoDB DEFAULT CHARSET=utf8mb4;

创建成功:

 

 

 

 

 

 

其它三个表一样,就不赘述了,直接上代码:

 2、details

CREATE TABLE `details`(
`id` int(11)NOT NULL AUTO_INCREMENT,
`update_time` datetime DEFAULT NULL COMMENT '数据最后更新时间',
`province` varchar(50)DEFAULT NULL COMMENT '',
`city` varchar(50) DEFAULT NULL COMMENT '',
`confirm` int(11) DEFAULT NULL COMMENT '累计确诊',
`confirm_add` int(11) DEFAULT NULL COMMENT '新增确诊',
`heal` int(11) DEFAULT NULL COMMENT'累计治愈',
`dead` int(11) DEFAULT NULL COMMENT'累计死亡',
PRIMARY KEY ( `id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

3、time_broadcast表,实时播报

CREATE TABLE `time_broadcast`(
`id` int(11)NOT NULL AUTO_INCREMENT,
`dt` datetime DEFAULT NULL ,
`content` varchar(255)DEFAULT NULL,
PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

 4、global表、全球数据

create table global(
    id int(11) not null auto_increment,
    update_time datetime default null comment '数据最后更新时间',
    country varchar(50) not null comment'',
    confirm int(11) default null comment'累计确诊',
    confirm_add int(11) default null comment'新增确诊',
    heal int(11) default null comment'累计治愈',
    dead int(11) default null comment'累计死亡',
    primary key(id)
    )engine=InnoDB default charset=utf8mb4;

 

5、对应的数据库与表。

 

 

5.1、数据库的连接与关闭

# 链接数据库
def get_conn():
    # 建立连接, 因为是本地的mysql数据库,所以host可以用127.0.0.1
    # user就是安装配置mysql的用户名
    # password就是密码
    # db就是指定的数据库要有, 没有可以自己创建, create database test_db;  记住分号结尾 这句话就是创建test_db的数据库 
    conn = pymysql.connect(host="127.0.0.1", user="root", password="root", db="test_db", charset="utf8")
    # 创建游标
    cursor = conn.cursor()
    return conn, cursor


# 关闭数据库
def close_conn(conn, cursor):
    if cursor:
        cursor.close()
    if conn:
        conn.close()

 

 6、将history数据存储到MySQL数据库中history表中,并且可以实时更新,中国每日情况

 
# 将history数据存储到MySQL数据库中,并且可以实时更新,中国每日情况
def update_history():
    conn, cursor = get_conn()
    try:
        spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime())
        dic = get_china_data()[0]  # 0代表历史数据字典
        print(f"{spider_time}  开始更新历史数据")
        conn, cursor = get_conn()
        # 插入值
        sql = "insert into history value (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
        # 根据爬虫得到的最新数据的时间(k)到history数据表中查询是否包含此条记录,如果包含则不必插入数据,如果不包含则插入
        sql_query = "select confirm from history where ds = %s"
        for k, v in dic.items():
            # k是时间只有年月日,v数据{'confirm': 125620, 'suspect': 4, 'heal': 117432, 'dead': 5696, 'confirm_add': 75, 'suspect_add': 1, 'heal_add': 17, 'dead_add': 1}
            # 如果不包含数据则cursor.execute(sql_query, k)返回None, 更新时间的数据,相当于每天爬一次
            if not cursor.execute(sql_query, k):
                # 插入数据
                cursor.execute(sql, [k, v.get("confirm"), v.get("confirm_add"), v.get("suspect"),
                                     v.get("suspect_add"), v.get("heal"), v.get("heal_add"),
                                     v.get("dead"), v.get("dead_add")])
        conn.commit()
        print(f"{spider_time}  历史数据更新完毕")
    except:
        traceback.print_exc()
    finally:
        close_conn(conn, cursor)

 

7、将details数据存储到数据库中details表中,并可以更新

# 跟新每个省当天具体的的数据
def update_details():
    conn, cursor = get_conn()
    try:
        det = get_china_data()[1]  # 1代表details最新数据
        conn, cursor = get_conn()
        # 插入数据
        sql = "insert into details(update_time,province,city,confirm,confirm_add,heal,dead) values(%s,%s,%s,%s,%s,%s,%s)"
        # 选择时间,从  表,排序,倒叙,限制输出一个
        sql_query = "select update_time from details order by id desc limit 1"
        # 如果是第一次更新details数据,数据库中该表为空表,所以执行sql_query的返回值为0
        val = cursor.execute(sql_query)
        # 如果数据库中details表中有数据,则获取details表中最新的时间数据并保存至form_time中
        spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime())
        if val != 0:
            # 获取表格中的时间数据第一个
            form_time = cursor.fetchone()[0]

        # 与爬取数据的时间数据进行对比,如果不同则更新数据,反之。
        if val == 0 or str(form_time) != det[0][0]:
            print(f"{spider_time}  开始更新数据")
            for item in det:
                cursor.execute(sql, item)
            conn.commit()
            print(f"{spider_time}  更新到最新数据")
        else:
            print(f"{spider_time}  已是最新数据!")
    except:
        traceback.print_exc()
    finally:
        close_conn(conn, cursor)

 

8、将实时播报存放到数据库中的time_broadcast表中

# 将实时播报存放到数据库中的time_broadcast表中
def update_time_broadcast():
    cursor = None
    conn = None
    try:
        context = get_time_broadcast()
        spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime())
        print(f"{spider_time   }开始更新数据")
        conn, cursor = get_conn()
        sql = "insert into time_broadcast(dt,content) values(%s,%s)"
        ts = time.strftime("%Y-%m-%d %X")
        for i in context:
            cursor.execute(sql, (ts, i))
        conn.commit()
        print(f"{spider_time   }数据更新完毕")
    except:
        traceback.print_exc()
    finally:
        close_conn(conn, cursor)

 

 9、更新测试代码,要更新哪一个就将哪一个更新函数的注释取消

# 这是总体需要的模块
import time
import requests
import json
import pymysql


# 测试只有在本函数中启动才会有用,其它地方导入的话,不会启动下面代码
if __name__=="__main__": # update_history() # update_details() # update_foreign() update_time_broadcast()

 

 到这里算是完成了,可视化要等一段时间,不过如果比较急的话就去看看上面的参考的东西。

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM