參考:Python爬取疫情實戰:Flask搭建web/Echarts可視化大屏/MySQL數據庫/Linux項目部署與任務定時調度_嗶哩嗶哩_bilibili
由於這個工作量比較大,所以我將分為兩個部分,今天就先實現,相關數據爬取,儲存以及如何取出來。可視化的話得過一段時間才能完成,還請盡情期待吧。
一、疫情數據來源分析
數據爬取連接:實時更新:新冠肺炎疫情最新動態 (qq.com)
1、分析數據,找到數據:
win+F12 開發者
首先先分析中國近幾個月的每日數據:
找到數據所在位置

1.1、查看數據格式:下面可以看到有點類似於字典的格式,在python中可以利用json將其轉化為字典的格式便於提取所需要的數據

2、當日數據:先找到數據所在位置
當日數據:

2.1、數據格式:
下面可以看到:第一行並不類似於字典而是從第二行開始才類似於,所以得截取合適的范圍,再利用python 的json模塊轉化為字典格式的數據,再提取所需要的數據。

將下面的藍色部分刪除后回車

完全變成了類似於字典的數據

3、全球數據:先找到數據的所在位置

3.1、分析數據格式:下面看到可以直接利用json

4、百度疫情實時播報數據來源分析

4.1、數據格式:可以通過截取,使其完全滿足json的要求,也可以

也可以把藍色的部分刪除,也可以完全滿足

刪除后就滿足了

二、利用python爬取數據
1、中國的數據分析


1.1、中國數據爬取代碼:
import time import requests import json # 獲取中國每日情況,和各個省的數據 def get_china_data(): # 每日情況的連接 china_everyDay_url = 'https://api.inews.qq.com/newsqa/v1/query/inner/publish/modules/list?modules=chinaDayList,chinaDayAddList,nowConfirmStatis,provinceCompare' # 爬取當天的連接 china_today_details_url = 'https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5&callback=&_=%d' % int(time.time() * 1000) # 設置請求頭,防止爬蟲失敗 headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36" } # 將得到對象轉化為string的格式才能轉化為json 獲取該對象的文本內容,它的類型就是str response1 = requests.get(china_everyDay_url, headers).text # 將json格式轉換為字典格式 res1 = json.loads(response1) data_all1 = res1["data"] # 每天的情況 history = {} # 把確診的情況以字典的形式加到history字典中 for i in data_all1["chinaDayList"]: # 時間 下面將其轉化為 類似於2021.12.04 ds = i['y']+'.'+ i["date"] tup = time.strptime(ds, "%Y.%m.%d") # 匹配時間 ds = time.strftime("%Y.%m.%d", tup) # 改變時間格式 # 確診病例 confirm = i["confirm"] # 疑似病例 suspect = i["suspect"] # 自愈病例 heal = i["heal"] # 死亡病例 dead = i["dead"] history[ds] = {"confirm": confirm, "suspect": suspect, "heal": heal, "dead": dead} # 把新增的以字典的形式加到history中 for i in data_all1["chinaDayAddList"]: ds = i["y"]+'.' + i["date"] # 轉化格式 tup = time.strptime(ds, "%Y.%m.%d") # 匹配時間 ds = time.strftime("%Y.%m.%d", tup) # 改變時間格式 # 新增確診 confirm = i["confirm"] # 新增疑似 suspect = i["suspect"] # 新增自愈 heal = i["heal"] # 新增死亡 dead = i["dead"] history[ds].update({"confirm_add": confirm, "suspect_add": suspect, "heal_add": heal, "dead_add": dead}) # 當日的詳情數據 details = [] response2 = requests.get(china_today_details_url, headers) res = json.loads(response2.text) # json字符串轉字典 data_all1 = json.loads(res['data']) update_time = data_all1["lastUpdateTime"] data_country = data_all1["areaTree"] data_province = data_country[0]["children"] for pro_infos in data_province: # 省份 province = pro_infos["name"] # 省份下的市區 for city_infos in pro_infos["children"]: # 市名 city = city_infos["name"] # 新增確診 confirm_add = city_infos["today"]["confirm"] # 確診 confirm = city_infos["total"]["confirm"] # 死亡 dead = city_infos["total"]["dead"] # 自愈 heal = city_infos["total"]["heal"] details.append([update_time, province, city, confirm, confirm_add, heal, dead]) return history, details
2、全球疫情數據分析:

2.1 、全球數據爬取代碼
# 返回數據格式:國名 : {'date': '2021-12-28', 'confirm_add': 229016, 'confirm': 53791852, 'heal': 41203698, 'dead': 839605} def get_foreign_data(): start_time = time.time() urls = 'https://api.inews.qq.com/newsqa/v1/automation/modules/list?modules=FAutoCountryConfirmAdd,WomWorld,WomAboard' header = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36" } # value 格式{'date': '2021-02-01', 'confirm_add': 0, 'confirm': 2, 'heal': 0, 'dead': 0} response = requests.get(urls,header) data_all = json.loads(response.text)['data']["WomAboard"] country_data = {} for data in data_all: # 活得國家名 country = data['name'] value = {} # 時間 以及格式化: 2021-12-28 value['date'] = data['y']+data['date'] tup = time.strptime(value['date'], '%Y%m.%d') update_time = time.strftime('%Y-%m-%d', tup) # 改變時間格式,不然插入數據庫會報錯 value['date'] = update_time print(update_time) # 增加 value['confirm_add'] = data['confirmAdd'] # 確診 value['confirm'] = data['confirm'] # 治愈 value['heal'] = data['heal'] # 死亡 value['dead'] = data['dead'] country_data[country] = value print("各國數據請求完畢:", time.time() - start_time, '秒') return country_data
3、疫情實時播報數據分析

3.1、實時播報爬取代碼
# 實時播報#time broadcast def get_time_broadcast(): url = 'https://opendata.baidu.com/data/inner?tn=reserved_all_res_tn&dspName=iphone&from_sf=1&dsp=iphone&resource_id=28565&alr=1&query=%E5%9B%BD%E5%86%85%E6%96%B0%E5%9E%8B%E8%82%BA%E7%82%8E%E6%9C%80%E6%96%B0%E5%8A%A8%E6%80%81&cb=jsonp_1640266664481_72760' # 設置請求頭,防止爬蟲失敗 headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36" } response = requests.get(url, headers) # 截取使其滿足條件,可以轉化為json data = response.text[26:-1] data_json = json.loads(data)['Result'][0]['DisplayData']['result']['items'] context = [] for ev in data_json: # 獲取事件 eventDescription = ev['eventDescription'] # 獲取時間參數 eventTime = ev['eventTime'] # 拼接在一起 context.append(eventDescription + eventTime) return context
三、存儲爬取的數據
這里是利用MySQL進行存儲數據
這里需要提前安裝mysql,自己去搜索吧!
1、創建history表,每日情況
CREATE TABLE `history`( `ds` datetime NOT NULL COMMENT '日期', `confirm` int(11) DEFAULT NULL COMMENT '累計確診', `confirm_add` int(11) DEFAULT NULL COMMENT '當日新增確診', `suspect` int(11) DEFAULT NULL COMMENT '剩余疑似', `suspect_add` int(11) DEFAULT NULL COMMENT '當日新增疑似', `heal` int(11) DEFAULT NULL COMMENT '累計治愈', `heal_add` int(11) DEFAULT NULL COMMENT '當日新增治愈', `dead` int(11) DEFAULT NULL COMMENT '累計死亡', `dead_ add` int(11) DEFAULT NULL COMMENT '當日新增死亡', PRIMARY KEY (`ds`) USING BTREE ) ENGINE= InnoDB DEFAULT CHARSET=utf8mb4;
創建成功:

其它三個表一樣,就不贅述了,直接上代碼:
2、details
CREATE TABLE `details`( `id` int(11)NOT NULL AUTO_INCREMENT, `update_time` datetime DEFAULT NULL COMMENT '數據最后更新時間', `province` varchar(50)DEFAULT NULL COMMENT '省', `city` varchar(50) DEFAULT NULL COMMENT '市', `confirm` int(11) DEFAULT NULL COMMENT '累計確診', `confirm_add` int(11) DEFAULT NULL COMMENT '新增確診', `heal` int(11) DEFAULT NULL COMMENT'累計治愈', `dead` int(11) DEFAULT NULL COMMENT'累計死亡', PRIMARY KEY ( `id`) )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3、time_broadcast表,實時播報
CREATE TABLE `time_broadcast`( `id` int(11)NOT NULL AUTO_INCREMENT, `dt` datetime DEFAULT NULL , `content` varchar(255)DEFAULT NULL, PRIMARY KEY (`id`) )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
4、global表、全球數據
create table global( id int(11) not null auto_increment, update_time datetime default null comment '數據最后更新時間', country varchar(50) not null comment'國', confirm int(11) default null comment'累計確診', confirm_add int(11) default null comment'新增確診', heal int(11) default null comment'累計治愈', dead int(11) default null comment'累計死亡', primary key(id) )engine=InnoDB default charset=utf8mb4;
5、對應的數據庫與表。

5.1、數據庫的連接與關閉
# 鏈接數據庫 def get_conn(): # 建立連接, 因為是本地的mysql數據庫,所以host可以用127.0.0.1 # user就是安裝配置mysql的用戶名 # password就是密碼 # db就是指定的數據庫要有, 沒有可以自己創建, create database test_db; 記住分號結尾 這句話就是創建test_db的數據庫 conn = pymysql.connect(host="127.0.0.1", user="root", password="root", db="test_db", charset="utf8") # 創建游標 cursor = conn.cursor() return conn, cursor # 關閉數據庫 def close_conn(conn, cursor): if cursor: cursor.close() if conn: conn.close()
6、將history數據存儲到MySQL數據庫中history表中,並且可以實時更新,中國每日情況
# 將history數據存儲到MySQL數據庫中,並且可以實時更新,中國每日情況 def update_history(): conn, cursor = get_conn() try: spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime()) dic = get_china_data()[0] # 0代表歷史數據字典 print(f"{spider_time} 開始更新歷史數據") conn, cursor = get_conn() # 插入值 sql = "insert into history value (%s,%s,%s,%s,%s,%s,%s,%s,%s)" # 根據爬蟲得到的最新數據的時間(k)到history數據表中查詢是否包含此條記錄,如果包含則不必插入數據,如果不包含則插入 sql_query = "select confirm from history where ds = %s" for k, v in dic.items(): # k是時間只有年月日,v數據{'confirm': 125620, 'suspect': 4, 'heal': 117432, 'dead': 5696, 'confirm_add': 75, 'suspect_add': 1, 'heal_add': 17, 'dead_add': 1} # 如果不包含數據則cursor.execute(sql_query, k)返回None, 更新時間的數據,相當於每天爬一次 if not cursor.execute(sql_query, k): # 插入數據 cursor.execute(sql, [k, v.get("confirm"), v.get("confirm_add"), v.get("suspect"), v.get("suspect_add"), v.get("heal"), v.get("heal_add"), v.get("dead"), v.get("dead_add")]) conn.commit() print(f"{spider_time} 歷史數據更新完畢") except: traceback.print_exc() finally: close_conn(conn, cursor)
7、將details數據存儲到數據庫中details表中,並可以更新
# 跟新每個省當天具體的的數據 def update_details(): conn, cursor = get_conn() try: det = get_china_data()[1] # 1代表details最新數據 conn, cursor = get_conn() # 插入數據 sql = "insert into details(update_time,province,city,confirm,confirm_add,heal,dead) values(%s,%s,%s,%s,%s,%s,%s)" # 選擇時間,從 表,排序,倒敘,限制輸出一個 sql_query = "select update_time from details order by id desc limit 1" # 如果是第一次更新details數據,數據庫中該表為空表,所以執行sql_query的返回值為0 val = cursor.execute(sql_query) # 如果數據庫中details表中有數據,則獲取details表中最新的時間數據並保存至form_time中 spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime()) if val != 0: # 獲取表格中的時間數據第一個 form_time = cursor.fetchone()[0] # 與爬取數據的時間數據進行對比,如果不同則更新數據,反之。 if val == 0 or str(form_time) != det[0][0]: print(f"{spider_time} 開始更新數據") for item in det: cursor.execute(sql, item) conn.commit() print(f"{spider_time} 更新到最新數據") else: print(f"{spider_time} 已是最新數據!") except: traceback.print_exc() finally: close_conn(conn, cursor)
8、將實時播報存放到數據庫中的time_broadcast表中
# 將實時播報存放到數據庫中的time_broadcast表中 def update_time_broadcast(): cursor = None conn = None try: context = get_time_broadcast() spider_time = time.strftime(f"%Y{'年'}%m{'月'}%d{'日'} %H:%M:%S", time.localtime()) print(f"{spider_time }開始更新數據") conn, cursor = get_conn() sql = "insert into time_broadcast(dt,content) values(%s,%s)" ts = time.strftime("%Y-%m-%d %X") for i in context: cursor.execute(sql, (ts, i)) conn.commit() print(f"{spider_time }數據更新完畢") except: traceback.print_exc() finally: close_conn(conn, cursor)
9、更新測試代碼,要更新哪一個就將哪一個更新函數的注釋取消
# 這是總體需要的模塊 import time import requests import json import pymysql
# 測試只有在本函數中啟動才會有用,其它地方導入的話,不會啟動下面代碼 if __name__=="__main__": # update_history() # update_details() # update_foreign() update_time_broadcast()
到這里算是完成了,可視化要等一段時間,不過如果比較急的話就去看看上面的參考的東西。
