Source code for optimalflow.autoPP

"""dynaPreprocessing Class"""

#!/usr/bin/env python

import itertools
from optimalflow.funcPP import PPtools
import pandas as pd
import joblib
import datetime
import numpy as np
from time import time
from optimalflow.utilis_func import update_progress,delete_old_log_files
import warnings
import os

path = os.getcwd()

def warn(*args, **kwargs):
    pass
warnings.warn = warn

import logging

LOG_TS = datetime.datetime.now().strftime("%Y.%m.%d.%H.%M.%S")
logs_folder = os.path.join(os.getcwd(),'logs')
if not os.path.exists(logs_folder):
    os.makedirs(logs_folder)
log_name = os.path.join(logs_folder, f'{os.path.basename(__file__).split(".")[0]}_log_{LOG_TS}.log')

LOG_LEVEL = logging.DEBUG
DELETE_FLAG = True
TS = time()
logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s','%d/%m %H:%M:%S')
fh = logging.FileHandler(filename = log_name)
fh.setLevel(LOG_LEVEL)
fh.setFormatter(formatter)
logger.addHandler(fh)
Test_case = f'Optimal Flow - autoCV - Auto PreProcessing :: {LOG_TS}'
Test_comment = '-' * len(Test_case) * 3
Start_log = '#' * len(Test_case) * 3
logger.info(Start_log)
logger.info(Test_case)
logger.info(Start_log)
delete_old_log_files(directory = logs_folder ,delete_flag = DELETE_FLAG, logger = logger, extension_list = ['.log'],filename_list = ['autoPP_log'],log_ts = LOG_TS)
logger.info(Test_comment)

[docs]class dynaPreprocessing: """Automated feature preprocessing including imputation, winsorization, encoding, and scaling in ensemble algorithms, to generate permutation input datasets for further pipeline components. Parameters ---------- custom_parameters: dictionary, default = None Custom parameters settings input. NOTE: default_parameters = { "scaler" : ["standard", "minmax", "maxabs", "robust"], "encode_band" : [10], "low_encode" : ["onehot","label"], "high_encode" : ["frequency", "mean"], "winsorizer" : [(0.01,0.01),(0.05,0.05)], "sparsity" : [0.50], "cols" : [100] } label_col: str, default = None Name of label column. model_type: str, default = "reg" "reg" for regression problem or "cls" for classification problem - Default: "reg". export_output_files: bool, default = False Export qualified permutated datasets to ./df_folder. Example ------- .. [Example] https://Optimal-Flow.readthedocs.io/en/latest/demos.html#feature-preprocessing-for-a-regression-problem-using-autopp References ---------- None """ def __init__(self, custom_parameters = None, label_col = None, model_type = "reg",export_output_files = False): default_parameters = { "scaler" : ["standard", "minmax", "maxabs", "robust"], "encode_band" : [10], "low_encode" : ["onehot","label"], "high_encode" : ["frequency", "mean"], "winsorizer" : [(0.01,0.01),(0.05,0.05)], "sparsity" : [0.50], "cols" : [100] } if(custom_parameters is None): self.parameters = default_parameters else: self.parameters = custom_parameters self.model_type = model_type self.export_output_files = export_output_files self.label_col = label_col
[docs] def fit(self, input_data = None): """Fits and transforms a pandas dataframe to non-missing values, outlier excluded, categories encoded and scaled datasets by all algorithms permutation. Parameters ---------- input_data : pandas dataframe, shape = [n_samples, n_features] NOTE: The input_data should be the datasets after basic data cleaning & well feature deduction, the more features involve will result in more columns permutation outputs. Returns ------- DICT_PREP_DF : dictionary Each key is the # of output preprocessed dataset, each value stores the dataset DICT_PREP_INFO : dictionary Dictionary for reference. Each key is the # of the output preprocessed dataset, each value stores the column names of the dataset NOTE - Log records will generate and save to ./logs folder automatedly. """ if (self.export_output_files): df_folder = os.path.join(os.getcwd(),'dfs') if not os.path.exists(df_folder): os.makedirs(df_folder) for l in os.listdir(df_folder): os.remove(os.path.join(df_folder,l)) DICT_DFS={} for i in range(len(self.parameters.get("winsorizer"))): pp = PPtools(label_col = self.label_col, data = input_data, model_type = self.model_type) pp.split_category_cols() initial_num_cols = pp.num_df.columns pp.impute_tool() pp.winsorize_tool(lower_ban = self.parameters.get("winsorizer")[i][0],upper_ban = self.parameters.get("winsorizer")[i][1]) winsorized_df_cols_list = list(pp.num_df.columns) encoded_cols_list = {} for col in pp.cat_df.columns: encoded_cols_list[col] = [] if(pp.cat_df[col].nunique() < self.parameters.get("encode_band")[0]): for en_type in self.parameters.get("low_encode"): encoded_col = pp.encode_tool(en_type = en_type ,category_col = col) encoded_cols_list[col].append(list(encoded_col.columns)) pp.num_df = pd.concat([pp.num_df,encoded_col],axis = 1) if(pp.cat_df[col].nunique() >= self.parameters.get("encode_band")[0]): for en_type in self.parameters.get("high_encode"): encoded_col = pp.encode_tool(en_type = en_type ,category_col = col) encoded_cols_list[col].append(list(encoded_col.columns)) pp.num_df = pd.concat([pp.num_df,encoded_col],axis = 1) args_list = [] for key in encoded_cols_list.keys(): args_list.append(encoded_cols_list[key]) iters_combined = itertools.product(*args_list) loop_num = 1 total_loop = len(list(iters_combined)) for number, combination in enumerate(itertools.product(*args_list)): start_time = time() combined_cols_list = [] combined_cols_list.append(winsorized_df_cols_list) for ele in list(combination): combined_cols_list.append(ele) combined_cols_list = [item for sublist in combined_cols_list for item in sublist] encoded_df = pp.num_df[pp.num_df.columns.intersection(combined_cols_list)] encoded_df = pp.remove_zero_col_tool(encoded_df) category_sparsity_score = pp.sparsity_tool(encoded_df[encoded_df.columns.difference(list(initial_num_cols))]) if (category_sparsity_score > self.parameters["sparsity"][0]) and ((len(encoded_df.columns)+1)<=self.parameters["cols"][0]): logger.info(Test_comment) logger.info(f"Current Running Dataset No. {number} :") if (self.export_output_files): temp_dfs = os.path.join(df_folder, f"winsor_{i}_{number}.csv") encoded_df.to_csv(temp_dfs, index = False) for sca in self.parameters["scaler"]: DICT_DFS[f"winsor_{i}-Scaler_{sca}-Dataset_{number}"] = pd.concat([pp.data[self.label_col], pp.scale_tool(df = encoded_df,sc_type = sca)],axis = 1) logger.info(f">>> winsorized_Strategy is {i}") logger.info(f">>> Scaler stragety is {sca}") logger.info(f">>> Encoding strategy: {list(combination)}") logger.info(f">>> Total columns with label column is: {len(list(encoded_df.columns))+1}") logger.info(f">>> Encoded Category Columns' Sparsity Score: {str(category_sparsity_score)}") time_est = round(((time()-start_time)/60)*(total_loop - loop_num),4) update_progress(loop_num/total_loop, clear_flag = True, process_name = "Data Preprocessing Ensemble Iteration", time_est = time_est) loop_num += 1 DICT_PREP_INFO = {} DICT_PREP_DF = {} for number, key in enumerate(DICT_DFS.keys()): DICT_PREP_INFO["Dataset_"+str(number)] = key.split("Dataset_",1)[0] + "- Encoded Features:" + str(list(DICT_DFS[key].columns)) for number, key in enumerate(DICT_DFS.keys()): DICT_PREP_DF["Dataset_"+str(number)] = DICT_DFS[key] return(DICT_PREP_DF,DICT_PREP_INFO)