zl程序教程

您现在的位置是:首页 >  其他

当前栏目

《大数据+AI在大健康领域中最佳实践前瞻》---- 基于 pyspark + xgboost 算法的 欺诈检测 DEMO实践

2023-04-18 12:57:29 时间

文章大纲

xgboost 和pyspark 如何配置呢? 请参考之前的博文:

使用 WSL 进行pyspark + xgboost 分类+特征重要性 简单实践

银行需要面对数量不断上升的欺诈案件。随着新技术的出现,欺诈事件的实例将会成倍增加,银行很难检查每笔交易并手动识别欺诈模式。RPA使用“if-then”方法识别潜在的欺诈行为并将其标记给相关部门。例如,如果在短时间内进行了多次交易, RPA会识别该账户并将其标记为潜在威胁。这有助于银行仔细审查账户并调查欺诈行为。

欺诈检测一般性处理流程介绍

流程图说明 正如我们在上面看到的,我们接收我们的输入,包括关于金融数据中个人保险索赔的数据(这些包含索赔特征、客户特征和保险特征)。

经过一些预处理和添加新的特征,我们使用数据来训练XGBOOST分类器。 在分类器被训练之后,它可以用来确定新记录是否被接受(不欺诈)或被拒绝(欺诈)。 下面将更详细地描述该过程的流程。

当我们和客户交流后,需要针对每个字段进行理解,客户会给到我们一个数据说明表格:

输入 Our input consists of a dataset with lines for each claim. The claims contain data about the customers, the types of claims, claim amounts, and other relevant features.

Data Preparation (Preprocessing, Generation of Code Features, and Generation of Customer Segmentation Features)

We first do some initial preprocessing to convert the data fields to a suitable format. Then, based on the input, we generate features which characterize the customer based on factors like number of previous claims, number of previous occurences of fraud, total amount claimed, etc. These customer segmentation features are added to the existing dataset along with features that detail the presence (or lack thereof) of Warning Codes, Diagnosis Codes, etc.

我们首先做一些初始的预处理,将数据字段转换成合适的格式。然后,基于输入,我们生成特征,这些特征基于以前索赔次数、以前欺诈发生次数、索赔总额等因素来描述客户。这些客户细分特征与详细说明警告代码存在(或缺乏)的特征一起添加到现有数据集中,诊断代码等。

Preprocessing consists of :

  • Log Transformation on high magnitude numerical features
  • One-hot encoding of categorical features
  • Timestamp transforms on date-time features

Customer Segmentation Features in the data include : Summation of ORG_PRES_AMT_VALUE for a particular customer until a particular timestamp Summation of TOTAL_RECEIPT_AMT for a particular customer until a particular timestamp Summation of APP_AMT Summation of CL_SOCIAL_PAY_AMT for a particular customer until a particular timestamp Summation of CL_OWNER_PAY_AMT for a particular customer until a particular timestamp Summation of REJECTED_AMT Average of COPAY_PCT for a particular customer until a particular timestamp Max value of NO_OF_YEAR for a particular customer until a particular timestamp Number of CL_LINE_STATUS_RJ lines Number of CL_LINE_STATUS_AC lines Ratio of CL_LINE_STATUS_RJ lines to CL_LINE_STATUS_AC lines Number of BEN_SPEND_Applicant lines Number of BEN_SPEND_Spouse lines Number of BEN_SPEND_Child lines Number of BEN_SPEND_Parents lines Number of lines Ratio of Total Rejected Amount to Total ORG_PRES_AMT_VALUE Ratio of CL_LINE_STATUS_RJ lines to all lines Ratio of CL_LINE_STATUS_AC lines to all lines Ratio of BEN_SPEND_Applicant lines to all lines Ratio of BEN_SPEND_Spouse lines to all lines Ratio of BEN_SPEND_Child lines to all lines Ratio of BEN_SPEND_Parents lines to all lines

Code Features are generated by selecting a particular code field, determining the top 20 most frequently occuring items in that particular field, and then creating a feature for each of those 20 items, indicating whether they are present in the record or not. The code fields for which we generate Code

Features in the data are :

  • Warning Code
  • DIAG_CODE
  • PROV_CODE
  • KIND_CODE
  • CLSH_HOSP_CODE

Classification Model :

XGBoost is an implementation of gradient boosted decision trees designed for speed and performance. The implementation of the algorithm was engineered for efficiency of compute time and memory resources. A design goal was to make the best use of available resources to train the model. We are using an XGBoost classifier to determine whether a claim is fraudulent or not.

XGBoost是一个梯度增强决策树的实现,旨在提高速度和性能。算法的实现是为了提高计算时间和内存资源的效率而设计的。设计目标是充分利用现有资源来训练模型。我们使用XGBoost分类器来确定索赔是否具有欺诈性。

输出 The model classifies the claims as Rejected or Accepted, ie, Fraudulent or Not Fraudulent

pyspark + xgboost DEMO

# Imports and Initialization

from xgboost import XGBClassifier, plot_importance
from sklearn.model_selection import RandomizedSearchCV
from pyspark.sql import functions as F
from pyspark.sql.types import *
from sklearn import preprocessing
import sys
import os
import numpy as np
import pandas as pd

from sklearn.metrics import precision_recall_curve
from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score, confusion_matrix, accuracy_score
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
import json
import math
import numbers
from pyspark.sql import SQLContext
from pyspark.sql import Window
import matplotlib.pyplot as plt
import itertools

from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score

day_of_week_udf = F.udf(
    lambda ts: ts.weekday() if ts is not None else None,
    StringType())

def getSummary(df):
    summarydf = (df
               .withColumn('is_BEN_TYPE_Applicant',F.when(F.col("BEN_TYPE") == "Applicant", F.lit(1)).otherwise(F.lit(0)))
                          .groupby("MBR_NO")
               .agg(
                    F.max("NO_OF_YR").alias("MAX_NO_OF_YR"),
                    F.sum(F.lit(1)).alias("NUM_LINES"),
                                 )
               .withColumn("FRAC_REJECTED_AMT", F.col("TOT_REJECTED_AMT")/F.col("TOT_ORG_PRES_AMT_VALUE"))
               .withColumn("FRAC_BEN_TYPE_Applicant", F.col("TOT_is_BEN_TYPE_Applicant")/F.col("NUM_LINES"))
               .withColumn("FRAC_BEN_TYPE_Spouse", F.col("TOT_is_BEN_TYPE_Spouse")/F.col("NUM_LINES"))
               .withColumn("FRAC_BEN_TYPE_Child", F.col("TOT_is_BEN_TYPE_Child")/F.col("NUM_LINES"))
               .withColumn("FRAC_BEN_TYPE_Parent", F.col("TOT_is_BEN_TYPE_Parent")/F.col("NUM_LINES"))
               .persist()
              )

    return summarydf
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

def checkContain(baseFeatures, allFeatures, transformsList):
    """
    Description : Used to indicate that we want to use the transformed features
                  and not the original features if a transform has been done

    Input : baseFeatures - Features included in the dataset before any
                           transforms are applied
            allFeatures - All the features present in the dataframe after all
                          transforms and prep have completed
            transformsList - The features from the original dataset that should
                             have transforms applied to them

    Output : List of the features we're going to use for the ML model
    """
    resList = []

    for baseFeat in baseFeatures:
        if baseFeat not in transformsList:
            resList.append(baseFeat)
        else:
            for feat in allFeatures:
                if baseFeat in feat:
                    if "~~" in feat or "log10" in feat:
                        resList.append(feat)

    return resList


def transform_ts_fields(df, ts_cols):
    """
    Description : Produces a timestamp in the standard dow-hod format for the
                  supplied field

    Input : df - dataframe
            ts_cols - timestamp features that need to be formatted correctly

    Output : dataframe with appropriately formatted timestamp features
    """
    col_list = df.columns
    for col in ts_cols:
        if(col in col_list):
            df = (
                df .withColumn(
                    col,
                    F.col(col).cast("timestamp")) .withColumn(
                    "{}_dow".format(col),
                    day_of_week_udf(
                        F.col(col))) .withColumn(
                    "{}_hod".format(col),
                    F.hour(
                        F.col(col))))
    return df


def transform_numeric_fields(df, num_cols):
    """
    Description : Converts all numeric fields into float type

    Input : df - dataframe
            num_cols - numeric features that need to be converted to float type

    Output : dataframe with appropriately numerical features converted to float
             type
    """
    col_list = df.columns
    for col in num_cols:
        if(col in col_list):
            df = (df
                  .withColumn(col, F.col(col).cast("float"))
                  )
    return df


def transform_log_fields(df, num_cols):
    """
    Description : Produces the log_10 of the fields passed to it

    Input : df - dataframe
            num_cols - numeric features whose log values need to be calculated

    Output : dataframe with added log values for the required numerical
             features
    """
    col_list = df.columns
    for col in num_cols:
        if(col in col_list):
            df = (df
                  .withColumn(col + "_log10", F.log(10.0, F.col(col)))
                  )
    return df


def with_transform(df, param_dict):
    """
    Description : Applies transforms on relevant data fields in the data
    Input : df - dataframe
            param_dict - parameter dictionary

    Output : dataframe with all appropriate transforms
    """

    df = transform_ts_fields(df, param_dict['BASE_FEATURES_TIMESTAMP'])
    df = transform_numeric_fields(df, param_dict['BASE_FEATURES_NUMERIC'])
    df = transform_log_fields(df, param_dict['LOG_TRANSFORM_FEATURES'])
    df = (
        df .withColumn(
            "INCUR_PERIOD_SECS",
            F.col("INCUR_DATE_TO").cast("long") -
            F.col("INCUR_DATE_FROM").cast("long")))

    return df


def run_xgboost(data,feats, scale_pos_weight=1.0, old_model = None):
    """
    Description : Generates an xgboost model based on training data
    Input : X_train_pd - Pandas Dataframe, training data input
            y_train - training data output/labels
            param_dict - parameter dictionary
            max_depth_list - list of max depths of trees
            n_estimators_list - list of number of trees
            scoring_metric - scoring metric used
            grid_scoring - grid scoring metric
            scale_pos_weight - weight applied to positive vals
            num_cv = cross-validation splitting strategy

    Output : Trained XGBoost Classifier
    """
    
    X_train, X_test, y_train, y_test = train_test_split(data[feats], data['label'], test_size=0.33)
    
    unique, counts = np.unique(y_train, return_counts=True)
    cdict = dict(zip(unique, counts))
    temp_pos_weight = cdict[0]/cdict[1]
    
    xgb_class = XGBClassifier(scale_pos_weight=temp_pos_weight)
    xgb_class.fit(X=X_train, y=y_train, xgb_model = old_model)
    
    y_pred_proba = xgb_class.predict_proba(X_test)
    
    threshs = np.arange(0.01,1,0.01)
    acc = 0
    prsum = 0
    abdist = 1
    bestthresh = 0
    for thresh in threshs:
        y_pred_temp = (y_pred_proba[:,1] >= thresh).astype(int)
        
        '''
        precision, recall, thresholds = precision_recall_curve(y_test, y_pred_temp)
        average_precision = average_precision_score(y_test, y_pred_temp)
        if ((precision[1]+recall[1])>prsum) and (recall[1]>precision[1]):
            prsum = precision[1]+recall[1]
            bestthresh = thresh
        '''
        
        '''
        temp_acc = accuracy_score(np.array(y_test), np.array(y_pred_temp))
        if temp_acc >acc:
            acc = temp_acc
            bestthresh = thresh
        '''
        
        cnf_matrix_temp = confusion_matrix(y_test, y_pred_temp)
        cm = cnf_matrix_temp.astype('float') / cnf_matrix_temp.sum(axis=1)[:, np.newaxis]
        
        fp = cm[0][1] * 1.0
        fn = cm[1][0] * 1.0
        
        dist = abs((fn/fp)-1)
        if dist<abdist:
            abdist = dist
            bestthresh = thresh
        
        
        
    y_pred = (y_pred_proba[:,1] >= bestthresh).astype(int)
    precision, recall, thresholds = precision_recall_curve(y_test, y_pred)
    average_precision = average_precision_score(y_test, y_pred)
    
    # Compute confusion matrix
    cnf_matrix = confusion_matrix(y_test, y_pred)
    np.set_printoptions(precision=2)

    # Plot non-normalized confusion matrix
    plt.figure()
    plot_confusion_matrix(cnf_matrix, classes=[0,1],
                          title='Confusion matrix, without normalization')

    # Plot normalized confusion matrix
    plt.figure()
    plot_confusion_matrix(cnf_matrix, classes=[0,1], normalize=True,
                          title='Normalized confusion matrix')
    
    plt.show()
    
    
    plt.step(recall, precision, color='b', alpha=0.2,
             where='post')
    plt.fill_between(recall, precision, step='post', alpha=0.2,
                     color='b')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.ylim([0.0, 1.05])
    plt.xlim([0.0, 1.0])
    plt.title('2-class Precision-Recall curve: AP={0:0.5f}'.format(
              average_precision))
    
    plt.show()
    
    auc = roc_auc_score(y_test, y_pred_proba[:,1])
    print('AUC: %.3f' % auc)
    # calculate roc curve
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba[:,1])
    # plot no skill
    plt.plot([0, 1], [0, 1], linestyle='--')
    # plot the roc curve for the model
    plt.plot(fpr, tpr, marker='.')
    # show the plot
    plt.show()
    
    unique, counts = np.unique(data['label'], return_counts=True)
    cdict = dict(zip(unique, counts))
    pos_weight = cdict[0]/cdict[1]
    
    full_model = XGBClassifier(scale_pos_weight= pos_weight)
    full_model.fit(data[feats], data['label'])
    
    
    return full_model, bestthresh



def setup_spark_session(param_dict):
    """
    Description : Used to setup spark session

    Input : param_dict - parameter dictionary

    Output : Spark Session, Spark Context, and SQL Context
    """

    pd.set_option('display.max_rows', 500)
    pd.set_option('display.max_columns', 500)

    os.environ["PYSPARK_PYTHON"] = "/home/hadoop/anaconda/envs/playground_py36/bin/python"

    try:
        spark.stop()
        print("Stopped a SparkSession")
    except Exception as e:
        print("No existing SparkSession")

    SPARK_DRIVER_MEMORY = param_dict["SPARK_DRIVER_MEMORY"]  # "10G"
    SPARK_DRIVER_CORE = param_dict["SPARK_DRIVER_CORE"]  # "5"
    SPARK_EXECUTOR_MEMORY = param_dict["SPARK_EXECUTOR_MEMORY"]  # "3G"
    SPARK_EXECUTOR_CORE = param_dict["SPARK_EXECUTOR_CORE"]  # "1"
    AWS_ACCESS_KEY = param_dict["AWS_ACCESS_KEY"]  
    AWS_SECRET_KEY = param_dict["AWS_SECRET_KEY"]
    AWS_S3_ENDPOINT = param_dict["AWS_S3_ENDPOINT"]

    conf = SparkConf().
        setAppName(param_dict["APP_NAME"]).
        setMaster('yarn-client').
        set('spark.executor.cores', SPARK_EXECUTOR_CORE).
        set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).
        set('spark.driver.cores', SPARK_DRIVER_CORE).
        set('spark.driver.memory', SPARK_DRIVER_MEMORY).
        set('spark.driver.maxResultSize', '0')
    spark = SparkSession.builder.
        config(conf=conf).
        getOrCreate()

    sc = spark.sparkContext
    hadoop_conf = sc._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY)
    hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_KEY)
    hadoop_conf.set("fs.s3a.endpoint", AWS_S3_ENDPOINT)
    hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

    sqlContext = SQLContext(sc)

    return spark, sc, sqlContext


def loadDataset(vw_cl_lines_df, datefield, param_dict):
    """
    Description : Runs data through appropriate transforms to convert it
                  to a suitable format

    Input : vw_cl_lines_df - input dataframe
            datefield - field used to establish a window for the addFeats
                        function
            param_dict - parameter dictionary

    Output : Properly formatted dataframe
    """
    vw_cl_lines_df = (with_transform(vw_cl_lines_df, param_dict))

    vw_cl_lines_df = vw_cl_lines_df.withColumn(
        datefield + "_unix",
        (F.unix_timestamp(
            F.col(datefield),
            format='yyyy-MM-dd HH:mm:ss.000')))

    return vw_cl_lines_df


def addOneHotsTest(df, oneHots):
    for item in oneHots:
        field = item.split('~')[0]
        df[item] = np.where(df[field] == item, 1, 0)
    return df


def addCodes(res_df, codeField, topCodes, optUDF, knownPref):
    # Need to get same codes for testing
    for code in topCodes:
        likeCode = "%" + code + "%"
        res_df = res_df.withColumn(
            code,
            F.when(
                res_df[codeField].like(likeCode),
                1).otherwise(0))

    def checkOtherCodes(x):
        if not x:
            return 0
        x = set(x)
        if x.issubset(topCodes):
            return 0
        else:
            return 1

    otherCodesUDF = F.udf(checkOtherCodes, IntegerType())

    if knownPref is not None:
        otherlabel = knownPref + "_" + codeField
    else:
        otherlabel = codeField
    res_df = res_df.withColumn(
        "OTHER_" + otherlabel,
        otherCodesUDF(
            res_df[codeField]))

    codesAdded = topCodes

    return res_df, codesAdded


def codeExtract(df, codeField, topCount, optUDF=None, knownPref=None):
    """
    Description : Function to extract code features
    Input : df - input dataframe
            codeField - field used to establish a window for the addFeats
                        function
            topCount - number of code features to be added
            optUDF - optional udf to apply to the field
            knownPref - prefix characterizing a field, if any

    Output : dataframe with code features added
    """
    codeEx_df = df

    if optUDF is not None:

        codeEx_df = codeEx_df.withColumn(
            codeField, optUDF(codeEx_df[codeField]))

    codeEx_df = codeEx_df.withColumn(
        codeField, F.explode(
            F.split(
                codeEx_df[codeField], ",")))
    code_counts = codeEx_df.groupBy(codeField).count().sort(F.desc("count"))

    if knownPref is not None:
        code_counts = code_counts.filter(
            code_counts[codeField].like(
                "%" + knownPref + "%"))

    # code_counts.show(10)
    xy = code_counts.toPandas()
    # Generating a list of the top 20 most frequently occuring Reject Codes
    topCodes = xy[codeField].head(topCount).tolist()
    topCodes = [x.strip() for x in topCodes]

    res_df = df

    return addCodes(res_df, codeField, topCodes, optUDF, knownPref)

# checks for presence of values in a field


def isVal(df, field, value):
    return df.withColumn(
        "is_" + field + "_" + value,
        F.when(
            F.col(field) == value,
            F.lit(1)).otherwise(
            F.lit(0)))

# sums values of a field within a specified window


def sumVal(df, field, windowval):
    return df.withColumn("TOT_" + field, F.sum(field).over(windowval))

# finds the maximum value of a field within a specified window


def maxVal(df, field, windowval):
    return df.withColumn("MAX_" + field, F.max(field).over(windowval))

# finds the average value of a field within a specified window


def meanVal(df, field, windowval):
    return df.withColumn("MEAN_" + field, F.mean(field).over(windowval))

# finds the ratio between two fields of a record


def fracVal(df, numfield, denomfield, fracName):
    return df.withColumn(fracName, F.col(numfield) / F.col(denomfield))

# adds required fields to the dataframe


def addFeatsTrain(vw_cl_lines_df, param_dict):
    orig = vw_cl_lines_df
    windowval = (Window.partitionBy(param_dict["groupField"]).orderBy(
        param_dict["windowField"] + "_unix").rangeBetween(
                                                Window.unboundedPreceding, -1))
    codes_df = orig.withColumn("NUM_LINES", F.sum(F.lit(1)).over(windowval))

    for field in param_dict["isFields"]:
        codes_df = isVal(codes_df, field[0], field[1])

    for field in param_dict["sumFields"]:
        codes_df = sumVal(codes_df, field, windowval)

    for field in param_dict["maxFields"]:
        codes_df = maxVal(codes_df, field, windowval)

    for field in param_dict["meanFields"]:
        codes_df = meanVal(codes_df, field, windowval)

    for fracTuple in param_dict["fracTuples"]:
        codes_df = fracVal(codes_df, fracTuple[0], fracTuple[1], fracTuple[2])

    def remPref(x):
        if x is None:
            return ""
        x = x.split(",")
        y = []
        for item in x:
            if (('T' not in item) & ('M' not in item)):
                y.append(item.strip())
        y = ','.join(y)
        return y

    remPrefUDF = F.udf(remPref, StringType())
    allCodes = {}
    for code in param_dict["codeFields"]:
        if len(code) == 1:
            codes_df, toAdd = codeExtract(codes_df, code[0], 20)
            if code[0] in allCodes:
                allCodes[code[0]] = allCodes[code[0]] + toAdd
            else:
                allCodes[code[0]] = toAdd
        else:
            codes_df, toAdd = codeExtract(
                codes_df,
                code[0],
                20,
                optUDF=remPrefUDF,
                knownPref=code[1])
            if code[0] in allCodes:
                allCodes[code[0]] = allCodes[code[0]] + toAdd
            else:
                allCodes[code[0]] = toAdd
    addedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))

    return codes_df, addedCols, allCodes


def addFeatsTest(vw_cl_lines_df, param_dict, summary_df):
    orig = vw_cl_lines_df

    joinfields = [param_dict['groupField'], "NUM_LINES"]
    for field in param_dict["sumFields"]:
        joinfields.append("TOT_"+field)
        
    for field in param_dict["maxFields"]:
        joinfields.append("MAX_"+field)

    for field in param_dict["meanFields"]:
        joinfields.append("MEAN_"+field)

    for fracTuple in param_dict["fracTuples"]:
        joinfields.append(fracTuple[2])
        
    codes_df = orig.join(summary_df[joinfields], param_dict['groupField'],how='left')
    
    for field in param_dict["isFields"]:
        codes_df = isVal(codes_df, field[0], field[1])
    
    def remPref(x):
        if x is None:
            return ""
        x = x.split(",")
        y = []
        for item in x:
            if (('T' not in item) & ('M' not in item)):
                y.append(item.strip())
        y = ','.join(y)
        return y

    remPrefUDF = F.udf(remPref, StringType())
    allCodes = {}
    for code in param_dict["codeFields"]:
        presentInTrain = param_dict["allCodes"][code[0]]
        if len(code) == 1:

            codes_df, added = addCodes(codes_df, code[0], presentInTrain, None,
                                       None)
        else:
            codes_df, added = addCodes(codes_df, code[0], presentInTrain,
                                       optUDF=remPrefUDF, knownPref=code[1])

    addedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))

    return codes_df, addedCols


# prepares the data for use in a training or inference by adding features
# and appropriate labels


def prepTrainData(df, baseFeatures, param_dict):

    trainData = loadDataset(df, param_dict["custSegOrder"], param_dict)
    negCount = trainData.filter(trainData[param_dict["labelField"]] ==
                                param_dict["negativeLabel"]).count()
    posCount = trainData.filter(trainData[param_dict["labelField"]] ==
                                param_dict["positiveLabel"]).count()
    pos_weight = negCount/posCount
    trainData, extraCols, param_dict["allCodes"] = addFeatsTrain(trainData,
                                                                 param_dict)

    vw_cl_lines_pd = trainData.toPandas()
    prep_labelled_data_pd = pd.get_dummies(
        vw_cl_lines_pd,
        columns=param_dict["BASE_FEATURES_CATEGORICAL"],
        drop_first=False,
        prefix_sep="~~")

    featureCols = extraCols + checkContain(baseFeatures,
                                           prep_labelled_data_pd.columns
                                           .tolist(),
                                           param_dict
                                           ["LOG_TRANSFORM_FEATURES"] +
                                           param_dict
                                           ["BASE_FEATURES_CATEGORICAL"])
    param_dict["oneHots"] = [x for x in prep_labelled_data_pd.columns.tolist()
                             if "~~" in x]

    leakageFeats = ["is_"+str(x[0])+"_"+str(x[1]) for x in
                   param_dict["isFields"] if x[0] == param_dict["labelField"]]

    featureCols = [x for x in featureCols if x not in leakageFeats]
    return prep_labelled_data_pd, featureCols, pos_weight, param_dict


def prepTestData(df, summary, baseFeatures, param_dict):

    trainData = loadDataset(df, param_dict["custSegOrder"], param_dict)
    

    trainData, extraCols = addFeatsTest(trainData, param_dict , summary)
    

    vw_cl_lines_pd = trainData.toPandas()
    prep_labelled_data_pd = addOneHotsTest(vw_cl_lines_pd,
                                           param_dict["oneHots"])
    featureCols = extraCols + checkContain(baseFeatures,
                                           prep_labelled_data_pd.columns
                                           .tolist(),
                                           param_dict
                                           ["LOG_TRANSFORM_FEATURES"] +
                                           param_dict
                                           ["BASE_FEATURES_CATEGORICAL"])

    leakageFeats = ["is_"+str(x[0])+"_"+str(x[1]) for x in
                    param_dict["isFields"] if x[0] == param_dict["labelField"]]
    featureCols = [x for x in featureCols if x not in leakageFeats]
    return prep_labelled_data_pd, featureCols


# trains and returns an XGBoost Classifier


def trainXGBModel(df, param_dict):  # ,onlyWarn = False):

    pdf, feats, ratio, param_dict = prepTrainData(df, param_dict["baseFeatures"], param_dict)
    
    for col in param_dict["BASE_FEATURES_TIMESTAMP"]:
        pdf[col] = pd.to_datetime(pdf[col], errors='coerce')
    
    adf = pdf.replace([np.inf,-np.inf], 0)
    cols = pdf[feats].columns
    
    label = np.where(adf[param_dict["labelField"]] ==
                                  param_dict["positiveLabel"], 1, 0)

    x = adf[feats].values #returns a numpy array
    standard_scaler = preprocessing.StandardScaler()
    x_scaled = standard_scaler.fit_transform(x)
    adf = pd.DataFrame(x_scaled, columns=adf[feats].columns)
    adf['label'] = label
    
    #X_train, y_train = adf[feats], adf['label']
    
    xgb_model, bestThresh = run_xgboost(adf[feats + ['label']], feats, scale_pos_weight= ratio)
    param_dict["trainedCols"] = list(feats)
   

    return xgb_model, feats, param_dict, bestThresh

def updateXGBModel(df, param_dict, model):
    pandas_df, featureCols, pos_weight = prepTestData(
        df, param_dict["baseFeatures"], param_dict)
    pandas_df['label'] = np.where(pandas_df[param_dict["labelField"]] ==
                                  param_dict["positiveLabel"], 1, 0)
    pandas_df = pandas_df.fillna(0)
    y_train = pandas_df['label'].values
    X_train_pd = pandas_df.drop('label', 1)
    
    if len(X_train_pd) > 100000 :
        X = np.array_split(X_train_pd, 100000)
        y = np.array_split(y_train, 100000)
    
    
    for i in range(len(X)):
        xgb_class = XGBClassifier(scale_pos_weight=pos_weight)
        model = xgb_class.fit(X[i],y[i], xgb_model = model)
    
    xgb_model = model

    return xgb_model, featureCols, param_dict
    
# uses a model to predict values


def modelPredict(model, test_df, summary, param_dict, posThresh):
    test_pdf, feats1 = prepTestData(test_df, summary, param_dict["baseFeatures"], param_dict)
    
    for col in param_dict["BASE_FEATURES_TIMESTAMP"]:
        test_pdf[col] = pd.to_datetime(test_pdf[col], errors='coerce')
    test_adf = test_pdf.replace([np.inf,-np.inf], 0)
    x = test_adf[feats1].values #returns a numpy array
    standard_scaler = preprocessing.StandardScaler()
    x_scaled = standard_scaler.fit_transform(x)
    test_adf = pd.DataFrame(x_scaled, columns=test_adf[feats1].columns)
    
    X_test = test_adf[param_dict["trainedCols"]]
    
    result_proba = model.predict_proba(X_test)
    result = []
    
    result = (result_proba[:,1] >= posThresh).astype(int)
    
    #result = model.predict(X_test)
    return result, result_proba

参考文献

https://wenku.baidu.com/view/529fc3a4a45177232e60a287.html https://zhuanlan.zhihu.com/p/345828553