Reg. Random Forest

Train a Random Forest regression model.

Reg. Random Forest

Processing

This brick trains a Random Forest regression model to predict continuous numerical values. Random Forest is an ensemble method that creates multiple decision trees and combines their predictions to produce more accurate and stable results than a single tree.

The brick handles the complete training workflow: splitting your data into training and test sets, fitting the model, evaluating performance with multiple metrics, and optionally performing hyperparameter optimization to find the best model settings. It supports advanced features like cross-validation for robust performance measurement, SHAP explainers for understanding predictions, and intelligent caching to avoid retraining when using the same data and settings.

Inputs

X
The feature data used to train the model. This should contain all the input variables (predictors) that the model will use to make predictions. Each row represents one observation, and each column represents one feature. Only numerical or boolean data types are accepted.
y
The target values you want the model to predict. This should be a continuous numerical variable (like sales amounts, temperatures, prices). Each value corresponds to one row in your X data.

Inputs Types

Input Types
X DataFrame
y DataSeries, NDArray, List

You can check the list of supported types here: Available Type Hints.

Outputs

Model
The trained Random Forest regression model, ready to make predictions on new data. This model can be passed to prediction bricks or saved for later use.
SHAP
A SHAP (SHapley Additive exPlanations) explainer object that helps you understand which features contributed most to specific predictions. Only returned if the SHAP Explainer option is enabled, otherwise returns None.
Metrics
Performance measurements showing how well the model predicts on the test data. Contains metrics like MAE (Mean Absolute Error), RMSE (Root Mean Squared Error), R2 Score, and others. Format depends on the "Metrics as" setting.
CV Metrics
Cross-validation metrics showing model performance across multiple data splits. This provides a more robust assessment of model quality. Only populated if "Enable Cross-Validation" is turned on, otherwise returns an empty table.
Features Importance
A ranking showing which input features have the most influence on the model's predictions. Features with higher importance values contribute more to the model's decision-making.
Prediction Set
A dataset containing the test data along with both the actual values (y_true) and the model's predictions (y_pred). Useful for analyzing where the model performs well or poorly.
HPO Trials
A detailed log of all hyperparameter optimization attempts, showing which settings were tested and their performance. Only populated if "Hyperparameter Optim." is enabled, otherwise returns an empty table.
HPO Best
The best hyperparameter settings found during optimization. Returns the optimal values for criterion, number of trees, max depth, and other model settings. Only populated if "Hyperparameter Optim." is enabled, otherwise returns None.

The Metrics output contains the following performance measurements (when returned as Dataframe):

  • Forecast Accuracy: Overall prediction accuracy calculated as 1 - WAPE. Values closer to 1.0 (100%) indicate better predictions.
  • Weighted Absolute Percentage Error: Prediction errors expressed as a percentage of actual values, weighted by magnitude. Lower is better.
  • Mean Absolute Error: Average absolute difference between predictions and actual values, in the same units as your target variable.
  • Mean Squared Error: Average of squared differences. Emphasizes larger errors more than MAE.
  • Root Mean Squared Error: Square root of MSE, returning to the original units. More interpretable than MSE.
  • R2 Score: Proportion of variance in the target explained by the model (0-1 scale, where 1 is perfect prediction).
  • Mean Absolute Percentage Error: Average percentage error across all predictions.

The Features Importance output contains:

  • feature: The name of each input feature.
  • importance: A score (0-1) indicating how much that feature contributed to the model's predictions. Higher scores mean more important features.

The Prediction Set output contains:

  • All original feature columns from your test data
  • y_true: The actual target values
  • y_pred: The model's predicted values

The CV Metrics output (when cross-validation is enabled) contains:

  • Metric: The name of each performance metric
  • Mean: The average value across all cross-validation folds
  • Std: The standard deviation showing how much the metric varied across folds

The HPO Trials output (when hyperparameter optimization is enabled) contains:

  • number: Trial number
  • value: The optimization metric score achieved
  • params_criterion, params_n_estimators, params_max_depth, etc.: The hyperparameter values tested in that trial
  • best_value: The best score achieved up to that trial

The HPO Best output (when hyperparameter optimization is enabled) contains:

  • criterion: Best splitting criterion found
  • n_estimators: Optimal number of trees
  • max_depth: Optimal maximum tree depth
  • max_features: Optimal feature sampling strategy
  • min_samples_leaf: Optimal minimum samples per leaf
  • min_samples_split: Optimal minimum samples to split a node

Outputs Types

Output Types
Model Any
SHAP Any
Metrics DataFrame, Dict
CV Metrics DataFrame
Features Importance DataFrame
Prediction Set DataFrame
HPO Trials DataFrame
HPO Best Dict

You can check the list of supported types here: Available Type Hints.

Options

The Reg. Random Forest brick contains some changeable options:

Criterion
The function used to measure the quality of each split in the decision trees. Different criteria optimize for different aspects of prediction quality.
  • Squared Error: Minimizes the squared differences between predictions and actual values. Standard choice for most regression problems.
  • Friedman MSE: A variant that uses mean squared error with Friedman's improvement score. Can lead to better splits in some cases.
  • Absolute Error: Minimizes the absolute differences, making the model more robust to outliers.
  • Poisson: Optimized for count data or rates. Use when your target variable represents counts or events.
Number of Trees
How many individual decision trees to create in the forest (25-3000). More trees generally improve accuracy but increase training time and memory usage. The predictions are averaged across all trees.
Max Depth (0 = Unlimited)
The maximum number of levels each tree can grow (0-100). Setting this to 0 allows trees to expand until all leaves are pure. Lower values prevent overfitting but may underfit; higher values capture more complex patterns but risk memorizing the training data.
Feature Sampling
Controls how many features each tree considers when making splits. This adds randomness to make trees more diverse.
  • Automatic 30%: Each tree randomly uses 30% of available features.
  • Automatic 50%: Each tree randomly uses 50% of available features.
  • Square root: Each tree uses the square root of the total number of features (recommended for most cases).
  • Logarithm: Each tree uses the logarithm (base 2) of the total number of features.
  • None: Each tree considers all features (less diversity, may overfit).
Min Samples per Leaf
The minimum number of data points required in a leaf node (1-100). Higher values create simpler trees that generalize better but may miss subtle patterns. Lower values allow the model to fit the training data more closely.
Auto Split Data
When enabled, automatically divides your data into training and test sets using intelligent ratios based on dataset size. When disabled, uses your manually specified Test/Validation Set % value.
Shuffle Split
Whether to randomly shuffle the data before splitting into training and test sets. Recommended to keep enabled unless your data has a time-based order that must be preserved.
Test/Validation Set %
The percentage of data to hold out for testing (0.1-50%). Only used when "Auto Split Data" is disabled. For example, 15% means 15% is used for testing and 85% for training.
Retrain On Full Data
When enabled, after evaluation, retrains the final model on the entire dataset (both training and test sets). This produces a stronger production model but means the reported metrics reflect the earlier train/test split, not the final model.
Enable Cross-Validation
When enabled, evaluates the model using k-fold cross-validation, which splits the data multiple ways to get a more reliable performance estimate. This provides robust metrics but increases computation time.
Number of CV Folds
How many times to split the data for cross-validation (2-10). More folds give more reliable metrics but take longer to compute. 5 folds is a common choice balancing accuracy and speed.
Hyperparameter Optim.
When enabled, automatically searches for the best model settings (like number of trees, max depth, etc.) by testing many combinations. This can significantly improve model performance but requires much more computation time.
Optimization Metric
Which performance measure to optimize when tuning hyperparameters. The algorithm will try to find settings that give the best score on this metric.
  • Forecast Accuracy: Maximizes overall prediction accuracy (1 - WAPE). Higher is better.
  • Weighted Absolute Percentage Error (WAPE): Minimizes prediction errors as a percentage of actual values.
  • Mean Absolute Error (MAE): Minimizes the average absolute difference between predictions and actual values.
  • Mean Squared Error (MSE): Minimizes the average squared difference. Penalizes large errors more heavily.
  • Root Mean Squared Error (RMSE): Like MSE but in the same units as your target variable. Common default choice.
  • R2 Score: Maximizes the proportion of variance explained by the model (0-1 scale, higher is better).
  • Mean Absolute Percentage Error (MAPE): Like WAPE but can be problematic with values near zero.
Optimization Method
The algorithm used to search for optimal hyperparameters. Different methods explore the parameter space in different ways.
  • Tree-structured Parzen: An intelligent Bayesian method that learns from previous trials to suggest better parameters. Good default choice.
  • Gaussian Process: Uses probabilistic modeling to predict which parameters will work best. Effective but slower.
  • CMA-ES: Evolution-based strategy that adapts its search based on the distribution of good results.
  • Random Sobol Search: Quasi-random sampling that covers the parameter space more evenly than pure random.
  • Random Search: Tests completely random parameter combinations. Simple but often effective baseline.
Optimization Iterations
How many different hyperparameter combinations to test (15-1000). More iterations improve your chances of finding optimal settings but take longer. 50 is a reasonable starting point.
Metrics as
How to format the performance metrics output.
SHAP Explainer
When enabled, creates a SHAP explainer object that can show how each feature contributes to individual predictions. Useful for understanding and explaining model decisions but adds computation overhead.
SHAP Sampler
When enabled, uses a representative sample of the training data as background for SHAP calculations instead of the full dataset. Speeds up SHAP computations for large datasets with minimal accuracy loss.
SHAP Feature Perturbation
The method SHAP uses to measure feature importance.
  • Interventional: Simulates what happens when features are changed by replacing them with values from other samples. Standard approach.
  • Tree Path Dependent: Uses the tree structure itself to compute contributions. Faster and specific to tree-based models.
Number of Jobs
How many CPU cores to use for parallel processing. More cores speed up training but use more system resources.
Random State
A seed number (0-10000) that controls randomness in the model. Using the same number ensures you get identical results each time you run with the same data and settings. Useful for reproducibility.
Brick Caching
When enabled, saves trained models and results to disk. If you run the brick again with identical data and settings, it loads the cached results instantly instead of retraining. Greatly speeds up repeated runs but uses disk space.
Verbose Logging
When enabled, prints detailed progress messages during training, showing metrics, optimization steps, and other diagnostic information. Helpful for monitoring progress and debugging.
import logging
import warnings
import shap
import json
import xxhash
import hashlib
import tempfile
import sklearn
import scipy
import joblib
import numpy as np
import pandas as pd
import polars as pl
from pathlib import Path
from scipy import sparse
from optuna.samplers import (
    TPESampler,
    RandomSampler,
    GPSampler,
    CmaEsSampler,
    QMCSampler,
)
import optuna
from optuna import Study
from optuna.trial import FrozenTrial
from optuna.pruners import HyperbandPruner
from optuna import create_study
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split, cross_validate, KFold
from sklearn.metrics import (
    mean_absolute_error,
    mean_squared_error,
    r2_score,
    root_mean_squared_error,
    mean_absolute_percentage_error,
    make_scorer,
)
from dataclasses import dataclass
from datetime import datetime
from coded_flows.types import (
    Union,
    Dict,
    List,
    Tuple,
    NDArray,
    DataFrame,
    DataSeries,
    Any,
    Tuple,
)
from coded_flows.utils import CodedFlowsLogger

logger = CodedFlowsLogger(name="Reg. Random Forest", level=logging.INFO)
optuna.logging.set_verbosity(optuna.logging.ERROR)
warnings.filterwarnings("ignore", category=optuna.exceptions.ExperimentalWarning)
METRICS_DICT = {
    "Forecast Accuracy": "fa",
    "Weighted Absolute Percentage Error (WAPE)": "wape",
    "Mean Absolute Error (MAE)": "mae",
    "Mean Squared Error (MSE)": "mse",
    "Root Mean Squared Error (RMSE)": "rmse",
    "R2 Score": "r2",
    "Mean Absolute Percentage Error (MAPE)": "mape",
}
METRICS_OPT = {
    "fa": "maximize",
    "wape": "minimize",
    "mae": "minimize",
    "mse": "minimize",
    "rmse": "minimize",
    "r2": "maximize",
    "mape": "minimize",
}
DataType = Union[
    pd.DataFrame, pl.DataFrame, np.ndarray, sparse.spmatrix, pd.Series, pl.Series
]


@dataclass
class _DatasetFingerprint:
    """Lightweight fingerprint of a dataset."""

    hash: str
    shape: tuple
    computed_at: str
    data_type: str
    method: str


class _UniversalDatasetHasher:
    """
    High-performance dataset hasher optimizing for zero-copy operations
    and native backend execution (C/Rust).
    """

    def __init__(
        self,
        data_size: int,
        method: str = "auto",
        sample_size: int = 100000,
        verbose: bool = False,
    ):
        self.method = method
        self.sample_size = sample_size
        self.data_size = data_size
        self.verbose = verbose

    def hash_data(self, data: DataType) -> _DatasetFingerprint:
        """
        Main entry point: hash any supported data format.
        Auto-detects format and applies optimal strategy.
        """
        if isinstance(data, pd.DataFrame):
            return self._hash_pandas(data)
        elif isinstance(data, pl.DataFrame):
            return self._hash_polars(data)
        elif isinstance(data, pd.Series):
            return self._hash_pandas_series(data)
        elif isinstance(data, pl.Series):
            return self._hash_polars_series(data)
        elif isinstance(data, np.ndarray):
            return self._hash_numpy(data)
        elif sparse.issparse(data):
            return self._hash_sparse(data)
        else:
            raise TypeError(f"Unsupported data type: {type(data)}")

    def _hash_pandas(self, df: pd.DataFrame) -> _DatasetFingerprint:
        """
        Optimized Pandas hashing using pd.util.hash_pandas_object.
        Avoids object-to-string conversion overhead.
        """
        method = self._determine_method(self.data_size, self.method)
        self.verbose and logger.info(
            f"Hashing Pandas: {self.data_size:,} rows - {method}"
        )
        target_df = df
        if method == "sampled":
            target_df = self._get_pandas_sample(df)
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "columns": df.columns.tolist(),
                "dtypes": {k: str(v) for (k, v) in df.dtypes.items()},
                "shape": df.shape,
            },
        )
        try:
            row_hashes = pd.util.hash_pandas_object(target_df, index=False)
            hasher.update(memoryview(row_hashes.values))
        except Exception as e:
            self.verbose and logger.warning(
                f"Fast hash failed, falling back to slow hash: {e}"
            )
            self._hash_pandas_fallback(hasher, target_df)
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=df.shape,
            computed_at=datetime.now().isoformat(),
            data_type="pandas",
            method=method,
        )

    def _get_pandas_sample(self, df: pd.DataFrame) -> pd.DataFrame:
        """Deterministic slicing for sampling (Zero randomness)."""
        if self.data_size <= self.sample_size:
            return df
        chunk = self.sample_size // 3
        head = df.iloc[:chunk]
        mid_idx = self.data_size // 2
        mid = df.iloc[mid_idx : mid_idx + chunk]
        tail = df.iloc[-chunk:]
        return pd.concat([head, mid, tail])

    def _hash_pandas_fallback(self, hasher, df: pd.DataFrame):
        """Legacy fallback for complex object types."""
        for col in df.columns:
            val = df[col].astype(str).values
            hasher.update(val.astype(np.bytes_).tobytes())

    def _hash_polars(self, df: pl.DataFrame) -> _DatasetFingerprint:
        """
        Optimized Polars hashing using native Rust execution.
        """
        method = self._determine_method(self.data_size, self.method)
        self.verbose and logger.info(
            f"Hashing Polars: {self.data_size:,} rows - {method}"
        )
        target_df = df
        if method == "sampled" and self.data_size > self.sample_size:
            indices = self._get_sample_indices(self.data_size, self.sample_size)
            target_df = df.gather(indices)
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "columns": df.columns,
                "dtypes": [str(t) for t in df.dtypes],
                "shape": df.shape,
            },
        )
        row_hashes = target_df.hash_rows()
        hasher.update(memoryview(row_hashes.to_numpy()))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=df.shape,
            computed_at=datetime.now().isoformat(),
            data_type="polars",
            method=method,
        )

    def _hash_pandas_series(self, series: pd.Series) -> _DatasetFingerprint:
        """Hash Pandas Series using the fastest vectorized method."""
        self.verbose and logger.info(f"Hashing Pandas Series: {self.data_size:,} rows")
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "name": series.name if series.name else "None",
                "dtype": str(series.dtype),
                "shape": series.shape,
            },
        )
        try:
            row_hashes = pd.util.hash_pandas_object(series, index=False)
            hasher.update(memoryview(row_hashes.values))
        except Exception as e:
            self.verbose and logger.warning(f"Series hash failed, falling back: {e}")
            hasher.update(memoryview(series.astype(str).values.tobytes()))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=series.shape,
            computed_at=datetime.now().isoformat(),
            data_type="pandas_series",
            method="full",
        )

    def _hash_polars_series(self, series: pl.Series) -> _DatasetFingerprint:
        """Hash Polars Series using native Polars expressions."""
        self.verbose and logger.info(f"Hashing Polars Series: {self.data_size:,} rows")
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {"name": series.name, "dtype": str(series.dtype), "shape": series.shape},
        )
        try:
            row_hashes = series.hash()
            hasher.update(memoryview(row_hashes.to_numpy()))
        except Exception as e:
            self.verbose and logger.warning(
                f"Polars series native hash failed. Falling back."
            )
            hasher.update(str(series.to_list()).encode())
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=series.shape,
            computed_at=datetime.now().isoformat(),
            data_type="polars_series",
            method="full",
        )

    def _hash_numpy(self, arr: np.ndarray) -> _DatasetFingerprint:
        """
        Optimized NumPy hashing using Buffer Protocol (Zero-Copy).
        """
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {"shape": arr.shape, "dtype": str(arr.dtype), "strides": arr.strides},
        )
        if arr.flags["C_CONTIGUOUS"] or arr.flags["F_CONTIGUOUS"]:
            hasher.update(memoryview(arr))
        else:
            hasher.update(memoryview(np.ascontiguousarray(arr)))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=arr.shape,
            computed_at=datetime.now().isoformat(),
            data_type="numpy",
            method="full",
        )

    def _hash_sparse(self, matrix: sparse.spmatrix) -> _DatasetFingerprint:
        """
        Optimized sparse hashing. Hashes underlying data arrays directly.
        """
        if not (sparse.isspmatrix_csr(matrix) or sparse.isspmatrix_csc(matrix)):
            matrix = matrix.tocsr()
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher, {"shape": matrix.shape, "format": matrix.format, "nnz": matrix.nnz}
        )
        hasher.update(memoryview(matrix.data))
        hasher.update(memoryview(matrix.indices))
        hasher.update(memoryview(matrix.indptr))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=matrix.shape,
            computed_at=datetime.now().isoformat(),
            data_type=f"sparse_{matrix.format}",
            method="sparse",
        )

    def _determine_method(self, rows: int, requested: str) -> str:
        if requested != "auto":
            return requested
        if rows < 5000000:
            return "full"
        return "sampled"

    def _hash_schema(self, hasher, schema: Dict[str, Any]):
        """Compact schema hashing."""
        hasher.update(
            json.dumps(schema, sort_keys=True, separators=(",", ":")).encode()
        )

    def _get_sample_indices(self, total_rows: int, sample_size: int) -> list:
        """Calculate indices for sampling without generating full range lists."""
        chunk = sample_size // 3
        indices = list(range(min(chunk, total_rows)))
        mid_start = max(0, total_rows // 2 - chunk // 2)
        mid_end = min(mid_start + chunk, total_rows)
        indices.extend(range(mid_start, mid_end))
        last_start = max(0, total_rows - chunk)
        indices.extend(range(last_start, total_rows))
        return sorted(list(set(indices)))


def wape_score(y_true, y_pred):
    """
    Calculates Weighted Absolute Percentage Error (WAPE).

    WAPE = sum(|Error|) / sum(|Groundtruth|)
    """
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    eps = np.finfo(np.float64).eps
    sum_abs_error = np.sum(np.abs(y_true - y_pred))
    sum_abs_truth = np.maximum(np.sum(np.abs(y_true)), eps)
    return sum_abs_error / sum_abs_truth


def forecast_accuracy(y_true, y_pred):
    """
    Calculates Forecast Accuracy.

    FA = 1 - (sum(|Error|) / sum(|Groundtruth|))
    """
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    eps = np.finfo(np.float64).eps
    sum_abs_error = np.sum(np.abs(y_true - y_pred))
    sum_abs_truth = np.maximum(np.sum(np.abs(y_true)), eps)
    return 1 - sum_abs_error / sum_abs_truth


def _normalize_hpo_df(df):
    df = df.copy()
    param_cols = [c for c in df.columns if c.startswith("params_")]
    df[param_cols] = df[param_cols].astype("string[pyarrow]")
    return df


def _validate_numerical_data(data):
    """
    Validates if the input data (NumPy array, Pandas DataFrame/Series,
    Polars DataFrame/Series, or SciPy sparse matrix) contains only
    numerical (integer, float) or boolean values.

    Args:
        data: The input data structure to check.

    Raises:
        TypeError: If the input data contains non-numerical and non-boolean types.
        ValueError: If the input data is of an unsupported type.
    """
    if sparse.issparse(data):
        if not (
            np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
        ):
            raise TypeError(
                f"Sparse matrix contains unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
            )
        return
    elif isinstance(data, np.ndarray):
        if not (
            np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
        ):
            raise TypeError(
                f"NumPy array contains unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
            )
        return
    elif isinstance(data, (pd.DataFrame, pd.Series)):
        d_types = data.dtypes.apply(lambda x: x.kind)
        non_numerical_mask = ~d_types.isin(["i", "f", "b"])
        if non_numerical_mask.any():
            non_numerical_columns = (
                data.columns[non_numerical_mask].tolist()
                if isinstance(data, pd.DataFrame)
                else [data.name]
            )
            raise TypeError(
                f"Pandas {('DataFrame' if isinstance(data, pd.DataFrame) else 'Series')} contains non-numerical/boolean data. Offending column(s) and types: {data.dtypes[non_numerical_mask].to_dict()}"
            )
        return
    elif isinstance(data, (pl.DataFrame, pl.Series)):
        pl_numerical_types = [
            pl.Int8,
            pl.Int16,
            pl.Int32,
            pl.Int64,
            pl.UInt8,
            pl.UInt16,
            pl.UInt32,
            pl.UInt64,
            pl.Float32,
            pl.Float64,
            pl.Boolean,
        ]
        if isinstance(data, pl.DataFrame):
            for col, dtype in data.schema.items():
                if dtype not in pl_numerical_types:
                    raise TypeError(
                        f"Polars DataFrame column '{col}' has unsupported data type: {dtype}. Only numerical or boolean types are allowed."
                    )
        elif isinstance(data, pl.Series):
            if data.dtype not in pl_numerical_types:
                raise TypeError(
                    f"Polars Series has unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
                )
        return
    else:
        raise ValueError(
            f"Unsupported data type provided: {type(data)}. Function supports NumPy, Pandas, Polars, and SciPy sparse matrices."
        )


def _smart_split(
    n_samples,
    X,
    y,
    *,
    random_state=42,
    shuffle=True,
    stratify=None,
    fixed_test_split=None,
    verbose=True,
):
    """
    Parameters
    ----------
    n_samples : int
        Number of samples in the dataset (len(X) or len(y))
    X : array-like
        Features
    y : array-like
        Target
    random_state : int
    shuffle : bool
    stratify : array-like or None
        For stratified splitting (recommended for classification)

    Returns
    -------
    If return_val=True  → X_train, X_val, X_test, y_train, y_val, y_test
    If return_val=False → X_train, X_test, y_train, y_test
    """
    if fixed_test_split:
        test_ratio = fixed_test_split
        val_ratio = fixed_test_split
    elif n_samples <= 1000:
        test_ratio = 0.2
        val_ratio = 0.1
    elif n_samples < 10000:
        test_ratio = 0.15
        val_ratio = 0.15
    elif n_samples < 100000:
        test_ratio = 0.1
        val_ratio = 0.1
    elif n_samples < 1000000:
        test_ratio = 0.05
        val_ratio = 0.05
    else:
        test_ratio = 0.01
        val_ratio = 0.01
    (X_train, X_test, y_train, y_test) = train_test_split(
        X,
        y,
        test_size=test_ratio,
        random_state=random_state,
        shuffle=shuffle,
        stratify=stratify,
    )
    val_size_in_train = val_ratio / (1 - test_ratio)
    verbose and logger.info(
        f"Split → Train: {1 - test_ratio:.2%} | Test: {test_ratio:.2%} (no validation set)"
    )
    return (X_train, X_test, y_train, y_test, val_size_in_train)


def _ensure_feature_names(X, feature_names=None):
    if isinstance(X, pd.DataFrame):
        return list(X.columns)
    if isinstance(X, np.ndarray):
        if feature_names is None:
            feature_names = [f"feature_{i}" for i in range(X.shape[1])]
        return feature_names
    raise TypeError("X must be a pandas DataFrame or numpy ndarray")


def _perform_cross_validation(
    model, X, y, cv_folds, shuffle, random_state, n_jobs, verbose
) -> dict[str, Any]:
    """Perform cross-validation on the regression model."""
    verbose and logger.info(f"Performing {cv_folds}-fold cross-validation...")
    cv = KFold(n_splits=cv_folds, shuffle=shuffle, random_state=random_state)
    scoring = {
        "MAE": "neg_mean_absolute_error",
        "MSE": "neg_mean_squared_error",
        "RMSE": "neg_root_mean_squared_error",
        "MAPE": "neg_mean_absolute_percentage_error",
        "R2": "r2",
        "WAPE": make_scorer(wape_score, greater_is_better=False),
        "Forecast_Accuracy": make_scorer(forecast_accuracy, greater_is_better=True),
    }
    cv_results = cross_validate(
        model, X, y, cv=cv, scoring=scoring, return_train_score=False, n_jobs=n_jobs
    )

    def get_score_stats(metric_key, invert_sign=False):
        key = f"test_{metric_key}"
        if key in cv_results:
            scores = cv_results[key]
            if invert_sign:
                scores = -scores
            return (scores.mean(), scores.std())
        return (0.0, 0.0)

    (mae_mean, mae_std) = get_score_stats("MAE", invert_sign=True)
    (mse_mean, mse_std) = get_score_stats("MSE", invert_sign=True)
    (rmse_mean, rmse_std) = get_score_stats("RMSE", invert_sign=True)
    (mape_mean, mape_std) = get_score_stats("MAPE", invert_sign=True)
    (wape_mean, wape_std) = get_score_stats("WAPE", invert_sign=True)
    (r2_mean, r2_std) = get_score_stats("R2", invert_sign=False)
    (fa_mean, fa_std) = get_score_stats("Forecast_Accuracy", invert_sign=False)
    verbose and logger.info(f"CV MAE          : {mae_mean:.4f} (+/- {mae_std:.4f})")
    verbose and logger.info(f"CV MSE          : {mse_mean:.4f} (+/- {mse_std:.4f})")
    verbose and logger.info(f"CV RMSE         : {rmse_mean:.4f} (+/- {rmse_std:.4f})")
    verbose and logger.info(f"CV MAPE         : {mape_mean:.4f} (+/- {mape_std:.4f})")
    verbose and logger.info(f"CV WAPE         : {wape_mean:.4f} (+/- {wape_std:.4f})")
    verbose and logger.info(f"CV R2 Score     : {r2_mean:.4f} (+/- {r2_std:.4f})")
    verbose and logger.info(f"CV Forecast Acc : {fa_mean:.4f} (+/- {fa_std:.4f})")
    CV_metrics = pd.DataFrame(
        {
            "Metric": [
                "Mean Absolute Error (MAE)",
                "Mean Squared Error (MSE)",
                "Root Mean Squared Error (RMSE)",
                "Mean Absolute Percentage Error (MAPE)",
                "Weighted Absolute Percentage Error (WAPE)",
                "R2 Score",
                "Forecast Accuracy",
            ],
            "Mean": [
                mae_mean,
                mse_mean,
                rmse_mean,
                mape_mean,
                wape_mean,
                r2_mean,
                fa_mean,
            ],
            "Std": [mae_std, mse_std, rmse_std, mape_std, wape_std, r2_std, fa_std],
        }
    )
    return CV_metrics


def _compute_score(model, X, y, metric):
    """
    Computes the score for the model on the given data based on the selected metric.
    Assumes 'metric' is passed as the short code (e.g., "MAE", "R2", "FA").
    """
    y_pred = model.predict(X)
    if metric == "mae":
        score = mean_absolute_error(y, y_pred)
    elif metric == "mse":
        score = mean_squared_error(y, y_pred)
    elif metric == "rmse":
        score = root_mean_squared_error(y, y_pred)
    elif metric == "mape":
        score = mean_absolute_percentage_error(y, y_pred)
    elif metric == "r2":
        score = r2_score(y, y_pred)
    elif metric == "wape" or metric == "fa":
        y_true_np = np.array(y, dtype=float).flatten()
        y_pred_np = np.array(y_pred, dtype=float).flatten()
        eps = np.finfo(np.float64).eps
        sum_abs_error = np.sum(np.abs(y_true_np - y_pred_np))
        sum_abs_truth = np.maximum(np.sum(np.abs(y_true_np)), eps)
        wape_val = sum_abs_error / sum_abs_truth
        if metric == "fa":
            score = 1.0 - wape_val
        else:
            score = wape_val
    else:
        raise ValueError(f"Unknown regression metric: {metric}")
    return score


def _get_cv_scoring_object(metric: str) -> Any:
    """
    Returns a scoring object (string or callable) suitable for cross_validate or GridSearchCV.
    Used during HPO for Regression.
    """
    if metric == "mae":
        return "neg_mean_absolute_error"
    elif metric == "mse":
        return "neg_mean_squared_error"
    elif metric == "rmse":
        return "neg_root_mean_squared_error"
    elif metric == "r2":
        return "r2"
    elif metric == "mape":
        return "neg_mean_absolute_percentage_error"
    elif metric == "wape":
        return make_scorer(wape_score, greater_is_better=False)
    elif metric == "fa":
        return make_scorer(forecast_accuracy, greater_is_better=True)
    else:
        return "neg_root_mean_squared_error"


def _hyperparameters_optimization(
    X,
    y,
    constant_hyperparameters,
    optimization_metric,
    val_ratio,
    shuffle_split,
    use_cross_val,
    cv_folds,
    n_trials=50,
    strategy="maximize",
    sampler="Tree-structured Parzen",
    seed=None,
    n_jobs=-1,
    verbose=False,
):
    direction = "maximize" if strategy.lower() == "maximize" else "minimize"
    sampler_map = {
        "Tree-structured Parzen": TPESampler(seed=seed),
        "Gaussian Process": GPSampler(seed=seed),
        "CMA-ES": CmaEsSampler(seed=seed),
        "Random Search": RandomSampler(seed=seed),
        "Random Sobol Search": QMCSampler(seed=seed),
    }
    if sampler in sampler_map:
        chosen_sampler = sampler_map[sampler]
    else:
        logger.warning(f"Sampler '{sampler}' not recognized → falling back to TPE")
        chosen_sampler = TPESampler(seed=seed)
    chosen_pruner = HyperbandPruner()
    if use_cross_val:
        cv = KFold(n_splits=cv_folds, shuffle=shuffle_split, random_state=seed)
        cv_score_obj = _get_cv_scoring_object(optimization_metric)
    else:
        (X_train, X_val, y_train, y_val) = train_test_split(
            X, y, test_size=val_ratio, random_state=seed, shuffle=shuffle_split
        )

    def logging_callback(study: Study, trial: FrozenTrial):
        """Callback function to log trial progress"""
        verbose and logger.info(
            f"Trial {trial.number} finished with value: {trial.value} and parameters: {trial.params}"
        )
        try:
            verbose and logger.info(f"Best value so far: {study.best_value}")
            verbose and logger.info(f"Best parameters so far: {study.best_params}")
        except ValueError:
            verbose and logger.info(f"No successful trials completed yet")
        verbose and logger.info(f"" + "-" * 50)

    def objective(trial):
        try:
            criterion = trial.suggest_categorical(
                "criterion", ["squared_error", "friedman_mse", "absolute_error"]
            )
            n_estimators = trial.suggest_int("n_estimators", 100, 2000, log=True)
            max_depth = trial.suggest_int("max_depth", 10, 120, log=True)
            min_samples_split = trial.suggest_int("min_samples_split", 2, 32)
            min_samples_leaf = trial.suggest_int("min_samples_leaf", 1, 32)
            max_features = trial.suggest_categorical(
                "max_features", ["sqrt", "log2", None, 0.5, 0.8]
            )
            model = RandomForestRegressor(
                criterion=criterion,
                n_estimators=n_estimators,
                min_samples_leaf=min_samples_leaf,
                max_features=max_features,
                max_depth=max_depth,
                min_samples_split=min_samples_split,
                random_state=seed,
                n_jobs=n_jobs,
            )
            if use_cross_val:
                scores = cross_validate(
                    model, X, y, cv=cv, n_jobs=n_jobs, scoring=cv_score_obj
                )
                return scores["test_score"].mean()
            else:
                model.fit(X_train, y_train)
                score = _compute_score(model, X_val, y_val, optimization_metric)
                return score
        except Exception as e:
            verbose and logger.error(
                f"Trial {trial.number} failed with error: {str(e)}"
            )
            raise

    study = create_study(
        direction=direction, sampler=chosen_sampler, pruner=chosen_pruner
    )
    study.optimize(
        objective,
        n_trials=n_trials,
        catch=(Exception,),
        n_jobs=n_jobs,
        callbacks=[logging_callback],
    )
    verbose and logger.info(f"Optimization completed!")
    verbose and logger.info(
        f"   Best Criterion             : {study.best_params['criterion']}"
    )
    verbose and logger.info(
        f"   Best Number of Trees       : {study.best_params['n_estimators']}"
    )
    verbose and logger.info(
        f"   Best Max Depth             : {study.best_params['max_depth']}"
    )
    verbose and logger.info(
        f"   Best Feature Sampling      : {study.best_params['max_features']}"
    )
    verbose and logger.info(
        f"   Best Min Samples per Leaf  : {study.best_params['min_samples_leaf']}"
    )
    verbose and logger.info(
        f"   Best Min Samples Split     : {study.best_params['min_samples_split']}"
    )
    verbose and logger.info(
        f"   Best {optimization_metric:<22}: {study.best_value:.4f}"
    )
    verbose and logger.info(f"   Sampler used               : {sampler}")
    verbose and logger.info(f"   Direction                  : {direction}")
    if use_cross_val:
        verbose and logger.info(f"   Cross-validation           : {cv_folds}-fold")
    else:
        verbose and logger.info(
            f"   Validation                 : single train/val split"
        )
    trials = study.trials_dataframe()
    trials["best_value"] = trials["value"].cummax()
    cols = list(trials.columns)
    value_idx = cols.index("value")
    cols = [c for c in cols if c != "best_value"]
    new_order = cols[: value_idx + 1] + ["best_value"] + cols[value_idx + 1 :]
    trials = trials[new_order]
    return (study.best_params, trials)


def _combine_test_data(X_test, y_true, y_pred, features_names=None):
    """
    Combine X_test, y_true, y_pred into a single DataFrame.

    Parameters:
    -----------
    X_test : pandas/polars DataFrame, numpy array, or scipy sparse matrix
        Test features
    y_true : pandas/polars Series, numpy array, or list
        True labels
    y_pred : pandas/polars Series, numpy array, or list
        Predicted labels

    Returns:
    --------
    pandas.DataFrame
        Combined DataFrame with features, y_true, and y_pred
    """
    if sparse.issparse(X_test):
        X_df = pd.DataFrame(X_test.toarray())
    elif isinstance(X_test, np.ndarray):
        X_df = pd.DataFrame(X_test)
    elif hasattr(X_test, "to_pandas"):
        X_df = X_test.to_pandas()
    elif isinstance(X_test, pd.DataFrame):
        X_df = X_test.copy()
    else:
        raise TypeError(f"Unsupported type for X_test: {type(X_test)}")
    if X_df.columns.tolist() == list(range(len(X_df.columns))):
        X_df.columns = (
            [f"feature_{i}" for i in range(len(X_df.columns))]
            if features_names is None
            else features_names
        )
    if isinstance(y_true, list):
        y_true_series = pd.Series(y_true, name="y_true")
    elif isinstance(y_true, np.ndarray):
        y_true_series = pd.Series(y_true, name="y_true")
    elif hasattr(y_true, "to_pandas"):
        y_true_series = y_true.to_pandas()
        y_true_series.name = "y_true"
    elif isinstance(y_true, pd.Series):
        y_true_series = y_true.copy()
        y_true_series.name = "y_true"
    else:
        raise TypeError(f"Unsupported type for y_true: {type(y_true)}")
    if isinstance(y_pred, list):
        y_pred_series = pd.Series(y_pred, name="y_pred")
    elif isinstance(y_pred, np.ndarray):
        y_pred_series = pd.Series(y_pred, name="y_pred")
    elif hasattr(y_pred, "to_pandas"):
        y_pred_series = y_pred.to_pandas()
        y_pred_series.name = "y_pred"
    elif isinstance(y_pred, pd.Series):
        y_pred_series = y_pred.copy()
        y_pred_series.name = "y_pred"
    else:
        raise TypeError(f"Unsupported type for y_pred: {type(y_pred)}")
    X_df = X_df.reset_index(drop=True)
    y_true_series = y_true_series.reset_index(drop=True)
    y_pred_series = y_pred_series.reset_index(drop=True)
    result_df = pd.concat([X_df, y_true_series, y_pred_series], axis=1)
    return result_df


def _get_feature_importance(model, feature_names=None, sort=True, top_n=None):
    """
    Extract feature importance from a Random Forest model.

    Parameters:
    -----------
    model : RandomForestClassifier or RandomForestRegressor
        Fitted scikit-learn Random Forest model
    feature_names : list or array-like, optional
        Names of features. If None, uses generic names like 'feature_0', 'feature_1', etc.
    sort : bool, default=True
        Whether to sort features by importance (descending)
    top_n : int, optional
        If specified, returns only the top N most important features

    Returns:
    --------
    pd.DataFrame
        DataFrame with columns: 'feature', 'importance'
        Importance values represent the mean decrease in impurity (Gini importance)
    """
    importances = model.feature_importances_
    if feature_names is None:
        feature_names = [f"feature_{i}" for i in range(len(importances))]
    importance_df = pd.DataFrame({"feature": feature_names, "importance": importances})
    if sort:
        importance_df = importance_df.sort_values("importance", ascending=False)
    importance_df = importance_df.reset_index(drop=True)
    if top_n is not None:
        importance_df = importance_df.head(top_n)
    return importance_df


def _smart_shap_background(
    X: Union[np.ndarray, pd.DataFrame],
    model_type: str = "tree",
    seed: int = 42,
    verbose: bool = True,
) -> Union[np.ndarray, pd.DataFrame, object]:
    """
    Intelligently prepares a background dataset for SHAP based on model type.

    Strategies:
    - Tree: Higher sample cap (1000), uses Random Sampling (preserves data structure).
    - Other: Lower sample cap (100), uses K-Means (maximizes info density).
    """
    (n_rows, n_features) = X.shape
    if model_type == "tree":
        max_samples = 1000
        use_kmeans = False
    else:
        max_samples = 100
        use_kmeans = True
    if n_rows <= max_samples:
        verbose and logger.info(
            f"✓ Dataset small ({n_rows} <= {max_samples}). Using full data."
        )
        return X
    verbose and logger.info(
        f"⚡ Large dataset detected ({n_rows} rows). Optimization Strategy: {('K-Means' if use_kmeans else 'Random Sampling')}"
    )
    if use_kmeans:
        try:
            verbose and logger.info(
                f"   Summarizing to {max_samples} weighted centroids..."
            )
            return shap.kmeans(X, max_samples)
        except Exception as e:
            logger.warning(
                f"   K-Means failed ({str(e)}). Falling back to random sampling."
            )
            return shap.sample(X, max_samples, random_state=seed)
    else:
        verbose and logger.info(f"   Sampling {max_samples} random rows...")
        return shap.sample(X, max_samples, random_state=seed)


def _class_index_df(model):
    columns = {"index": pd.Series(dtype="int64"), "class": pd.Series(dtype="object")}
    if model is None:
        return pd.DataFrame(columns)
    classes = getattr(model, "classes_", None)
    if classes is None:
        return pd.DataFrame(columns)
    return pd.DataFrame({"index": range(len(classes)), "class": classes})


def train_reg_random_forest(
    X: DataFrame, y: Union[DataSeries, NDArray, List], options=None
) -> Tuple[
    Any, Any, Union[DataFrame, Dict], DataFrame, DataFrame, DataFrame, DataFrame, Dict
]:
    options = options or {}
    criterion = options.get("criterion", "squared_error").lower().replace(" ", "_")
    n_estimators = options.get("n_estimators", 100)
    feature_strategy = options.get("feature_strategy", "Square root")
    max_depth_input = options.get("max_depth", 0)
    min_samples_leaf = options.get("min_samples_leaf", 1)
    if feature_strategy == "Automatic 30%":
        max_features = 0.3
    elif feature_strategy == "Automatic 60%":
        max_features = 0.5
    elif feature_strategy == "Square root":
        max_features = "sqrt"
    elif feature_strategy == "Logarithm":
        max_features = "log2"
    elif feature_strategy == "None":
        max_features = None
    else:
        max_features = "sqrt"
    max_depth = None if max_depth_input == 0 else max_depth_input
    auto_split = options.get("auto_split", True)
    test_val_size = options.get("test_val_size", 15) / 100
    shuffle_split = options.get("shuffle_split", True)
    retrain_on_full = options.get("retrain_on_full", False)
    custom_average_strategy = options.get("custom_average_strategy", "auto")
    use_cross_validation = options.get("use_cross_validation", False)
    cv_folds = options.get("cv_folds", 5)
    use_hpo = options.get("use_hyperparameter_optimization", False)
    optimization_metric = options.get(
        "optimization_metric", "Root Mean Squared Error (RMSE)"
    )
    optimization_metric = METRICS_DICT[optimization_metric]
    optimization_method = options.get("optimization_method", "Tree-structured Parzen")
    optimization_iterations = options.get("optimization_iterations", 50)
    return_shap_explainer = options.get("return_shap_explainer", False)
    use_shap_sampler = options.get("use_shap_sampler", False)
    shap_feature_perturbation = options.get(
        "shap_feature_perturbation", "Interventional"
    )
    metrics_as = options.get("metrics_as", "Dataframe")
    n_jobs_str = options.get("n_jobs", "1")
    random_state = options.get("random_state", 42)
    activate_caching = options.get("activate_caching", False)
    verbose = options.get("verbose", True)
    n_jobs_int = -1 if n_jobs_str == "All" else int(n_jobs_str)
    skip_computation = False
    Model = None
    Metrics = pd.DataFrame()
    CV_Metrics = pd.DataFrame()
    Features_Importance = pd.DataFrame()
    SHAP = None
    HPO_Trials = pd.DataFrame()
    HPO_Best = None
    fa = None
    wape = None
    mae = None
    mse = None
    rmse = None
    r2 = None
    mape = None
    (n_samples, _) = X.shape
    shap_feature_names = _ensure_feature_names(X)
    if activate_caching:
        verbose and logger.info(f"Caching is activate")
        data_hasher = _UniversalDatasetHasher(n_samples, verbose=verbose)
        X_hash = data_hasher.hash_data(X).hash
        y_hash = data_hasher.hash_data(y).hash
        all_hash_base_text = f"HASH BASE TEXTPandas Version {pd.__version__}POLARS Version {pl.__version__}Numpy Version {np.__version__}Scikit Learn Version {sklearn.__version__}Scipy Version {scipy.__version__}{('SHAP Version ' + shap.__version__ if return_shap_explainer else 'NO SHAP Version')}{X_hash}{y_hash}{n_estimators}{feature_strategy}{max_depth_input}{min_samples_leaf}{criterion}{('Use HPO' if use_hpo else 'No HPO')}{(optimization_metric if use_hpo else 'No HPO Metric')}{(optimization_method if use_hpo else 'No HPO Method')}{(optimization_iterations if use_hpo else 'No HPO Iter')}{(cv_folds if use_cross_validation else 'No CV')}{('Auto Split' if auto_split else test_val_size)}{shuffle_split}{return_shap_explainer}{shap_feature_perturbation}{use_shap_sampler}{random_state}{custom_average_strategy}"
        all_hash = hashlib.sha256(all_hash_base_text.encode("utf-8")).hexdigest()
        verbose and logger.info(f"Hash was computed: {all_hash}")
        temp_folder = Path(tempfile.gettempdir())
        cache_folder = temp_folder / "coded-flows-cache"
        cache_folder.mkdir(parents=True, exist_ok=True)
        model_path = cache_folder / f"{all_hash}.model"
        metrics_dict_path = cache_folder / f"metrics_{all_hash}.json"
        metrics_df_path = cache_folder / f"metrics_{all_hash}.parquet"
        cv_metrics_path = cache_folder / f"cv_metrics_{all_hash}.parquet"
        hpo_trials_path = cache_folder / f"hpo_trials_{all_hash}.parquet"
        hpo_best_params_path = cache_folder / f"hpo_best_params_{all_hash}.json"
        features_importance_path = (
            cache_folder / f"features_importance_{all_hash}.parquet"
        )
        prediction_set_path = cache_folder / f"prediction_set_{all_hash}.parquet"
        shap_path = cache_folder / f"{all_hash}.shap"
        skip_computation = model_path.is_file()
    if not skip_computation:
        try:
            _validate_numerical_data(X)
        except Exception as e:
            verbose and logger.error(
                f"Only numerical or boolean types are allowed for 'X' input!"
            )
            raise
        features_names = X.columns if hasattr(X, "columns") else None
        min_samples_split = 2 * min_samples_leaf
        fixed_test_split = None if auto_split else test_val_size
        (X_train, X_test, y_train, y_test, val_ratio) = _smart_split(
            n_samples,
            X,
            y,
            random_state=random_state,
            shuffle=shuffle_split,
            fixed_test_split=fixed_test_split,
            verbose=verbose,
        )
        if use_hpo:
            verbose and logger.info(f"Performing Hyperparameters Optimization")
            constant_hyperparameters = {}
            (HPO_Best, HPO_Trials) = _hyperparameters_optimization(
                X_train,
                y_train,
                constant_hyperparameters,
                optimization_metric,
                val_ratio,
                shuffle_split,
                use_cross_validation,
                cv_folds,
                optimization_iterations,
                METRICS_OPT[optimization_metric],
                optimization_method,
                random_state,
                n_jobs_int,
                verbose=verbose,
            )
            HPO_Trials = _normalize_hpo_df(HPO_Trials)
            criterion = HPO_Best["criterion"]
            n_estimators = HPO_Best["n_estimators"]
            max_depth = HPO_Best["max_depth"]
            min_samples_split = HPO_Best["min_samples_split"]
            min_samples_leaf = HPO_Best["min_samples_leaf"]
            max_features = HPO_Best["max_features"]
        Model = RandomForestRegressor(
            criterion=criterion,
            n_estimators=n_estimators,
            min_samples_leaf=min_samples_leaf,
            max_features=max_features,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            random_state=random_state,
            n_jobs=n_jobs_int,
        )
        if use_cross_validation and (not use_hpo):
            verbose and logger.info(
                f"Using Cross-Validation to measure performance metrics"
            )
            CV_Metrics = _perform_cross_validation(
                Model,
                X_train,
                y_train,
                cv_folds,
                shuffle_split,
                random_state,
                n_jobs_int,
                verbose,
            )
        Model.fit(X_train, y_train)
        y_pred = Model.predict(X_test)
        fa = forecast_accuracy(y_test, y_pred)
        wape = wape_score(y_test, y_pred)
        mae = mean_absolute_error(y_test, y_pred)
        mse = mean_squared_error(y_test, y_pred)
        rmse = root_mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        mape = mean_absolute_percentage_error(y_test, y_pred)
        if metrics_as == "Dataframe":
            Metrics = pd.DataFrame(
                {
                    "Metric": [
                        "Forecast Accuracy",
                        "Weighted Absolute Percentage Error",
                        "Mean Absolute Error",
                        "Mean Squared Error",
                        "Root Mean Squared Error",
                        "R2 Score",
                        "Mean Absolute Percentage Error",
                    ],
                    "Value": [fa, wape, mae, mse, rmse, r2, mape],
                }
            )
        else:
            Metrics = {
                "forecast_accuracy": fa,
                "weighted_absolute_percentage_error ": wape,
                "mean_absolute_error": mae,
                "mean_squared_error": mse,
                "root_mean_squared_error": rmse,
                "r2_score": r2,
                "mean_absolute_percentage_error": mape,
            }
        verbose and logger.info(f"Forecast Accuracy                  : {fa:.2%}")
        verbose and logger.info(f"Weighted Absolute Percentage Error : {wape:.2%}")
        verbose and logger.info(f"Mean Absolute Error                : {mae:.4f}")
        verbose and logger.info(f"Mean Squared Error                 : {mse:.4f}")
        verbose and logger.info(f"Root Mean Squared Error            : {rmse:.4f}")
        verbose and logger.info(f"R2 Score                           : {r2:.4f}")
        verbose and logger.info(f"Mean Absolute Percentage Error     : {mape:.2%}")
        Prediction_Set = _combine_test_data(X_test, y_test, y_pred, features_names)
        verbose and logger.info(f"Prediction Set created")
        if retrain_on_full:
            verbose and logger.info(
                "Retraining model on full dataset for production deployment"
            )
            Model.fit(X, y)
            verbose and logger.info(
                "Model successfully retrained on full dataset. Reported metrics remain from original held-out test set."
            )
        Features_Importance = _get_feature_importance(Model, features_names)
        verbose and logger.info(f"Features Importance computed")
        if return_shap_explainer:
            if shap_feature_perturbation == "Interventional":
                SHAP = shap.TreeExplainer(
                    Model,
                    (
                        _smart_shap_background(
                            X if retrain_on_full else X_train,
                            model_type="tree",
                            seed=random_state,
                            verbose=verbose,
                        )
                        if use_shap_sampler
                        else X if retrain_on_full else X_train
                    ),
                    feature_names=shap_feature_names,
                )
            else:
                SHAP = shap.TreeExplainer(
                    Model,
                    feature_names=shap_feature_names,
                    feature_perturbation="tree_path_dependent",
                )
            verbose and logger.info(f"SHAP explainer generated")
        if activate_caching:
            verbose and logger.info(f"Caching output elements")
            joblib.dump(Model, model_path)
            if isinstance(Metrics, dict):
                with metrics_dict_path.open("w", encoding="utf-8") as f:
                    json.dump(Metrics, f, ensure_ascii=False, indent=4)
            else:
                Metrics.to_parquet(metrics_df_path)
            if use_cross_validation and (not use_hpo):
                CV_Metrics.to_parquet(cv_metrics_path)
            if use_hpo:
                HPO_Trials.to_parquet(hpo_trials_path)
                with hpo_best_params_path.open("w", encoding="utf-8") as f:
                    json.dump(HPO_Best, f, ensure_ascii=False, indent=4)
            Features_Importance.to_parquet(features_importance_path)
            Prediction_Set.to_parquet(prediction_set_path)
            if return_shap_explainer:
                with shap_path.open("wb") as f:
                    joblib.dump(SHAP, f)
            verbose and logger.info(f"Caching done")
    else:
        verbose and logger.info(f"Skipping computations and loading cached elements")
        Model = joblib.load(model_path)
        verbose and logger.info(f"Model loaded")
        if metrics_dict_path.is_file():
            with metrics_dict_path.open("r", encoding="utf-8") as f:
                Metrics = json.load(f)
        else:
            Metrics = pd.read_parquet(metrics_df_path)
        verbose and logger.info(f"Metrics loaded")
        if use_cross_validation and (not use_hpo):
            CV_Metrics = pd.read_parquet(cv_metrics_path)
            verbose and logger.info(f"Cross Validation metrics loaded")
        if use_hpo:
            HPO_Trials = pd.read_parquet(hpo_trials_path)
            with hpo_best_params_path.open("r", encoding="utf-8") as f:
                HPO_Best = json.load(f)
            verbose and logger.info(
                f"Hyperparameters Optimization trials and best params loaded"
            )
        Features_Importance = pd.read_parquet(features_importance_path)
        verbose and logger.info(f"Features Importance loaded")
        Prediction_Set = pd.read_parquet(prediction_set_path)
        verbose and logger.info(f"Prediction Set loaded")
        if return_shap_explainer:
            with shap_path.open("rb") as f:
                SHAP = joblib.load(f)
            verbose and logger.info(f"SHAP Explainer loaded")
    return (
        Model,
        SHAP,
        Metrics,
        CV_Metrics,
        Features_Importance,
        Prediction_Set,
        HPO_Trials,
        HPO_Best,
    )

Brick Info

version v0.1.4
python 3.11, 3.12, 3.13
requirements
  • shap>=0.47.0
  • scikit-learn
  • pandas
  • numpy
  • torch
  • numba>=0.56.0
  • shap
  • cmaes
  • optuna
  • scipy
  • polars
  • xxhash