Clu. HDBSCAN
Performs Hierarchical Density-Based Spatial Clustering (HDBSCAN) and computes intrinsic metrics.
Clu. HDBSCAN
Processing
This brick groups your data points into clusters based on density using the HDBSCAN (Hierarchical Density-Based Spatial Clustering of Applications with Noise) algorithm. Unlike other methods (like K-Means), you do not need to specify the number of clusters in advance.
The brick analyzes the "density" of your data to find natural groupings. It identifies areas where data points are concentrated and separates them from areas where points are sparse. It also automatically detects outliers—data points that don't fit well into any cluster—and labels them as noise.
It calculates three intrinsic metrics (Silhouette Score, Calinski-Harabasz, and Davies-Bouldin) to help you evaluate how well the clusters are separated and defined.
Inputs
- data
- The dataset you want to cluster. This must contain only numerical values (integers, floats, or booleans). If your data contains text or dates, you must remove or encode them before using this brick.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame |
You can check the list of supported types here: Available Type Hints.
Outputs
- Clustered data
- The original input data with an additional column (default name:
cluster_id) indicating the group assignment for each row. A cluster ID of-1indicates "Noise", the algorithm determined this point does not belong to any specific cluster. - Metrics
- A summary of the clustering quality scores. Depending on your settings, this is returned as either a small table (DataFrame) or a list of key-value pairs (Dictionary).
The Metrics output contains the following specific data fields (keys or rows):
- Silhouette Score: Measures how similar an object is to its own cluster compared to other clusters. Ranges from -1 to 1 (higher is better).
- Calinski-Harabasz: The ratio of dispersion between and within clusters. Higher scores indicate better-defined clusters.
- Davies-Bouldin: The average similarity measure of each cluster with its most similar cluster. Lower scores indicate better separation.
Outputs Types
| Output | Types |
|---|---|
Clustered data |
DataFrame |
Metrics |
DataFrame, Dict |
You can check the list of supported types here: Available Type Hints.
Options
The Clu. HDBSCAN brick contains some changeable options:
- Min. Cluster Size
- The smallest number of data points allowed to form a valid cluster. Increasing this value prevents the creation of many tiny "micro-clusters," forcing the algorithm to find larger, more significant groups.
- Min. Samples
- Controls how conservative the clustering is. It determines how many neighbors a point needs to be considered a "core" part of a cluster. The larger the value, the more points will be declared as noise/outliers.
- Cluster Selection Epsilon
- A distance threshold that allows clusters to merge. If set above 0, clusters that are physically closer than this distance will be combined into a single group.
- Distance Metric
- The method used to calculate the distance between data points.
- Euclidean: Standard straight-line distance. Good for most general geometric data.
- Manhattan: Grid-like distance (sum of absolute differences). Good for high-dimensional data.
- Chebyshev: The greatest difference along any single dimension.
- Cosine: Measures the angle between vectors rather than magnitude.
- Selection Method
- The strategy used to flatten the hierarchical tree into flat clusters.
- EOM (Excess of Mass): The standard method. It tends to produce clusters of varying densities and is usually the best choice.
- Leaf: Produces the most granular (fine-detailed) clustering possible. It tends to find many small, homogeneous clusters.
- Search Algorithm
- The internal algorithm used to find nearest neighbors.
- Auto: Automatically selects the best method based on the data structure.
- Brute: Compares every point to every other point (slow for large data).
- KD-Tree: Efficient for lower-dimensional data.
- Ball-Tree: Efficient for higher-dimensional data.
- Allow Single Cluster
- If enabled, allows the algorithm to determine that the entire dataset forms just one single cluster. By default, it prefers to find at least two or classify points as noise.
- Cluster Column Name
- The name of the new column that will be added to your data containing the cluster labels (e.g.,
cluster_id). - Metrics Output Format
- Determines how the quality scores are returned in the
Metricsoutput. - Verbose
- If enabled, detailed logs about the clustering process (number of clusters found, noise count, calculation times) will be printed to the logs.
import logging
import pandas as pd
import polars as pl
import numpy as np
from scipy import sparse
from sklearn.cluster import HDBSCAN
from sklearn.metrics import (
silhouette_score,
calinski_harabasz_score,
davies_bouldin_score,
)
from coded_flows.types import DataFrame, Tuple, Union, Dict, Any
from coded_flows.utils import CodedFlowsLogger
logger = CodedFlowsLogger(name="Clu. HDBSCAN", level=logging.INFO)
def _validate_numerical_data(data):
"""
Validates if the input data contains only numerical (integer, float) or boolean values.
"""
if sparse.issparse(data):
if not (
np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
):
raise TypeError(
f"Sparse matrix contains unsupported data type: {data.dtype}."
)
return
if isinstance(data, np.ndarray):
if not (
np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
):
raise TypeError(
f"NumPy array contains unsupported data type: {data.dtype}."
)
return
if isinstance(data, (pd.DataFrame, pd.Series)):
if isinstance(data, pd.Series):
if not (
pd.api.types.is_numeric_dtype(data) or pd.api.types.is_bool_dtype(data)
):
raise TypeError(f"Pandas Series '{data.name}' is not numerical.")
else:
numeric_cols = data.select_dtypes(include=["number", "bool"])
if numeric_cols.shape[1] != data.shape[1]:
invalid_cols = list(set(data.columns) - set(numeric_cols.columns))
raise TypeError(
f"Pandas DataFrame contains non-numerical columns: {invalid_cols}."
)
return
if isinstance(data, (pl.DataFrame, pl.Series)):
if isinstance(data, pl.Series):
if not (data.dtype.is_numeric() or data.dtype == pl.Boolean):
raise TypeError(f"Polars Series '{data.name}' is not numerical.")
else:
for col_name, dtype in zip(data.columns, data.dtypes):
if not (dtype.is_numeric() or dtype == pl.Boolean):
raise TypeError(
f"Polars DataFrame contains non-numerical column: '{col_name}'."
)
return
raise ValueError(f"Unsupported data type: {type(data)}")
def clu_hdbscan(
data: DataFrame, options=None
) -> Tuple[DataFrame, Union[DataFrame, Dict]]:
options = options or {}
verbose = options.get("verbose", True)
min_cluster_size = options.get("min_cluster_size", 5)
min_samples = options.get("min_samples", 5)
cluster_selection_epsilon = options.get("cluster_selection_epsilon", 0.0)
metric = options.get("metric", "euclidean").lower()
cluster_selection_method = options.get("cluster_selection_method", "eom").lower()
algorithm = options.get("algorithm", "auto").lower().replace("-", "_")
allow_single_cluster = options.get("allow_single_cluster", False)
cluster_col_name = options.get("cluster_column", "cluster_id")
metrics_as = options.get("metrics_as", "Dataframe")
AUTOMATED_THRESHOLD = 10000
Clustered_data = None
Metrics = None
try:
verbose and logger.info(
f"Initializing Clu. HDBSCAN (min_cluster={min_cluster_size}, min_samples={min_samples})."
)
is_pandas = isinstance(data, pd.DataFrame)
is_polars = isinstance(data, pl.DataFrame)
if not (is_pandas or is_polars):
raise ValueError("Input data must be a pandas or polars DataFrame.")
verbose and logger.info("Validating numerical requirements...")
_validate_numerical_data(data)
n_samples = data.shape[0]
verbose and logger.info(f"Processing {n_samples} samples.")
Model = HDBSCAN(
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
metric=metric,
cluster_selection_method=cluster_selection_method,
algorithm=algorithm,
allow_single_cluster=allow_single_cluster,
copy=True,
n_jobs=-1,
)
verbose and logger.info("Fitting model and predicting labels...")
labels = Model.fit_predict(data)
n_clusters_found = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
verbose and logger.info(
f"Found {n_clusters_found} clusters. Noise points: {n_noise}."
)
unique_labels = set(labels)
if len(unique_labels) < 2:
verbose and logger.warning(
"Not enough clusters (or only noise) found to calculate metrics. Returning NaNs."
)
(s_score, ch_score, db_score) = (np.nan, np.nan, np.nan)
else:
verbose and logger.info("Computing Intrinsic metrics.")
if n_samples > AUTOMATED_THRESHOLD:
s_score = silhouette_score(
data, labels, sample_size=min(n_samples, 20000), random_state=42
)
verbose and logger.info(f"Silhouette Score : {s_score:.4f} (Sampled)")
else:
s_score = silhouette_score(data, labels)
verbose and logger.info(f"Silhouette Score : {s_score:.4f}")
ch_score = calinski_harabasz_score(data, labels)
verbose and logger.info(f"Calinski-Harabasz : {ch_score:.4f}")
db_score = davies_bouldin_score(data, labels)
verbose and logger.info(f"Davies-Bouldin : {db_score:.4f}")
metric_names = ["Silhouette Score", "Calinski-Harabasz", "Davies-Bouldin"]
metric_values = [s_score, ch_score, db_score]
if metrics_as == "Dataframe":
Metrics = pd.DataFrame({"Metric": metric_names, "Value": metric_values})
else:
Metrics = dict(zip(metric_names, metric_values))
if is_pandas:
Clustered_data = data.copy()
Clustered_data[cluster_col_name] = labels
verbose and logger.info("Results assigned in-place to Pandas DataFrame.")
elif is_polars:
Clustered_data = data.with_columns(
pl.Series(name=cluster_col_name, values=labels)
)
verbose and logger.info("Results attached to Polars DataFrame.")
except Exception as e:
verbose and logger.error(f"Clu. HDBSCAN operation failed: {e}")
raise
return (Clustered_data, Metrics)
Brick Info
- shap>=0.47.0
- scikit-learn
- pandas
- numpy
- scipy
- numba>=0.56.0
- polars