参考:Python爬取疫情实战:Flask搭建web/Echarts可视化大屏/MySQL数据库/Linux项目部署与任务定时调度_哔哩哔哩_bilibili
由于这个工作量比较大,所以我将分为两个部分,今天就先实现,相关数据爬取,储存以及如何取出来。可视化的话得过一段时间才能完成,还请尽情期待吧。
一、疫情数据来源分析
数据爬取连接:实时更新:新冠肺炎疫情最新动态 (qq.com)
1、分析数据,找到数据:
win+F12 开发者
首先先分析中国近几个月的每日数据:
找到数据所在位置
1.1、查看数据格式:下面可以看到有点类似于字典的格式,在python中可以利用json将其转化为字典的格式便于提取所需要的数据
2、当日数据:先找到数据所在位置
当日数据:
2.1、数据格式:
下面可以看到:第一行并不类似于字典而是从第二行开始才类似于,所以得截取合适的范围,再利用python 的json模块转化为字典格式的数据,再提取所需要的数据。
将下面的蓝色部分删除后回车
完全变成了类似于字典的数据
3、全球数据:先找到数据的所在位置
3.1、分析数据格式:下面看到可以直接利用json
4、百度疫情实时播报数据来源分析
4.1、数据格式:可以通过截取,使其完全满足json的要求,也可以
也可以把蓝色的部分删除,也可以完全满足
删除后就满足了
二、利用python爬取数据
1、中国的数据分析
1.1、中国数据爬取代码:
import time import requests import json # 获取中国每日情况,和各个省的数据 def get_china_data(): # 每日情况的连接 china_everyDay_url = 'https://api.inews.qq.com/newsqa/v1/query/inner/publish/modules/list?modules=chinaDayList,chinaDayAddList,nowConfirmStatis,provinceCompare' # 爬取当天的连接 china_today_details_url = 'https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5&callback=&_=%d' % int(time.time() * 1000) # 设置请求头,防止爬虫失败 headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36" } # 将得到对象转化为string的格式才能转化为json 获取该对象的文本内容,它的类型就是str response1 = requests.get(china_everyDay_url, headers).text # 将json格式转换为字典格式 res1 = json.loads(response1) data_all1 = res1["data"] # 每天的情况 history = {} # 把确诊的情况以字典的形式加到history字典中 for i in data_all1["chinaDayList"]: # 时间 下面将其转化为 类似于2021.12.04 ds = i['y']+'.'+ i["date"] tup = time.strptime(ds, "%Y.%m.%d") # 匹配时间 ds = time.strftime("%Y.%m.%d", tup) # 改变时间格式 # 确诊病例 confirm = i["confirm"] # 疑似病例 suspect = i["suspect"] # 自愈病例 heal = i["heal"] # 死亡病例 dead = i["dead"] history[ds] = {"confirm": confirm, "suspect": suspect, "heal": heal, "dead": dead} # 把新增的以字典的形式加到history中 for i in data_all1["chinaDayAddList"]: ds = i["y"]+'.' + i["date"] # 转化格式 tup = time.strptime(ds, "%Y.%m.%d") # 匹配时间 ds = time.strftime("%Y.%m.%d", tup) # 改变时间格式 # 新增确诊 confirm = i["confirm"] # 新增疑似 suspect = i["suspect"] # 新增自愈 heal = i["heal"] # 新增死亡 dead = i["dead"] history[ds].update({"confirm_add": confirm, "suspect_add": suspect, "heal_add": heal, "dead_add": dead}) # 当日的详情数据 details = [] response2 = requests.get(china_today_details_url, headers) res = json.loads(response2.text) # json字符串转字典 data_all1 = json.loads(res['data']) update_time = data_all1["lastUpdateTime"] data_country = data_all1["areaTree"] data_province = data_country[0]["children"] for pro_infos in data_province: # 省份 province = pro_infos["name"] # 省份下的市区 for city_infos in pro_infos["children"]: # 市名 city = city_infos["name"] # 新增确诊 confirm_add = city_infos["today"]["confirm"] # 确诊 confirm = city_infos["total"]["confirm"] # 死亡 dead = city_infos["total"]["dead"] # 自愈 heal = city_infos["total"]["heal"] details.append([update_time, province, city, confirm, confirm_add, heal, dead]) return history, details
2、全球疫情数据分析:
2.1 、全球数据爬取代码
# 返回数据格式:国名 : {'date': '2021-12-28', 'confirm_add': 229016, 'confirm': 53791852, 'heal': 41203698, 'dead': 839605} def get_foreign_data(): start_time = time.time() urls = 'https://api.inews.qq.com/newsqa/v1/automation/modules/list?modules=FAutoCountryConfirmAdd,WomWorld,WomAboard' header = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36" } # value 格式{'date': '2021-02-01', 'confirm_add': 0, 'confirm': 2, 'heal': 0, 'dead': 0} response = requests.get(urls,header) data_all = json.loads(response.text)['data']["WomAboard"] country_data = {} for data in data_all: # 活得国家名 country = data['name'] value = {} # 时间 以及格式化: 2021-12-28 value['date'] = data['y']+data['date'] tup = time.strptime(value['date'], '%Y%m.%d') update_time = time.strftime('%Y-%m-%d', tup) # 改变时间格式,不然插入数据库会报错 value['date'] = update_time print(update_time) # 增加 value['confirm_add'] = data['confirmAdd'] # 确诊 value['confirm'] = data['confirm'] # 治愈 value['heal'] = data['heal'] # 死亡 value['dead'] = data['dead'] country_data[country] = value print("各国数据请求完毕:", time.time() - start_time, '秒') return country_data
3、疫情实时播报数据分析
3.1、实时播报爬取代码
# 实时播报#time broadcast def get_time_broadcast(): url = 'https://opendata.baidu.com/data/inner?tn=reserved_all_res_tn&dspName=iphone&from_sf=1&dsp=iphone&resource_id=28565&alr=1&query=%E5%9B%BD%E5%86%85%E6%96%B0%E5%9E%8B%E8%82%BA%E7%82%8E%E6%9C%80%E6%96%B0%E5%8A%A8%E6%80%81&cb=jsonp_1640266664481_72760' # 设置请求头,防止爬虫失败 headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36" } response = requests.get(url, headers) # 截取使其满足条件,可以转化为json data = response.text[26:-1] data_json = json.loads(data)['Result'][0]['DisplayData']['result']['items'] context = [] for ev in data_json: # 获取事件 eventDescription = ev['eventDescription'] # 获取时间参数 eventTime = ev['eventTime'] # 拼接在一起 context.append(eventDescription + eventTime) return context
三、存储爬取的数据
这里是利用MySQL进行存储数据
这里需要提前安装mysql,自己去搜索吧!
1、创建history表,每日情况
CREATE TABLE `history`( `ds` datetime NOT NULL COMMENT '日期', `confirm` int(11) DEFAULT NULL COMMENT '累计确诊', `confirm_add` int(11) DEFAULT NULL COMMENT '当日新增确诊', `suspect` int(11) DEFAULT NULL COMMENT '剩余疑似', `suspect_add` int(11) DEFAULT NULL COMMENT '当日新增疑似', `heal` int(11) DEFAULT NULL COMMENT '累计治愈', `heal_add` int(11) DEFAULT NULL COMMENT '当日新增治愈', `dead` int(11) DEFAULT NULL COMMENT '累计死亡', `dead_ add` int(11) DEFAULT NULL COMMENT '当日新增死亡', PRIMARY KEY (`ds`) USING BTREE ) ENGINE= InnoDB DEFAULT CHARSET=utf8mb4;
创建成功:

其它三个表一样,就不赘述了,直接上代码:
2、details
CREATE TABLE `details`( `id` int(11)NOT NULL AUTO_INCREMENT, `update_time` datetime DEFAULT NULL COMMENT '数据最后更新时间', `province` varchar(50)DEFAULT NULL COMMENT '省', `city` varchar(50) DEFAULT NULL COMMENT '市', `confirm` int(11) DEFAULT NULL COMMENT '累计确诊', `confirm_add` int(11) DEFAULT NULL COMMENT '新增确诊', `heal` int(11) DEFAULT NULL COMMENT'累计治愈', `dead` int(11) DEFAULT NULL COMMENT'累计死亡', PRIMARY KEY ( `id`) )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3、time_broadcast表,实时播报
CREATE TABLE `time_broadcast`( `id` int(11)NOT NULL AUTO_INCREMENT, `dt` datetime DEFAULT NULL , `content` varchar(255)DEFAULT NULL, PRIMARY KEY (`id`) )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
4、global表、全球数据
create table global( id int(11) not null auto_increment, update_time datetime default null comment '数据最后更新时间', country varchar(50) not null comment'国', confirm int(11) default null comment'累计确诊', confirm_add int(11) default null comment'新增确诊', heal int(11) default null comment'累计治愈', dead int(11) default null comment'累计死亡', primary key(id) )engine=InnoDB default charset=utf8mb4;
5、对应的数据库与表。
5.1、数据库的连接与关闭
# 链接数据库 def get_conn(): # 建立连接, 因为是本地的mysql数据库,所以host可以用127.0.0.1 # user就是安装配置mysql的用户名 # password就是密码 # db就是指定的数据库要有, 没有可以自己创建, create database test_db; 记住分号结尾 这句话就是创建test_db的数据库 conn = pymysql.connect(host="127.0.0.1", user="root", password="root", db="test_db", charset="utf8") # 创建游标 cursor = conn.cursor() return conn, cursor # 关闭数据库 def close_conn(conn, cursor): if cursor: cursor.close() if conn: conn.close()
6、将history数据存储到MySQL数据库中history表中,并且可以实时更新,中国每日情况
# 将history数据存储到MySQL数据库中,并且可以实时更新,中国每日情况 def update_history(): conn, cursor = get_conn() try: spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime()) dic = get_china_data()[0] # 0代表历史数据字典 print(f"{spider_time} 开始更新历史数据") conn, cursor = get_conn() # 插入值 sql = "insert into history value (%s,%s,%s,%s,%s,%s,%s,%s,%s)" # 根据爬虫得到的最新数据的时间(k)到history数据表中查询是否包含此条记录,如果包含则不必插入数据,如果不包含则插入 sql_query = "select confirm from history where ds = %s" for k, v in dic.items(): # k是时间只有年月日,v数据{'confirm': 125620, 'suspect': 4, 'heal': 117432, 'dead': 5696, 'confirm_add': 75, 'suspect_add': 1, 'heal_add': 17, 'dead_add': 1} # 如果不包含数据则cursor.execute(sql_query, k)返回None, 更新时间的数据,相当于每天爬一次 if not cursor.execute(sql_query, k): # 插入数据 cursor.execute(sql, [k, v.get("confirm"), v.get("confirm_add"), v.get("suspect"), v.get("suspect_add"), v.get("heal"), v.get("heal_add"), v.get("dead"), v.get("dead_add")]) conn.commit() print(f"{spider_time} 历史数据更新完毕") except: traceback.print_exc() finally: close_conn(conn, cursor)
7、将details数据存储到数据库中details表中,并可以更新
# 跟新每个省当天具体的的数据 def update_details(): conn, cursor = get_conn() try: det = get_china_data()[1] # 1代表details最新数据 conn, cursor = get_conn() # 插入数据 sql = "insert into details(update_time,province,city,confirm,confirm_add,heal,dead) values(%s,%s,%s,%s,%s,%s,%s)" # 选择时间,从 表,排序,倒叙,限制输出一个 sql_query = "select update_time from details order by id desc limit 1" # 如果是第一次更新details数据,数据库中该表为空表,所以执行sql_query的返回值为0 val = cursor.execute(sql_query) # 如果数据库中details表中有数据,则获取details表中最新的时间数据并保存至form_time中 spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime()) if val != 0: # 获取表格中的时间数据第一个 form_time = cursor.fetchone()[0] # 与爬取数据的时间数据进行对比,如果不同则更新数据,反之。 if val == 0 or str(form_time) != det[0][0]: print(f"{spider_time} 开始更新数据") for item in det: cursor.execute(sql, item) conn.commit() print(f"{spider_time} 更新到最新数据") else: print(f"{spider_time} 已是最新数据!") except: traceback.print_exc() finally: close_conn(conn, cursor)
8、将实时播报存放到数据库中的time_broadcast表中
# 将实时播报存放到数据库中的time_broadcast表中 def update_time_broadcast(): cursor = None conn = None try: context = get_time_broadcast() spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime()) print(f"{spider_time }开始更新数据") conn, cursor = get_conn() sql = "insert into time_broadcast(dt,content) values(%s,%s)" ts = time.strftime("%Y-%m-%d %X") for i in context: cursor.execute(sql, (ts, i)) conn.commit() print(f"{spider_time }数据更新完毕") except: traceback.print_exc() finally: close_conn(conn, cursor)
9、更新测试代码,要更新哪一个就将哪一个更新函数的注释取消
# 这是总体需要的模块 import time import requests import json import pymysql
# 测试只有在本函数中启动才会有用,其它地方导入的话,不会启动下面代码 if __name__=="__main__": # update_history() # update_details() # update_foreign() update_time_broadcast()
到这里算是完成了,可视化要等一段时间,不过如果比较急的话就去看看上面的参考的东西。