Source code for optimalflow.funcPP

#!/usr/bin/env python

import math
import pandas as pd
import numpy as np
from numpy import array
from numpy import count_nonzero
from sklearn import preprocessing
from sklearn.impute import SimpleImputer
from sklearn.feature_extraction import DictVectorizer
from scipy.stats.mstats import winsorize
from sklearn.preprocessing import LabelEncoder
import category_encoders as ce

[docs]class PPtools: """This class stores feature preprocessing transform tools. Parameters ---------- data : df, default = None Pre-cleaned dataset for feature preprocessing. label_col : str, default = None Name of label column. model_type : str, default = "reg" Value in ["reg","cls"]. The "reg" for regression problem, and "cls" for classification problem. Example ------- .. [Example] https://optimal-flow.readthedocs.io/en/latest/demos.html#build-pipeline-cluster-traveral-experiments-using-autopipe References ---------- None """ def __init__(self, data = None, label_col = None,model_type = 'reg'): self.snapshots = {} self.log = [] self.high_cardinal_cols = [] self.configure(data = data, label_col = label_col) self.model_type = model_type self.sparsity = 0 def configure(self, data = None, label_col = None): if (type(label_col) is str or type(label_col) is int): self.label_col = label_col if (type(data) is str): self.initial_data = pd.read_csv(data) self.data = self.initial_data.copy() self.log = [] if (type(data) is pd.DataFrame): self.initial_data = data self.data = self.initial_data.copy() self.log = [] if(self.data[self.label_col].dtypes!= np.float64 and self.data[self.label_col].dtypes!= np.int64): self.data[self.label_col] = self.data[self.label_col].fillna("NaN") self.data[self.label_col] = pd.DataFrame(LabelEncoder().fit_transform(self.data[self.label_col]))
[docs] def split_category_cols(self): """Split input datasets to numeric dataset and category dataset. Parameters ---------- None Returns ------- None """ non_label_list = self.data.columns.difference([self.label_col]) self.non_label_data = self.data[non_label_list] self.cat_df = self.non_label_data.select_dtypes(exclude=['number']) self.num_df = self.non_label_data.select_dtypes(include = 'number') self._log("PPtools.split_category_cols(): Split input to category df and numeric df.")
[docs] def remove_feature(self, feature_name): """Remove feature. Parameters ---------- feature_name : str/list, default = None column name, or list of column names wants to extract. Returns ------- None """ del self.data[feature_name] self._log("PPtools.remove_feature('{0}')".format(feature_name))
def extract_feature(self, old_featre, new_feature, mapper = None): new_feature_column = map(mapper, self.data[old_featre]) self.data[new_feature] = list(new_feature_column) self._log("PPtools.extract_feature({0}, {1}, {2})".format(old_featre, new_feature, mapper))
[docs] def impute_tool(self): """Imputation with the missing values. Parameters ---------- None Returns ------- None """ column_names = self.num_df.columns imp = SimpleImputer() imp.fit(self.num_df[column_names]) self.num_df[column_names] = imp.transform(self.num_df[column_names]) self.cat_df = self.cat_df.fillna("NaN") self._log("PPtools.impute_tool()")
[docs] def scale_tool(self,df = None,sc_type = None): """Feature scaling. Parameters ---------- df : df, default = None Dataset wants to be scaled sc_type : str, default = None Value in ["None","standard","minmax","maxabs","robust"]. Select which scaling algorithm: "None" - No scale algorithm apply; "standard" - StandardScaler algorithm; "minmax" - MinMaxScaler algorithm; "maxabs" - MaxAbsScaler algorithm; "RobustScaler" - RobustScaler algorithm Returns ------- Scaled dataset """ if sc_type == "None": self._log("PPtools.scale_tool() - None") return(df) if sc_type == "standard": self._log("PPtools.scale_tool() - StandardScaler") return(pd.DataFrame(preprocessing.StandardScaler().fit_transform(df),columns = df.columns)) if sc_type == "minmax": self._log("PPtools.scale_tool() - MinMaxScaler") return(pd.DataFrame(preprocessing.MinMaxScaler().fit_transform(df),columns = df.columns)) if sc_type == "maxabs": self._log("PPtools.scale_tool() - MaxAbsScaler") return(pd.DataFrame(preprocessing.MaxAbsScaler().fit_transform(df),columns = df.columns)) if sc_type == "robust": self._log("PPtools.scale_tool() - RobustScaler") return(pd.DataFrame(preprocessing.RobustScaler().fit_transform(df),columns = df.columns))
[docs] def winsorize_tool(self, lower_ban = None,upper_ban = None): """Feature outliers excluding with winsorization. Parameters ---------- lower_ban : float, default = None Bottom percent of excluding data needs to set here. upper_ban Top percent of excluding data needs to set here. Returns ------- None """ for i in list(self.num_df.columns): self.num_df[i] = winsorize(self.num_df[i], limits=[lower_ban,upper_ban])
[docs] def remove_zero_col_tool(self, data = None): """Remove the columns with all value zero. Parameters ---------- data : pandas dataset, default = None dataset needs to remove all zero columns Returns ------- All-zero-columns dataset """ return(data.loc[:, (data!= 0).any(axis=0)])
[docs] def encode_tool(self,en_type = None ,category_col = None): """Category features encoding, included: "onehot" - OneHot algorithm; "label" - LabelEncoder algorithm; "frequency" - Frequency Encoding algorithm; "mean" - Mean Encoding algorithm. Parameters ---------- en_type : str, default = None Value in ["reg","cls"]. Will drop first encoded column to cope with dummy trap issue, when value is "reg". Returns ------- Encoded column/dataset for each category feature """ if en_type == 'onehot': if(self.model_type == 'reg'): temp_df = pd.get_dummies(self.cat_df,prefix = ['onehot_'+category_col],columns = [category_col], drop_first = True) return(temp_df.loc[:, temp_df.columns.str.startswith('onehot_')]) elif(self.model_type == 'cls'): temp_df = pd.get_dummies(self.cat_df,prefix = ['onehot_'+category_col],columns = [category_col], drop_first = False) return(temp_df.loc[:, temp_df.columns.str.startswith('onehot_')]) if en_type == 'label': return(pd.DataFrame(LabelEncoder().fit_transform(self.cat_df[category_col]),columns=['Label_'+category_col])) if en_type == 'frequency': fe = self.cat_df.groupby(category_col).size()/len(self.cat_df) return(self.cat_df[category_col].map(fe).to_frame().rename(columns = {category_col:'Frequency_'+category_col})) if en_type == 'mean': mean_encoder = self.data.groupby(category_col)[self.label_col].mean() self.cat_df["Mean_"+category_col] = self.cat_df[category_col].map(mean_encoder) return(pd.DataFrame(self.cat_df["Mean_"+category_col]))
[docs] def sparsity_tool(self, data = None): """Calculate the sparsity of the datset. Parameters ---------- data : df, default = None Returns ------- Value of sparsity """ return(1.0 - (count_nonzero(data)/float(data.size)))
def use_snapshot(self, name): self.data = self.snapshots[name]["data"] self.log = self.snapshots[name]["log"] def _log(self, string): self.log.append(string)