Clu. DBSCAN
Performs Density-Based Spatial Clustering (DBSCAN) and computes intrinsic metrics (Silhouette, etc.).
Clu. DBSCAN
Processing
This brick performs Density-Based Spatial Clustering (DBSCAN) to group data points that are packed closely together. Unlike methods that force data into circular groups (like K-Means), this method can find clusters of any shape and automatically identifies "noise" (outliers) that don't belong to any group.
It processes the data mathematically and, if clusters are found, calculates intrinsic metrics (like the Silhouette Score) to help you judge how well-separated and distinct the resulting groups are.
Inputs
- data
- The dataset containing the numerical values you want to cluster. This algorithm relies on mathematical distance, so all columns in this dataset must be numerical (numbers or booleans). You should remove or transform text columns before connecting them here.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame |
You can check the list of supported types here: Available Type Hints.
Outputs
- Clustered data
- The original dataset with an additional column appended containing the cluster labels. Points assigned to the same cluster will share the same integer ID.
Important: A value of -1 in the cluster column represents "Noise" (outliers that did not fit into any density group).
- Metrics
- A collection of scores that evaluate the quality of the clustering. This can be returned as a Dataframe or a Dictionary depending on your settings.
The Metrics output contains the following specific data fields (keys or rows):
- Silhouette Score: A value between -1 and 1. High values indicate points are well-matched to their own cluster and poorly matched to neighboring clusters.
- Calinski-Harabasz: Also known as the Variance Ratio Criterion. Higher scores indicate better-defined clusters.
- Davies-Bouldin: The average similarity measure of each cluster with its most similar cluster. Lower values indicate better clustering.
Outputs Types
| Output | Types |
|---|---|
Clustered data |
DataFrame |
Metrics |
DataFrame, Dict |
You can check the list of supported types here: Available Type Hints.
Options
The Clu. DBSCAN brick contains some changeable options:
- Neighborhood Distance (Eps.)
- Controls how close two points must be to be considered "neighbors."
- Low value (e.g., 0.1): Points must be very close. This may result in many small clusters and a lot of unassigned noise.
- High value (e.g., 10.0): Points can be further apart. This may merge distinct groups into one giant cluster.
- Min. Points per Cluster
- The minimum number of neighbors a point needs to start a new cluster (or be considered a "core" part of one).
- Higher values: Reduces noise but makes it harder to form clusters (needs denser data).
- Lower values: Makes it easier to form clusters but might group noise points together.
- Distance Calculation Metric
- The formula used to calculate the distance between points.
- Euclidean: Standard straight-line distance (like a ruler). Best for general physical data.
- Manhattan: Grid-based distance (like walking city blocks). Good for high-dimensional data.
- Cosine: Measures the angle between points.
- Chebyshev: The greatest difference along any single dimension.
- Search Algorithm
- The computational method used to find neighbors.
- Auto: Automatically selects the best method based on your data size and structure. (Recommended).
- Ball-Tree: Efficient for high-dimensional data.
- KD-Tree: Efficient for low-dimensional data.
- Brute: Compares every point to every other point. Slowest, but accurate.
- Tree Leaf Size
- A tuning parameter for Ball-Tree or KD-Tree. Smaller values may slow down building the tree but speed up querying. This rarely needs changing from the default (30).
- Cluster Column Name
- The name of the new column that will be added to your data containing the Cluster IDs. Defaults to
cluster_id. - Metrics Output Format
- Determines the structure of the
Metricsoutput. - Verbose
- If enabled, detailed logs about the clustering process (number of clusters found, noise count, calculation times) will be printed to the logs.
import logging
import pandas as pd
import polars as pl
import numpy as np
from scipy import sparse
from sklearn.cluster import DBSCAN
from sklearn.metrics import (
silhouette_score,
calinski_harabasz_score,
davies_bouldin_score,
)
from coded_flows.types import DataFrame, Tuple, Union, Dict, Any
from coded_flows.utils import CodedFlowsLogger
logger = CodedFlowsLogger(name="Clu. DBSCAN", level=logging.INFO)
def _validate_numerical_data(data):
"""
Validates if the input data contains only numerical (integer, float) or boolean values.
"""
if sparse.issparse(data):
if not (
np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
):
raise TypeError(
f"Sparse matrix contains unsupported data type: {data.dtype}."
)
return
if isinstance(data, np.ndarray):
if not (
np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
):
raise TypeError(
f"NumPy array contains unsupported data type: {data.dtype}."
)
return
if isinstance(data, (pd.DataFrame, pd.Series)):
if isinstance(data, pd.Series):
if not (
pd.api.types.is_numeric_dtype(data) or pd.api.types.is_bool_dtype(data)
):
raise TypeError(f"Pandas Series '{data.name}' is not numerical.")
else:
numeric_cols = data.select_dtypes(include=["number", "bool"])
if numeric_cols.shape[1] != data.shape[1]:
invalid_cols = list(set(data.columns) - set(numeric_cols.columns))
raise TypeError(
f"Pandas DataFrame contains non-numerical columns: {invalid_cols}."
)
return
if isinstance(data, (pl.DataFrame, pl.Series)):
if isinstance(data, pl.Series):
if not (data.dtype.is_numeric() or data.dtype == pl.Boolean):
raise TypeError(f"Polars Series '{data.name}' is not numerical.")
else:
for col_name, dtype in zip(data.columns, data.dtypes):
if not (dtype.is_numeric() or dtype == pl.Boolean):
raise TypeError(
f"Polars DataFrame contains non-numerical column: '{col_name}'."
)
return
raise ValueError(f"Unsupported data type: {type(data)}")
def clu_dbscan(
data: DataFrame, options=None
) -> Tuple[DataFrame, Union[DataFrame, Dict]]:
options = options or {}
verbose = options.get("verbose", True)
eps = options.get("eps", 0.5)
min_samples = options.get("min_samples", 5)
metric = options.get("metric", "euclidean").lower()
algorithm = options.get("algorithm", "auto").lower().replace("-", "_")
leaf_size = options.get("leaf_size", 30)
cluster_col_name = options.get("cluster_column", "cluster_id")
metrics_as = options.get("metrics_as", "Dataframe")
AUTOMATED_THRESHOLD = 10000
Clustered_data = None
Metrics = None
try:
verbose and logger.info(
f"Initializing Clu. DBSCAN (eps={eps}, min_pts={min_samples})."
)
is_pandas = isinstance(data, pd.DataFrame)
is_polars = isinstance(data, pl.DataFrame)
if not (is_pandas or is_polars):
raise ValueError("Input data must be a pandas or polars DataFrame.")
verbose and logger.info("Validating numerical requirements...")
_validate_numerical_data(data)
n_samples = data.shape[0]
verbose and logger.info(f"Processing {n_samples} samples.")
Model = DBSCAN(
eps=eps,
min_samples=min_samples,
metric=metric,
algorithm=algorithm,
leaf_size=leaf_size,
n_jobs=-1,
)
verbose and logger.info("Fitting model...")
labels = Model.fit_predict(data)
n_clusters_found = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
verbose and logger.info(
f"Found {n_clusters_found} clusters. Noise points: {n_noise}."
)
unique_labels = set(labels)
if len(unique_labels) < 2:
verbose and logger.warning(
"Not enough clusters (or only noise) found to calculate metrics. Returning NaNs."
)
(s_score, ch_score, db_score) = (np.nan, np.nan, np.nan)
else:
verbose and logger.info("Computing Intrinsic metrics.")
if n_samples > AUTOMATED_THRESHOLD:
s_score = silhouette_score(
data, labels, sample_size=min(n_samples, 20000), random_state=42
)
verbose and logger.info(f"Silhouette Score : {s_score:.4f} (Sampled)")
else:
s_score = silhouette_score(data, labels)
verbose and logger.info(f"Silhouette Score : {s_score:.4f}")
ch_score = calinski_harabasz_score(data, labels)
verbose and logger.info(f"Calinski-Harabasz : {ch_score:.4f}")
db_score = davies_bouldin_score(data, labels)
verbose and logger.info(f"Davies-Bouldin : {db_score:.4f}")
metric_names = ["Silhouette Score", "Calinski-Harabasz", "Davies-Bouldin"]
metric_values = [s_score, ch_score, db_score]
if metrics_as == "Dataframe":
Metrics = pd.DataFrame({"Metric": metric_names, "Value": metric_values})
else:
Metrics = dict(zip(metric_names, metric_values))
if is_pandas:
Clustered_data = data.copy()
Clustered_data[cluster_col_name] = labels
verbose and logger.info(
"Results attached to a copy of Pandas DataFrame (original preserved)."
)
elif is_polars:
Clustered_data = data.with_columns(
pl.Series(name=cluster_col_name, values=labels)
)
verbose and logger.info("Results attached to Polars DataFrame.")
except Exception as e:
verbose and logger.error(f"Clu. DBSCAN operation failed: {e}")
raise
return (Clustered_data, Metrics)
Brick Info
- shap>=0.47.0
- scikit-learn
- pandas
- numpy
- scipy
- numba>=0.56.0
- polars