Clu. K-Means

Performs K-Means clustering. Computes Extrinsic metrics if true labels are provided (ARI, NMI, etc.), otherwise computes Intrinsic metrics (Silhouette, etc.).

Clu. K-Means

Processing

This brick automatically groups your data points into distinct clusters based on their similarities. It uses the K-Means algorithm to partition your dataset, assigning each row to a specific group.

It is "smart" about performance: if your dataset contains fewer than 10,000 rows, it uses standard K-Means. If it contains 10,000 or more rows, it automatically switches to MiniBatchKMeans, which is faster and more memory-efficient for large datasets. Additionally, it calculates specific scores (metrics) to help you evaluate how well the data has been separated.

Inputs

data
The dataset you want to cluster. You must ensure that the dataset (or the specific columns selected before this brick) contains only numbers (integers/floats) or booleans. Text or date columns will cause an error and should be removed or encoded before this step.

Inputs Types

Input Types
data DataFrame

You can check the list of supported types here: Available Type Hints.

Outputs

Model
The trained K-Means model object (Scikit-Learn). This contains the internal logic and centroids found during processing, which can be used by downstream technical bricks for further analysis or prediction.
Clustered data
The result of the operation. This is your original dataset with an added column (default name cluster_id) indicating which group each row belongs to (e.g., 0, 1, 2).
Metrics
A collection of scores that evaluate the quality of the clustering. Since the algorithm doesn't know the "true" labels, it uses intrinsic metrics (mathematical calculations based on how compact and separated the clusters are).

The Metrics output contains the following specific data fields (keys or column values):

  • Silhouette Score: Ranges from -1 to 1. A high value indicates that the object is well matched to its own cluster and poorly matched to neighboring clusters.
  • Calinski-Harabasz: Also known as the Variance Ratio Criterion. Higher scores generally indicate a model with better-defined clusters.
  • Davies-Bouldin: The average similarity measure of each cluster with its most similar cluster. Unlike the others, lower values are better (closer to 0).

Outputs Types

Output Types
Model Any
Clustered data DataFrame
Metrics DataFrame, Dict

You can check the list of supported types here: Available Type Hints.

Options

The Clu. K-Means brick contains some changeable options:

Number of Clusters (K)
Defines how many groups you want to split your data into. Example: If set to 3, the algorithm will try to find 3 distinct groups in your data.
Output Cluster Column Name
The name of the new header that will be added to your data. This column contains the group number for each row. Default: cluster_id
Metrics Output Format
Determines the data structure for the Metrics output.
Random Seed
An integer used to initialize the random number generator. Why change this? K-Means starts with random centers. Setting a specific number (like 42) ensures that if you run the workflow again with the exact same data, you get the exact same results.
Verbose
Toggles whether detailed logs are written to the system console during processing.
import logging
import pandas as pd
import polars as pl
import numpy as np
from scipy import sparse
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.metrics import (
    silhouette_score,
    calinski_harabasz_score,
    davies_bouldin_score,
)
from coded_flows.types import DataFrame, Tuple, Any, Union, Dict
from coded_flows.utils import CodedFlowsLogger

logger = CodedFlowsLogger(name="Clu. K-Means", level=logging.INFO)


def _validate_numerical_data(data):
    """
    Validates if the input data contains only numerical or boolean values.
    """
    if sparse.issparse(data):
        if not (
            np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
        ):
            raise TypeError("Sparse matrix contains unsupported data type.")
        return
    if isinstance(data, np.ndarray):
        if not (
            np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
        ):
            raise TypeError("NumPy array contains unsupported data type.")
        return
    if isinstance(data, (pd.DataFrame, pd.Series)):
        if isinstance(data, pd.Series):
            if not (
                pd.api.types.is_numeric_dtype(data) or pd.api.types.is_bool_dtype(data)
            ):
                raise TypeError(
                    f"Pandas Series '{data.name}' is not numerical or boolean."
                )
        else:
            numeric_cols = data.select_dtypes(include=["number", "bool"])
            if numeric_cols.shape[1] != data.shape[1]:
                invalid_cols = list(set(data.columns) - set(numeric_cols.columns))
                raise TypeError(
                    f"Pandas DataFrame contains non-numerical columns: {invalid_cols}."
                )
        return
    if isinstance(data, (pl.DataFrame, pl.Series)):
        if isinstance(data, pl.Series):
            if not (data.dtype.is_numeric() or data.dtype == pl.Boolean):
                raise TypeError(
                    f"Polars Series '{data.name}' is not numerical or boolean."
                )
        else:
            for col_name, dtype in zip(data.columns, data.dtypes):
                if not (dtype.is_numeric() or dtype == pl.Boolean):
                    raise TypeError(
                        f"Polars DataFrame contains non-numerical column: '{col_name}'."
                    )
        return
    raise ValueError(f"Unsupported data type for validation: {type(data)}")


def clu_kmeans(
    data: DataFrame, options=None
) -> Tuple[Any, DataFrame, Union[DataFrame, Dict]]:
    options = options or {}
    verbose = options.get("verbose", True)
    n_clusters = options.get("n_clusters", 2)
    cluster_col_name = options.get("cluster_column", "cluster_id")
    metrics_as = options.get("metrics_as", "Dataframe")
    random_state = options.get("random_state", 42)
    AUTOMATED_THRESHOLD = 10000
    Clustered_data = None
    Model = None
    Metrics = None
    try:
        verbose and logger.info(f"Starting Smart K-Means with K={n_clusters}.")
        is_pandas = isinstance(data, pd.DataFrame)
        is_polars = isinstance(data, pl.DataFrame)
        if not (is_pandas or is_polars):
            raise ValueError("Input data must be a pandas or polars DataFrame.")
        verbose and logger.info("Validating input data types...")
        _validate_numerical_data(data)
        n_samples = data.shape[0]
        verbose and logger.info(f"Dataset contains {n_samples} samples.")
        if n_samples >= AUTOMATED_THRESHOLD:
            verbose and logger.info(
                f"Sample size >= {AUTOMATED_THRESHOLD}. Using MiniBatchKMeans."
            )
            Model = MiniBatchKMeans(
                n_clusters=n_clusters,
                random_state=random_state,
                batch_size=256,
                n_init="auto",
            )
        else:
            verbose and logger.info(
                f"Sample size < {AUTOMATED_THRESHOLD}. Using standard KMeans."
            )
            Model = KMeans(
                n_clusters=n_clusters, random_state=random_state, n_init="auto"
            )
        verbose and logger.info("Fitting model...")
        cluster_labels = Model.fit_predict(data)
        metric_names = []
        metric_values = []
        verbose and logger.info("Computing Intrinsic metrics.")
        if n_samples > AUTOMATED_THRESHOLD:
            s_score = silhouette_score(
                data,
                cluster_labels,
                sample_size=min(n_samples, 20000),
                random_state=random_state,
            )
            verbose and logger.info(f"Silhouette Score  : {s_score:.4f} (Sampled)")
        else:
            s_score = silhouette_score(data, cluster_labels)
            verbose and logger.info(f"Silhouette Score  : {s_score:.4f}")
        ch_score = calinski_harabasz_score(data, cluster_labels)
        verbose and logger.info(f"Calinski-Harabasz : {ch_score:.4f}")
        db_score = davies_bouldin_score(data, cluster_labels)
        verbose and logger.info(f"Davies-Bouldin    : {db_score:.4f}")
        metric_names = ["Silhouette Score", "Calinski-Harabasz", "Davies-Bouldin"]
        metric_values = [s_score, ch_score, db_score]
        if metrics_as == "Dataframe":
            Metrics = pd.DataFrame({"Metric": metric_names, "Value": metric_values})
        else:
            Metrics = dict(zip(metric_names, metric_values))
        if is_pandas:
            Clustered_data = data.copy()
            Clustered_data[cluster_col_name] = cluster_labels
            verbose and logger.info("Appended clusters to Pandas DataFrame.")
        elif is_polars:
            Clustered_data = data.with_columns(
                pl.Series(name=cluster_col_name, values=cluster_labels)
            )
            verbose and logger.info("Appended clusters to Polars DataFrame.")
    except Exception as e:
        verbose and logger.error(f"Error during K-Means process: {e}")
        raise
    return (Model, Clustered_data, Metrics)

Brick Info

version v0.1.4
python 3.11, 3.12, 3.13
requirements
  • shap>=0.47.0
  • scikit-learn
  • pandas
  • numpy
  • scipy
  • numba>=0.56.0
  • polars