Bin Numbers

Group numbers into bins (intervals) using fixed-size or custom intervals.

Bin Numbers

Processing

This function groups numeric values within specified columns into discrete intervals or "bins." It supports two primary modes: fixed_size, where bins have uniform width, or custom_intervals, allowing the user to define specific boundaries. The output dataset includes the new binned column(s) where values are represented by their interval range (e.g., "10:20").

Inputs

data
The input tabular data (DataFrame or Arrow Table) containing the columns to be binned.
columns (optional)
A list of column names containing numeric data that should be converted into bins. If not provided via input, the list must be set using the 'Input Columns' option.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting tabular dataset containing the original data plus the new binned column(s). The specific type (DataFrame or ArrowTable) depends on the selected 'Output Format' option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Bin Numbers brick contains some changeable options:

Input Columns
Specifies the numeric column(s) in the input data that should be binned. This is an alternative way to provide the columns input.
Binning Mode
Determines the method for creating bins: fixed_size (equal width) or custom_intervals (user-defined ranges). Defaults to fixed_size.
Bin Width
Used only in fixed_size mode. Defines the uniform size of each bin interval (e.g., 10).
Minimum Value
Used in fixed_size mode. Defines the starting point for the bin calculation.
Use Minimum Value
If enabled, values below the defined Minimum Value are grouped into a specific labeled bin (e.g., '< 0').
Maximum Value
Used in fixed_size mode. Defines the maximum value before grouping into the upper bound label.
Use Maximum Value
If enabled, values equal to or above the defined Maximum Value are grouped into a specific labeled bin (e.g., '>= 100').
Custom Intervals
Used only in custom_intervals mode. A list of comma-separated string pairs defining lower,upper bounds (e.g., "0,10"). Bounds can be left empty to represent infinity (e.g., "10," for [10, +inf) or ",10" for [-inf, 10)).
Output Suffix
A suffix (default: _binned) added to the name of the new binned column if Bin In Place is disabled.
Bin In Place
If enabled, the original numeric column is overwritten by the new categorical bin column. If disabled, a new column is created.
Safe Mode
If enabled, errors related to missing columns or non-numeric columns are logged as warnings and skipped, allowing the operation to continue. If disabled, these errors raise an exception.
Output Format
Specifies the desired format for the returned data (pandas, polars, or arrow). Defaults to pandas.
Verbose
Enables detailed logging information during execution.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, List, Str, Float, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def _build_fixed_case_statement(
    sanitized_col,
    use_minimum,
    minimum_value,
    use_maximum,
    maximum_value,
    offset,
    bin_width,
):
    """Build CASE statement for fixed-size binning"""
    case_parts = []
    if use_minimum:
        case_parts.append(
            f"""WHEN "{sanitized_col}" < {minimum_value} THEN '< {minimum_value}'"""
        )
    if use_maximum:
        case_parts.append(
            f"""WHEN "{sanitized_col}" >= {maximum_value} THEN '>= {maximum_value}'"""
        )
    if use_minimum and use_maximum:
        case_parts.append(
            f"""\n            WHEN "{sanitized_col}" >= {minimum_value} AND "{sanitized_col}" < {maximum_value} THEN\n                CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {offset} AS VARCHAR) ||\n                ':' ||\n                CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {bin_width} + {offset} AS VARCHAR)\n        """
        )
    elif use_minimum:
        case_parts.append(
            f"""\n            WHEN "{sanitized_col}" >= {minimum_value} THEN\n                CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {offset} AS VARCHAR) ||\n                ':' ||\n                CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {bin_width} + {offset} AS VARCHAR)\n        """
        )
    elif use_maximum:
        case_parts.append(
            f"""\n            WHEN "{sanitized_col}" < {maximum_value} THEN\n                CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {offset} AS VARCHAR) ||\n                ':' ||\n                CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {bin_width} + {offset} AS VARCHAR)\n        """
        )
    else:
        case_parts.append(
            f"""\n            WHEN "{sanitized_col}" IS NOT NULL THEN\n                CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {offset} AS VARCHAR) ||\n                ':' ||\n                CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {bin_width} + {offset} AS VARCHAR)\n        """
        )
    case_parts.append("ELSE NULL")
    return "CASE " + " ".join(case_parts) + " END"


def _build_custom_case_statement(sanitized_col, intervals):
    """Build CASE statement for custom intervals"""
    case_parts = []
    for interval in intervals:
        lower = interval["lower"]
        upper = interval["upper"]
        lower_label = interval["lower_label"]
        upper_label = interval["upper_label"]
        if lower is None and upper is None:
            condition = f'"{sanitized_col}" IS NOT NULL'
        elif lower is None:
            condition = f'"{sanitized_col}" < {upper}'
        elif upper is None:
            condition = f'"{sanitized_col}" >= {lower}'
        else:
            condition = f'"{sanitized_col}" >= {lower} AND "{sanitized_col}" < {upper}'
        label = f"{lower_label}:{upper_label}"
        case_parts.append(f"WHEN {condition} THEN '{label}'")
    case_parts.append("ELSE NULL")
    return "CASE " + " ".join(case_parts) + " END"


def bin_numbers(
    data: Union[DataFrame, ArrowTable], columns: List = None, options=None
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Bin Numbers"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = columns or options.get("columns", [])
    binning_mode = options.get("binning_mode", "fixed_size")
    bin_width = options.get("bin_width", 10)
    minimum_value = options.get("minimum_value", 0)
    use_minimum = options.get("use_minimum", False)
    maximum_value = options.get("maximum_value", 100)
    use_maximum = options.get("use_maximum", False)
    custom_intervals = options.get("custom_intervals", ["0,10", "10,20", "20,30"])
    output_suffix = options.get("output_suffix", "_binned")
    in_place = options.get("in_place", False)
    safe_mode = options.get("safe_mode", False)
    output_format = options.get("output_format", "pandas")
    result = None
    if not isinstance(columns, list) and (
        not all((isinstance(c, str) for c in columns))
    ):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    try:
        verbose and logger.info(f"[{brick_display_name}] Starting binning operation.")
        if not columns or len(columns) == 0:
            verbose and logger.error(
                f"[{brick_display_name}] At least one input column must be specified."
            )
            raise ValueError("At least one input column must be specified")
        if not isinstance(columns, list) or not all(
            (isinstance(c, str) for c in columns)
        ):
            verbose and logger.error(
                f"[{brick_display_name}] Input columns must be provided as a list of strings."
            )
            raise ValueError("Input columns must be provided as a list of strings")
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Processing {len(columns)} column(s) for binning."
        )
        numeric_types = [
            "INTEGER",
            "BIGINT",
            "SMALLINT",
            "TINYINT",
            "DOUBLE",
            "FLOAT",
            "REAL",
            "DECIMAL",
            "NUMERIC",
            "HUGEINT",
        ]
        valid_columns = []
        invalid_columns = []
        for col in columns:
            if col not in all_columns:
                if safe_mode:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Column '{col}' not found in data. Skipping."
                    )
                    invalid_columns.append(col)
                    continue
                else:
                    verbose and logger.error(
                        f"[{brick_display_name}] Column '{col}' not found in data."
                    )
                    conn.close()
                    raise ValueError(f"Column '{col}' not found in data")
            column_type = all_columns[col].upper()
            is_numeric = any((num_type in column_type for num_type in numeric_types))
            if not is_numeric:
                error_message = f"Column '{col}' is not numeric (type: {column_type})"
                if safe_mode:
                    verbose and logger.warning(
                        f"[{brick_display_name}] {error_message}. Skipping."
                    )
                    invalid_columns.append(col)
                    continue
                else:
                    verbose and logger.error(f"[{brick_display_name}] {error_message}.")
                    conn.close()
                    raise ValueError(error_message)
            valid_columns.append(col)
            verbose and logger.info(
                f"[{brick_display_name}] Column '{col}' is numeric (type: {column_type})."
            )
        if not valid_columns:
            if safe_mode:
                verbose and logger.warning(
                    f"[{brick_display_name}] No valid numeric columns to bin. Returning original data."
                )
                conn.close()
                result = data
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] No valid numeric columns to bin."
                )
                conn.close()
                raise ValueError("No valid numeric columns to bin")
        else:
            verbose and logger.info(
                f"[{brick_display_name}] {len(valid_columns)} valid column(s) will be binned."
            )
            if binning_mode == "fixed_size":
                verbose and logger.info(
                    f"[{brick_display_name}] Using fixed-size binning with width={bin_width}."
                )
                offset = minimum_value if use_minimum else 0
                verbose and logger.info(f"[{brick_display_name}] Bin offset: {offset}.")
                if use_minimum:
                    verbose and logger.info(
                        f"[{brick_display_name}] Values below {minimum_value} will be labeled as '< {minimum_value}'."
                    )
                if use_maximum:
                    verbose and logger.info(
                        f"[{brick_display_name}] Values above or equal to {maximum_value} will be labeled as '>= {maximum_value}'."
                    )
            elif binning_mode == "custom_intervals":
                verbose and logger.info(
                    f"[{brick_display_name}] Using custom intervals binning."
                )
                try:
                    if not isinstance(custom_intervals, list):
                        verbose and logger.error(
                            f"[{brick_display_name}] Custom intervals must be a list."
                        )
                        conn.close()
                        raise ValueError("Custom intervals must be a list")
                    intervals = []
                    for interval_str in custom_intervals:
                        interval_str = interval_str.strip()
                        if not interval_str:
                            continue
                        parts = [p.strip() for p in interval_str.split(",")]
                        if len(parts) != 2:
                            verbose and logger.error(
                                f"[{brick_display_name}] Invalid interval format: '{interval_str}'. Expected 'lower,upper'."
                            )
                            conn.close()
                            raise ValueError(
                                f"Invalid interval format: '{interval_str}'. Expected 'lower,upper'"
                            )
                        lower = parts[0]
                        upper = parts[1]
                        if lower == "":
                            lower_val = None
                            lower_label = "-inf"
                        else:
                            try:
                                lower_val = float(lower)
                                lower_label = lower
                            except ValueError:
                                verbose and logger.error(
                                    f"[{brick_display_name}] Invalid lower bound: '{lower}'"
                                )
                                conn.close()
                                raise ValueError(f"Invalid lower bound: '{lower}'")
                        if upper == "":
                            upper_val = None
                            upper_label = "+inf"
                        else:
                            try:
                                upper_val = float(upper)
                                upper_label = upper
                            except ValueError:
                                verbose and logger.error(
                                    f"[{brick_display_name}] Invalid upper bound: '{upper}'"
                                )
                                conn.close()
                                raise ValueError(f"Invalid upper bound: '{upper}'")
                        if lower_val is not None and upper_val is not None:
                            if lower_val >= upper_val:
                                verbose and logger.error(
                                    f"[{brick_display_name}] Invalid interval: lower bound ({lower_val}) must be less than upper bound ({upper_val})."
                                )
                                conn.close()
                                raise ValueError(
                                    f"Invalid interval: lower bound ({lower_val}) must be less than upper bound ({upper_val})"
                                )
                        intervals.append(
                            {
                                "lower": lower_val,
                                "upper": upper_val,
                                "lower_label": lower_label,
                                "upper_label": upper_label,
                            }
                        )
                    if not intervals:
                        verbose and logger.error(
                            f"[{brick_display_name}] No valid intervals specified."
                        )
                        conn.close()
                        raise ValueError("No valid intervals specified")
                    verbose and logger.info(
                        f"[{brick_display_name}] Parsed {len(intervals)} custom intervals."
                    )
                except Exception as e:
                    verbose and logger.error(
                        f"[{brick_display_name}] Error parsing custom intervals."
                    )
                    conn.close()
                    raise
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unknown binning mode: {binning_mode}"
                )
                conn.close()
                raise ValueError(f"Unknown binning mode: {binning_mode}")
            select_parts = []
            binned_columns = set()
            for col_name in all_columns.keys():
                sanitized_col = _sanitize_identifier(col_name)
                if col_name in valid_columns:
                    if binning_mode == "fixed_size":
                        case_statement = _build_fixed_case_statement(
                            sanitized_col,
                            use_maximum,
                            minimum_value,
                            use_maximum,
                            maximum_value,
                            offset,
                            bin_width,
                        )
                    else:
                        case_statement = _build_custom_case_statement(
                            sanitized_col, intervals
                        )
                    if in_place:
                        select_parts.append(f'{case_statement} AS "{sanitized_col}"')
                        binned_columns.add(col_name)
                    else:
                        select_parts.append(f'"{sanitized_col}"')
                else:
                    select_parts.append(f'"{sanitized_col}"')
            if not in_place:
                for col_name in valid_columns:
                    sanitized_col = _sanitize_identifier(col_name)
                    output_col_name = f"{col_name}{output_suffix}"
                    sanitized_output_col = _sanitize_identifier(output_col_name)
                    if binning_mode == "fixed_size":
                        case_statement = _build_fixed_case_statement(
                            sanitized_col,
                            use_maximum,
                            minimum_value,
                            use_maximum,
                            maximum_value,
                            offset,
                            bin_width,
                        )
                    else:
                        case_statement = _build_custom_case_statement(
                            sanitized_col, intervals
                        )
                    select_parts.append(f'{case_statement} AS "{sanitized_output_col}"')
                    binned_columns.add(output_col_name)
            select_clause = ", ".join(select_parts)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(f"[{brick_display_name}] Executing binning query.")
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                conn.close()
                raise ValueError(f"Unsupported output format: {output_format}")
            conn.close()
            result_rows = len(result)
            verbose and logger.info(
                f"[{brick_display_name}] Binning operation completed successfully. Binned {len(valid_columns)} column(s). Returned {result_rows} rows."
            )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during binning operation."
        )
        raise
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow