Filter Rows by Numeric Range

Filter rows from the dataset that contain numbers within a numerical range, or clear content from matching cells.

Filter Rows by Numeric Range

Processing

This function filters rows or modifies cells within an input dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) based on whether numeric values in specified columns fall within a defined minimum and maximum range. The exact operation (keeping rows, removing rows, or clearing cell content) is configurable via options. Processing is handled internally using DuckDB for robust and high-performance querying.

Inputs

data
The input dataset (DataFrame or Arrow Table) to be processed.
action (optional)
Determines how rows or cells are handled if they match the numeric range criteria, keep_matching_rows (default), remove_matching_rows, clear_matching_cells, or clear_non_matching_cells..
columns (optional)
A list of specific numeric columns to check against the range criteria. If empty and no regex is provided, all numeric columns are checked.
regex pattern (optional)
A regex pattern used to select columns to check. If provided, it overrides the explicit columns list.
min value (optional)
The inclusive lower bound of the numeric range.
max value (optional)
The inclusive upper bound of the numeric range.
multi column mode (optional)
Specifies the logical requirement when checking multiple columns (e.g., must all match the range, or any must match).

Inputs Types

Input Types
data DataFrame, ArrowTable
action Str
columns List
regex pattern Str
min value Float
max value Float
multi column mode Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting dataset after applying the filter or cell clearance operations. The format depends on the Output Format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Filter Rows by Numeric Range brick contains some changeable options:

Action
Defines the filtering behavior. Choices include keep_matching_rows (default), remove_matching_rows, clear_matching_cells, or clear_non_matching_cells.
Columns to Check
A list of column names that should be checked for values within the defined range.
Column Regex Pattern
A regular expression used to select columns for checking. If provided, it overrides explicit column selection.
Minimum Value (inclusive)
The lowest numeric value considered to be within the range.
Maximum Value (inclusive)
The highest numeric value considered to be within the range.
Multi-Column Mode
Controls how multiple checked columns contribute to the row filtering decision. Use any (default) if one column must match, or all if all checked columns must match.
Output Format
Specifies the desired format of the returned data (pandas, polars, or arrow).
Safe Mode
If enabled, the brick will ignore errors caused by requesting columns that do not exist in the dataset, allowing processing to continue.
Verbose
If enabled, logging information and details about the filtering process will be displayed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Float, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    """
    return identifier.replace('"', '""')


def _build_range_condition(column_expr, min_value, max_value):
    """Build SQL condition to check if a column value is within the numeric range."""
    numeric_expr = f"TRY_CAST({column_expr} AS DOUBLE)"
    condition = f"({numeric_expr} IS NOT NULL AND {numeric_expr} >= {min_value} AND {numeric_expr} <= {max_value})"
    return condition


def filter_rows_by_numeric_range(
    data: Union[DataFrame, ArrowTable],
    action: Str = None,
    columns: List = None,
    regex_pattern: Str = None,
    min_value: Float = None,
    max_value: Float = None,
    multi_column_mode: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Filter Rows by Numeric Range"
    options = options or {}
    verbose = options.get("verbose", True)
    action = _coalesce(action, options.get("action", "keep_matching_rows"))
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    min_value = _coalesce(min_value, options.get("min_value", 0))
    max_value = _coalesce(max_value, options.get("max_value", 100))
    multi_column_mode = _coalesce(
        multi_column_mode, options.get("multi_column_mode", "any")
    )
    process_all_columns = len(columns) == 0 and (not regex_pattern)
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    if min_value > max_value:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid range: min_value ({min_value}) is greater than max_value ({max_value})."
        )
        raise ValueError("min_value must be less than or equal to max_value!")
    valid_actions = [
        "keep_matching_rows",
        "remove_matching_rows",
        "clear_matching_cells",
        "clear_non_matching_cells",
    ]
    if action not in valid_actions:
        verbose and logger.error(f"[{brick_display_name}] Invalid action: {action}.")
        raise ValueError(f"Action must be one of {valid_actions}")
    valid_multi_column_modes = ["all", "any"]
    if multi_column_mode not in valid_multi_column_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid multi-column mode: {multi_column_mode}."
        )
        raise ValueError(f"Multi-column mode must be one of {valid_multi_column_modes}")
    if result is None:
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting filter with action '{action}' for range [{min_value}, {max_value}]."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = {col[0]: col[1] for col in column_info}
            verbose and logger.info(
                f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
            )
            numeric_types = [
                "TINYINT",
                "SMALLINT",
                "INTEGER",
                "BIGINT",
                "HUGEINT",
                "UTINYINT",
                "USMALLINT",
                "UINTEGER",
                "UBIGINT",
                "FLOAT",
                "DOUBLE",
                "DECIMAL",
                "NUMERIC",
                "INT",
                "INT2",
                "INT4",
                "INT8",
                "REAL",
            ]
            numeric_columns = {
                col: dtype
                for (col, dtype) in all_columns.items()
                if any((num_type in dtype.upper() for num_type in numeric_types))
            }
            verbose and logger.info(
                f"[{brick_display_name}] Numeric columns detected: {len(numeric_columns)} out of {len(all_columns)}."
            )
            columns_to_check = []
            if regex_pattern:
                try:
                    pattern = re.compile(regex_pattern)
                    columns_to_check = [
                        col for col in numeric_columns.keys() if pattern.search(col)
                    ]
                    if not columns_to_check:
                        verbose and logger.warning(
                            f"[{brick_display_name}] No numeric columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
                        )
                        result = data
                    else:
                        verbose and logger.info(
                            f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_check)} numeric columns."
                        )
                except re.error as e:
                    verbose and logger.error(
                        f"[{brick_display_name}] Invalid regex pattern."
                    )
                    raise ValueError(f"Invalid regex pattern: {e}")
            elif process_all_columns:
                columns_to_check = list(numeric_columns.keys())
                verbose and logger.info(
                    f"[{brick_display_name}] Checking all {len(columns_to_check)} numeric columns."
                )
            else:
                if not safe_mode:
                    missing_columns = [col for col in columns if col not in all_columns]
                    if missing_columns:
                        verbose and logger.error(
                            f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                        )
                        raise ValueError(
                            f"Columns not found in data: {missing_columns}"
                        )
                non_numeric_requested = [
                    col
                    for col in columns
                    if col in all_columns and col not in numeric_columns
                ]
                if non_numeric_requested:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Skipping non-numeric columns: {non_numeric_requested}"
                    )
                columns_to_check = [col for col in columns if col in numeric_columns]
                if safe_mode:
                    skipped = len(columns) - len(columns_to_check)
                    if skipped > 0:
                        skipped_cols = [
                            col for col in columns if col not in all_columns
                        ]
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
                        )
                verbose and logger.info(
                    f"[{brick_display_name}] Checking {len(columns_to_check)} numeric column(s)."
                )
            if result is None:
                if not columns_to_check:
                    verbose and logger.warning(
                        f"[{brick_display_name}] No columns to check. Returning data unchanged."
                    )
                    result = data
                else:
                    column_conditions = []
                    for col in columns_to_check:
                        sanitized_col = _sanitize_identifier(col)
                        col_expr = f'"{sanitized_col}"'
                        condition = _build_range_condition(
                            col_expr, min_value, max_value
                        )
                        column_conditions.append(condition)
                    if multi_column_mode == "all":
                        row_match_condition = " AND ".join(column_conditions)
                    else:
                        row_match_condition = " OR ".join(column_conditions)
                    if action == "keep_matching_rows":
                        query = f"SELECT * FROM input_table WHERE {row_match_condition}"
                        verbose and logger.info(
                            f"[{brick_display_name}] Keeping rows where values are in range."
                        )
                    elif action == "remove_matching_rows":
                        query = f"SELECT * FROM input_table WHERE NOT ({row_match_condition})"
                        verbose and logger.info(
                            f"[{brick_display_name}] Removing rows where values are in range."
                        )
                    elif action in ["clear_matching_cells", "clear_non_matching_cells"]:
                        select_parts = []
                        clear_on_match = action == "clear_matching_cells"
                        for col in all_columns.keys():
                            sanitized_col = _sanitize_identifier(col)
                            if col in columns_to_check:
                                col_expr = f'"{sanitized_col}"'
                                col_condition = _build_range_condition(
                                    col_expr, min_value, max_value
                                )
                                if clear_on_match:
                                    select_parts.append(
                                        f'CASE WHEN {col_condition} THEN NULL ELSE "{sanitized_col}" END AS "{sanitized_col}"'
                                    )
                                else:
                                    select_parts.append(
                                        f'CASE WHEN {col_condition} THEN "{sanitized_col}" ELSE NULL END AS "{sanitized_col}"'
                                    )
                            else:
                                select_parts.append(f'"{sanitized_col}"')
                        select_clause = ", ".join(select_parts)
                        query = f"SELECT {select_clause} FROM input_table"
                        verbose and logger.info(
                            f"[{brick_display_name}] Clearing content of {('matching' if clear_on_match else 'non-matching')} cells."
                        )
                    if output_format == "pandas":
                        result = conn.execute(query).df()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to pandas DataFrame."
                        )
                    elif output_format == "polars":
                        result = conn.execute(query).pl()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Polars DataFrame."
                        )
                    elif output_format == "arrow":
                        result = conn.execute(query).fetch_arrow_table()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Arrow Table."
                        )
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Unsupported output format: {output_format}"
                        )
                        raise ValueError(f"Unsupported output format: {output_format}")
                    verbose and logger.info(
                        f"[{brick_display_name}] Filter completed successfully."
                    )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during filtering: {str(e)}"
            )
            raise
        finally:
            if conn is not None:
                conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow