Remove Empty Rows

Remove rows where specified cells are empty/null.

Remove Empty Rows

Processing

This brick removes rows from the input data structure where specified cells are considered empty (NULL or missing). It supports filtering based on a list of columns, a regular expression pattern, or all columns, and allows defining the removal logic using any or all match conditions.

Inputs

data
The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) to be processed.
columns (optional)
A list of specific column names to check for empty values. If this list is empty, and no regex pattern is provided, all columns will be checked.
regex pattern (optional)
A regular expression string used to dynamically select the columns that must be checked. If provided, this overrides the columns list. This input port is synchronized with the "Regex Pattern" option.
match condition (optional)
Specifies the condition for removal: 'any' (remove if any checked column is empty) or 'all' (remove if all checked columns are empty). This input port is synchronized with the "Match Condition" option.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List
regex pattern Str
match condition Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure (DataFrame or Arrow Table) after rows containing empty values have been removed based on the specified criteria.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Remove Empty Rows brick contains some changeable options:

Columns to Check
Specifies which columns must be checked for null/empty values. If this list is empty, all columns are checked unless a Regex Pattern is specified.
Regex Pattern
A regular expression used to dynamically select the columns to check. If provided, this pattern takes precedence over the explicit list of columns.
Match Condition
Determines the filtering logic. If set to any, a row is removed if any checked column is null. If set to all, a row is removed only if all checked columns are null. Defaults to any.
Output Format
Defines the desired output structure of the filtered data: pandas (DataFrame), polars (DataFrame), or arrow (Arrow Table). Defaults to pandas.
Safe Mode
If enabled (True), the brick will silently skip any column specified in "Columns to Check" that does not exist in the input data, instead of raising an error. Defaults to False.
Verbose
Enables detailed logging and informational messages during execution. Defaults to True.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def remove_empty_rows(
    data: Union[DataFrame, ArrowTable],
    columns: List = None,
    regex_pattern: Str = None,
    match_condition: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Remove Empty Rows"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    match_condition = _coalesce(match_condition, options.get("match_condition", "any"))
    check_all_columns = len(columns) == 0
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    if match_condition not in ["any", "all"]:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid match condition: {match_condition}. Must be 'any' or 'all'."
        )
        raise ValueError("Match condition must be 'any' or 'all'")
    try:
        check_mode = None
        if regex_pattern:
            check_mode = "regex_pattern"
        elif check_all_columns:
            check_mode = "all_columns"
        else:
            check_mode = "column_list"
        verbose and logger.info(
            f"[{brick_display_name}] Detected mode: '{check_mode}'. Removing rows where {match_condition} specified column(s) are empty."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        row_count_before = conn.execute("SELECT COUNT(*) FROM input_table").fetchone()[
            0
        ]
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}. Total rows: {row_count_before}."
        )
        columns_to_check = []
        if check_mode == "column_list":
            if not safe_mode:
                missing_columns = [col for col in columns if col not in all_columns]
                if missing_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                    )
                    raise ValueError(f"Columns not found in data: {missing_columns}")
            columns_to_check = [col for col in columns if col in all_columns]
            skipped = len(columns) - len(columns_to_check)
            if safe_mode and skipped > 0:
                skipped_cols = [col for col in columns if col not in all_columns]
                verbose and logger.warning(
                    f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Checking {len(columns_to_check)} column(s): {columns_to_check}."
            )
        elif check_mode == "regex_pattern":
            try:
                pattern = re.compile(regex_pattern)
                columns_to_check = [
                    col for col in all_columns.keys() if pattern.search(col)
                ]
                if not columns_to_check:
                    verbose and logger.warning(
                        f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
                    )
                    result = data
                else:
                    verbose and logger.info(
                        f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_check)} columns: {columns_to_check}."
                    )
            except re.error as e:
                verbose and logger.error(
                    f"[{brick_display_name}] Invalid regex pattern."
                )
                raise ValueError(f"Invalid regex pattern: {e}")
        elif check_mode == "all_columns":
            columns_to_check = list(all_columns.keys())
            verbose and logger.info(
                f"[{brick_display_name}] Checking all {len(columns_to_check)} columns."
            )
        if result is None:
            if not columns_to_check:
                verbose and logger.warning(
                    f"[{brick_display_name}] No columns to check. Returning data unchanged."
                )
                result = data
            else:
                where_conditions = []
                for col in columns_to_check:
                    sanitized_col = _sanitize_identifier(col)
                    where_conditions.append(f'"{sanitized_col}" IS NOT NULL')
                if match_condition == "all":
                    where_clause = " OR ".join(where_conditions)
                else:
                    where_clause = " AND ".join(where_conditions)
                query = f"SELECT * FROM input_table WHERE {where_clause}"
                verbose and logger.info(
                    f"[{brick_display_name}] Executing query to remove empty rows."
                )
                if output_format == "pandas":
                    result = conn.execute(query).df()
                    verbose and logger.info(
                        f"[{brick_display_name}] Converted result to pandas DataFrame."
                    )
                elif output_format == "polars":
                    result = conn.execute(query).pl()
                    verbose and logger.info(
                        f"[{brick_display_name}] Converted result to Polars DataFrame."
                    )
                elif output_format == "arrow":
                    result = conn.execute(query).fetch_arrow_table()
                    verbose and logger.info(
                        f"[{brick_display_name}] Converted result to Arrow Table."
                    )
                else:
                    verbose and logger.error(
                        f"[{brick_display_name}] Unsupported output format: {output_format}"
                    )
                    raise ValueError(f"Unsupported output format: {output_format}")
                row_count_after = conn.execute(
                    f"SELECT COUNT(*) FROM ({query}) AS filtered"
                ).fetchone()[0]
                rows_removed = row_count_before - row_count_after
                verbose and logger.info(
                    f"[{brick_display_name}] Remove operation completed successfully. Removed {rows_removed} row(s). Remaining rows: {row_count_after}."
                )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during remove operation: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow