Force Numerical Range
Creates upper and/or lower bounds in numerical columns by clipping or clearing outliers.
Force Numerical Range
Processing
This function enforces upper and/or lower bounds on selected numerical columns within a dataset. Users can choose between two actions for handling outliers: clip_outliers (replacing values outside the range with the respective min/max bound) or clear_outliers (setting values outside the range to NULL).
Inputs
- data
- The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) containing the numerical columns to modify.
- input columns (optional)
- A list of column names (strings) on which the range enforcement operation will be performed. If not provided via the input port, this must be set using the 'Input Columns' option.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
input columns |
List |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting dataset, where the specified numerical columns have been adjusted according to the defined range limits and action (clipping or clearing outliers). The format matches the selection in the 'Output Format' option.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Force Numerical Range brick contains some changeable options:
- Input Columns
- A list of column names that the range enforcement operation should be applied to.
- Action
- Defines how values outside the specified range are handled. Choices are
clip_outliers(replace outliers with the bound value) orclear_outliers(replace outliers with NULL). - Minimum Value (Lower Bound)
- The specific numerical value to use as the lower limit.
- Use Lower Bound
- Toggle to activate the minimum value check, ensuring no value falls below this limit.
- Maximum Value (Upper Bound)
- The specific numerical value to use as the upper limit.
- Use Upper Bound
- Toggle to activate the maximum value check, ensuring no value exceeds this limit.
- Safe Mode
- If enabled, columns specified in 'Input Columns' that do not exist or are not numerical will be silently skipped, allowing the operation to proceed with valid columns. If disabled, an error is raised upon encountering an invalid column.
- Output Format
- Specifies the desired format of the resulting output data (pandas, polars, or arrow).
- Verbose
- If enabled, detailed logs about the process, validation, and conversion steps will be printed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, List, Str, Float, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def force_numerical_range(
data: Union[DataFrame, ArrowTable], input_columns: List = None, options=None
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Force Numerical Range"
options = options or {}
verbose = options.get("verbose", True)
input_columns = input_columns or options.get("input_columns", [])
action = options.get("action", "clip_outliers")
minimum_value = options.get("minimum_value", 0)
use_minimum = options.get("use_minimum", False)
maximum_value = options.get("maximum_value", 100)
use_maximum = options.get("use_maximum", False)
safe_mode = options.get("safe_mode", False)
output_format = options.get("output_format", "pandas")
result = None
try:
verbose and logger.info(
f"[{brick_display_name}] Starting force numerical range operation."
)
if not input_columns or len(input_columns) == 0:
verbose and logger.error(
f"[{brick_display_name}] At least one input column must be specified."
)
raise ValueError("At least one input column must be specified")
if not isinstance(input_columns, list) or not all(
(isinstance(c, str) for c in input_columns)
):
verbose and logger.error(
f"[{brick_display_name}] Input columns must be provided as a list of strings."
)
raise ValueError("Input columns must be provided as a list of strings")
if not use_minimum and (not use_maximum):
verbose and logger.error(
f"[{brick_display_name}] At least one bound (lower or upper) must be specified."
)
raise ValueError("At least one bound (lower or upper) must be specified")
if use_minimum and use_maximum and (minimum_value >= maximum_value):
verbose and logger.error(
f"[{brick_display_name}] Lower bound ({minimum_value}) must be less than upper bound ({maximum_value})."
)
raise ValueError(
f"Lower bound ({minimum_value}) must be less than upper bound ({maximum_value})"
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Processing {len(input_columns)} column(s) for range enforcement."
)
numeric_types = [
"INTEGER",
"BIGINT",
"SMALLINT",
"TINYINT",
"DOUBLE",
"FLOAT",
"REAL",
"DECIMAL",
"NUMERIC",
"HUGEINT",
]
valid_columns = []
invalid_columns = []
for col in input_columns:
if col not in all_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Column '{col}' not found in data. Skipping."
)
invalid_columns.append(col)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Column '{col}' not found in data."
)
conn.close()
raise ValueError(f"Column '{col}' not found in data")
column_type = all_columns[col].upper()
is_numeric = any((num_type in column_type for num_type in numeric_types))
if not is_numeric:
error_message = f"Column '{col}' is not numeric (type: {column_type})"
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] {error_message}. Skipping."
)
invalid_columns.append(col)
continue
else:
verbose and logger.error(f"[{brick_display_name}] {error_message}.")
conn.close()
raise ValueError(error_message)
valid_columns.append(col)
verbose and logger.info(
f"[{brick_display_name}] Column '{col}' is numeric (type: {column_type})."
)
if not valid_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] No valid numeric columns to process. Returning original data."
)
conn.close()
result = data
else:
verbose and logger.error(
f"[{brick_display_name}] No valid numeric columns to process."
)
conn.close()
raise ValueError("No valid numeric columns to process")
else:
verbose and logger.info(
f"[{brick_display_name}] {len(valid_columns)} valid column(s) will be processed."
)
if action == "clip_outliers":
verbose and logger.info(
f"[{brick_display_name}] Action: Clip outliers (replace with bounds)."
)
elif action == "clear_outliers":
verbose and logger.info(
f"[{brick_display_name}] Action: Clear outliers (set to NULL)."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unknown action: {action}"
)
conn.close()
raise ValueError(f"Unknown action: {action}")
if use_minimum:
verbose and logger.info(
f"[{brick_display_name}] Lower bound: {minimum_value}"
)
if use_maximum:
verbose and logger.info(
f"[{brick_display_name}] Upper bound: {maximum_value}"
)
select_parts = []
for col_name in all_columns.keys():
sanitized_col = _sanitize_identifier(col_name)
if col_name in valid_columns:
if action == "clip_outliers":
case_parts = []
if use_minimum and use_maximum:
case_parts.append(
f'WHEN "{sanitized_col}" < {minimum_value} THEN {minimum_value}'
)
case_parts.append(
f'WHEN "{sanitized_col}" > {maximum_value} THEN {maximum_value}'
)
case_parts.append(f'ELSE "{sanitized_col}"')
elif use_minimum:
case_parts.append(
f'WHEN "{sanitized_col}" < {minimum_value} THEN {minimum_value}'
)
case_parts.append(f'ELSE "{sanitized_col}"')
elif use_maximum:
case_parts.append(
f'WHEN "{sanitized_col}" > {maximum_value} THEN {maximum_value}'
)
case_parts.append(f'ELSE "{sanitized_col}"')
case_statement = "CASE " + " ".join(case_parts) + " END"
select_parts.append(f'{case_statement} AS "{sanitized_col}"')
elif action == "clear_outliers":
case_parts = []
if use_minimum and use_maximum:
case_parts.append(
f'WHEN "{sanitized_col}" >= {minimum_value} AND "{sanitized_col}" <= {maximum_value} THEN "{sanitized_col}"'
)
case_parts.append("ELSE NULL")
elif use_minimum:
case_parts.append(
f'WHEN "{sanitized_col}" >= {minimum_value} THEN "{sanitized_col}"'
)
case_parts.append("ELSE NULL")
elif use_maximum:
case_parts.append(
f'WHEN "{sanitized_col}" <= {maximum_value} THEN "{sanitized_col}"'
)
case_parts.append("ELSE NULL")
case_statement = "CASE " + " ".join(case_parts) + " END"
select_parts.append(f'{case_statement} AS "{sanitized_col}"')
else:
select_parts.append(f'"{sanitized_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing range enforcement query."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
conn.close()
raise ValueError(f"Unsupported output format: {output_format}")
conn.close()
result_rows = len(result)
verbose and logger.info(
f"[{brick_display_name}] Range enforcement completed successfully. Processed {len(valid_columns)} column(s). Returned {result_rows} rows."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during range enforcement operation."
)
raise
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow