《機器學習Python實現_03_二分類轉多分類的一般實現》


import numpy as np
import os
os.chdir('../')
import matplotlib.pyplot as plt
%matplotlib inline

簡介

上一講我們實現了一個簡單二元分類器:LogisticRegression,但通常情況下,我們面對的更多是多分類器的問題,而二分類轉多分類的通常做法也很朴素,一般分為兩種:one-vs-rest以及one-vs-one。顧名思義,one-vs-rest將多類別中的其中一類作為正類,剩余其他所有類別作為負類,對於n_class類別的分類問題,需要構建\(n\_class\)種分類器;而one-vs-one是指進行兩兩分類,這樣將會構造\(n\_class*(n\_class-1)/2\)種分類器,由於實現思路很簡單,就直接貼出代碼,將多分類實現封裝到MultiClassWrapper類,並放到ml_models.wrapper_models

from ml_models.linear_model import *
from ml_models.wrapper_models import *
#准備手寫數據
from sklearn.metrics import f1_score
from sklearn import model_selection
from sklearn import datasets
digits = datasets.load_digits()
data = digits['data']
target = digits['target']
X_train, X_test, y_train, y_test = model_selection.train_test_split(data, target, test_size=0.3,
                                                                    random_state=0)
#構建初始模型
lr = LogisticRegression()
#進行one-vs-rest訓練並評估
ovr = MultiClassWrapper(lr, mode='ovr')
ovr.fit(X_train, y_train)

y = ovr.predict(X_test)
print('ovr:', f1_score(y_test, y, average='macro'))
ovr: 0.9492701335705958
#進行one-vs-one訓練並評估
ovo = MultiClassWrapper(lr, mode='ovo')
ovo.fit(X_train, y_train)

y = ovo.predict(X_test)
print('ovo:', f1_score(y_test, y, average='macro'))
ovo: 0.959902103714483

MultiClassWrapper類實現細節

import threading
import copy
import numpy as np

"""
繼承Thread,獲取函數的返回值
"""


class MyThread(threading.Thread):
    def __init__(self, target, args, kwargs, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.target = target
        self.args = args
        self.kwargs = kwargs
        self.result = self.target(*self.args, **self.kwargs)

    def get_result(self):
        try:
            return self.result
        except:
            return None


class MultiClassWrapper(object):
    def __init__(self, base_classifier, mode='ovr'):
        """
        :param base_classifier: 實例化后的分類器
        :param mode: 'ovr'表示one-vs-rest方式,'ovo'表示one-vs-one方式
        """
        self.base_classifier = base_classifier
        self.mode = mode

    @staticmethod
    def fit_base_classifier(base_classifier, x, y, **kwargs):
        base_classifier.fit(x, y, **kwargs)

    @staticmethod
    def predict_proba_base_classifier(base_classifier, x):
        return base_classifier.predict_proba(x)

    def fit(self, x, y, **kwargs):
        # 對y分組並行fit
        self.n_class = np.max(y)
        if self.mode == 'ovr':
            # 打包數據
            self.classifiers = []

            for cls in range(0, self.n_class + 1):
                self.classifiers.append(copy.deepcopy(self.base_classifier))
            # 並行訓練
            tasks = []
            for cls in range(len(self.classifiers)):
                task = MyThread(target=self.fit_base_classifier,
                                args=(self.classifiers[cls], x, (y == cls).astype('int')), kwargs=kwargs)
                task.start()
                tasks.append(task)
            for task in tasks:
                task.join()
        elif self.mode == "ovo":
            # 打包數據
            self.classifiers = {}
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    self.classifiers[(first_cls, second_cls)] = copy.deepcopy(self.base_classifier)
            # 並行訓練
            tasks = {}
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    index = np.where(y == first_cls)[0].tolist() + np.where(y == second_cls)[0].tolist()
                    new_x = x[index, :]
                    new_y = y[index]
                    task = MyThread(target=self.fit_base_classifier,
                                    args=(self.classifiers[(first_cls, second_cls)], new_x,
                                          (new_y == first_cls).astype('int')), kwargs=kwargs)
                    task.start()
                    tasks[(first_cls, second_cls)] = task
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    tasks[(first_cls, second_cls)].join()

    def predict_proba(self, x, **kwargs):
        if self.mode == 'ovr':
            tasks = []
            probas = []
            for cls in range(len(self.classifiers)):
                task = MyThread(target=self.predict_proba_base_classifier, args=(self.classifiers[cls], x),
                                kwargs=kwargs)
                task.start()
                tasks.append(task)
            for task in tasks:
                task.join()
            for task in tasks:
                probas.append(task.get_result())
            total_probas = np.concatenate(probas, axis=1)
            # 歸一化
            return total_probas / total_probas.sum(axis=1, keepdims=True)
        elif self.mode == 'ovo':
            tasks = {}
            probas = {}
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    task = MyThread(target=self.predict_proba_base_classifier,
                                    args=(self.classifiers[(first_cls, second_cls)], x), kwargs=kwargs)
                    task.start()
                    tasks[(first_cls, second_cls)] = task
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    tasks[(first_cls, second_cls)].join()
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    probas[(first_cls, second_cls)] = tasks[(first_cls, second_cls)].get_result()
                    probas[(second_cls, first_cls)] = 1.0 - probas[(first_cls, second_cls)]
            # 統計概率
            total_probas = []
            for first_cls in range(0, self.n_class + 1):
                temp = []
                for second_cls in range(0, self.n_class + 1):
                    if first_cls != second_cls:
                        temp.append(probas[(first_cls, second_cls)])
                temp = np.concatenate(temp, axis=1).sum(axis=1, keepdims=True)
                total_probas.append(temp)
            # 歸一化
            total_probas = np.concatenate(total_probas, axis=1)
            return total_probas / total_probas.sum(axis=1, keepdims=True)

    def predict(self, x):
        return np.argmax(self.predict_proba(x), axis=1)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM