Source code for handle_missing_values

import pandas as pd
from typing import List, Optional, Union, Any, Dict
from sklearn.experimental import enable_iterative_imputer  # noqa F401
from sklearn.impute import SimpleImputer, KNNImputer, IterativeImputer
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.base import RegressorMixin, ClassifierMixin
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV


[docs] class MissingValueHandler: """ A class to handle missing values in datasets using various strategies such as simple imputation, KNN-based imputation, iterative imputation, and machine learning models. """
[docs] @staticmethod def identify_missing(data: pd.DataFrame) -> pd.DataFrame: """ Identifies missing values in the dataset. Args: data (pd.DataFrame): The input DataFrame. Returns: pd.DataFrame: A DataFrame of the same shape as the input, with boolean values indicating where values are missing (True for missing values). """ return data.isnull()
[docs] @staticmethod def missing_summary(data: pd.DataFrame) -> pd.DataFrame: """ Provides a summary of missing values for each column in the dataset. Args: data (pd.DataFrame): The input DataFrame. Returns: pd.DataFrame: A DataFrame with columns 'missing_count' and 'missing_percentage'. """ missing_count = data.isnull().sum() missing_percentage = 100 * missing_count / len(data) summary = pd.DataFrame({ 'missing_count': missing_count, 'missing_percentage': missing_percentage }) return summary
[docs] @staticmethod def drop_missing( data: pd.DataFrame, axis: int = 0, how: str = 'any', thresh: Optional[int] = None, subset: Optional[List[str]] = None, inplace: bool = False ) -> pd.DataFrame: """ Drops rows or columns with missing values. Args: data (pd.DataFrame): The input DataFrame. axis (int, optional): Specifies whether to drop rows (0) or columns (1). Default is 0 (drop rows). how (str, optional): 'any' or 'all'. If 'any', drop if any NA values are present. If 'all', drop if all values are NA. thresh (int, optional): Require that many non-NA values. Overrides 'how'. subset (List[str], optional): Labels along the axis to consider. inplace (bool, optional): If True, perform operation in-place. Returns: pd.DataFrame: The DataFrame with missing rows or columns dropped. """ return data.dropna(axis=axis, how=how, thresh=thresh, subset=subset, inplace=inplace)
[docs] @staticmethod def drop_missing_threshold( data: pd.DataFrame, threshold: float = 0.5, axis: int = 0, inplace: bool = False ) -> pd.DataFrame: """ Drops rows or columns with missing values that exceed a specified threshold. Args: data (pd.DataFrame): The input DataFrame. threshold (float): The maximum allowed proportion of missing values (between 0 and 1). Default is 0.5. axis (int): Specifies whether to drop rows (0) or columns (1). Default is 0 (drop rows). inplace (bool, optional): If True, perform operation in-place. Returns: pd.DataFrame: The DataFrame with rows or columns dropped based on the missing value threshold. """ if not 0 <= threshold <= 1: raise ValueError("Threshold must be between 0 and 1.") thresh = int((1 - threshold) * data.shape[axis]) return data.dropna(axis=axis, thresh=thresh, inplace=inplace)
[docs] @staticmethod def fill_missing( data: pd.DataFrame, strategy: Union[str, Dict[str, str]] = 'mean', fill_value: Optional[Any] = None, columns: Optional[List[str]] = None, inplace: bool = False ) -> pd.DataFrame: """ Fills missing values in the DataFrame using specified strategies. Args: data (pd.DataFrame): The input DataFrame. strategy (Union[str, Dict[str, str]]): The imputation strategy ('mean', 'median', 'most_frequent', 'constant') or a dictionary mapping column names to strategies. fill_value (Any, optional): When strategy='constant', used to fill missing values. columns (List[str], optional): List of columns to impute. If None, all columns are imputed. inplace (bool, optional): If True, perform operation in-place. Returns: pd.DataFrame: The DataFrame with missing values filled according to the strategy. """ if not inplace: data = data.copy() if columns is None: columns = data.columns.tolist() if isinstance(strategy, str): imputer = SimpleImputer(strategy=strategy, fill_value=fill_value) data[columns] = imputer.fit_transform(data[columns]) elif isinstance(strategy, dict): for col in columns: col_strategy = strategy.get(col, 'mean') imputer = SimpleImputer(strategy=col_strategy, fill_value=fill_value) data[[col]] = imputer.fit_transform(data[[col]]) else: raise ValueError("Strategy must be a string or a dictionary mapping columns to strategies.") return data
[docs] @staticmethod def fill_missing_knn( data: pd.DataFrame, n_neighbors: int = 5, weights: str = 'uniform', metric: str = 'nan_euclidean', columns: Optional[List[str]] = None, inplace: bool = False ) -> pd.DataFrame: """ Fills missing values using K-Nearest Neighbors (KNN) imputation. Args: data (pd.DataFrame): The input DataFrame. n_neighbors (int): Number of neighboring samples to use for imputation. weights (str): Weight function used in prediction ('uniform' or 'distance'). metric (str): Distance metric for searching neighbors. columns (List[str], optional): List of columns to impute. If None, all columns are imputed. inplace (bool, optional): If True, perform operation in-place. Returns: pd.DataFrame: The DataFrame with missing values filled using KNN imputation. """ if not inplace: data = data.copy() if columns is None: columns = data.columns.tolist() imputer = KNNImputer(n_neighbors=n_neighbors, weights=weights, metric=metric) data[columns] = imputer.fit_transform(data[columns]) return data
[docs] @staticmethod def fill_missing_iterative( data: pd.DataFrame, estimator: Optional[RegressorMixin] = None, columns: Optional[List[str]] = None, inplace: bool = False, **kwargs: Any ) -> pd.DataFrame: """ Fills missing values using Iterative Imputer. Args: data (pd.DataFrame): The input DataFrame. estimator (RegressorMixin, optional): The estimator to use at each step of the imputation. If None, BayesianRidge is used. columns (List[str], optional): List of columns to impute. If None, all columns are imputed. inplace (bool, optional): If True, perform operation in-place. **kwargs: Additional keyword arguments to pass to IterativeImputer. Returns: pd.DataFrame: The DataFrame with missing values filled using Iterative Imputer. """ if not inplace: data = data.copy() if columns is None: columns = data.columns.tolist() imputer = IterativeImputer(estimator=estimator, **kwargs) data[columns] = imputer.fit_transform(data[columns]) return data
[docs] @staticmethod def fill_missing_ml( data: pd.DataFrame, target_column: str, model: Optional[Union[RegressorMixin, ClassifierMixin]] = None, search_type: str = 'grid', # 'grid' or 'random' for hyperparameter tuning param_grid: Optional[Dict[str, List[Any]]] = None, # parameters for tuning cv: int = 5, # number of cross-validation folds inplace: bool = False, **kwargs: Any ) -> pd.DataFrame: """ Fills missing values in the target column using a machine learning model trained on the other columns, with hyperparameter tuning using cross-validation. Args: data (pd.DataFrame): The input DataFrame. target_column (str): The name of the column with missing values to impute. model (Union[RegressorMixin, ClassifierMixin], optional): The machine learning model to use. If None, RandomForestRegressor or RandomForestClassifier is used. search_type (str): Type of search for hyperparameter tuning ('grid' or 'random'). param_grid (Dict[str, List[Any]], optional): The hyperparameter grid for tuning. cv (int): Number of cross-validation folds for hyperparameter tuning. inplace (bool): If True, perform operation in-place. **kwargs: Additional keyword arguments to pass to the model. Returns: pd.DataFrame: The DataFrame with missing values in the target column filled using the tuned model. """ if not inplace: data = data.copy() df_complete = data.dropna(subset=[target_column]) df_missing = data[data[target_column].isnull()] if df_missing.empty: return data X_train = df_complete.drop(columns=[target_column]) y_train = df_complete[target_column] X_predict = df_missing.drop(columns=[target_column]) # Handle categorical features X_full = pd.concat([X_train, X_predict]) X_full_encoded = pd.get_dummies(X_full, drop_first=True) X_train_encoded = X_full_encoded.iloc[:len(X_train)] X_predict_encoded = X_full_encoded.iloc[len(X_train):] # Default model if not provided if y_train.dtype.kind in 'biufc': model = model or RandomForestRegressor(**kwargs) else: model = model or RandomForestClassifier(**kwargs) # Perform hyperparameter tuning if param_grid: if search_type == 'grid': search = GridSearchCV(model, param_grid, cv=cv) elif search_type == 'random': search = RandomizedSearchCV(model, param_grid, cv=cv, n_iter=kwargs.get('n_iter', 10)) else: raise ValueError("search_type must be either 'grid' or 'random'") search.fit(X_train_encoded, y_train) best_model = search.best_estimator_ else: best_model = model best_model.fit(X_train_encoded, y_train) # Impute missing values with the best model predicted_values = best_model.predict(X_predict_encoded) data.loc[data[target_column].isnull(), target_column] = predicted_values return data
[docs] @staticmethod def add_missing_indicator( data: pd.DataFrame, columns: Optional[List[str]] = None, inplace: bool = False ) -> pd.DataFrame: """ Adds a binary indicator column for each feature, showing where missing values were located. Args: data (pd.DataFrame): The input DataFrame. columns (List[str], optional): List of columns to create indicators for. If None, only columns with missing values are used. inplace (bool, optional): If True, perform operation in-place. Returns: pd.DataFrame: The original DataFrame with additional indicator columns for missing values. """ if not inplace: data = data.copy() # If columns is None, select only the columns with missing values if columns is None: columns = data.columns[data.isnull().any()].tolist() # Add missing value indicators for the specified columns for column in columns: data[f"{column}_missing"] = data[column].isnull().astype(int) return data
[docs] @staticmethod def fill_missing_ffill( data: pd.DataFrame, columns: Optional[List[str]] = None, inplace: bool = False, limit: Optional[int] = None ) -> pd.DataFrame: """ Fills missing values using forward fill method. Args: data (pd.DataFrame): The input DataFrame. columns (List[str], optional): List of columns to forward fill. If None, all columns are used. inplace (bool, optional): If True, perform operation in-place. limit (int, optional): The maximum number of consecutive NaNs to fill. Returns: pd.DataFrame: The DataFrame with missing values filled using forward fill. """ if not inplace: data = data.copy() data.fillna(method='ffill', axis=0, limit=limit, inplace=True) return data
[docs] @staticmethod def fill_missing_bfill( data: pd.DataFrame, columns: Optional[List[str]] = None, inplace: bool = False, limit: Optional[int] = None ) -> pd.DataFrame: """ Fills missing values using backward fill method. Args: data (pd.DataFrame): The input DataFrame. columns (List[str], optional): List of columns to backward fill. If None, all columns are used. inplace (bool, optional): If True, perform operation in-place. limit (int, optional): The maximum number of consecutive NaNs to fill. Returns: pd.DataFrame: The DataFrame with missing values filled using backward fill. """ if not inplace: data = data.copy() data.fillna(method='bfill', axis=0, limit=limit, inplace=True) return data
[docs] @staticmethod def interpolate_missing( data: pd.DataFrame, method: str = 'linear', axis: int = 0, limit: Optional[int] = None, inplace: bool = False, **kwargs: Any ) -> pd.DataFrame: """ Fills missing values using interpolation. Args: data (pd.DataFrame): The input DataFrame. method (str, optional): Interpolation method. Defaults to 'linear'. axis (int, optional): Axis along which to interpolate. Defaults to 0. limit (int, optional): Maximum number of consecutive NaNs to fill. inplace (bool, optional): If True, perform operation in-place. **kwargs: Additional keyword arguments to pass to interpolate. Returns: pd.DataFrame: The DataFrame with missing values filled using interpolation. """ if not inplace: data = data.copy() data.interpolate(method=method, axis=axis, limit=limit, inplace=True, **kwargs) return data