Source code for encode_category

import pandas as pd
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
from sklearn.base import BaseEstimator, TransformerMixin
import category_encoders as ce
from typing import List, Optional, Dict, Any, Union


[docs] class CategoricalEncoder(BaseEstimator, TransformerMixin): """ Class `CategoricalEncoder` provides methods for encoding categorical variables, including label encoding, one-hot encoding, ordinal encoding, binary encoding, target encoding, frequency encoding, and more. The class is designed to be compatible with scikit-learn pipelines. """ def __init__( self, encoding_methods: Optional[Union[str, Dict[str, str]]] = 'one_hot', columns: Optional[List[str]] = None, categories: Optional[Dict[str, List[str]]] = None, target: Optional[str] = None, drop_first: bool = False, **kwargs: Any ): """ Initializes the CategoricalEncoder. Args: encoding_methods (Union[str, Dict[str, str]], optional): The encoding method(s) to use. Can be a string specifying the method to use for all columns, or a dictionary mapping column names to encoding methods. Supported methods include: - 'label': Label Encoding - 'one_hot': One-Hot Encoding - 'ordinal': Ordinal Encoding - 'binary': Binary Encoding - 'target': Target Encoding - 'frequency': Frequency Encoding columns (List[str], optional): List of columns to encode. If None, all object-type columns will be encoded. categories (Dict[str, List[str]], optional): Dictionary mapping column names to list of categories for ordinal encoding. target (str, optional): Target column name, required for target encoding. drop_first (bool, optional): Whether to drop the first category in one-hot encoding to avoid multicollinearity. **kwargs: Additional keyword arguments to pass to the underlying encoders. """ self.encoding_methods = encoding_methods self.columns = columns self.categories = categories self.target = target self.drop_first = drop_first self.kwargs = kwargs self.encoders: Dict[str, Any] = {}
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> 'CategoricalEncoder': """ Fits the encoders to the data. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable. Required for target encoding. Returns: CategoricalEncoder: Fitted encoder. """ # Determine columns to encode if self.columns is None: self.columns = X.select_dtypes(include=['object', 'category']).columns.tolist() # Handle encoding methods if isinstance(self.encoding_methods, str): encoding_methods = {col: self.encoding_methods for col in self.columns} elif isinstance(self.encoding_methods, dict): encoding_methods = self.encoding_methods else: raise ValueError("encoding_methods should be a string or a dictionary mapping columns to methods.") for column in self.columns: method = encoding_methods.get(column, 'one_hot') if method == 'label': encoder = LabelEncoder() encoder.fit(X[column].astype(str)) self.encoders[column] = ('label', encoder) elif method == 'one_hot': encoder = OneHotEncoder( sparse_output=False, drop='first' if self.drop_first else None, handle_unknown='ignore' ) encoder.fit(X[[column]]) self.encoders[column] = ('one_hot', encoder) elif method == 'ordinal': categories = self.categories.get(column) if self.categories else 'auto' encoder = OrdinalEncoder(categories=[categories] if categories != 'auto' else 'auto', handle_unknown='use_encoded_value', unknown_value=-1) encoder.fit(X[[column]]) self.encoders[column] = ('ordinal', encoder) elif method == 'binary': encoder = ce.BinaryEncoder(cols=[column], **self.kwargs) encoder.fit(X) self.encoders[column] = ('binary', encoder) elif method == 'target': if y is None: raise ValueError("y cannot be None for target encoding.") encoder = ce.TargetEncoder(cols=[column], **self.kwargs) encoder.fit(X, y) self.encoders[column] = ('target', encoder) elif method == 'frequency': freq = X[column].value_counts(normalize=True) self.encoders[column] = ('frequency', freq) else: raise ValueError(f"Unsupported encoding method: {method}") return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Transforms the data using the fitted encoders. Args: X (pd.DataFrame): Input DataFrame. Returns: pd.DataFrame: Transformed DataFrame. """ X_transformed = X.copy() for column, (method, encoder) in self.encoders.items(): if method == 'label': X_transformed[column] = encoder.transform(X_transformed[column].astype(str)) elif method == 'one_hot': encoded = encoder.transform(X_transformed[[column]]) encoded_df = pd.DataFrame(encoded, columns=encoder.get_feature_names_out([column]), index=X_transformed.index) X_transformed = pd.concat([X_transformed, encoded_df], axis=1) X_transformed.drop(columns=[column], inplace=True) elif method == 'ordinal': X_transformed[column] = encoder.transform(X_transformed[[column]]) elif method == 'binary': encoded_df = encoder.transform(X_transformed) X_transformed = pd.concat([X_transformed.drop(columns=[column]), encoded_df], axis=1) elif method == 'target': encoded = encoder.transform(X_transformed) X_transformed[column] = encoded[column] elif method == 'frequency': X_transformed[column] = X_transformed[column].map(encoder).fillna(0) else: raise ValueError(f"Unsupported encoding method: {method}") return X_transformed
[docs] def fit_transform(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> pd.DataFrame: """ Fits the encoders and transforms the data. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable. Required for target encoding. Returns: pd.DataFrame: Transformed DataFrame. """ return self.fit(X, y).transform(X)
[docs] def inverse_transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Inverse transforms the data back to original categories. Args: X (pd.DataFrame): Encoded DataFrame. Returns: pd.DataFrame: DataFrame with original categories. """ X_inv = X.copy() for column, (method, encoder) in self.encoders.items(): if method == 'label': X_inv[column] = encoder.inverse_transform(X_inv[column].astype(int)) elif method == 'one_hot': # Need to reconstruct original column from one-hot encoded columns feature_names = encoder.get_feature_names_out([column]) encoded_cols = [col for col in feature_names if col in X_inv.columns] if not encoded_cols: continue # No encoded columns present one_hot_values = X_inv[encoded_cols].values categories = encoder.categories_[0] if self.drop_first: categories = categories[1:] # If drop_first, the first category is missing indices = one_hot_values.argmax(axis=1) X_inv[column] = [categories[idx] if one_hot_values[i].sum() > 0 else None for i, idx in enumerate(indices)] X_inv.drop(columns=encoded_cols, inplace=True) elif method == 'ordinal': categories = encoder.categories_[0] X_inv[column] = X_inv[column].apply(lambda x: categories[int(x)] if 0 <= x < len(categories) else None) elif method == 'binary': # Binary encoding cannot be inversely transformed pass elif method == 'target': # Target encoding cannot be inversely transformed pass elif method == 'frequency': # Frequency encoding cannot be inversely transformed pass else: raise ValueError(f"Unsupported encoding method: {method}") return X_inv
[docs] def get_feature_names_out(self, input_features: Optional[List[str]] = None) -> List[str]: """ Get output feature names for transformation. Args: input_features (List[str], optional): List of input feature names. If None, uses self.columns. Returns: List[str]: List of output feature names. """ output_features = [] for column, (method, encoder) in self.encoders.items(): if method == 'one_hot': feature_names = encoder.get_feature_names_out([column]) output_features.extend(feature_names) elif method == 'binary': binary_feature_names = encoder.get_feature_names() output_features.extend(binary_feature_names) else: output_features.append(column) return output_features
[docs] def get_params(self, deep: bool = True) -> Dict[str, Any]: """ Get parameters for this estimator. Args: deep (bool): If True, will return the parameters for this estimator and contained subobjects that are estimators. Returns: Dict[str, Any]: Parameter names mapped to their values. """ params = { 'encoding_methods': self.encoding_methods, 'columns': self.columns, 'categories': self.categories, 'target': self.target, 'drop_first': self.drop_first, **self.kwargs } if deep: for encoder in self.encoders.values(): if hasattr(encoder, 'get_params'): params.update(encoder.get_params(deep=deep)) return params
[docs] def set_params(self, **params: Any) -> 'CategoricalEncoder': """ Set the parameters of this estimator. Args: **params: Estimator parameters. Returns: CategoricalEncoder: Returns self. """ for key, value in params.items(): setattr(self, key, value) return self