Flag Rows by Values

Flag rows from the dataset that contain specific textual values by creating a flag column.

Flag Rows by Values

Processing

This function flags rows in a dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) by creating a new column that indicates whether specific textual values are present in a set of designated columns. Users can specify columns explicitly, use a regex pattern to select columns, define the matching logic (complete value, substring, or regex), and control text normalization (case sensitivity and accent handling). The multi column mode determines if a match must occur in 'all' or 'any' of the checked columns.

Inputs

data
The input dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) to be processed.
columns (optional)
A list of column names in which to search for the match values. If provided, overrides the default behavior of checking all textual columns, unless a regex pattern is provided.
regex pattern (optional)
A regular expression used to dynamically select column names to check. If provided, this setting overrides the explicit columns list.
match values (optional)
The specific list of textual values (strings) to search for within the target columns.
match mode (optional)
Specifies the type of matching logic to apply. Choices are: complete_value, substring, or regular_expression.
normalization mode (optional)
Specifies how text should be normalized before comparison. Choices are: exact, ignore_case (case-insensitive), or normalize_accents (removes accents and ignores case).
multi column mode (optional)
Specifies the logic for combining matches across multiple columns: any (OR condition) or all (AND condition).
flag column (optional)
The name of the new column created to hold the flag result.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List
regex pattern Str
match values List
match mode Str
normalization mode Str
multi column mode Str
flag column Str

You can check the list of supported types here: Available Type Hints.

Outputs

data
The modified dataset, including the new flag column.

Outputs Types

Output Types
data DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Flag Rows by Values brick contains some changeable options:

Columns to Check
Specifies specific columns where the values should be matched. If provided alongside regex, regex takes precedence.
Column Regex Pattern
A regular expression used to select textual columns to check.
Values to Match
The list of strings or values that the function will search for in the selected columns.
Match Mode
Defines the matching logic: complete_value (full match), substring (partial match using LIKE or regex), or regular_expression (using full regex pattern).
Normalization Mode
Defines how case and accents are handled: exact, ignore_case, or normalize_accents (removes accents and converts to lower case).
Multi-Column Mode
Defines how conditions across multiple columns are combined: any (if a match occurs in any checked column) or all (if a match occurs in every checked column).
Flag Column
The name assigned to the new column created for the flag (default: 'flag').
Output Format
Specifies the desired format for the output dataset (pandas, polars, or arrow).
Safe Mode
If enabled, it handles conflicts gracefully (e.g., renames the flag column if it already exists) and skips non-existent or non-textual columns specified in the input list.
Verbose
If enabled, provides detailed logging during the execution process.

The data output contains a new column (named by the Flag Column option, default 'flag'):

  • {Flag Column Name}: Contains the string '1' if the row matches the defined criteria across the selected columns, otherwise NULL.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    """
    return identifier.replace('"', '""')


def _escape_sql_string(s):
    """Escape single quotes for SQL string literals."""
    return s.replace("'", "''")


def _escape_regex_for_sql(pattern):
    """Escape special characters in regex pattern for SQL."""
    return pattern.replace("'", "''")


def _normalize_text_sql(text_expr, normalization_mode):
    """Generate SQL expression to normalize text based on normalization mode."""
    if normalization_mode == "ignore_case":
        return f"LOWER({text_expr})"
    elif normalization_mode == "normalize_accents":
        normalized = f"LOWER({text_expr})"
        accent_map = {
            "á": "a",
            "à": "a",
            "â": "a",
            "ä": "a",
            "ã": "a",
            "å": "a",
            "é": "e",
            "è": "e",
            "ê": "e",
            "ë": "e",
            "í": "i",
            "ì": "i",
            "î": "i",
            "ï": "i",
            "ó": "o",
            "ò": "o",
            "ô": "o",
            "ö": "o",
            "õ": "o",
            "ú": "u",
            "ù": "u",
            "û": "u",
            "ü": "u",
            "ñ": "n",
            "ç": "c",
            "Á": "a",
            "À": "a",
            "Â": "a",
            "Ä": "a",
            "Ã": "a",
            "Å": "a",
            "É": "e",
            "È": "e",
            "Ê": "e",
            "Ë": "e",
            "Í": "i",
            "Ì": "i",
            "Î": "i",
            "Ï": "i",
            "Ó": "o",
            "Ò": "o",
            "Ô": "o",
            "Ö": "o",
            "Õ": "o",
            "Ú": "u",
            "Ù": "u",
            "Û": "u",
            "Ü": "u",
            "Ñ": "n",
            "Ç": "c",
        }
        for accented, plain in accent_map.items():
            normalized = f"REPLACE({normalized}, '{accented}', '{plain}')"
        return normalized
    else:
        return text_expr


def _build_match_condition(column_expr, match_values, match_mode, normalization_mode):
    """Build SQL condition to check if a column matches any of the values."""
    conditions = []
    for value in match_values:
        escaped_value = _escape_sql_string(value)
        if match_mode == "complete_value":
            if normalization_mode == "normalize_accents":
                normalized_col = _normalize_text_sql(column_expr, normalization_mode)
                normalized_val = _normalize_text_sql(
                    f"'{escaped_value}'", normalization_mode
                )
                conditions.append(f"{normalized_col} = {normalized_val}")
            elif normalization_mode == "ignore_case":
                conditions.append(f"LOWER({column_expr}) = LOWER('{escaped_value}')")
            else:
                conditions.append(f"{column_expr} = '{escaped_value}'")
        elif match_mode == "substring":
            if normalization_mode == "ignore_case":
                escaped_regex = _escape_regex_for_sql(re.escape(value))
                conditions.append(
                    f"regexp_matches({column_expr}, '{escaped_regex}', 'i')"
                )
            elif normalization_mode == "normalize_accents":
                normalized_col = _normalize_text_sql(column_expr, normalization_mode)
                normalized_val = _normalize_text_sql(
                    f"'{escaped_value}'", normalization_mode
                )
                conditions.append(
                    f"{normalized_col} LIKE '%' || {normalized_val} || '%'"
                )
            else:
                conditions.append(f"{column_expr} LIKE '%{escaped_value}%'")
        elif match_mode == "regular_expression":
            escaped_regex = _escape_regex_for_sql(value)
            if normalization_mode == "ignore_case":
                conditions.append(
                    f"regexp_matches({column_expr}, '{escaped_regex}', 'i')"
                )
            elif normalization_mode == "normalize_accents":
                normalized_col = _normalize_text_sql(column_expr, normalization_mode)
                conditions.append(
                    f"regexp_matches({normalized_col}, '{escaped_regex}')"
                )
            else:
                conditions.append(f"regexp_matches({column_expr}, '{escaped_regex}')")
    if conditions:
        return f"({' OR '.join(conditions)})"
    return "FALSE"


def flag_rows_by_values(
    data: Union[DataFrame, ArrowTable],
    columns: List = None,
    regex_pattern: Str = None,
    match_values: List = None,
    match_mode: Str = None,
    normalization_mode: Str = None,
    multi_column_mode: Str = None,
    flag_column: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Flag Rows by Values"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    match_values = _coalesce(match_values, options.get("match_values", []))
    match_mode = _coalesce(match_mode, options.get("match_mode", "complete_value"))
    normalization_mode = _coalesce(
        normalization_mode, options.get("normalization_mode", "exact")
    )
    multi_column_mode = _coalesce(
        multi_column_mode, options.get("multi_column_mode", "any")
    )
    flag_column = _coalesce(flag_column, options.get("flag_column", "flag"))
    process_all_columns = len(columns) == 0 and (not regex_pattern)
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    if not isinstance(match_values, list):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid match_values format! Expected a list."
        )
        raise ValueError("Match values must be provided as a list!")
    if not match_values:
        verbose and logger.warning(
            f"[{brick_display_name}] No match values specified. Returning data unchanged."
        )
        result = data
    valid_match_modes = ["complete_value", "substring", "regular_expression"]
    if match_mode not in valid_match_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid match mode: {match_mode}."
        )
        raise ValueError(f"Match mode must be one of {valid_match_modes}")
    valid_normalization_modes = ["exact", "ignore_case", "normalize_accents"]
    if normalization_mode not in valid_normalization_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid normalization mode: {normalization_mode}."
        )
        raise ValueError(
            f"Normalization mode must be one of {valid_normalization_modes}"
        )
    valid_multi_column_modes = ["all", "any"]
    if multi_column_mode not in valid_multi_column_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid multi-column mode: {multi_column_mode}."
        )
        raise ValueError(f"Multi-column mode must be one of {valid_multi_column_modes}")
    if normalization_mode == "normalize_accents" and match_mode != "complete_value":
        verbose and logger.warning(
            f"[{brick_display_name}] Accent normalization is only fully supported for complete value matching. Using it anyway."
        )
    if result is None:
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting flagging with match mode '{match_mode}', and {len(match_values)} value(s)."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = {col[0]: col[1] for col in column_info}
            verbose and logger.info(
                f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
            )
            original_flag_column = flag_column
            i = 1
            while flag_column in all_columns:
                if not safe_mode:
                    verbose and logger.error(
                        f"[{brick_display_name}] Flag column '{flag_column}' already exists in data."
                    )
                    raise ValueError(
                        f"Flag column '{flag_column}' already exists in data."
                    )
                else:
                    flag_column = f"{original_flag_column}_{i}"
                    i += 1
            if i > 1:
                verbose and logger.warning(
                    f"[{brick_display_name}] Renamed flag column to '{flag_column}' to avoid conflict."
                )
            textual_types = ["VARCHAR", "TEXT", "STRING", "CHAR", "NVARCHAR"]
            textual_columns = {
                col: dtype
                for (col, dtype) in all_columns.items()
                if any((text_type in dtype.upper() for text_type in textual_types))
            }
            verbose and logger.info(
                f"[{brick_display_name}] Textual columns detected: {len(textual_columns)} out of {len(all_columns)}."
            )
            columns_to_check = []
            if regex_pattern:
                try:
                    pattern = re.compile(regex_pattern)
                    columns_to_check = [
                        col for col in textual_columns.keys() if pattern.search(col)
                    ]
                    if not columns_to_check:
                        verbose and logger.warning(
                            f"[{brick_display_name}] No textual columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
                        )
                        result = data
                    else:
                        verbose and logger.info(
                            f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_check)} textual columns."
                        )
                except re.error as e:
                    verbose and logger.error(
                        f"[{brick_display_name}] Invalid regex pattern."
                    )
                    raise ValueError(f"Invalid regex pattern: {e}")
            elif process_all_columns:
                columns_to_check = list(textual_columns.keys())
                verbose and logger.info(
                    f"[{brick_display_name}] Checking all {len(columns_to_check)} textual columns."
                )
            else:
                if not safe_mode:
                    missing_columns = [col for col in columns if col not in all_columns]
                    if missing_columns:
                        verbose and logger.error(
                            f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                        )
                        raise ValueError(
                            f"Columns not found in data: {missing_columns}"
                        )
                non_textual_requested = [
                    col
                    for col in columns
                    if col in all_columns and col not in textual_columns
                ]
                if non_textual_requested:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Skipping non-textual columns: {non_textual_requested}"
                    )
                columns_to_check = [col for col in columns if col in textual_columns]
                if safe_mode:
                    skipped = len(columns) - len(columns_to_check)
                    if skipped > 0:
                        skipped_cols = [
                            col for col in columns if col not in all_columns
                        ]
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
                        )
                verbose and logger.info(
                    f"[{brick_display_name}] Checking {len(columns_to_check)} textual column(s)."
                )
            if result is None:
                if not columns_to_check:
                    verbose and logger.warning(
                        f"[{brick_display_name}] No columns to check. Returning data unchanged."
                    )
                    result = data
                else:
                    column_conditions = []
                    for col in columns_to_check:
                        sanitized_col = _sanitize_identifier(col)
                        col_expr = f'CAST("{sanitized_col}" AS VARCHAR)'
                        condition = _build_match_condition(
                            col_expr, match_values, match_mode, normalization_mode
                        )
                        column_conditions.append(condition)
                    if multi_column_mode == "all":
                        row_match_condition = " AND ".join(column_conditions)
                    else:
                        row_match_condition = " OR ".join(column_conditions)
                    select_parts = [
                        f'"{_sanitize_identifier(col)}"' for col in all_columns.keys()
                    ]
                    select_parts.append(
                        f'''CASE WHEN {row_match_condition} THEN '1' ELSE NULL END AS "{_sanitize_identifier(flag_column)}"'''
                    )
                    select_clause = ", ".join(select_parts)
                    query = f"SELECT {select_clause} FROM input_table"
                    verbose and logger.info(
                        f"[{brick_display_name}] Adding flag column '{flag_column}' where match condition is TRUE."
                    )
                    if output_format == "pandas":
                        result = conn.execute(query).df()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to pandas DataFrame."
                        )
                    elif output_format == "polars":
                        result = conn.execute(query).pl()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Polars DataFrame."
                        )
                    elif output_format == "arrow":
                        result = conn.execute(query).fetch_arrow_table()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Arrow Table."
                        )
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Unsupported output format: {output_format}"
                        )
                        raise ValueError(f"Unsupported output format: {output_format}")
                    verbose and logger.info(
                        f"[{brick_display_name}] Flagging completed successfully."
                    )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during flagging: {str(e)}"
            )
            raise
        finally:
            if conn is not None:
                conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow