Reg. Linear Regression

Train a Linear Regression model.

Reg. Linear Regression

Processing

This brick trains a Linear Regression model, a fundamental statistical method used to predict a continuous numerical value (like sales revenue, temperature, or price) based on a set of input features.

It works by finding the "line of best fit" through your data points that minimizes the error between predicted and actual values. This brick handles the entire modeling pipeline, including:

  1. Data Validation: Ensures inputs are numerical.
  2. Splitting: Automatically separates data into training and testing sets to evaluate performance fairly.
  3. Scaling: Optionally standardizes your data (highly recommended for Linear Regression) so features with large numbers don't dominate the model.
  4. Evaluation: Calculates key performance metrics like RMSE and R² Score.

Inputs

X
The input features (independent variables) used to make predictions. This must be a DataFrame where every column is numerical (integers, floats) or boolean. Text columns must be encoded into numbers before using this brick.
y
The target variable (dependent variable) you want to predict. This contains the actual historical values corresponding to the features in X.

Inputs Types

Input Types
X DataFrame
y DataSeries, NDArray, List

You can check the list of supported types here: Available Type Hints.

Outputs

Model
The trained Linear Regression model object (Scikit-Learn). This object can be saved or passed to other bricks to make predictions on new, unseen data.
SHAP
A SHAP explainer object used to interpret the model. It helps understand which features contributed most to a specific prediction. (Returns None if the SHAP option is disabled).
Scaler
The fitted Standard Scaler used to normalize the data. This is needed to scale future data exactly the same way before making predictions.
Metrics
A summary of the model's performance on the test set. Depending on the options selected, this is returned as either a DataFrame or a Dictionary containing scores like R² (accuracy) and RMSE (error magnitude).
CV Metrics
The results of Cross-Validation (if enabled). This provides the mean and standard deviation of error metrics across multiple folds, offering a more robust measure of model stability.
Prediction Set
A DataFrame combining the test data features, the actual target values (y_true), and the model's predictions (y_pred). This allows you to inspect specific errors row-by-row.

The Prediction Set output contains the following specific data fields:

  • {original_feature_columns}: All columns from your input X data.
  • y_true: The actual value from your input y (the ground truth).
  • y_pred: The value predicted by the model.

Outputs Types

Output Types
Model Any
SHAP Any
Scaler Any
Metrics DataFrame, Dict
CV Metrics DataFrame
Prediction Set DataFrame

You can check the list of supported types here: Available Type Hints.

Options

The Reg. Linear Regression brick contains some changeable options:

Include Intercept
Decides whether to calculate the y-intercept (the value of the target when all features are 0). If disabled, the model forces the line to pass through the origin (0,0). Usually kept True.
Standard Scaling
When enabled, the brick automatically scales all features to have a mean of 0 and a variance of 1.
  • True (Default): Highly recommended. Essential if features have different units (e.g., "Age" vs "Salary").
  • False: Use only if your data is already scaled.
Auto Split Data
Automatically determines the size of the test set based on the total number of rows. Larger datasets get smaller percentage splits for testing.
Shuffle Split
Randomly shuffles the data before splitting it into train/test sets. This ensures the model isn't biased by the order of the data.
Test/Validation Set %
(Only active if "Auto Split Data" is disabled) Manually defines the percentage of data to hold back for testing.
Retrain On Full Data
If enabled, after evaluating the model on the split test data, the brick re-trains the model on 100% of the provided data. Use this when you are ready to deploy the model to production.
Enable Cross-Validation
Performs K-Fold Cross-Validation to test model stability. It splits the data into multiple parts (folds) and trains/tests the model multiple times.
Number of CV Folds
The number of times to split and retrain during cross-validation (e.g., 5 folds means training 5 times).
Metrics as
Defines the format of the Metrics output.
SHAP Explainer
If enabled, calculates and returns the SHAP explainer object.
SHAP Sampler
Uses a sampling strategy for SHAP background data to speed up explanation generation on large datasets.
Number of Jobs
Controls parallel processing power.
Random State
A seed number to ensure results are reproducible. Using the same number ensures the data splits the same way every time.
Brick Caching
If enabled, saves the results of this run. If you run the workflow again with the exact same inputs and settings, it loads the saved results instead of calculating everything again.
Verbose Logging
If enabled, prints detailed progress updates, metric calculations, and optimization steps to the execution logs. Useful for debugging or monitoring long-running tasks.
import logging
import warnings
import shap
import json
import xxhash
import hashlib
import tempfile
import sklearn
import scipy
import joblib
import numpy as np
import pandas as pd
import polars as pl
from pathlib import Path
from scipy import sparse
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split, cross_validate, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    mean_absolute_error,
    mean_squared_error,
    r2_score,
    root_mean_squared_error,
    mean_absolute_percentage_error,
    make_scorer,
)
from dataclasses import dataclass
from datetime import datetime
from coded_flows.types import (
    Union,
    Dict,
    List,
    Tuple,
    NDArray,
    DataFrame,
    DataSeries,
    Any,
    Tuple,
)
from coded_flows.utils import CodedFlowsLogger

logger = CodedFlowsLogger(name="Reg. Linear Regression", level=logging.INFO)
METRICS_DICT = {
    "Forecast Accuracy": "fa",
    "Weighted Absolute Percentage Error (WAPE)": "wape",
    "Mean Absolute Error (MAE)": "mae",
    "Mean Squared Error (MSE)": "mse",
    "Root Mean Squared Error (RMSE)": "rmse",
    "R2 Score": "r2",
    "Mean Absolute Percentage Error (MAPE)": "mape",
}
METRICS_OPT = {
    "fa": "maximize",
    "wape": "minimize",
    "mae": "minimize",
    "mse": "minimize",
    "rmse": "minimize",
    "r2": "maximize",
    "mape": "minimize",
}
DataType = Union[
    pd.DataFrame, pl.DataFrame, np.ndarray, sparse.spmatrix, pd.Series, pl.Series
]


@dataclass
class _DatasetFingerprint:
    """Lightweight fingerprint of a dataset."""

    hash: str
    shape: tuple
    computed_at: str
    data_type: str
    method: str


class _UniversalDatasetHasher:
    """
    High-performance dataset hasher optimizing for zero-copy operations
    and native backend execution (C/Rust).
    """

    def __init__(
        self,
        data_size: int,
        method: str = "auto",
        sample_size: int = 100000,
        verbose: bool = False,
    ):
        self.method = method
        self.sample_size = sample_size
        self.data_size = data_size
        self.verbose = verbose

    def hash_data(self, data: DataType) -> _DatasetFingerprint:
        """
        Main entry point: hash any supported data format.
        Auto-detects format and applies optimal strategy.
        """
        if isinstance(data, pd.DataFrame):
            return self._hash_pandas(data)
        elif isinstance(data, pl.DataFrame):
            return self._hash_polars(data)
        elif isinstance(data, pd.Series):
            return self._hash_pandas_series(data)
        elif isinstance(data, pl.Series):
            return self._hash_polars_series(data)
        elif isinstance(data, np.ndarray):
            return self._hash_numpy(data)
        elif sparse.issparse(data):
            return self._hash_sparse(data)
        else:
            raise TypeError(f"Unsupported data type: {type(data)}")

    def _hash_pandas(self, df: pd.DataFrame) -> _DatasetFingerprint:
        """
        Optimized Pandas hashing using pd.util.hash_pandas_object.
        Avoids object-to-string conversion overhead.
        """
        method = self._determine_method(self.data_size, self.method)
        self.verbose and logger.info(
            f"Hashing Pandas: {self.data_size:,} rows - {method}"
        )
        target_df = df
        if method == "sampled":
            target_df = self._get_pandas_sample(df)
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "columns": df.columns.tolist(),
                "dtypes": {k: str(v) for (k, v) in df.dtypes.items()},
                "shape": df.shape,
            },
        )
        try:
            row_hashes = pd.util.hash_pandas_object(target_df, index=False)
            hasher.update(memoryview(row_hashes.values))
        except Exception as e:
            self.verbose and logger.warning(
                f"Fast hash failed, falling back to slow hash: {e}"
            )
            self._hash_pandas_fallback(hasher, target_df)
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=df.shape,
            computed_at=datetime.now().isoformat(),
            data_type="pandas",
            method=method,
        )

    def _get_pandas_sample(self, df: pd.DataFrame) -> pd.DataFrame:
        """Deterministic slicing for sampling (Zero randomness)."""
        if self.data_size <= self.sample_size:
            return df
        chunk = self.sample_size // 3
        head = df.iloc[:chunk]
        mid_idx = self.data_size // 2
        mid = df.iloc[mid_idx : mid_idx + chunk]
        tail = df.iloc[-chunk:]
        return pd.concat([head, mid, tail])

    def _hash_pandas_fallback(self, hasher, df: pd.DataFrame):
        """Legacy fallback for complex object types."""
        for col in df.columns:
            val = df[col].astype(str).values
            hasher.update(val.astype(np.bytes_).tobytes())

    def _hash_polars(self, df: pl.DataFrame) -> _DatasetFingerprint:
        """
        Optimized Polars hashing using native Rust execution.
        """
        method = self._determine_method(self.data_size, self.method)
        self.verbose and logger.info(
            f"Hashing Polars: {self.data_size:,} rows - {method}"
        )
        target_df = df
        if method == "sampled" and self.data_size > self.sample_size:
            indices = self._get_sample_indices(self.data_size, self.sample_size)
            target_df = df.gather(indices)
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "columns": df.columns,
                "dtypes": [str(t) for t in df.dtypes],
                "shape": df.shape,
            },
        )
        row_hashes = target_df.hash_rows()
        hasher.update(memoryview(row_hashes.to_numpy()))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=df.shape,
            computed_at=datetime.now().isoformat(),
            data_type="polars",
            method=method,
        )

    def _hash_pandas_series(self, series: pd.Series) -> _DatasetFingerprint:
        """Hash Pandas Series using the fastest vectorized method."""
        self.verbose and logger.info(f"Hashing Pandas Series: {self.data_size:,} rows")
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "name": series.name if series.name else "None",
                "dtype": str(series.dtype),
                "shape": series.shape,
            },
        )
        try:
            row_hashes = pd.util.hash_pandas_object(series, index=False)
            hasher.update(memoryview(row_hashes.values))
        except Exception as e:
            self.verbose and logger.warning(f"Series hash failed, falling back: {e}")
            hasher.update(memoryview(series.astype(str).values.tobytes()))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=series.shape,
            computed_at=datetime.now().isoformat(),
            data_type="pandas_series",
            method="full",
        )

    def _hash_polars_series(self, series: pl.Series) -> _DatasetFingerprint:
        """Hash Polars Series using native Polars expressions."""
        self.verbose and logger.info(f"Hashing Polars Series: {self.data_size:,} rows")
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {"name": series.name, "dtype": str(series.dtype), "shape": series.shape},
        )
        try:
            row_hashes = series.hash()
            hasher.update(memoryview(row_hashes.to_numpy()))
        except Exception as e:
            self.verbose and logger.warning(
                f"Polars series native hash failed. Falling back."
            )
            hasher.update(str(series.to_list()).encode())
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=series.shape,
            computed_at=datetime.now().isoformat(),
            data_type="polars_series",
            method="full",
        )

    def _hash_numpy(self, arr: np.ndarray) -> _DatasetFingerprint:
        """
        Optimized NumPy hashing using Buffer Protocol (Zero-Copy).
        """
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {"shape": arr.shape, "dtype": str(arr.dtype), "strides": arr.strides},
        )
        if arr.flags["C_CONTIGUOUS"] or arr.flags["F_CONTIGUOUS"]:
            hasher.update(memoryview(arr))
        else:
            hasher.update(memoryview(np.ascontiguousarray(arr)))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=arr.shape,
            computed_at=datetime.now().isoformat(),
            data_type="numpy",
            method="full",
        )

    def _hash_sparse(self, matrix: sparse.spmatrix) -> _DatasetFingerprint:
        """
        Optimized sparse hashing. Hashes underlying data arrays directly.
        """
        if not (sparse.isspmatrix_csr(matrix) or sparse.isspmatrix_csc(matrix)):
            matrix = matrix.tocsr()
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher, {"shape": matrix.shape, "format": matrix.format, "nnz": matrix.nnz}
        )
        hasher.update(memoryview(matrix.data))
        hasher.update(memoryview(matrix.indices))
        hasher.update(memoryview(matrix.indptr))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=matrix.shape,
            computed_at=datetime.now().isoformat(),
            data_type=f"sparse_{matrix.format}",
            method="sparse",
        )

    def _determine_method(self, rows: int, requested: str) -> str:
        if requested != "auto":
            return requested
        if rows < 5000000:
            return "full"
        return "sampled"

    def _hash_schema(self, hasher, schema: Dict[str, Any]):
        """Compact schema hashing."""
        hasher.update(
            json.dumps(schema, sort_keys=True, separators=(",", ":")).encode()
        )

    def _get_sample_indices(self, total_rows: int, sample_size: int) -> list:
        """Calculate indices for sampling without generating full range lists."""
        chunk = sample_size // 3
        indices = list(range(min(chunk, total_rows)))
        mid_start = max(0, total_rows // 2 - chunk // 2)
        mid_end = min(mid_start + chunk, total_rows)
        indices.extend(range(mid_start, mid_end))
        last_start = max(0, total_rows - chunk)
        indices.extend(range(last_start, total_rows))
        return sorted(list(set(indices)))


def wape_score(y_true, y_pred):
    """
    Calculates Weighted Absolute Percentage Error (WAPE).

    WAPE = sum(|Error|) / sum(|Groundtruth|)
    """
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    eps = np.finfo(np.float64).eps
    sum_abs_error = np.sum(np.abs(y_true - y_pred))
    sum_abs_truth = np.maximum(np.sum(np.abs(y_true)), eps)
    return sum_abs_error / sum_abs_truth


def forecast_accuracy(y_true, y_pred):
    """
    Calculates Forecast Accuracy.

    FA = 1 - (sum(|Error|) / sum(|Groundtruth|))
    """
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    eps = np.finfo(np.float64).eps
    sum_abs_error = np.sum(np.abs(y_true - y_pred))
    sum_abs_truth = np.maximum(np.sum(np.abs(y_true)), eps)
    return 1 - sum_abs_error / sum_abs_truth


def _normalize_hpo_df(df):
    df = df.copy()
    param_cols = [c for c in df.columns if c.startswith("params_")]
    df[param_cols] = df[param_cols].astype("string[pyarrow]")
    return df


def _validate_numerical_data(data):
    """
    Validates if the input data (NumPy array, Pandas DataFrame/Series,
    Polars DataFrame/Series, or SciPy sparse matrix) contains only
    numerical (integer, float) or boolean values.

    Args:
        data: The input data structure to check.

    Raises:
        TypeError: If the input data contains non-numerical and non-boolean types.
        ValueError: If the input data is of an unsupported type.
    """
    if sparse.issparse(data):
        if not (
            np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
        ):
            raise TypeError(
                f"Sparse matrix contains unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
            )
        return
    elif isinstance(data, np.ndarray):
        if not (
            np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
        ):
            raise TypeError(
                f"NumPy array contains unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
            )
        return
    elif isinstance(data, (pd.DataFrame, pd.Series)):
        d_types = data.dtypes.apply(lambda x: x.kind)
        non_numerical_mask = ~d_types.isin(["i", "f", "b"])
        if non_numerical_mask.any():
            non_numerical_columns = (
                data.columns[non_numerical_mask].tolist()
                if isinstance(data, pd.DataFrame)
                else [data.name]
            )
            raise TypeError(
                f"Pandas {('DataFrame' if isinstance(data, pd.DataFrame) else 'Series')} contains non-numerical/boolean data. Offending column(s) and types: {data.dtypes[non_numerical_mask].to_dict()}"
            )
        return
    elif isinstance(data, (pl.DataFrame, pl.Series)):
        pl_numerical_types = [
            pl.Int8,
            pl.Int16,
            pl.Int32,
            pl.Int64,
            pl.UInt8,
            pl.UInt16,
            pl.UInt32,
            pl.UInt64,
            pl.Float32,
            pl.Float64,
            pl.Boolean,
        ]
        if isinstance(data, pl.DataFrame):
            for col, dtype in data.schema.items():
                if dtype not in pl_numerical_types:
                    raise TypeError(
                        f"Polars DataFrame column '{col}' has unsupported data type: {dtype}. Only numerical or boolean types are allowed."
                    )
        elif isinstance(data, pl.Series):
            if data.dtype not in pl_numerical_types:
                raise TypeError(
                    f"Polars Series has unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
                )
        return
    else:
        raise ValueError(
            f"Unsupported data type provided: {type(data)}. Function supports NumPy, Pandas, Polars, and SciPy sparse matrices."
        )


def _smart_split(
    n_samples,
    X,
    y,
    *,
    random_state=42,
    shuffle=True,
    stratify=None,
    fixed_test_split=None,
    verbose=True,
):
    """
    Parameters
    ----------
    n_samples : int
        Number of samples in the dataset (len(X) or len(y))
    X : array-like
        Features
    y : array-like
        Target
    random_state : int
    shuffle : bool
    stratify : array-like or None
        For stratified splitting (recommended for classification)

    Returns
    -------
    If return_val=True  → X_train, X_val, X_test, y_train, y_val, y_test
    If return_val=False → X_train, X_test, y_train, y_test
    """
    if fixed_test_split:
        test_ratio = fixed_test_split
        val_ratio = fixed_test_split
    elif n_samples <= 1000:
        test_ratio = 0.2
        val_ratio = 0.1
    elif n_samples < 10000:
        test_ratio = 0.15
        val_ratio = 0.15
    elif n_samples < 100000:
        test_ratio = 0.1
        val_ratio = 0.1
    elif n_samples < 1000000:
        test_ratio = 0.05
        val_ratio = 0.05
    else:
        test_ratio = 0.01
        val_ratio = 0.01
    (X_train, X_test, y_train, y_test) = train_test_split(
        X,
        y,
        test_size=test_ratio,
        random_state=random_state,
        shuffle=shuffle,
        stratify=stratify,
    )
    val_size_in_train = val_ratio / (1 - test_ratio)
    verbose and logger.info(
        f"Split → Train: {1 - test_ratio:.2%} | Test: {test_ratio:.2%} (no validation set)"
    )
    return (X_train, X_test, y_train, y_test, val_size_in_train)


def _ensure_feature_names(X, feature_names=None):
    if isinstance(X, pd.DataFrame):
        return list(X.columns)
    if isinstance(X, np.ndarray):
        if feature_names is None:
            feature_names = [f"feature_{i}" for i in range(X.shape[1])]
        return feature_names
    raise TypeError("X must be a pandas DataFrame or numpy ndarray")


def _perform_cross_validation(
    model, X, y, cv_folds, shuffle, random_state, n_jobs, verbose
) -> dict[str, Any]:
    """Perform cross-validation on the regression model."""
    verbose and logger.info(f"Performing {cv_folds}-fold cross-validation...")
    cv = KFold(n_splits=cv_folds, shuffle=shuffle, random_state=random_state)
    scoring = {
        "MAE": "neg_mean_absolute_error",
        "MSE": "neg_mean_squared_error",
        "RMSE": "neg_root_mean_squared_error",
        "MAPE": "neg_mean_absolute_percentage_error",
        "R2": "r2",
        "WAPE": make_scorer(wape_score, greater_is_better=False),
        "Forecast_Accuracy": make_scorer(forecast_accuracy, greater_is_better=True),
    }
    cv_results = cross_validate(
        model, X, y, cv=cv, scoring=scoring, return_train_score=False, n_jobs=n_jobs
    )

    def get_score_stats(metric_key, invert_sign=False):
        key = f"test_{metric_key}"
        if key in cv_results:
            scores = cv_results[key]
            if invert_sign:
                scores = -scores
            return (scores.mean(), scores.std())
        return (0.0, 0.0)

    (mae_mean, mae_std) = get_score_stats("MAE", invert_sign=True)
    (mse_mean, mse_std) = get_score_stats("MSE", invert_sign=True)
    (rmse_mean, rmse_std) = get_score_stats("RMSE", invert_sign=True)
    (mape_mean, mape_std) = get_score_stats("MAPE", invert_sign=True)
    (wape_mean, wape_std) = get_score_stats("WAPE", invert_sign=True)
    (r2_mean, r2_std) = get_score_stats("R2", invert_sign=False)
    (fa_mean, fa_std) = get_score_stats("Forecast_Accuracy", invert_sign=False)
    verbose and logger.info(f"CV MAE          : {mae_mean:.4f} (+/- {mae_std:.4f})")
    verbose and logger.info(f"CV MSE          : {mse_mean:.4f} (+/- {mse_std:.4f})")
    verbose and logger.info(f"CV RMSE         : {rmse_mean:.4f} (+/- {rmse_std:.4f})")
    verbose and logger.info(f"CV MAPE         : {mape_mean:.4f} (+/- {mape_std:.4f})")
    verbose and logger.info(f"CV WAPE         : {wape_mean:.4f} (+/- {wape_std:.4f})")
    verbose and logger.info(f"CV R2 Score     : {r2_mean:.4f} (+/- {r2_std:.4f})")
    verbose and logger.info(f"CV Forecast Acc : {fa_mean:.4f} (+/- {fa_std:.4f})")
    CV_metrics = pd.DataFrame(
        {
            "Metric": [
                "Mean Absolute Error (MAE)",
                "Mean Squared Error (MSE)",
                "Root Mean Squared Error (RMSE)",
                "Mean Absolute Percentage Error (MAPE)",
                "Weighted Absolute Percentage Error (WAPE)",
                "R2 Score",
                "Forecast Accuracy",
            ],
            "Mean": [
                mae_mean,
                mse_mean,
                rmse_mean,
                mape_mean,
                wape_mean,
                r2_mean,
                fa_mean,
            ],
            "Std": [mae_std, mse_std, rmse_std, mape_std, wape_std, r2_std, fa_std],
        }
    )
    return CV_metrics


def _combine_test_data(X_test, y_true, y_pred, features_names=None):
    """
    Combine X_test, y_true, y_pred into a single DataFrame.

    Parameters:
    -----------
    X_test : pandas/polars DataFrame, numpy array, or scipy sparse matrix
        Test features
    y_true : pandas/polars Series, numpy array, or list
        True labels
    y_pred : pandas/polars Series, numpy array, or list
        Predicted labels

    Returns:
    --------
    pandas.DataFrame
        Combined DataFrame with features, y_true, and y_pred
    """
    if sparse.issparse(X_test):
        X_df = pd.DataFrame(X_test.toarray())
    elif isinstance(X_test, np.ndarray):
        X_df = pd.DataFrame(X_test)
    elif hasattr(X_test, "to_pandas"):
        X_df = X_test.to_pandas()
    elif isinstance(X_test, pd.DataFrame):
        X_df = X_test.copy()
    else:
        raise TypeError(f"Unsupported type for X_test: {type(X_test)}")
    if X_df.columns.tolist() == list(range(len(X_df.columns))):
        X_df.columns = (
            [f"feature_{i}" for i in range(len(X_df.columns))]
            if features_names is None
            else features_names
        )
    if isinstance(y_true, list):
        y_true_series = pd.Series(y_true, name="y_true")
    elif isinstance(y_true, np.ndarray):
        y_true_series = pd.Series(y_true, name="y_true")
    elif hasattr(y_true, "to_pandas"):
        y_true_series = y_true.to_pandas()
        y_true_series.name = "y_true"
    elif isinstance(y_true, pd.Series):
        y_true_series = y_true.copy()
        y_true_series.name = "y_true"
    else:
        raise TypeError(f"Unsupported type for y_true: {type(y_true)}")
    if isinstance(y_pred, list):
        y_pred_series = pd.Series(y_pred, name="y_pred")
    elif isinstance(y_pred, np.ndarray):
        y_pred_series = pd.Series(y_pred, name="y_pred")
    elif hasattr(y_pred, "to_pandas"):
        y_pred_series = y_pred.to_pandas()
        y_pred_series.name = "y_pred"
    elif isinstance(y_pred, pd.Series):
        y_pred_series = y_pred.copy()
        y_pred_series.name = "y_pred"
    else:
        raise TypeError(f"Unsupported type for y_pred: {type(y_pred)}")
    X_df = X_df.reset_index(drop=True)
    y_true_series = y_true_series.reset_index(drop=True)
    y_pred_series = y_pred_series.reset_index(drop=True)
    result_df = pd.concat([X_df, y_true_series, y_pred_series], axis=1)
    return result_df


def _smart_shap_background(
    X: Union[np.ndarray, pd.DataFrame],
    model_type: str = "tree",
    seed: int = 42,
    verbose: bool = True,
) -> Union[np.ndarray, pd.DataFrame, object]:
    """
    Intelligently prepares a background dataset for SHAP based on model type.

    Strategies:
    - Tree: Higher sample cap (1000), uses Random Sampling (preserves data structure).
    - Other: Lower sample cap (100), uses K-Means (maximizes info density).
    """
    (n_rows, n_features) = X.shape
    if model_type == "tree":
        max_samples = 1000
        use_kmeans = False
    else:
        max_samples = 100
        use_kmeans = True
    if n_rows <= max_samples:
        verbose and logger.info(
            f"✓ Dataset small ({n_rows} <= {max_samples}). Using full data."
        )
        return X
    verbose and logger.info(
        f"⚡ Large dataset detected ({n_rows} rows). Optimization Strategy: {('K-Means' if use_kmeans else 'Random Sampling')}"
    )
    if use_kmeans:
        try:
            verbose and logger.info(
                f"   Summarizing to {max_samples} weighted centroids..."
            )
            return shap.kmeans(X, max_samples)
        except Exception as e:
            logger.warning(
                f"   K-Means failed ({str(e)}). Falling back to random sampling."
            )
            return shap.sample(X, max_samples, random_state=seed)
    else:
        verbose and logger.info(f"   Sampling {max_samples} random rows...")
        return shap.sample(X, max_samples, random_state=seed)


def _class_index_df(model):
    columns = {"index": pd.Series(dtype="int64"), "class": pd.Series(dtype="object")}
    if model is None:
        return pd.DataFrame(columns)
    classes = getattr(model, "classes_", None)
    if classes is None:
        return pd.DataFrame(columns)
    return pd.DataFrame({"index": range(len(classes)), "class": classes})


def train_reg_ols(
    X: DataFrame, y: Union[DataSeries, NDArray, List], options=None
) -> Tuple[Any, Any, Any, Union[DataFrame, Dict], DataFrame, DataFrame]:
    options = options or {}
    intercept = options.get("intercept", True)
    standard_scaling = options.get("standard_scaling", True)
    auto_split = options.get("auto_split", True)
    test_val_size = options.get("test_val_size", 15) / 100
    shuffle_split = options.get("shuffle_split", True)
    retrain_on_full = options.get("retrain_on_full", False)
    use_cross_validation = options.get("use_cross_validation", False)
    cv_folds = options.get("cv_folds", 5)
    use_hpo = options.get("use_hyperparameter_optimization", False)
    optimization_metric = options.get(
        "optimization_metric", "Root Mean Squared Error (RMSE)"
    )
    optimization_metric = METRICS_DICT[optimization_metric]
    optimization_method = options.get("optimization_method", "Tree-structured Parzen")
    optimization_iterations = options.get("optimization_iterations", 50)
    return_shap_explainer = options.get("return_shap_explainer", False)
    use_shap_sampler = options.get("use_shap_sampler", False)
    metrics_as = options.get("metrics_as", "Dataframe")
    n_jobs_str = options.get("n_jobs", "1")
    random_state = options.get("random_state", 42)
    activate_caching = options.get("activate_caching", False)
    verbose = options.get("verbose", True)
    n_jobs_int = -1 if n_jobs_str == "All" else int(n_jobs_str)
    skip_computation = False
    Scaler = None
    Model = None
    Metrics = pd.DataFrame()
    CV_Metrics = pd.DataFrame()
    SHAP = None
    fa = None
    wape = None
    mae = None
    mse = None
    rmse = None
    r2 = None
    mape = None
    (n_samples, _) = X.shape
    shap_feature_names = _ensure_feature_names(X)
    if standard_scaling:
        verbose and logger.info("Standard scaling is activated")
    if activate_caching:
        verbose and logger.info(f"Caching is activate")
        data_hasher = _UniversalDatasetHasher(n_samples, verbose=verbose)
        X_hash = data_hasher.hash_data(X).hash
        y_hash = data_hasher.hash_data(y).hash
        all_hash_base_text = f"HASH BASE TEXTPandas Version {pd.__version__}POLARS Version {pl.__version__}Numpy Version {np.__version__}Scikit Learn Version {sklearn.__version__}Scipy Version {scipy.__version__}{('SHAP Version ' + shap.__version__ if return_shap_explainer else 'NO SHAP Version')}{X_hash}{y_hash}{intercept}{('Use HPO' if use_hpo else 'No HPO')}{(optimization_metric if use_hpo else 'No HPO Metric')}{(optimization_method if use_hpo else 'No HPO Method')}{(optimization_iterations if use_hpo else 'No HPO Iter')}{(cv_folds if use_cross_validation else 'No CV')}{standard_scaling}{('Auto Split' if auto_split else test_val_size)}{shuffle_split}{return_shap_explainer}{use_shap_sampler}{random_state}"
        all_hash = hashlib.sha256(all_hash_base_text.encode("utf-8")).hexdigest()
        verbose and logger.info(f"Hash was computed: {all_hash}")
        temp_folder = Path(tempfile.gettempdir())
        cache_folder = temp_folder / "coded-flows-cache"
        cache_folder.mkdir(parents=True, exist_ok=True)
        model_path = cache_folder / f"{all_hash}.model"
        metrics_dict_path = cache_folder / f"metrics_{all_hash}.json"
        metrics_df_path = cache_folder / f"metrics_{all_hash}.parquet"
        cv_metrics_path = cache_folder / f"cv_metrics_{all_hash}.parquet"
        hpo_trials_path = cache_folder / f"hpo_trials_{all_hash}.parquet"
        hpo_best_params_path = cache_folder / f"hpo_best_params_{all_hash}.json"
        prediction_set_path = cache_folder / f"prediction_set_{all_hash}.parquet"
        shap_path = cache_folder / f"{all_hash}.shap"
        scaler_path = cache_folder / f"{all_hash}.scaler"
        skip_computation = model_path.is_file()
    if not skip_computation:
        try:
            _validate_numerical_data(X)
        except Exception as e:
            verbose and logger.error(
                f"Only numerical or boolean types are allowed for 'X' input!"
            )
            raise
        features_names = X.columns if hasattr(X, "columns") else None
        fixed_test_split = None if auto_split else test_val_size
        (X_train, X_test, y_train, y_test, val_ratio) = _smart_split(
            n_samples,
            X,
            y,
            random_state=random_state,
            shuffle=shuffle_split,
            fixed_test_split=fixed_test_split,
            verbose=verbose,
        )
        if standard_scaling:
            Scaler = StandardScaler().set_output(transform="pandas")
            X_train = Scaler.fit_transform(X_train)
        Model = LinearRegression(
            fit_intercept=intercept, copy_X=True, n_jobs=n_jobs_int
        )
        if use_cross_validation:
            verbose and logger.info(
                f"Using Cross-Validation to measure performance metrics"
            )
            CV_Metrics = _perform_cross_validation(
                Model,
                X_train,
                y_train,
                cv_folds,
                shuffle_split,
                random_state,
                n_jobs_int,
                verbose,
            )
        Model.fit(X_train, y_train)
        y_pred = Model.predict(Scaler.transform(X_test) if standard_scaling else X_test)
        fa = forecast_accuracy(y_test, y_pred)
        wape = wape_score(y_test, y_pred)
        mae = mean_absolute_error(y_test, y_pred)
        mse = mean_squared_error(y_test, y_pred)
        rmse = root_mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        mape = mean_absolute_percentage_error(y_test, y_pred)
        if metrics_as == "Dataframe":
            Metrics = pd.DataFrame(
                {
                    "Metric": [
                        "Forecast Accuracy",
                        "Weighted Absolute Percentage Error",
                        "Mean Absolute Error",
                        "Mean Squared Error",
                        "Root Mean Squared Error",
                        "R2 Score",
                        "Mean Absolute Percentage Error",
                    ],
                    "Value": [fa, wape, mae, mse, rmse, r2, mape],
                }
            )
        else:
            Metrics = {
                "forecast_accuracy": fa,
                "weighted_absolute_percentage_error ": wape,
                "mean_absolute_error": mae,
                "mean_squared_error": mse,
                "root_mean_squared_error": rmse,
                "r2_score": r2,
                "mean_absolute_percentage_error": mape,
            }
        verbose and logger.info(f"Forecast Accuracy                  : {fa:.2%}")
        verbose and logger.info(f"Weighted Absolute Percentage Error : {wape:.2%}")
        verbose and logger.info(f"Mean Absolute Error                : {mae:.4f}")
        verbose and logger.info(f"Mean Squared Error                 : {mse:.4f}")
        verbose and logger.info(f"Root Mean Squared Error            : {rmse:.4f}")
        verbose and logger.info(f"R2 Score                           : {r2:.4f}")
        verbose and logger.info(f"Mean Absolute Percentage Error     : {mape:.2%}")
        Prediction_Set = _combine_test_data(X_test, y_test, y_pred, features_names)
        verbose and logger.info(f"Prediction Set created")
        if retrain_on_full:
            verbose and logger.info(
                "Retraining model on full dataset for production deployment"
            )
            if standard_scaling:
                Scaler = StandardScaler().set_output(transform="pandas")
                X = Scaler.fit_transform(X)
            Model.fit(X, y)
            verbose and logger.info(
                "Model successfully retrained on full dataset. Reported metrics remain from original held-out test set."
            )
        if return_shap_explainer:
            SHAP = shap.LinearExplainer(
                Model,
                (
                    _smart_shap_background(
                        X if retrain_on_full else X_train,
                        model_type="other",
                        seed=random_state,
                        verbose=verbose,
                    )
                    if use_shap_sampler
                    else X if retrain_on_full else X_train
                ),
                feature_names=shap_feature_names,
            )
            verbose and logger.info(f"SHAP explainer generated")
        if activate_caching:
            verbose and logger.info(f"Caching output elements")
            joblib.dump(Model, model_path)
            if isinstance(Metrics, dict):
                with metrics_dict_path.open("w", encoding="utf-8") as f:
                    json.dump(Metrics, f, ensure_ascii=False, indent=4)
            else:
                Metrics.to_parquet(metrics_df_path)
            if use_cross_validation and (not use_hpo):
                CV_Metrics.to_parquet(cv_metrics_path)
            if use_hpo:
                HPO_Trials.to_parquet(hpo_trials_path)
                with hpo_best_params_path.open("w", encoding="utf-8") as f:
                    json.dump(HPO_Best, f, ensure_ascii=False, indent=4)
            Prediction_Set.to_parquet(prediction_set_path)
            if return_shap_explainer:
                with shap_path.open("wb") as f:
                    joblib.dump(SHAP, f)
            joblib.dump(Scaler, scaler_path)
            verbose and logger.info(f"Caching done")
    else:
        verbose and logger.info(f"Skipping computations and loading cached elements")
        Model = joblib.load(model_path)
        verbose and logger.info(f"Model loaded")
        if metrics_dict_path.is_file():
            with metrics_dict_path.open("r", encoding="utf-8") as f:
                Metrics = json.load(f)
        else:
            Metrics = pd.read_parquet(metrics_df_path)
        verbose and logger.info(f"Metrics loaded")
        if use_cross_validation and (not use_hpo):
            CV_Metrics = pd.read_parquet(cv_metrics_path)
            verbose and logger.info(f"Cross Validation metrics loaded")
        if use_hpo:
            HPO_Trials = pd.read_parquet(hpo_trials_path)
            with hpo_best_params_path.open("r", encoding="utf-8") as f:
                HPO_Best = json.load(f)
            verbose and logger.info(
                f"Hyperparameters Optimization trials and best params loaded"
            )
        Prediction_Set = pd.read_parquet(prediction_set_path)
        verbose and logger.info(f"Prediction Set loaded")
        if return_shap_explainer:
            with shap_path.open("rb") as f:
                SHAP = joblib.load(f)
            verbose and logger.info(f"SHAP Explainer loaded")
        Scaler = joblib.load(scaler_path)
        verbose and logger.info(f"Standard Scaler loaded")
    return (Model, SHAP, Scaler, Metrics, CV_Metrics, Prediction_Set)

Brick Info

version v0.1.4
python 3.11, 3.12, 3.13
requirements
  • shap>=0.47.0
  • scikit-learn
  • pandas
  • numpy
  • torch
  • numba>=0.56.0
  • shap
  • cmaes
  • optuna
  • scipy
  • polars
  • xxhash