Source code for temporal_features

import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from typing import Optional, List, Union, Dict


[docs] class DatetimeConverter(BaseEstimator, TransformerMixin): """ Converts specified columns to datetime format. """ def __init__(self, columns: Union[str, List[str]], format: Optional[str] = None, errors: str = 'raise'): """ Initializes the DatetimeConverter. Args: columns (str or List[str]): Column name or list of column names to convert to datetime. format (str, optional): Datetime format to use for parsing. Default is None. errors (str): How to handle errors. 'raise' will raise an exception, 'coerce' will set invalid parsing to NaT, 'ignore' will return the original input. Default is 'raise'. """ self.columns = [columns] if isinstance(columns, str) else columns self.format = format self.errors = errors
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> 'DatetimeConverter': """ Fit method does nothing as no fitting is required. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable (ignored). Returns: DatetimeConverter: Returns self. """ return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Converts specified columns to datetime. Args: X (pd.DataFrame): Input DataFrame. Returns: pd.DataFrame: DataFrame with specified columns converted to datetime. """ X_transformed = X.copy() for col in self.columns: if col in X_transformed.columns: X_transformed[col] = pd.to_datetime(X_transformed[col], format=self.format, errors=self.errors) else: raise ValueError(f"Column '{col}' not found in input DataFrame.") return X_transformed
[docs] class DatePartExtractor(BaseEstimator, TransformerMixin): """ Extracts date parts from datetime columns. """ def __init__(self, column: str, parts: Optional[List[str]] = None, prefix: Optional[str] = None): """ Initializes the DatePartExtractor. Args: column (str): Name of the datetime column. parts (List[str], optional): List of date parts to extract. Default is all parts. Supported parts: 'year', 'month', 'day', 'hour', 'minute', 'second', 'dayofweek', 'is_weekend', 'quarter', 'dayofyear', 'weekofyear'. prefix (str, optional): Prefix to add to the extracted feature names. """ self.column = column self.parts = parts or ['year', 'month', 'day', 'hour', 'minute', 'second', 'dayofweek', 'is_weekend'] self.prefix = prefix or '' self.supported_parts = { 'year': 'year', 'month': 'month', 'day': 'day', 'hour': 'hour', 'minute': 'minute', 'second': 'second', 'dayofweek': 'dayofweek', 'weekday_name': 'weekday_name', # Not in pandas >= 1.0.0 'is_weekend': 'is_weekend', 'quarter': 'quarter', 'dayofyear': 'dayofyear', 'weekofyear': 'weekofyear' }
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> 'DatePartExtractor': """ Fit method does nothing as no fitting is required. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable (ignored). Returns: DatePartExtractor: Returns self. """ # Check if parts are valid invalid_parts = set(self.parts) - set(self.supported_parts.keys()) if invalid_parts: raise ValueError(f"Unsupported date parts: {invalid_parts}") return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Extracts specified date parts from the datetime column. Args: X (pd.DataFrame): Input DataFrame. Returns: pd.DataFrame: DataFrame with extracted date parts. """ X_transformed = X.copy() if self.column not in X_transformed.columns: raise ValueError(f"Column '{self.column}' not found in input DataFrame.") if not pd.api.types.is_datetime64_any_dtype(X_transformed[self.column]): raise TypeError(f"Column '{self.column}' is not of datetime dtype.") dt_series = X_transformed[self.column] for part in self.parts: feature_name = f"{self.prefix}{part}" if part == 'year': X_transformed[feature_name] = dt_series.dt.year elif part == 'month': X_transformed[feature_name] = dt_series.dt.month elif part == 'day': X_transformed[feature_name] = dt_series.dt.day elif part == 'hour': X_transformed[feature_name] = dt_series.dt.hour elif part == 'minute': X_transformed[feature_name] = dt_series.dt.minute elif part == 'second': X_transformed[feature_name] = dt_series.dt.second elif part == 'dayofweek': X_transformed[feature_name] = dt_series.dt.dayofweek elif part == 'weekday_name': X_transformed[feature_name] = dt_series.dt.day_name() elif part == 'is_weekend': X_transformed[feature_name] = dt_series.dt.dayofweek >= 5 elif part == 'quarter': X_transformed[feature_name] = dt_series.dt.quarter elif part == 'dayofyear': X_transformed[feature_name] = dt_series.dt.dayofyear elif part == 'weekofyear': X_transformed[feature_name] = dt_series.dt.isocalendar().week else: raise ValueError(f"Unsupported date part: {part}") return X_transformed
[docs] class TimeDifferenceTransformer(BaseEstimator, TransformerMixin): """ Creates time difference between consecutive rows in a datetime column. """ def __init__(self, column: str, new_column_name: Optional[str] = None, periods: int = 1): """ Initializes the TimeDifferenceTransformer. Args: column (str): Name of the datetime column. new_column_name (str, optional): Name of the new column to store time differences. Default is 'time_diff'. periods (int): Number of periods to calculate difference over. Default is 1. """ self.column = column self.new_column_name = new_column_name or 'time_diff' self.periods = periods
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> 'TimeDifferenceTransformer': """ Fit method does nothing as no fitting is required. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable (ignored). Returns: TimeDifferenceTransformer: Returns self. """ return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Calculates time differences. Args: X (pd.DataFrame): Input DataFrame. Returns: pd.DataFrame: DataFrame with new time difference column. """ X_transformed = X.copy() if self.column not in X_transformed.columns: raise ValueError(f"Column '{self.column}' not found in input DataFrame.") if not pd.api.types.is_datetime64_any_dtype(X_transformed[self.column]): raise TypeError(f"Column '{self.column}' is not of datetime dtype.") X_transformed[self.new_column_name] = X_transformed[self.column].diff(periods=self.periods) return X_transformed
[docs] class LagFeatureCreator(BaseEstimator, TransformerMixin): """ Creates lag features for specified columns. """ def __init__(self, columns: Union[str, List[str]], lags: List[int]): """ Initializes the LagFeatureCreator. Args: columns (str or List[str]): Column name(s) for which to create lag features. lags (List[int]): List of lag periods. """ self.columns = [columns] if isinstance(columns, str) else columns self.lags = lags
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> 'LagFeatureCreator': """ Fit method does nothing as no fitting is required. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable (ignored). Returns: LagFeatureCreator: Returns self. """ return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Creates lag features. Args: X (pd.DataFrame): Input DataFrame. Returns: pd.DataFrame: DataFrame with new lag feature columns. """ X_transformed = X.copy() for col in self.columns: if col not in X_transformed.columns: raise ValueError(f"Column '{col}' not found in input DataFrame.") for lag in self.lags: X_transformed[f'{col}_lag_{lag}'] = X_transformed[col].shift(lag) return X_transformed
[docs] class RollingFeatureCreator(BaseEstimator, TransformerMixin): """ Creates rolling statistics for specified columns. """ def __init__(self, columns: Union[str, List[str]], window_size: int, statistics: List[str] = ['mean']): """ Initializes the RollingFeatureCreator. Args: columns (str or List[str]): Column name(s) for which to calculate rolling statistics. window_size (int): Size of the rolling window. statistics (List[str]): List of rolling statistics to calculate ('mean', 'sum', 'std', 'min', 'max'). """ self.columns = [columns] if isinstance(columns, str) else columns self.window_size = window_size self.statistics = statistics self.supported_statistics = ['mean', 'sum', 'std', 'min', 'max']
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> 'RollingFeatureCreator': """ Fit method does nothing as no fitting is required. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable (ignored). Returns: RollingFeatureCreator: Returns self. """ # Check if statistics are valid invalid_stats = set(self.statistics) - set(self.supported_statistics) if invalid_stats: raise ValueError(f"Unsupported statistics: {invalid_stats}") return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Creates rolling features. Args: X (pd.DataFrame): Input DataFrame. Returns: pd.DataFrame: DataFrame with new rolling feature columns. """ X_transformed = X.copy() for col in self.columns: if col not in X_transformed.columns: raise ValueError(f"Column '{col}' not found in input DataFrame.") for stat in self.statistics: feature_name = f'{col}_rolling_{stat}_{self.window_size}' if stat == 'mean': X_transformed[feature_name] = X_transformed[col].rolling(window=self.window_size).mean() elif stat == 'sum': X_transformed[feature_name] = X_transformed[col].rolling(window=self.window_size).sum() elif stat == 'std': X_transformed[feature_name] = X_transformed[col].rolling(window=self.window_size).std() elif stat == 'min': X_transformed[feature_name] = X_transformed[col].rolling(window=self.window_size).min() elif stat == 'max': X_transformed[feature_name] = X_transformed[col].rolling(window=self.window_size).max() else: raise ValueError(f"Unsupported statistic: {stat}") return X_transformed
[docs] class CyclicalFeaturesEncoder(BaseEstimator, TransformerMixin): """ Encodes cyclical features using sine and cosine transformations. """ def __init__(self, columns: Union[str, List[str]], max_values: Union[int, List[int]]): """ Initializes the CyclicalFeaturesEncoder. Args: columns (str or List[str]): Column name(s) to encode. max_values (int or List[int]): Maximum value(s) of the cyclical features. If columns is a list, max_values should be a list of the same length. """ self.columns = [columns] if isinstance(columns, str) else columns self.max_values = [max_values] if isinstance(max_values, int) else max_values if len(self.columns) != len(self.max_values): raise ValueError("The length of 'columns' and 'max_values' must be equal.")
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> 'CyclicalFeaturesEncoder': """ Fit method does nothing as no fitting is required. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable (ignored). Returns: CyclicalFeaturesEncoder: Returns self. """ return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Encodes cyclical features. Args: X (pd.DataFrame): Input DataFrame. Returns: pd.DataFrame: DataFrame with new sine and cosine encoded columns. """ X_transformed = X.copy() for col, max_val in zip(self.columns, self.max_values): if col not in X_transformed.columns: raise ValueError(f"Column '{col}' not found in input DataFrame.") X_transformed[f'{col}_sin'] = np.sin(2 * np.pi * X_transformed[col] / max_val) X_transformed[f'{col}_cos'] = np.cos(2 * np.pi * X_transformed[col] / max_val) return X_transformed
[docs] class DataResampler(BaseEstimator, TransformerMixin): """ Resamples the DataFrame based on a given frequency and aggregation method. """ def __init__(self, datetime_column: str, rule: str, aggregation_methods: Union[str, Dict[str, str]] = 'sum'): """ Initializes the DataResampler. Args: datetime_column (str): Name of the datetime column. rule (str): Resampling frequency (e.g., 'W' for weekly, 'M' for monthly). aggregation_methods (str or Dict[str, str]): Aggregation method(s) to apply during resampling. If a string is provided, the same method is applied to all columns. If a dict is provided, it should map column names to aggregation methods. Supported methods include 'sum', 'mean', 'min', 'max', etc. """ self.datetime_column = datetime_column self.rule = rule self.aggregation_methods = aggregation_methods
[docs] def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> 'DataResampler': """ Fit method does nothing as no fitting is required. Args: X (pd.DataFrame): Input DataFrame. y (pd.Series, optional): Target variable (ignored). Returns: DataResampler: Returns self. """ return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """ Resamples the DataFrame. Args: X (pd.DataFrame): Input DataFrame. Returns: pd.DataFrame: Resampled DataFrame with aggregated values. """ X_transformed = X.copy() if self.datetime_column not in X_transformed.columns: raise ValueError(f"Column '{self.datetime_column}' not found in input DataFrame.") if not pd.api.types.is_datetime64_any_dtype(X_transformed[self.datetime_column]): raise TypeError(f"Column '{self.datetime_column}' is not of datetime dtype.") X_transformed.set_index(self.datetime_column, inplace=True) resampled_df = X_transformed.resample(self.rule).agg(self.aggregation_methods) resampled_df.reset_index(inplace=True) return resampled_df