Source code for solve_regression

import logging
import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import optuna  # Added for Bayesian optimization

from typing import Any, Callable, Dict, List, Optional, Tuple

from sklearn.base import BaseEstimator
from sklearn.ensemble import (
    RandomForestRegressor,
    GradientBoostingRegressor,
    AdaBoostRegressor,
    StackingRegressor,  # Added for stacking
    BaggingRegressor,
    VotingRegressor
)
from sklearn.linear_model import (
    LinearRegression,
    Ridge,
    Lasso,
    ElasticNet,
)
from sklearn.metrics import (
    mean_absolute_error,
    mean_absolute_percentage_error,
    mean_squared_error,
    median_absolute_error,
    r2_score,
    explained_variance_score,
)
from sklearn.model_selection import (
    GridSearchCV,
    RandomizedSearchCV,
    KFold,
    cross_val_score,
    learning_curve,
    train_test_split,
)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR
from sklearn.tree import DecisionTreeRegressor
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from catboost import CatBoostRegressor


[docs] class RegressionSolver: """ A comprehensive class for solving regression problems using various machine learning models. Includes methods for data preprocessing, model training, evaluation, hyperparameter tuning, cross-validation, model merging, and model persistence. """ def __init__( self, models: Optional[Dict[str, BaseEstimator]] = None, random_state: int = 42 ) -> None: """ Initializes the RegressionSolver with a dictionary of models to use. Args: models (Optional[Dict[str, BaseEstimator]]): A dictionary mapping model names to model instances. random_state (int): Random seed for reproducibility. """ self.logger = self._setup_logger() self.random_state = random_state self.models = models or self._default_models() self.tuned_models: Dict[str, BaseEstimator] = {} def _default_models(self) -> Dict[str, BaseEstimator]: """ Provides default models for regression tasks. Returns: Dict[str, BaseEstimator]: A dictionary of default models. """ return { "Linear Regression": LinearRegression(), "Ridge Regression": Ridge(random_state=self.random_state), "Lasso Regression": Lasso(random_state=self.random_state), "ElasticNet Regression": ElasticNet(random_state=self.random_state), "Decision Tree": DecisionTreeRegressor(random_state=self.random_state), "Random Forest": RandomForestRegressor(random_state=self.random_state), "Gradient Boosting": GradientBoostingRegressor(random_state=self.random_state), "AdaBoost": AdaBoostRegressor(random_state=self.random_state), "Support Vector Regressor": SVR(), "XGBoost": XGBRegressor( random_state=self.random_state, objective="reg:squarederror", use_label_encoder=False, # Added to suppress warnings eval_metric='rmse' # Added for clarity ), "LightGBM": LGBMRegressor(random_state=self.random_state), "CatBoost": CatBoostRegressor( verbose=0, random_state=self.random_state ), } def _default_param_grids(self) -> Dict[str, Dict[str, List[Any]]]: """ Provides default hyperparameter grids for common regression models. Returns: Dict[str, Dict[str, List[Any]]]: A dictionary of hyperparameter grids. """ return { "Ridge Regression": { "alpha": [0.1, 1.0, 10.0, 100.0], "solver": ["auto", "svd", "cholesky", "lsqr", "sparse_cg", "sag", "saga"], }, "Lasso Regression": { "alpha": [0.001, 0.01, 0.1, 1.0], "selection": ["cyclic", "random"], }, "ElasticNet Regression": { "alpha": [0.001, 0.01, 0.1, 1.0], "l1_ratio": [0.1, 0.5, 0.7, 0.9], }, "Decision Tree": { "max_depth": [None, 5, 10, 20, 30], "min_samples_split": [2, 5, 10], "min_samples_leaf": [1, 2, 4], "criterion": ["squared_error", "friedman_mse", "absolute_error", "poisson"], }, "Random Forest": { "n_estimators": [50, 100, 200], "max_depth": [None, 5, 10, 20, 30], "min_samples_split": [2, 5, 10], "min_samples_leaf": [1, 2, 4], "bootstrap": [True, False], }, "Gradient Boosting": { "n_estimators": [50, 100, 200], "learning_rate": [0.01, 0.05, 0.1, 0.2], "max_depth": [3, 5, 7], "subsample": [0.6, 0.8, 1.0], }, "AdaBoost": { "n_estimators": [50, 100, 200], "learning_rate": [0.01, 0.05, 0.1, 0.2, 1.0], "loss": ["linear", "square", "exponential"], }, "Support Vector Regressor": { "C": [0.1, 1, 10], "kernel": ["linear", "rbf", "poly", "sigmoid"], "gamma": ["scale", "auto"], "epsilon": [0.1, 0.2, 0.5], }, "XGBoost": { "n_estimators": [50, 100, 200], "learning_rate": [0.01, 0.05, 0.1], "max_depth": [3, 5, 7], "subsample": [0.6, 0.8, 1.0], "colsample_bytree": [0.6, 0.8, 1.0], }, "LightGBM": { "n_estimators": [50, 100, 200], "learning_rate": [0.01, 0.05, 0.1], "num_leaves": [31, 50, 100], "max_depth": [-1, 5, 10], }, "CatBoost": { "iterations": [100, 200, 500], "learning_rate": [0.01, 0.05, 0.1], "depth": [3, 5, 7], }, } def _default_bayesian_search_spaces(self) -> Dict[str, Dict[str, Any]]: """ Provides fine-grained default search spaces for Bayesian optimization for each regression model. Returns: Dict[str, Dict[str, Any]]: A dictionary of parameter distributions suitable for Optuna. """ return { "Ridge Regression": { "alpha": optuna.distributions.FloatDistribution(low=1e-4, high=1e4, log=True), # Updated "solver": optuna.distributions.CategoricalDistribution(choices=["auto", "svd", "cholesky", "lsqr", "sparse_cg", "sag", "saga"]), }, "Lasso Regression": { "alpha": optuna.distributions.FloatDistribution(low=1e-4, high=1e1, log=True), # Updated "selection": optuna.distributions.CategoricalDistribution(choices=["cyclic", "random"]), }, "ElasticNet Regression": { "alpha": optuna.distributions.FloatDistribution(low=1e-4, high=1e1, log=True), # Updated "l1_ratio": optuna.distributions.FloatDistribution(low=0.0, high=1.0, log=False), # Updated }, "Decision Tree": { "max_depth": optuna.distributions.IntDistribution(low=1, high=100, log=False), # Updated "min_samples_split": optuna.distributions.IntDistribution(low=2, high=20, log=False), # Updated "min_samples_leaf": optuna.distributions.IntDistribution(low=1, high=20, log=False), # Updated "criterion": optuna.distributions.CategoricalDistribution(choices=["squared_error", "friedman_mse", "absolute_error", "poisson"]), }, "Random Forest": { "n_estimators": optuna.distributions.IntDistribution(low=50, high=1000, log=False), # Updated "max_depth": optuna.distributions.IntDistribution(low=1, high=100, log=False), # Updated "min_samples_split": optuna.distributions.IntDistribution(low=2, high=20, log=False), # Updated "min_samples_leaf": optuna.distributions.IntDistribution(low=1, high=20, log=False), # Updated "bootstrap": optuna.distributions.CategoricalDistribution(choices=[True, False]), }, "Gradient Boosting": { "n_estimators": optuna.distributions.IntDistribution(low=50, high=1000, log=False), # Updated "learning_rate": optuna.distributions.FloatDistribution(low=1e-4, high=1.0, log=True), # Updated "max_depth": optuna.distributions.IntDistribution(low=3, high=20, log=False), # Updated "subsample": optuna.distributions.FloatDistribution(low=0.5, high=1.0, log=False), # Updated }, "AdaBoost": { "n_estimators": optuna.distributions.IntDistribution(low=50, high=1000, log=False), # Updated "learning_rate": optuna.distributions.FloatDistribution(low=1e-4, high=1.0, log=True), # Updated "loss": optuna.distributions.CategoricalDistribution(choices=["linear", "square", "exponential"]), }, "Support Vector Regressor": { "C": optuna.distributions.FloatDistribution(low=1e-3, high=1e3, log=True), # Updated "kernel": optuna.distributions.CategoricalDistribution(choices=["linear", "rbf", "poly", "sigmoid"]), "gamma": optuna.distributions.CategoricalDistribution(choices=["scale", "auto"]), "epsilon": optuna.distributions.FloatDistribution(low=0.0, high=1.0, log=False), # Updated }, "XGBoost": { "n_estimators": optuna.distributions.IntDistribution(low=50, high=1000, log=False), # Updated "learning_rate": optuna.distributions.FloatDistribution(low=1e-4, high=1.0, log=True), # Updated "max_depth": optuna.distributions.IntDistribution(low=3, high=20, log=False), # Updated "subsample": optuna.distributions.FloatDistribution(low=0.5, high=1.0, log=False), # Updated "colsample_bytree": optuna.distributions.FloatDistribution(low=0.5, high=1.0, log=False), # Updated }, "LightGBM": { "n_estimators": optuna.distributions.IntDistribution(low=50, high=1000, log=False), # Updated "learning_rate": optuna.distributions.FloatDistribution(low=1e-4, high=1.0, log=True), # Updated "num_leaves": optuna.distributions.IntDistribution(low=20, high=150, log=False), # Updated "max_depth": optuna.distributions.IntDistribution(low=1, high=100, log=False), # Updated }, "CatBoost": { "iterations": optuna.distributions.IntDistribution(low=100, high=1000, log=False), # Updated "learning_rate": optuna.distributions.FloatDistribution(low=1e-4, high=1.0, log=True), # Updated "depth": optuna.distributions.IntDistribution(low=3, high=16, log=False), # Updated "l2_leaf_reg": optuna.distributions.IntDistribution(low=1, high=10, log=False), # Updated "border_count": optuna.distributions.IntDistribution(low=32, high=256, log=False), # Updated "bagging_temperature": optuna.distributions.FloatDistribution(low=0.0, high=5.0, log=False), # Updated }, } @staticmethod def _setup_logger() -> logging.Logger: """ Sets up a logger for tracking model training and evaluation. Returns: logging.Logger: Configured logger instance. """ logger = logging.getLogger("RegressionSolver") if not logger.handlers: logger.setLevel(logging.INFO) ch = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) ch.setFormatter(formatter) logger.addHandler(ch) return logger
[docs] def split_data( self, X: pd.DataFrame, y: pd.Series, test_size: float = 0.2, random_state: Optional[int] = None, ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]: """ Splits the data into training and testing sets. Args: X (pd.DataFrame): Feature matrix. y (pd.Series): Target variable. test_size (float): Proportion of the dataset to include in the test split. random_state (Optional[int]): Random seed. Returns: Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]: Training and testing sets for features and target. """ random_state = random_state or self.random_state self.logger.info("Splitting data into training and testing sets...") return train_test_split( # type: ignore X, y, test_size=test_size, random_state=random_state )
[docs] def train_model( self, model_name: str, X_train: pd.DataFrame, y_train: pd.Series, use_pipeline: bool = False, ) -> BaseEstimator: """ Trains a given regression model. Args: model_name (str): The name of the model to train. X_train (pd.DataFrame): Training features. y_train (pd.Series): Training target. use_pipeline (bool): Whether to use a pipeline with scaling. Returns: BaseEstimator: The trained model. """ if model_name in self.tuned_models: model = self.tuned_models[model_name] self.logger.info(f"Using tuned model: {model_name}") else: model = self.models[model_name] self.logger.info(f"Training model: {model_name}") if use_pipeline: self.logger.info("Using pipeline with StandardScaler.") pipeline = Pipeline( [("scaler", StandardScaler()), ("model", model)] ) pipeline.fit(X_train, y_train) return pipeline else: model.fit(X_train, y_train) return model
[docs] def evaluate_model( self, model: BaseEstimator, X_test: pd.DataFrame, y_test: pd.Series, ) -> Dict[str, Any]: """ Evaluates the regression model on test data. Args: model (BaseEstimator): The trained model. X_test (pd.DataFrame): Testing features. y_test (pd.Series): Testing target. Returns: Dict[str, Any]: A dictionary containing evaluation metrics. """ self.logger.info("Evaluating model performance...") predictions = model.predict(X_test) metrics = self._get_evaluation_metrics(y_test, predictions) return metrics
def _get_evaluation_metrics( self, y_true: pd.Series, y_pred: np.ndarray, ) -> Dict[str, Any]: """ Computes evaluation metrics for the regression model. Args: y_true (pd.Series): True values. y_pred (np.ndarray): Predicted values. Returns: Dict[str, Any]: Dictionary of evaluation metrics. """ self.logger.info("Computing evaluation metrics...") mse = mean_squared_error(y_true, y_pred) mae = mean_absolute_error(y_true, y_pred) rmse = np.sqrt(mse) r2 = r2_score(y_true, y_pred) explained_var = explained_variance_score(y_true, y_pred) mape = mean_absolute_percentage_error(y_true, y_pred) median_ae = median_absolute_error(y_true, y_pred) metrics = { "mean_squared_error": mse, "root_mean_squared_error": rmse, "mean_absolute_error": mae, "median_absolute_error": median_ae, "mean_absolute_percentage_error": mape, "r2_score": r2, "explained_variance_score": explained_var, } return metrics
[docs] def hyperparameter_tuning( self, model_name: str, X_train: pd.DataFrame, y_train: pd.Series, param_grid: Optional[Dict[str, List[Any]]] = None, cv: int = 5, search_type: str = "grid", n_iter: int = 50, scoring: str = "neg_mean_squared_error", ) -> None: """ Performs hyperparameter tuning using GridSearchCV, RandomizedSearchCV, or Bayesian Optimization for one or all models and stores the best models. Args: model_name (str): The name of the model to tune. If 'all', tunes all models in self.models. X_train (pd.DataFrame): Training features. y_train (pd.Series): Training target. param_grid (Optional[Dict[str, List[Any]]]): Parameter grid for hyperparameter tuning. If None, uses default. cv (int): Number of cross-validation folds. search_type (str): Type of search ('grid', 'random', or 'bayesian'). n_iter (int): Number of iterations for RandomizedSearchCV or Bayesian Optimization. scoring (str): Scoring metric for evaluation. Returns: None: The best models are stored in self.tuned_models. """ if model_name == "all": self.logger.info("Performing hyperparameter tuning for all models...") for name in self.models: self._tune_single_model( name, X_train, y_train, param_grid, cv, search_type, n_iter, scoring ) else: self._tune_single_model( model_name, X_train, y_train, param_grid, cv, search_type, n_iter, scoring, )
def _tune_single_model( self, model_name: str, X_train: pd.DataFrame, y_train: pd.Series, param_grid: Optional[Dict[str, List[Any]]], cv: int, search_type: str, n_iter: int, scoring: str, ) -> None: """ Helper method to perform hyperparameter tuning for a single model. Args: model_name (str): The name of the model to tune. X_train (pd.DataFrame): Training features. y_train (pd.Series): Training target. param_grid (Optional[Dict[str, List[Any]]]): Parameter grid for hyperparameter tuning. If None, uses default. cv (int): Number of cross-validation folds. search_type (str): Type of search ('grid', 'random', or 'bayesian'). n_iter (int): Number of iterations for RandomizedSearchCV or Bayesian Optimization. scoring (str): Scoring metric for evaluation. Returns: None: The best model is stored in self.tuned_models. """ model = self.models[model_name] self.logger.info(f"Performing hyperparameter tuning for {model_name}...") if search_type == "grid": if param_grid is None: param_grid = self._default_param_grids().get(model_name, {}) if not param_grid: self.logger.warning( f"No parameter grid available for {model_name}. Skipping tuning." ) return search = GridSearchCV( model, param_grid, cv=cv, scoring=scoring, n_jobs=-1, verbose=1, ) search.fit(X_train, y_train) self.logger.info( f"Best parameters found for {model_name}: {search.best_params_}" ) # Store the tuned model for future use self.tuned_models[model_name] = search.best_estimator_ elif search_type == "random": if param_grid is None: param_grid = self._default_param_grids().get(model_name, {}) if not param_grid: self.logger.warning( f"No parameter grid available for {model_name}. Skipping tuning." ) return search = RandomizedSearchCV( model, param_grid, n_iter=n_iter, cv=cv, scoring=scoring, n_jobs=-1, verbose=1, random_state=self.random_state, ) search.fit(X_train, y_train) self.logger.info( f"Best parameters found for {model_name}: {search.best_params_}" ) # Store the tuned model for future use self.tuned_models[model_name] = search.best_estimator_ elif search_type == "bayesian": # Added Bayesian optimization self.logger.info(f"Using Bayesian Optimization for {model_name}...") # Retrieve the default Bayesian search spaces bayesian_search_spaces = self._default_bayesian_search_spaces() param_distributions = bayesian_search_spaces.get(model_name, {}) if not param_distributions: self.logger.warning( f"No Bayesian parameter distribution available for {model_name}. Skipping tuning." ) return # Determine the direction based on the scoring metric if 'neg_' in scoring or 'mse' in scoring or 'mae' in scoring: direction = "minimize" else: direction = "maximize" study = optuna.create_study(direction=direction) func = self._create_objective(model, param_distributions, X_train, y_train, cv, scoring) study.optimize(func, n_trials=n_iter, show_progress_bar=True) best_params = study.best_params self.logger.info(f"Best parameters found for {model_name}: {best_params}") model.set_params(**best_params) model.fit(X_train, y_train) self.tuned_models[model_name] = model else: raise ValueError("search_type must be either 'grid', 'random', or 'bayesian'.") def _create_objective( self, model: BaseEstimator, param_distributions: Dict[str, Any], X: pd.DataFrame, y: pd.Series, cv: int, scoring: str, ) -> Callable[[optuna.trial.Trial], float]: """ Creates an objective function for Optuna to optimize model hyperparameters. Args: model (BaseEstimator): The machine learning model to be optimized. param_distributions (Dict[str, Any]): The distributions of hyperparameters to sample from. X (pd.DataFrame): Training data for features. y (pd.Series): Target labels for training data. cv (int): The number of cross-validation folds. scoring (str): The scoring metric to evaluate model performance. Returns: Callable[[optuna.trial.Trial], float]: The objective function to be minimized or maximized by Optuna. """ def objective(trial: optuna.trial.Trial) -> Any: """ The actual objective function used by Optuna to evaluate a set of hyperparameters. Args: trial (optuna.trial.Trial): A single trial instance that suggests hyperparameters. Returns: float: The mean cross-validated score for the suggested hyperparameter set. """ params = {} for param, distribution in param_distributions.items(): if isinstance(distribution, optuna.distributions.CategoricalDistribution): params[param] = trial.suggest_categorical(param, distribution.choices) elif isinstance(distribution, optuna.distributions.FloatDistribution): if distribution.log: params[param] = trial.suggest_loguniform(param, distribution.low, distribution.high) else: params[param] = trial.suggest_uniform(param, distribution.low, distribution.high) elif isinstance(distribution, optuna.distributions.IntDistribution): params[param] = trial.suggest_int(param, distribution.low, distribution.high) else: # Handle other distribution types if necessary params[param] = trial.suggest_float(param, 0.0, 1.0) model.set_params(**params) # Define cross-validation strategy cv_strategy = KFold(n_splits=cv, shuffle=True, random_state=self.random_state) # Perform cross-validation score = cross_val_score( model, X, y, cv=cv_strategy, scoring=scoring, n_jobs=-1 ).mean() # If the scoring is a negative metric (like neg_mean_squared_error), return it as is for minimization if 'neg_' in scoring or 'mse' in scoring or 'mae' in scoring: return score # Optuna will minimize this else: return score # Optuna will maximize this return objective
[docs] def auto_select_best_model( self, X_train: pd.DataFrame, y_train: pd.Series, cv: int = 5, scoring: str = "neg_mean_squared_error" ) -> Tuple[str, float]: """ Automatically selects the best model based on cross-validated score. It checks if a hyperparameter-tuned version of the model is available and uses it if present. Args: X_train (pd.DataFrame): Training features. y_train (pd.Series): Training target. cv (int): Number of cross-validation folds (default: 5). scoring (str): Scoring metric for evaluation. Returns: Tuple[str, float]: The name of the best performing model and its score based on cross-validation. """ self.logger.info( "Automatically selecting the best model based on cross-validated score..." ) best_score = float('inf') if 'neg_' in scoring or 'mse' in scoring or 'mae' in scoring else float('-inf') best_model_name = "" for model_name in self.models: self.logger.info(f"Evaluating model: {model_name}") # Use the tuned model if available model = self.tuned_models.get(model_name, self.models[model_name]) # Perform cross-validation cv_strategy = KFold(n_splits=cv, shuffle=True, random_state=self.random_state) scores = cross_val_score( model, X_train, y_train, cv=cv_strategy, scoring=scoring, n_jobs=-1 ) mean_score = scores.mean() std_score = scores.std() self.logger.info( f"{model_name} - Mean Score: {mean_score:.4f}, Std: {std_score:.4f}" ) # Determine if the current score is better if 'neg_' in scoring or 'mse' in scoring or 'mae' in scoring: # Lower is better if mean_score < best_score: best_score = mean_score best_model_name = model_name else: # Higher is better if mean_score > best_score: best_score = mean_score best_model_name = model_name self.logger.info( f"Best model selected: {best_model_name} with cross-validated score: {best_score:.4f}" ) return best_model_name, best_score
[docs] def compare_models( self, X_train: pd.DataFrame, y_train: pd.Series, cv: int = 5, scoring: str = "neg_mean_squared_error" ) -> pd.DataFrame: """ Compares multiple models based on cross-validation scores. Args: X_train (pd.DataFrame): Training features. y_train (pd.Series): Training target. cv (int): Number of cross-validation folds. scoring (str): Scoring metric for evaluation. Returns: pd.DataFrame: DataFrame containing models and their scores. """ self.logger.info("Comparing models...") results = [] cv_strategy = KFold(n_splits=cv, shuffle=True, random_state=self.random_state) for model_name in self.models: self.logger.info(f"Evaluating model: {model_name}") model = self.tuned_models.get(model_name, self.models[model_name]) scores = cross_val_score( model, X_train, y_train, cv=cv_strategy, scoring=scoring, n_jobs=-1 ) results.append( { "Model": model_name, "Mean Score": scores.mean(), "Std Score": scores.std(), } ) results_df = pd.DataFrame(results) # For regression, lower scores might be better depending on the metric if 'neg_' in scoring or 'mse' in scoring or 'mae' in scoring: results_df.sort_values(by="Mean Score", ascending=True, inplace=True) else: results_df.sort_values(by="Mean Score", ascending=False, inplace=True) self.logger.info("Model comparison results:\n" + results_df.to_string(index=False)) return results_df
[docs] def plot_residuals( self, model: BaseEstimator, X_test: pd.DataFrame, y_test: pd.Series ) -> None: """ Plots residuals of the regression model. Args: model (BaseEstimator): The trained model. X_test (pd.DataFrame): Testing features. y_test (pd.Series): Testing target. """ predictions = model.predict(X_test) residuals = y_test - predictions plt.figure(figsize=(10, 6)) sns.residplot(x=predictions, y=residuals, lowess=True, color="g") plt.title("Residual Plot") plt.xlabel("Predicted Values") plt.ylabel("Residuals") plt.show()
[docs] def plot_residual_distribution( self, model: BaseEstimator, X_test: pd.DataFrame, y_test: pd.Series ) -> None: """ Plots the distribution of residuals (prediction errors). Args: model (BaseEstimator): The trained model. X_test (pd.DataFrame): Testing features. y_test (pd.Series): Testing target. """ predictions = model.predict(X_test) residuals = y_test - predictions plt.figure(figsize=(10, 6)) sns.histplot(residuals, kde=True, color="blue") plt.title("Residual Distribution") plt.xlabel("Residuals") plt.ylabel("Frequency") plt.show()
[docs] def plot_feature_importance( self, model: BaseEstimator, feature_names: List[str] ) -> None: """ Plots feature importance for models that support it. Args: model (BaseEstimator): The trained model. feature_names (List[str]): List of feature names. """ if hasattr(model, "feature_importances_"): importances = model.feature_importances_ indices = np.argsort(importances)[::-1] plt.figure(figsize=(12, 6)) plt.title("Feature Importance") plt.bar( range(len(importances)), importances[indices], align="center", color="skyblue", ) plt.xticks( range(len(importances)), [feature_names[i] for i in indices], rotation=90, ) plt.tight_layout() plt.show() elif hasattr(model, "coef_"): if isinstance(model.coef_, np.ndarray): importances = np.abs(model.coef_) if importances.ndim > 1: importances = importances.ravel() indices = np.argsort(importances)[::-1] plt.figure(figsize=(12, 6)) plt.title("Feature Importance") plt.bar( range(len(importances)), importances[indices], align="center", color="skyblue", ) plt.xticks( range(len(importances)), [feature_names[i] for i in indices], rotation=90, ) plt.tight_layout() plt.show() else: self.logger.warning(f"Model {model.__class__.__name__} has non-array coefficients.") else: self.logger.warning(f"Model {model.__class__.__name__} does not support feature importances.")
[docs] def plot_learning_curve( self, model: BaseEstimator, X_train: pd.DataFrame, y_train: pd.Series, cv: int = 5, scoring: str = "neg_mean_squared_error", ) -> None: """ Plots the learning curve of the model. Args: model (BaseEstimator): The model to plot learning curve for. X_train (pd.DataFrame): Feature matrix. y_train (pd.Series): Target variable. cv (int): Number of cross-validation folds. scoring (str): Scoring metric. """ self.logger.info("Plotting learning curve...") train_sizes, train_scores, test_scores = learning_curve( model, X_train, y_train, cv=cv, scoring=scoring, n_jobs=-1, train_sizes=np.linspace(0.1, 1.0, 5), shuffle=True, random_state=self.random_state, ) # For negative scoring metrics, convert to positive if 'neg_' in scoring: train_scores_mean = -np.mean(train_scores, axis=1) test_scores_mean = -np.mean(test_scores, axis=1) ylabel = "Error" else: train_scores_mean = np.mean(train_scores, axis=1) test_scores_mean = np.mean(test_scores, axis=1) ylabel = scoring.capitalize() plt.figure(figsize=(10, 6)) if 'neg_' in scoring: plt.plot( train_sizes, train_scores_mean, "o-", color="r", label="Training error", ) plt.plot( train_sizes, test_scores_mean, "o-", color="g", label="Cross-validation error", ) else: plt.plot( train_sizes, train_scores_mean, "o-", color="r", label="Training score", ) plt.plot( train_sizes, test_scores_mean, "o-", color="g", label="Cross-validation score", ) plt.title("Learning Curve") plt.xlabel("Training Examples") plt.ylabel(ylabel) plt.legend(loc="best") plt.grid() plt.show()
[docs] def cross_validate_model( self, model: BaseEstimator, X: pd.DataFrame, y: pd.Series, cv: int = 5, scoring: Optional[List[str]] = None ) -> Dict[str, Any]: """ Cross-validates the model using the specified number of folds and returns detailed metrics. Args: model (BaseEstimator): The model to cross-validate. X (pd.DataFrame): Feature matrix. y (pd.Series): Target variable. cv (int): Number of cross-validation folds. scoring (Optional[List[str]]): List of scoring metrics to evaluate (default: None uses common regression metrics). Returns: Dict[str, Any]: Cross-validation metrics including R2, MAE, MSE, RMSE, etc. """ self.logger.info("Cross-validating the provided model...") # Define cross-validation strategy cv_strategy = KFold(n_splits=cv, shuffle=True, random_state=self.random_state) # Default scoring metrics if none are provided if scoring is None: scoring = ['neg_mean_absolute_error', 'neg_mean_squared_error', 'neg_root_mean_squared_error', 'r2'] # Perform cross-validation for all required metrics scores = {} for metric in scoring: self.logger.info(f"Calculating {metric}...") try: score = cross_val_score( model, X, y, cv=cv_strategy, scoring=metric, n_jobs=-1 ) # For negative metrics, convert back to positive values if 'neg_' in metric: scores[metric] = { "mean": -float(np.mean(score)), "std": float(np.std(score)), } else: scores[metric] = { "mean": float(np.mean(score)), "std": float(np.std(score)), } except ValueError as e: self.logger.warning(f"Skipping metric {metric} due to error: {e}") continue return scores
[docs] def save_model(self, model: BaseEstimator, filename: str) -> None: """ Saves the trained model to disk. Args: model (BaseEstimator): The trained model. filename (str): The path and filename to save the model. """ joblib.dump(model, filename) self.logger.info(f"Model saved to {filename}")
[docs] def load_model(self, filename: str) -> BaseEstimator: """ Loads a trained model from disk. Args: filename (str): The path and filename to load the model from. Returns: BaseEstimator: The loaded model. """ model = joblib.load(filename) self.logger.info(f"Model loaded from {filename}") return model
[docs] def model_merging( self, base_models: List[str], X_train: pd.DataFrame, y_train: pd.Series, method: str = "stacking", # New argument to select the ensemble method final_estimator: Optional[BaseEstimator] = None, passthrough: bool = False, cv: int = 5, n_estimators: int = 10 # For bagging and boosting ) -> BaseEstimator: """ Creates an ensemble model by merging multiple base models using different ensemble techniques. Supports stacking, bagging, boosting, and voting. Args: base_models (List[str]): List of model names to be used as base models. X_train (pd.DataFrame): Training features. y_train (pd.Series): Training target. method (str): The ensemble method to use ('stacking', 'bagging', 'boosting', or 'voting'). final_estimator (Optional[BaseEstimator]): The final estimator to combine base models for stacking. Defaults to Ridge. passthrough (bool): If True, pass the original features to the final estimator (only for stacking). cv (int): Number of cross-validation folds for stacking. n_estimators (int): Number of estimators for bagging or boosting. Returns: BaseEstimator: The ensemble model. """ self.logger.info(f"Creating {method.capitalize()} Regressor for model merging...") # Collect base models estimators = [] for model_name in base_models: if model_name in self.tuned_models: model = self.tuned_models[model_name] self.logger.info(f"Using tuned model: {model_name} for {method}.") else: model = self.models.get(model_name) if model is None: self.logger.warning(f"Model {model_name} not found. Skipping.") continue self.logger.info(f"Using default model: {model_name} for {method}.") estimators.append((model_name, model)) if not estimators: self.logger.error(f"No valid base models provided for {method}.") raise ValueError(f"No valid base models provided for {method}.") # Define the final estimator for stacking if final_estimator is None and method == "stacking": final_estimator = Ridge(random_state=self.random_state) # Ensemble Methods if method == "stacking": # Stacking Regressor ensemble_model = StackingRegressor( estimators=estimators, final_estimator=final_estimator, passthrough=passthrough, cv=cv, n_jobs=-1, ) elif method == "bagging": # Bagging Regressor base_estimator = estimators[0][1] if len(estimators) == 1 else RandomForestRegressor(random_state=self.random_state) ensemble_model = BaggingRegressor( base_estimator=base_estimator, n_estimators=n_estimators, random_state=self.random_state, n_jobs=-1, ) elif method == "boosting": # Boosting Regressor (Gradient Boosting or AdaBoost) if "Gradient Boosting" in base_models: ensemble_model = GradientBoostingRegressor( n_estimators=n_estimators, random_state=self.random_state, ) else: # Default to AdaBoost if Gradient Boosting is not in base models ensemble_model = AdaBoostRegressor( base_estimator=estimators[0][1] if len(estimators) == 1 else DecisionTreeRegressor(random_state=self.random_state), n_estimators=n_estimators, random_state=self.random_state, ) elif method == "voting": # Voting Regressor ensemble_model = VotingRegressor(estimators=estimators, n_jobs=-1) else: self.logger.error(f"Unknown ensemble method: {method}") raise ValueError(f"Unknown ensemble method: {method}") # Fit the ensemble model self.logger.info(f"Training {method.capitalize()} Regressor...") ensemble_model.fit(X_train, y_train) # Store the ensemble model in tuned_models for future use self.tuned_models[f"{method.capitalize()} Regressor"] = ensemble_model return ensemble_model