import numpy as np
import os
os.chdir('../')
import matplotlib.pyplot as plt
%matplotlib inline
簡介
上一講我們實現了一個簡單二元分類器:LogisticRegression,但通常情況下,我們面對的更多是多分類器的問題,而二分類轉多分類的通常做法也很朴素,一般分為兩種:one-vs-rest以及one-vs-one。顧名思義,one-vs-rest將多類別中的其中一類作為正類,剩余其他所有類別作為負類,對於n_class
類別的分類問題,需要構建\(n\_class\)種分類器;而one-vs-one是指進行兩兩分類,這樣將會構造\(n\_class*(n\_class-1)/2\)種分類器,由於實現思路很簡單,就直接貼出代碼,將多分類實現封裝到MultiClassWrapper
類,並放到ml_models.wrapper_models
包
from ml_models.linear_model import *
from ml_models.wrapper_models import *
#准備手寫數據
from sklearn.metrics import f1_score
from sklearn import model_selection
from sklearn import datasets
digits = datasets.load_digits()
data = digits['data']
target = digits['target']
X_train, X_test, y_train, y_test = model_selection.train_test_split(data, target, test_size=0.3,
random_state=0)
#構建初始模型
lr = LogisticRegression()
#進行one-vs-rest訓練並評估
ovr = MultiClassWrapper(lr, mode='ovr')
ovr.fit(X_train, y_train)
y = ovr.predict(X_test)
print('ovr:', f1_score(y_test, y, average='macro'))
ovr: 0.9492701335705958
#進行one-vs-one訓練並評估
ovo = MultiClassWrapper(lr, mode='ovo')
ovo.fit(X_train, y_train)
y = ovo.predict(X_test)
print('ovo:', f1_score(y_test, y, average='macro'))
ovo: 0.959902103714483
MultiClassWrapper
類實現細節
import threading
import copy
import numpy as np
"""
繼承Thread,獲取函數的返回值
"""
class MyThread(threading.Thread):
def __init__(self, target, args, kwargs, name=''):
threading.Thread.__init__(self)
self.name = name
self.target = target
self.args = args
self.kwargs = kwargs
self.result = self.target(*self.args, **self.kwargs)
def get_result(self):
try:
return self.result
except:
return None
class MultiClassWrapper(object):
def __init__(self, base_classifier, mode='ovr'):
"""
:param base_classifier: 實例化后的分類器
:param mode: 'ovr'表示one-vs-rest方式,'ovo'表示one-vs-one方式
"""
self.base_classifier = base_classifier
self.mode = mode
@staticmethod
def fit_base_classifier(base_classifier, x, y, **kwargs):
base_classifier.fit(x, y, **kwargs)
@staticmethod
def predict_proba_base_classifier(base_classifier, x):
return base_classifier.predict_proba(x)
def fit(self, x, y, **kwargs):
# 對y分組並行fit
self.n_class = np.max(y)
if self.mode == 'ovr':
# 打包數據
self.classifiers = []
for cls in range(0, self.n_class + 1):
self.classifiers.append(copy.deepcopy(self.base_classifier))
# 並行訓練
tasks = []
for cls in range(len(self.classifiers)):
task = MyThread(target=self.fit_base_classifier,
args=(self.classifiers[cls], x, (y == cls).astype('int')), kwargs=kwargs)
task.start()
tasks.append(task)
for task in tasks:
task.join()
elif self.mode == "ovo":
# 打包數據
self.classifiers = {}
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
self.classifiers[(first_cls, second_cls)] = copy.deepcopy(self.base_classifier)
# 並行訓練
tasks = {}
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
index = np.where(y == first_cls)[0].tolist() + np.where(y == second_cls)[0].tolist()
new_x = x[index, :]
new_y = y[index]
task = MyThread(target=self.fit_base_classifier,
args=(self.classifiers[(first_cls, second_cls)], new_x,
(new_y == first_cls).astype('int')), kwargs=kwargs)
task.start()
tasks[(first_cls, second_cls)] = task
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
tasks[(first_cls, second_cls)].join()
def predict_proba(self, x, **kwargs):
if self.mode == 'ovr':
tasks = []
probas = []
for cls in range(len(self.classifiers)):
task = MyThread(target=self.predict_proba_base_classifier, args=(self.classifiers[cls], x),
kwargs=kwargs)
task.start()
tasks.append(task)
for task in tasks:
task.join()
for task in tasks:
probas.append(task.get_result())
total_probas = np.concatenate(probas, axis=1)
# 歸一化
return total_probas / total_probas.sum(axis=1, keepdims=True)
elif self.mode == 'ovo':
tasks = {}
probas = {}
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
task = MyThread(target=self.predict_proba_base_classifier,
args=(self.classifiers[(first_cls, second_cls)], x), kwargs=kwargs)
task.start()
tasks[(first_cls, second_cls)] = task
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
tasks[(first_cls, second_cls)].join()
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
probas[(first_cls, second_cls)] = tasks[(first_cls, second_cls)].get_result()
probas[(second_cls, first_cls)] = 1.0 - probas[(first_cls, second_cls)]
# 統計概率
total_probas = []
for first_cls in range(0, self.n_class + 1):
temp = []
for second_cls in range(0, self.n_class + 1):
if first_cls != second_cls:
temp.append(probas[(first_cls, second_cls)])
temp = np.concatenate(temp, axis=1).sum(axis=1, keepdims=True)
total_probas.append(temp)
# 歸一化
total_probas = np.concatenate(total_probas, axis=1)
return total_probas / total_probas.sum(axis=1, keepdims=True)
def predict(self, x):
return np.argmax(self.predict_proba(x), axis=1)