Filter Rows by Boolean Value

Filter rows from the dataset that contain a specific boolean value, or clear content from matching cells.

Filter Rows by Boolean Value

Processing

This function filters rows from an input dataset (Pandas, Polars, or Arrow) based on whether specified boolean columns contain a target boolean value (True or False). Alternatively, instead of filtering rows, the function can clear the content of cells that match or do not match the target value.

Inputs

data
The input dataset (DataFrame or Arrow Table) to be filtered or modified.
action (optional)
Defines the operation to perform: keep_matching_rows, remove_matching_rows, clear_matching_cells, or clear_non_matching_cells. Defaults to keeping matching rows.
columns (optional)
A list of specific column names to check for the target boolean value. If empty and no regex is provided, all boolean columns are checked.
regex pattern (optional)
A regular expression pattern used to select columns for checking. This is applied only to columns detected as boolean.
target value (optional)
The specific boolean value ('true' or 'false') that rows/cells must match. Defaults to 'true'.
multi column mode (optional)
Specifies how conditions across multiple columns are combined: 'any' (default) means a row matches if any checked column matches, 'all' means all checked columns must match.

Inputs Types

Input Types
data DataFrame, ArrowTable
action Str
columns List
regex pattern Str
target value Str
multi column mode Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting dataset after applying the filtering or clearing operation, structured according to the selected output format.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Filter Rows by Boolean Value brick contains some changeable options:

Action
Defines whether to filter rows (keep_matching_rows or remove_matching_rows) or clear cells (clear_matching_cells or clear_non_matching_cells).
Columns to Check
A list of specific column names to be included in the boolean filtering logic.
Column Regex Pattern
A regex pattern used to select column names to be included in the boolean filtering logic.
Target Value
The boolean value ('true' or 'false') that is targeted during filtering.
Multi-Column Mode
Defines the logic for handling multiple checked columns. 'any' means a row matches if at least one column matches the target boolean; 'all' means all checked columns must match.
Output Format
Specifies the format of the resulting data structure. - Choices: pandas, polars, arrow.
Safe Mode
If enabled, missing or non-boolean columns specified in Columns to Check will be ignored instead of raising an error.
Verbose
If enabled, the function prints detailed logging information about the process.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Float, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    """
    return identifier.replace('"', '""')


def _build_boolean_condition(column_expr, target_bool):
    """Build SQL condition to check if a column value matches the target boolean."""
    bool_expr = f"TRY_CAST({column_expr} AS BOOLEAN)"
    sql_bool = "TRUE" if target_bool else "FALSE"
    condition = f"({bool_expr} IS NOT NULL AND {bool_expr} = {sql_bool})"
    return condition


def filter_rows_by_boolean_value(
    data: Union[DataFrame, ArrowTable],
    action: Str = None,
    columns: List = None,
    regex_pattern: Str = None,
    target_value: Str = None,
    multi_column_mode: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Filter Rows by Boolean Value"
    options = options or {}
    verbose = options.get("verbose", True)
    action = _coalesce(action, options.get("action", "keep_matching_rows"))
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    target_str = _coalesce(target_value, options.get("target_value", "true"))
    target_bool = target_str.lower() == "true"
    multi_column_mode = _coalesce(
        multi_column_mode, options.get("multi_column_mode", "any")
    )
    process_all_columns = len(columns) == 0 and (not regex_pattern)
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    valid_actions = [
        "keep_matching_rows",
        "remove_matching_rows",
        "clear_matching_cells",
        "clear_non_matching_cells",
    ]
    if action not in valid_actions:
        verbose and logger.error(f"[{brick_display_name}] Invalid action: {action}.")
        raise ValueError(f"Action must be one of {valid_actions}")
    valid_multi_column_modes = ["all", "any"]
    if multi_column_mode not in valid_multi_column_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid multi-column mode: {multi_column_mode}."
        )
        raise ValueError(f"Multi-column mode must be one of {valid_multi_column_modes}")
    if result is None:
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting filter with action '{action}' for boolean value '{target_bool}'."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = {col[0]: col[1] for col in column_info}
            verbose and logger.info(
                f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
            )
            boolean_types = ["BOOLEAN", "BOOL"]
            boolean_columns = {
                col: dtype
                for (col, dtype) in all_columns.items()
                if any((bool_type in dtype.upper() for bool_type in boolean_types))
            }
            verbose and logger.info(
                f"[{brick_display_name}] Boolean columns detected: {len(boolean_columns)} out of {len(all_columns)}."
            )
            columns_to_check = []
            if regex_pattern:
                try:
                    pattern = re.compile(regex_pattern)
                    columns_to_check = [
                        col for col in boolean_columns.keys() if pattern.search(col)
                    ]
                    if not columns_to_check:
                        verbose and logger.warning(
                            f"[{brick_display_name}] No boolean columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
                        )
                        result = data
                    else:
                        verbose and logger.info(
                            f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_check)} boolean columns."
                        )
                except re.error as e:
                    verbose and logger.error(
                        f"[{brick_display_name}] Invalid regex pattern."
                    )
                    raise ValueError(f"Invalid regex pattern: {e}")
            elif process_all_columns:
                columns_to_check = list(boolean_columns.keys())
                verbose and logger.info(
                    f"[{brick_display_name}] Checking all {len(columns_to_check)} boolean columns."
                )
            else:
                if not safe_mode:
                    missing_columns = [col for col in columns if col not in all_columns]
                    if missing_columns:
                        verbose and logger.error(
                            f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                        )
                        raise ValueError(
                            f"Columns not found in data: {missing_columns}"
                        )
                non_boolean_requested = [
                    col
                    for col in columns
                    if col in all_columns and col not in boolean_columns
                ]
                if non_boolean_requested:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Skipping non-boolean columns: {non_boolean_requested}"
                    )
                columns_to_check = [col for col in columns if col in boolean_columns]
                if safe_mode:
                    skipped = len(columns) - len(columns_to_check)
                    if skipped > 0:
                        skipped_cols = [
                            col for col in columns if col not in all_columns
                        ]
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
                        )
                verbose and logger.info(
                    f"[{brick_display_name}] Checking {len(columns_to_check)} boolean column(s)."
                )
            if result is None:
                if not columns_to_check:
                    verbose and logger.warning(
                        f"[{brick_display_name}] No columns to check. Returning data unchanged."
                    )
                    result = data
                else:
                    column_conditions = []
                    for col in columns_to_check:
                        sanitized_col = _sanitize_identifier(col)
                        col_expr = f'"{sanitized_col}"'
                        condition = _build_boolean_condition(col_expr, target_bool)
                        column_conditions.append(condition)
                    if multi_column_mode == "all":
                        row_match_condition = " AND ".join(column_conditions)
                    else:
                        row_match_condition = " OR ".join(column_conditions)
                    if action == "keep_matching_rows":
                        query = f"SELECT * FROM input_table WHERE {row_match_condition}"
                        verbose and logger.info(
                            f"[{brick_display_name}] Keeping rows where values match the boolean."
                        )
                    elif action == "remove_matching_rows":
                        query = f"SELECT * FROM input_table WHERE NOT ({row_match_condition})"
                        verbose and logger.info(
                            f"[{brick_display_name}] Removing rows where values match the boolean."
                        )
                    elif action in ["clear_matching_cells", "clear_non_matching_cells"]:
                        select_parts = []
                        clear_on_match = action == "clear_matching_cells"
                        for col in all_columns.keys():
                            sanitized_col = _sanitize_identifier(col)
                            if col in columns_to_check:
                                col_expr = f'"{sanitized_col}"'
                                col_condition = _build_boolean_condition(
                                    col_expr, target_bool
                                )
                                if clear_on_match:
                                    select_parts.append(
                                        f'CASE WHEN {col_condition} THEN NULL ELSE "{sanitized_col}" END AS "{sanitized_col}"'
                                    )
                                else:
                                    select_parts.append(
                                        f'CASE WHEN {col_condition} THEN "{sanitized_col}" ELSE NULL END AS "{sanitized_col}"'
                                    )
                            else:
                                select_parts.append(f'"{sanitized_col}"')
                        select_clause = ", ".join(select_parts)
                        query = f"SELECT {select_clause} FROM input_table"
                        verbose and logger.info(
                            f"[{brick_display_name}] Clearing content of {('matching' if clear_on_match else 'non-matching')} cells."
                        )
                    if output_format == "pandas":
                        result = conn.execute(query).df()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to pandas DataFrame."
                        )
                    elif output_format == "polars":
                        result = conn.execute(query).pl()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Polars DataFrame."
                        )
                    elif output_format == "arrow":
                        result = conn.execute(query).fetch_arrow_table()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Arrow Table."
                        )
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Unsupported output format: {output_format}"
                        )
                        raise ValueError(f"Unsupported output format: {output_format}")
                    verbose and logger.info(
                        f"[{brick_display_name}] Filter completed successfully."
                    )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during filtering: {str(e)}"
            )
            raise
        finally:
            if conn is not None:
                conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow