Flag Rows by Numeric Range
Flag rows from the dataset that contain numbers within a numerical range by creating a new column with '1' for matching rows.
Flag Rows by Numeric Range
Processing
Flag rows from the dataset that contain numbers within a numerical range by creating a new column with '1' for matching rows. The numeric range check is inclusive (min_value <= value <= max_value). If a row meets the criteria across the selected columns, the new flag column is set to '1', otherwise, it is set to NULL.
Inputs
- data
- The input dataset, which must be a Pandas DataFrame, Polars DataFrame, or Arrow Table.
- flag column name (optional)
- The name of the new column that will store the flag results. Defaults to
in_range_flag. - columns (optional)
- A list of specific column names to check against the numeric range criteria.
- regex pattern (optional)
- A regular expression pattern used to dynamically select numeric columns for checking. If provided, this overrides the
columnslist. - min value (optional)
- The minimum boundary value (inclusive) for the numeric range check. Defaults to 0.
- max value (optional)
- The maximum boundary value (inclusive) for the numeric range check. Defaults to 100.
- multi column mode (optional)
- Determines the logic for combining results when multiple columns are checked. Must be 'any' (at least one column must match the range) or 'all' (all selected columns must match the range). Defaults to 'any'.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
flag column name |
Str |
columns |
List |
regex pattern |
Str |
min value |
Float |
max value |
Float |
multi column mode |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting dataset, containing all original columns augmented with the new flag column. The output format matches the input type or the format specified in the options.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Flag Rows by Numeric Range brick contains some changeable options:
- Flag Column Name
- The name of the column where the flag (1 for match, NULL otherwise) will be written. Defaults to
in_range_flag. - Columns to Check
- A list of specific column names to apply the numeric range check to. This is ignored if a Regex Pattern is provided.
- Column Regex Pattern
- A regular expression used to dynamically select numeric columns to check. If provided, only columns matching this pattern are analyzed.
- Minimum Value (inclusive)
- The lowest numeric value (inclusive) considered to be within the range.
- Maximum Value (inclusive)
- The highest numeric value (inclusive) considered to be within the range.
- Multi-Column Mode
- Defines the logic used when checking multiple columns. Choose 'any' (default) if a row should be flagged if at least one column is in range, or 'all' if all selected columns must be in range.
- Output Format
- Specifies the desired format for the returned dataset (
pandas,polars, orarrow). Defaults topandas. - Safe Mode
- When enabled, the brick ignores requested columns that do not exist in the input data, preventing execution failure.
- Verbose
- If enabled, detailed logs and information about the execution process will be printed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Float
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
"""
return identifier.replace('"', '""')
def _build_range_condition(column_expr, min_value, max_value):
"""Build SQL condition to check if a column value is within the numeric range."""
numeric_expr = f"TRY_CAST({column_expr} AS DOUBLE)"
condition = f"({numeric_expr} IS NOT NULL AND {numeric_expr} >= {min_value} AND {numeric_expr} <= {max_value})"
return condition
def flag_rows_by_numeric_range(
data: Union[DataFrame, ArrowTable],
flag_column_name: Str = None,
columns: List = None,
regex_pattern: Str = None,
min_value: Float = None,
max_value: Float = None,
multi_column_mode: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Flag Rows by Numeric Range"
options = options or {}
verbose = options.get("verbose", True)
flag_column_name = _coalesce(
flag_column_name, options.get("flag_column_name", "in_range_flag")
)
columns = _coalesce(columns, options.get("columns", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
min_value = _coalesce(min_value, options.get("min_value", 0))
max_value = _coalesce(max_value, options.get("max_value", 100))
multi_column_mode = _coalesce(
multi_column_mode, options.get("multi_column_mode", "any")
)
process_all_columns = len(columns) == 0 and (not regex_pattern)
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
if not flag_column_name or not isinstance(flag_column_name, str):
verbose and logger.error(f"[{brick_display_name}] Invalid flag column name!")
raise ValueError("Flag column name must be a non-empty string!")
if min_value > max_value:
verbose and logger.error(
f"[{brick_display_name}] Invalid range: min_value ({min_value}) is greater than max_value ({max_value})."
)
raise ValueError("min_value must be less than or equal to max_value!")
valid_multi_column_modes = ["all", "any"]
if multi_column_mode not in valid_multi_column_modes:
verbose and logger.error(
f"[{brick_display_name}] Invalid multi-column mode: {multi_column_mode}."
)
raise ValueError(f"Multi-column mode must be one of {valid_multi_column_modes}")
if result is None:
try:
verbose and logger.info(
f"[{brick_display_name}] Starting flagging for range [{min_value}, {max_value}]."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if flag_column_name in all_columns:
verbose and logger.warning(
f"[{brick_display_name}] Flag column '{flag_column_name}' already exists and will be overwritten."
)
numeric_types = [
"TINYINT",
"SMALLINT",
"INTEGER",
"BIGINT",
"HUGEINT",
"UTINYINT",
"USMALLINT",
"UINTEGER",
"UBIGINT",
"FLOAT",
"DOUBLE",
"DECIMAL",
"NUMERIC",
"INT",
"INT2",
"INT4",
"INT8",
"REAL",
]
numeric_columns = {
col: dtype
for (col, dtype) in all_columns.items()
if any((num_type in dtype.upper() for num_type in numeric_types))
}
verbose and logger.info(
f"[{brick_display_name}] Numeric columns detected: {len(numeric_columns)} out of {len(all_columns)}."
)
columns_to_check = []
if regex_pattern:
try:
pattern = re.compile(regex_pattern)
columns_to_check = [
col for col in numeric_columns.keys() if pattern.search(col)
]
if not columns_to_check:
verbose and logger.warning(
f"[{brick_display_name}] No numeric columns matched regex pattern '{regex_pattern}'."
)
else:
verbose and logger.info(
f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_check)} numeric columns."
)
except re.error as e:
verbose and logger.error(
f"[{brick_display_name}] Invalid regex pattern."
)
raise ValueError(f"Invalid regex pattern: {e}")
elif process_all_columns:
columns_to_check = list(numeric_columns.keys())
verbose and logger.info(
f"[{brick_display_name}] Checking all {len(columns_to_check)} numeric columns."
)
else:
if not safe_mode:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
raise ValueError(
f"Columns not found in data: {missing_columns}"
)
non_numeric_requested = [
col
for col in columns
if col in all_columns and col not in numeric_columns
]
if non_numeric_requested:
verbose and logger.warning(
f"[{brick_display_name}] Skipping non-numeric columns: {non_numeric_requested}"
)
columns_to_check = [col for col in columns if col in numeric_columns]
if safe_mode:
skipped = len(columns) - len(columns_to_check)
if skipped > 0:
skipped_cols = [
col for col in columns if col not in all_columns
]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
)
verbose and logger.info(
f"[{brick_display_name}] Checking {len(columns_to_check)} numeric column(s)."
)
if not columns_to_check:
verbose and logger.warning(
f"[{brick_display_name}] No columns to check. All rows will be flagged as not matching (no flag)."
)
flag_condition = "NULL"
else:
column_conditions = []
for col in columns_to_check:
sanitized_col = _sanitize_identifier(col)
col_expr = f'"{sanitized_col}"'
condition = _build_range_condition(col_expr, min_value, max_value)
column_conditions.append(condition)
if multi_column_mode == "all":
row_match_condition = " AND ".join(column_conditions)
else:
row_match_condition = " OR ".join(column_conditions)
flag_condition = (
f"CASE WHEN {row_match_condition} THEN '1' ELSE NULL END"
)
select_parts = []
for col in all_columns.keys():
if col != flag_column_name:
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
sanitized_flag_col = _sanitize_identifier(flag_column_name)
select_parts.append(f'{flag_condition} AS "{sanitized_flag_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Creating flag column '{flag_column_name}'."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Flagging completed successfully."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during flagging: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow