本文對應代碼、數據及文獻資料已上傳至我的
Github
倉庫https://github.com/CNFeffery/DataScienceStudyNotes,對代碼不感興趣的朋友可以直接跳至2.2 探索新冠肺炎疫情數據查看疫情拐點分析結果
1 簡介
拐點檢測(Knee point detection),指的是在具有上升或下降趨勢的曲線中,在某一點之后整體趨勢明顯發生變化,這樣的點就稱為拐點(如圖1所示,在藍色標記出的點之后曲線陡然上升):

本文就將針對Python
中用於拐點檢測的第三方包kneed
進行介紹,並以新型冠狀肺炎數據為例,找出各指標數學意義上的拐點。
2 基於kneed的拐點檢測
2.1 kneed基礎
許多算法都需要利用肘部法則來確定某些關鍵參數,如K-means
中聚類個數k、DBSCAN
中的搜索半徑eps
等,在面對需要確定所謂肘部,即拐點時,人為通過觀察來確定位置的方式不嚴謹,需要一套有數學原理支撐的檢測方法,Jeannie Albrecht等人在Finding a “Kneedle” in a Haystack: Detecting Knee Points in System Behavior(你可以在文章開頭的Github
倉庫中找到)中從曲率的思想出發,針對離散型數據,結合離線、在線的不同應用場景以及Angle-based、Menger Curvature、EWMA等算法,提出了一套拐點檢測方法,kneed
就是對這篇論文所提出算法的實現。
使用pip install kneed
完成安裝之后,下面我們來了解其主要用法:
2.1.1 KneeLocator
KneeLocator
是kneed
中用於檢測拐點的模塊,其主要參數如下:
x:待檢測數據對應的橫軸數據序列,如時間點、日期等
y:待檢測數據序列,在x條件下對應的值,如x為星期一,對應的y為降水量
S:float型,默認為1,敏感度參數,越小對應拐點被檢測出得越快
curve:str型,指明曲線之上區域是凸集還是凹集,concave
代表凹,convex
代表凸
direction:str型,指明曲線初始趨勢是增還是減,increasing
表示增,decreasing
表示減
online:bool型,用於設置在線/離線識別模式,True表示在線,False表示離線;在線模式下會沿着x軸從右向左識別出每一個局部拐點,並在其中選擇最優的拐點;離線模式下會返回從右向左檢測到的第一個局部拐點
KneeLocator
在傳入參數實例化完成計算后,可返回的我們主要關注的屬性如下:
knee及elbow:返回檢測到的最優拐點對應的x
knee_y及elbow_y:返回檢測到的最優拐點對應的y
all_elbows及all_knees:返回檢測到的所有局部拐點對應的x
all_elbows_y及all_knees_y:返回檢測到的所有局部拐點對應的y
curve
與direction
參數非常重要,用它們組合出想要識別出的拐點模式,以正弦函數為例,在oonline
設置為True時,分別在curve='concave'
+direction='increasing'
、curve='concave'
+direction='decreasing'
、curve='convex'
+direction='increasing'
和curve='convex'
+direction='decreasing'
參數組合下對同一段余弦曲線進行拐點計算:
import matplotlib.pyplot as plt
from matplotlib import style
import numpy as np
from kneed import KneeLocator
style.use('seaborn-whitegrid')
x = np.arange(1, 3, 0.01)*np.pi
y = np.cos(x)
# 計算各種參數組合下的拐點
kneedle_cov_inc = KneeLocator(x,
y,
curve='convex',
direction='increasing',
online=True)
kneedle_cov_dec = KneeLocator(x,
y,
curve='convex',
direction='decreasing',
online=True)
kneedle_con_inc = KneeLocator(x,
y,
curve='concave',
direction='increasing',
online=True)
kneedle_con_dec = KneeLocator(x,
y,
curve='concave',
direction='decreasing',
online=True)
fig, axe = plt.subplots(2, 2, figsize=[12, 12])
axe[0, 0].plot(x, y, 'k--')
axe[0, 0].annotate(s='Knee Point', xy=(kneedle_cov_inc.knee+0.2, kneedle_cov_inc.knee_y), fontsize=10)
axe[0, 0].scatter(x=kneedle_cov_inc.knee, y=kneedle_cov_inc.knee_y, c='b', s=200, marker='^', alpha=1)
axe[0, 0].set_title('convex+increasing')
axe[0, 0].fill_between(np.arange(1, 1.5, 0.01)*np.pi, np.cos(np.arange(1, 1.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe[0, 0].set_ylim(-1, 1)
axe[0, 1].plot(x, y, 'k--')
axe[0, 1].annotate(s='Knee Point', xy=(kneedle_cov_dec.knee+0.2, kneedle_cov_dec.knee_y), fontsize=10)
axe[0, 1].scatter(x=kneedle_cov_dec.knee, y=kneedle_cov_dec.knee_y, c='b', s=200, marker='^', alpha=1)
axe[0, 1].fill_between(np.arange(2.5, 3, 0.01)*np.pi, np.cos(np.arange(2.5, 3, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe[0, 1].set_title('convex+decreasing')
axe[0, 1].set_ylim(-1, 1)
axe[1, 0].plot(x, y, 'k--')
axe[1, 0].annotate(s='Knee Point', xy=(kneedle_con_inc.knee+0.2, kneedle_con_inc.knee_y), fontsize=10)
axe[1, 0].scatter(x=kneedle_con_inc.knee, y=kneedle_con_inc.knee_y, c='b', s=200, marker='^', alpha=1)
axe[1, 0].fill_between(np.arange(1.5, 2, 0.01)*np.pi, np.cos(np.arange(1.5, 2, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe[1, 0].set_title('concave+increasing')
axe[1, 0].set_ylim(-1, 1)
axe[1, 1].plot(x, y, 'k--')
axe[1, 1].annotate(s='Knee Point', xy=(kneedle_con_dec.knee+0.2, kneedle_con_dec.knee_y), fontsize=10)
axe[1, 1].scatter(x=kneedle_con_dec.knee, y=kneedle_con_dec.knee_y, c='b', s=200, marker='^', alpha=1)
axe[1, 1].fill_between(np.arange(2, 2.5, 0.01)*np.pi, np.cos(np.arange(2, 2.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe[1, 1].set_title('concave+decreasing')
axe[1, 1].set_ylim(-1, 1)
# 導出圖像
plt.savefig('圖2.png', dpi=300)
圖中紅色區域分別對應符合參數條件的搜索區域,藍色三角形為每種參數組合下由kneed
檢測到的最優拐點:

下面我們擴大余弦函數中x的范圍,繪制出提取到的所有局部拐點:
x = np.arange(0, 6, 0.01)*np.pi
y = np.cos(x)
# 計算convex+increasing參數組合下的拐點
kneedle = KneeLocator(x,
y,
curve='convex',
direction='increasing',
online=True)
fig, axe = plt.subplots(figsize=[8, 4])
axe.plot(x, y, 'k--')
axe.annotate(s='Knee Point', xy=(kneedle.knee+0.2, kneedle.knee_y), fontsize=10)
axe.set_title('convex+increasing')
axe.fill_between(np.arange(1, 1.5, 0.01)*np.pi, np.cos(np.arange(1, 1.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe.fill_between(np.arange(3, 3.5, 0.01)*np.pi, np.cos(np.arange(3, 3.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe.fill_between(np.arange(5, 5.5, 0.01)*np.pi, np.cos(np.arange(5, 5.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe.scatter(x=list(kneedle.all_knees), y=np.cos(list(kneedle.all_knees)), c='b', s=200, marker='^', alpha=1)
axe.set_ylim(-1, 1)
# 導出圖像
plt.savefig('圖3.png', dpi=300)
得到的結果如圖3所示,其中注意,在使用kneed
檢測拐點時,落在最左或最右的拐點是無效拐點:

2.2 探索新冠肺炎疫情數據
接下來我們嘗試將上文介紹的kneed
應用到新冠肺炎數據上,來探究各個指標數學意義上的拐點是否已經出現。使用到的原始數據來自https://github.com/BlankerL/DXY-COVID-19-Data ,這個Github
倉庫以丁香園數據為數據源,實時同步更新粒度到城市級別的疫情發展數據,你可以在本文開頭提到的我的Github
倉庫對應本文路徑下找到下文使用到的數據,更新時間為2020-02-18 22:55:07,下面開始我們的分析。
首先我們讀入DXYArea.csv
文件,並查看其信息,為了后面方便處理我們在讀入時將updateTime列提前解析為時間格式:
import pandas as pd
raw = pd.read_csv('DXYArea.csv', parse_dates=['updateTime'])
raw.info()

查看其第一行信息:

可以看到,原始數據中包含了省、市信息,以及對應省及市的最新累計確診人數、累計疑似人數、累計治愈人數和累計死亡人數信息,我們的目的是檢測全國范圍內,累計確診人數、日新增確診人數、治愈率、死亡率隨時間(單位:天)變化下的曲線,是否已經出現數學意義上的拐點(由於武漢市數據變化的復雜性和特殊性,下面的分析只圍繞除武漢市之外的其他地區進行),首先我們對所有市取每天最晚一次更新的數據作為當天正式的記錄值:
# 抽取updateTime列中的年、月、日信息分別保存到新列中
raw['year'], raw['month'], raw['day'] = list(zip(*raw['updateTime'].apply(lambda d: (d.year, d.month, d.day))))
# 得到每天每個市最晚一次更新的疫情數據
temp = raw.sort_values(['provinceName', 'cityName', 'year', 'month', 'day', 'updateTime'],
ascending=False,
ignore_index=True).groupby(['provinceName', 'cityName', 'year', 'month', 'day']) \
.agg({'province_confirmedCount': 'first',
'province_curedCount': 'first',
'province_deadCount': 'first',
'city_confirmedCount': 'first',
'city_curedCount': 'first',
'city_deadCount': 'first'}) \
.reset_index(drop=False)
# 查看前5行
temp.head()

有了上面處理好的數據,接下來我們針對全國(除武漢市外)的相關指標的拐點進行分析。
首先我們來對截止到今天(2020-2-18)我們關心的指標進行計算並做一個基本的可視化:
# 計算各指標時序結果
# 全國(除武漢市外)累計確診人數
nationwide_confirmed_count = temp[temp['cityName'] != '武漢'].groupby(['year', 'month', 'day']) \
.agg({'city_confirmedCount': 'sum'}) \
.reset_index(drop=False)
# 全國(除武漢市外)累計治愈人數
nationwide_cured_count = temp[temp['cityName'] != '武漢'].groupby(['year', 'month', 'day']) \
.agg({'city_curedCount': 'sum'}) \
.reset_index(drop=False)
# 全國(除武漢市外)累計死亡人數
nationwide_dead_count = temp[temp['cityName'] != '武漢'].groupby(['year', 'month', 'day']) \
.agg({'city_deadCount': 'sum'}) \
.reset_index(drop=False)
# 全國(除武漢市外)每日新增確診人數,即為nationwide_confirmed_count的一階差分
nationwide_confirmed_inc_count = nationwide_confirmed_count['city_confirmedCount'].diff()[1:]
# 全國(除武漢市外)治愈率
nationwide_cured_ratio = nationwide_cured_count['city_curedCount'] / nationwide_confirmed_count['city_confirmedCount']
# 全國(除武漢市外)死亡率
nationwide_died_ratio = nationwide_dead_count['city_deadCount'] / nationwide_confirmed_count['city_confirmedCount']
# 繪圖
#解決中文顯示問題
plt.rcParams['font.sans-serif'] = ['KaiTi']
plt.rcParams['axes.unicode_minus'] = False
fig, axes = plt.subplots(3, 2, figsize=[12, 18])
axes[0, 0].plot(nationwide_confirmed_count.index, nationwide_confirmed_count['city_confirmedCount'], 'k--')
axes[0, 0].set_title('累計確診人數', fontsize=20)
axes[0, 0].set_xticks(nationwide_confirmed_count.index)
axes[0, 0].set_xticklabels([f"{nationwide_confirmed_count.loc[i, 'month']}-{nationwide_confirmed_count.loc[i, 'day']}"
for i in nationwide_confirmed_count.index], rotation=60)
axes[0, 1].plot(nationwide_cured_count.index, nationwide_cured_count['city_curedCount'], 'k--')
axes[0, 1].set_title('累計治愈人數', fontsize=20)
axes[0, 1].set_xticks(nationwide_cured_count.index)
axes[0, 1].set_xticklabels([f"{nationwide_cured_count.loc[i, 'month']}-{nationwide_cured_count.loc[i, 'day']}"
for i in nationwide_cured_count.index], rotation=60)
axes[1, 0].plot(nationwide_dead_count.index, nationwide_dead_count['city_deadCount'], 'k--')
axes[1, 0].set_title('累計死亡人數', fontsize=20)
axes[1, 0].set_xticks(nationwide_dead_count.index)
axes[1, 0].set_xticklabels([f"{nationwide_dead_count.loc[i, 'month']}-{nationwide_dead_count.loc[i, 'day']}"
for i in nationwide_dead_count.index], rotation=60)
axes[1, 1].plot(nationwide_confirmed_inc_count.index, nationwide_confirmed_inc_count, 'k--')
axes[1, 1].set_title('每日新增確診人數', fontsize=20)
axes[1, 1].set_xticks(nationwide_confirmed_inc_count.index)
axes[1, 1].set_xticklabels([f"{nationwide_confirmed_count.loc[i, 'month']}-{nationwide_confirmed_count.loc[i, 'day']}"
for i in nationwide_confirmed_inc_count.index], rotation=60)
axes[2, 0].plot(nationwide_cured_ratio.index, nationwide_cured_ratio, 'k--')
axes[2, 0].set_title('治愈率', fontsize=20)
axes[2, 0].set_xticks(nationwide_cured_ratio.index)
axes[2, 0].set_xticklabels([f"{nationwide_cured_count.loc[i, 'month']}-{nationwide_cured_count.loc[i, 'day']}"
for i in nationwide_cured_ratio.index], rotation=60)
axes[2, 1].plot(nationwide_died_ratio.index, nationwide_died_ratio, 'k--')
axes[2, 1].set_title('死亡率', fontsize=20)
axes[2, 1].set_xticks(nationwide_died_ratio.index)
axes[2, 1].set_xticklabels([f"{nationwide_dead_count.loc[i, 'month']}-{nationwide_dead_count.loc[i, 'day']}"
for i in nationwide_died_ratio.index], rotation=60)
fig.suptitle('全國范圍(除武漢外)', fontsize=30)
# 導出圖像
plt.savefig('圖7.png', dpi=300)

接着就到了檢測拐點的時候了,為了簡化代碼,我們先編寫自定義函數,用於從KneeLocator
中curve
和direction
參數的全部組合中搜索合法的拐點輸出值及對應拐點的趨勢變化類型,若無則返回None:
def knee_point_search(x, y):
# 轉為list以支持負號索引
x, y = x.tolist(), y.tolist()
output_knees = []
for curve in ['convex', 'concave']:
for direction in ['increasing', 'decreasing']:
model = KneeLocator(x=x, y=y, curve=curve, direction=direction, online=False)
if model.knee != x[0] and model.knee != x[-1]:
output_knees.append((model.knee, model.knee_y, curve, direction))
if output_knees.__len__() != 0:
print('發現拐點!')
return output_knees
else:
print('未發現拐點!')
下面我們對每個指標進行拐點搜索,先來看看累計確診數,經過程序的搜索,並未發現有效拐點:

接着檢測累計治愈數,發現了有效拐點:

在曲線圖上標記出拐點:
knee_info = knee_point_search(x=nationwide_cured_count.index,
y=nationwide_cured_count['city_curedCount'])
fig, axe = plt.subplots(figsize=[8, 6])
axe.plot(nationwide_cured_count.index, nationwide_cured_count['city_curedCount'], 'k--')
axe.set_title('累計治愈人數', fontsize=20)
axe.set_xticks(nationwide_cured_count.index)
axe.set_xticklabels([f"{nationwide_cured_count.loc[i, 'month']}-{nationwide_cured_count.loc[i, 'day']}"
for i in nationwide_cured_count.index], rotation=60)
for point in knee_info:
axe.scatter(x=point[0], y=point[1], c='b', s=200, marker='^')
axe.annotate(s=f'{point[2]} {point[3]}', xy=(point[0]+1, point[1]), fontsize=14)
# 導出圖像
plt.savefig('圖10.png', dpi=300)

結合其convex
+increasing
的特點,可以說明從2月5日開始,累計治愈人數有了明顯的加速上升趨勢。
再來看看累計死亡人數:

繪制出其拐點:

同樣在2月5日開始,累計死亡人數跟累計治愈人數同步,有了較為明顯的加速上升趨勢。
對於日新增確診數則找到了兩個拐點,雖然這個指標在變化趨勢上看波動較為明顯,但結合其參數信息還是可以推斷出其在第一個拐點處增速放緩,在第二個拐點出加速下降,說明全國除武漢之外的地區抗疫工作已經有了明顯的成果:

治愈率和死亡率同樣出現了拐點,其中治愈率出現加速上升的拐點,伴隨着廣大醫療工作者的辛勤付出,更好的療法加速了治愈率的上升:

死亡率雖然最新一次的拐點代表着加速上升,但通過比較其與治愈率的變化幅度比較可以看出,死亡率的絕對增長量十分微弱:

通過上面的分析,可以看出在這場針對新冠肺炎的特殊戰役中,到目前為止,除武漢外其他地區已取得階段性的進步,但仍然需要付出更大的努力來鞏固來之不易的改變,相信只要大家都能從自己做起,不給病毒留可趁之機,更加明顯的勝利拐點一定會出現。