zl程序教程

您现在的位置是:首页 >  IT要闻

当前栏目

第7节:提升方法adboost及numpy复现

2023-04-18 16:18:32 时间

文章目录

提升boosting

“装袋”(bagging)和“提升”(boost)是构建组合模型的两种最主要的方法,所谓的组合模型是由多个基本模型构成的模型,组合模型的预测效果往往比任意一个基本模型的效果都要好。

  • 装袋:每个基本模型由从总体样本中随机抽样得到的不同数据集进行训练得到,通过重抽样得到不同训练数据集的过程称为装袋。
  • 提升:每个基本模型训练时的数据集采用不同权重,针对上一个基本模型分类错误的样本增加权重,使得新的模型重点关注误分类样本
  1. 提升boosting方法是一种常用的统计学习方法;
  2. 强可学习:在概率近似正确学习框架中,一个概念如果存在多项式的额学习算法能够学习他,并且正确率很高,则是强可学习的 3.弱可学习:比随机猜概率略好则称为弱可学习. 4.提升方法就是将很多个弱的分类器组合构成一个强分类器.
  3. 提升方法如何学习:1)在每一轮如何改变训练数据的权 值或概率分布;前一轮弱分类器错误分类样本的权值,而降低那些被正确分类样本的 权值。2)是如何将弱分类器组合成一个强分类器;AdaBoost采取加权多数表决的方法.

算法步骤

1)给每个训练样本 x_{1},x_{2},….,x_{N} 分配权重,初始权重w_{1} 均为1/N。 2)针对带有权值的样本进行训练,得到模型G_m (初始模型为G1)。 3)计算模型 G_m 的误分率 e_m=sum_{i=1}^Nw_iI(y_i ot= G_m(x_i)) 4)计算模型 alpha_m=0.5log[(1-e_m)/e_m] 5)根据误分率e和当前权重向量 w_m 更新权重向量 w_{m+1} D_{m+1}=(w_{m+1,1},...,w_{m+1,i},...,w_{m+1,N}) w_{m+1,i}=frac{w_{m,1}}{Z_{m}}exp(-a_{m}y_{i}G_{m}(x_{i})) Z_{m}=sum_{i=1}^Nw_{mi}exp(-a_{m}y_{i}G_{m}(x_{i}))

6)计算组合模型 f(x)=sum_{m=1}^Malpha_mG_m(x_i) 的误分率。 得到最终分类器: G(x)=sign(f(x))=sign(sum_{m=1}^Malpha_mG_m(x_i)) 7)当组合模型的误分率或迭代次数低于一定阈值,停止迭代;否则,回到步骤2)

numpy复现

# -*- coding:utf-8 -*-
# /usr/bin/python

import numpy as np

class AdaBoost():

    def __init__(self,nEstimators=50,learningRate=0.1):
        '''
        对象属性
        :param nEstimators: 迭代代数
        :param learningRate: 学习率
        '''
        self.nEstimators = nEstimators
        self.learningRate = learningRate

    def initArgs(self,datasets,labels):
        '''
        初始化参数
        :param datasets: 数据集
        :param labels: 标签
        :return:
        '''
        self.X = datasets
        self.Y = labels
        self.M,self.N = self.X.shape

        # 弱分类器
        self.clfSets=[]

        # 初始化权值
        self.weights = [1.0 / self.M]*self.M

        # 不同分类器的系数
        self.alpha = []

    def _G(self,features,labels,weights):
        m = len(features)
        print("m",m)
        error = 10000
        bestV = 0
        features_min = min(features)
        features_max = max(features)
        n_step = (features_max - features_min + self.learningRate) // self.learningRate
        print('n_step:{}'.format(n_step))
        direct, compare_array = None, None
        for i in range(1, int(n_step)):
            v = features_min + self.learningRate * i

            if v not in features:
                # 误分类计算
                compare_array_positive = np.array([1 if features[k] > v else -1 for k in range(m)])
                print(compare_array_positive)
                weight_error_positive = sum([weights[k] for k in range(m) if compare_array_positive[k] != labels[k]])
                print(weight_error_positive)

                compare_array_nagetive = np.array([-1 if features[k] > v else 1 for k in range(m)])
                weight_error_nagetive = sum([weights[k] for k in range(m) if compare_array_nagetive[k] != labels[k]])

                if weight_error_positive < weight_error_nagetive:
                    weight_error = weight_error_positive
                    _compare_array = compare_array_positive
                    direct = 'positive'
                else:
                    weight_error = weight_error_nagetive
                    _compare_array = compare_array_nagetive
                    direct = 'nagetive'
                print('v:{} error:{}'.format(v, weight_error))
                if weight_error < error:
                    error = weight_error
                    compare_array = _compare_array
                    bestV = v
        return bestV,direct,error,compare_array

    # 计算alpha
    def _alpha(self,error):
        '''
        更新alpha
        :param error: 误差
        :return: 新的alpha
        '''
        return 0.5 * np.log((1 - error) / error)

    def _Z(self,weights,a,clf):
        '''
        规范化因子
        :param weights: 权值
        :param a: 系数
        :param clf: 类别
        :return:
        '''
        return sum([weights[i] * np.exp(-1 * a * self.Y[i] * clf[i]) for i in range(self.M)])

    def _w(self,a,clf,Z):
        '''
        更新权值
        :param a: 系数
        :param clf: 类别
        :param Z:
        :return: 规范因子
        '''
        self.weights = self.weights * np.exp(-1 * a * self.Y * clf) / Z

    # G(x)的线性组合
    def _f(self, alpha, clf_sets):
        pass

    def G(self, x, v, direct):
        '''
        分类器
        :param x: 输入特征
        :param v: 阈值
        :param direct: 正负倾向
        :return:
        '''
        if direct == 'positive':
            return 1 if x > v else -1
        else:
            return -1 if x > v else 1

    def fit(self,X,y):
        '''
        训练
        :param X: x特征
        :param y: y标签
        :return:
        '''
        self.initArgs(X,y)
        for epoch in range(self.nEstimators):
            best_clf_error, bestV, clf_result = 100000, None, None

            for j in range(self.N):
                features = self.X[:, j]
                # 分类阈值,分类误差,分类结果
                v, direct, error, compare_array = self._G(features, self.Y, self.weights)

                if error < best_clf_error:
                    best_clf_error = error
                    bestV = v
                    final_direct = direct
                    clf_result = compare_array
                    axis = j
                if best_clf_error == 0:
                    break

            # 计算G(x)系数a
            a = self._alpha(best_clf_error)
            self.alpha.append(a)
            # 记录分类器
            self.clfSets.append((axis, bestV, final_direct))
            # 规范化因子
            Z = self._Z(self.weights, a, clf_result)
            # 权值更新
            self._w(a, clf_result, Z)

    def predict(self, feature):
        '''
        预测
        :param feature: x特征
        :return:
        '''
        result = 0.0
        for i in range(len(self.clfSets)):
            axis, clf_v, direct = self.clfSets[i]
            f_input = feature[axis]
            result += self.alpha[i] * self.G(f_input, clf_v, direct)
        # sign
        return 1 if result > 0 else -1

    def score(self, X_test, y_test):
        '''
        分数计算
        :param X_test: 测试数据集的x特征
        :param y_test: 测试数据集的y特征
        :return:
        '''
        right_count = 0
        for i in range(len(X_test)):
            feature = X_test[i]
            if self.predict(feature) == y_test[i]:
                right_count += 1

        return right_count / len(X_test)

X = np.arange(10).reshape(10,1)
y = np.array([1, 1, 1, -1, -1, -1, 1, 1, 1, -1])

clf = AdaBoost(nEstimators=10,learningRate=0.9)
clf.fit(X,y)

testX = np.array([[0],[9],[2]])
testY = np.array([1,-1,1])
print(clf.score(testX,testY))