Filter Rows by Date

Filter rows from the dataset using a date filter defined by a fixed date range, relative date range, or matching date part.

Filter Rows by Date

Processing

Filter rows from the dataset using a date filter defined by a specific criteria applied to a designated date column. Filtering criteria can be set using a fixed date range (start/end date), a relative date range (e.g., last 7 days), or by matching specific extracted date parts (e.g., month number, year). The function supports various actions, including keeping matching rows, removing matching rows, or clearing the contents of the date column cells based on the match result.

Inputs

data
The input dataset (DataFrame or Arrow Table) to be filtered.
action (optional)
Defines how to handle rows/cells that match the date filter like keeping matching rows (keep_matching_rows), removing matching rows (remove_matching_rows), clearing matching cells in the date column (clear_matching_cells), or clearing non-matching cells in the date column (clear_non_matching_cells).
date column (optional)
The name of the column containing date values against which the filter will be applied.
filter type (optional)
Specifies the method used to define the filter: fixed date_range, relative_range, or matching date_part.
start date (optional)
The starting boundary for a fixed date range filter (format YYYY-MM-DD).
end date (optional)
The ending boundary for a fixed date range filter (format YYYY-MM-DD).
relative type (optional)
Defines the direction and context of the relative range (e.g., last_n, next_n, current).
relative value (optional)
The integer value N used in a relative range filter (e.g., 7 for 7 units).
relative unit (optional)
The unit of time for the relative range (e.g., days, weeks, years).
date part type (optional)
The specific date part to extract for matching (e.g., month, year, day_of_week).
date part values (optional)
A list of values that the extracted date part must match.

Inputs Types

Input Types
data DataFrame, ArrowTable
action Str
date column Str
filter type Str
start date Str
end date Str
relative type Str
relative value Int
relative unit Str
date part type Str
date part values List

You can check the list of supported types here: Available Type Hints.

Outputs

result
The dataset after applying the specified date filter and action. The format matches the configuration specified in the output_format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Filter Rows by Date brick contains some changeable options:

Action
Defines how to handle rows/cells that match the filter criteria. Choices include keeping matching rows (keep_matching_rows), removing matching rows (remove_matching_rows), clearing matching cells in the date column (clear_matching_cells), or clearing non-matching cells in the date column (clear_non_matching_cells).
Date Column
The name of the column containing the date values to filter against.
Filter On
Specifies the method used for defining the filter: date_range (fixed start/end), relative_range (e.g., last N days), or date_part (matching month, year, etc.).
Start Date (YYYY-MM-DD)
The fixed starting date boundary for the date_range filter.
End Date (YYYY-MM-DD)
The fixed ending date boundary for the date_range filter.
Relative Type
Defines the context for the relative_range filter, such as last_n, next_n, current period, or until_now.
Relative Value (N)
The numerical value N used when defining last_n or next_n relative ranges.
Relative Unit
The unit of time corresponding to the Relative Value (e.g., days, weeks, months, years).
Date Part Type
The specific component of the date to extract for the date_part filter (e.g., year, month, day_of_week).
Date Part Values
A list of numerical values (as strings) that the extracted Date Part must match.
Output Format
Specifies the desired format for the resulting dataset: pandas DataFrame, polars DataFrame, or arrow Table.
Verbose
If enabled, detailed logs about the filtering process, query construction, and conversion steps will be displayed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from datetime import datetime, date
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Int, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    """
    return identifier.replace('"', '""')


def _build_date_range_condition(column_expr, start_date, end_date):
    """Build SQL condition for fixed date range filter."""
    conditions = []
    date_expr = f"TRY_CAST({column_expr} AS DATE)"
    conditions.append(f"{date_expr} IS NOT NULL")
    if start_date:
        conditions.append(f"{date_expr} >= DATE '{start_date}'")
    if end_date:
        conditions.append(f"{date_expr} <= DATE '{end_date}'")
    return f"({' AND '.join(conditions)})"


def _build_relative_range_condition(
    column_expr, relative_type, relative_value, relative_unit
):
    """Build SQL condition for relative date range filter."""
    date_expr = f"TRY_CAST({column_expr} AS DATE)"
    current_date = "CURRENT_DATE"
    if relative_unit == "days":
        interval = f"INTERVAL '{relative_value}' DAY"
    elif relative_unit == "weeks":
        interval = f"INTERVAL '{relative_value}' WEEK"
    elif relative_unit == "months":
        interval = f"INTERVAL '{relative_value}' MONTH"
    elif relative_unit == "quarters":
        interval = f"INTERVAL '{relative_value * 3}' MONTH"
    elif relative_unit == "years":
        interval = f"INTERVAL '{relative_value}' YEAR"
    else:
        interval = f"INTERVAL '{relative_value}' DAY"
    if relative_type == "last_n":
        condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} - {interval} AND {date_expr} <= {current_date})"
    elif relative_type == "next_n":
        condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} AND {date_expr} <= {current_date} + {interval})"
    elif relative_type == "current":
        if relative_unit == "days":
            condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
        elif relative_unit == "weeks":
            condition = f"({date_expr} IS NOT NULL AND date_trunc('week', {date_expr}) = date_trunc('week', {current_date}))"
        elif relative_unit == "months":
            condition = f"({date_expr} IS NOT NULL AND date_trunc('month', {date_expr}) = date_trunc('month', {current_date}))"
        elif relative_unit == "quarters":
            condition = f"({date_expr} IS NOT NULL AND date_trunc('quarter', {date_expr}) = date_trunc('quarter', {current_date}))"
        elif relative_unit == "years":
            condition = f"({date_expr} IS NOT NULL AND date_trunc('year', {date_expr}) = date_trunc('year', {current_date}))"
        else:
            condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
    elif relative_type == "until_now":
        condition = f"({date_expr} IS NOT NULL AND {date_expr} <= {current_date})"
    else:
        condition = f"{date_expr} IS NOT NULL"
    return condition


def _build_date_part_condition(column_expr, date_part_type, date_part_values):
    """Build SQL condition for date part filter."""
    if not date_part_values:
        return "FALSE"
    date_expr = f"TRY_CAST({column_expr} AS DATE)"
    if date_part_type == "year":
        part_expr = f"EXTRACT(YEAR FROM {date_expr})"
    elif date_part_type == "quarter":
        part_expr = f"EXTRACT(QUARTER FROM {date_expr})"
    elif date_part_type == "month":
        part_expr = f"EXTRACT(MONTH FROM {date_expr})"
    elif date_part_type == "day":
        part_expr = f"EXTRACT(DAY FROM {date_expr})"
    elif date_part_type == "day_of_week":
        part_expr = f"EXTRACT(DOW FROM {date_expr})"
    elif date_part_type == "day_of_year":
        part_expr = f"EXTRACT(DOY FROM {date_expr})"
    elif date_part_type == "week":
        part_expr = f"EXTRACT(WEEK FROM {date_expr})"
    else:
        part_expr = f"EXTRACT(YEAR FROM {date_expr})"
    try:
        int_values = [int(v) for v in date_part_values]
        values_list = ", ".join((str(v) for v in int_values))
        condition = f"({date_expr} IS NOT NULL AND {part_expr} IN ({values_list}))"
    except (ValueError, TypeError):
        condition = "FALSE"
    return condition


def filter_rows_by_date(
    data: Union[DataFrame, ArrowTable],
    action: Str = None,
    date_column: Str = None,
    filter_type: Str = None,
    start_date: Str = None,
    end_date: Str = None,
    relative_type: Str = None,
    relative_value: Int = None,
    relative_unit: Str = None,
    date_part_type: Str = None,
    date_part_values: List = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Filter Rows by Date"
    options = options or {}
    verbose = options.get("verbose", True)
    action = _coalesce(action, options.get("action", "keep_matching_rows"))
    date_column = _coalesce(date_column, options.get("date_column", ""))
    filter_type = _coalesce(filter_type, options.get("filter_type", "date_range"))
    start_date = _coalesce(start_date, options.get("start_date", ""))
    end_date = _coalesce(end_date, options.get("end_date", ""))
    relative_type = _coalesce(relative_type, options.get("relative_type", "last_n"))
    relative_value = _coalesce(relative_value, options.get("relative_value", 7))
    relative_unit = _coalesce(relative_unit, options.get("relative_unit", "days"))
    date_part_type = _coalesce(date_part_type, options.get("date_part_type", "month"))
    date_part_values = _coalesce(date_part_values, options.get("date_part_values", []))
    output_format = options.get("output_format", "pandas")
    result = None
    conn = None
    if not date_column:
        verbose and logger.error(
            f"[{brick_display_name}] Date column must be specified!"
        )
        raise ValueError("Date column must be specified!")
    valid_actions = [
        "keep_matching_rows",
        "remove_matching_rows",
        "clear_matching_cells",
        "clear_non_matching_cells",
    ]
    if action not in valid_actions:
        verbose and logger.error(f"[{brick_display_name}] Invalid action: {action}.")
        raise ValueError(f"Action must be one of {valid_actions}")
    valid_filter_types = ["date_range", "relative_range", "date_part"]
    if filter_type not in valid_filter_types:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid filter type: {filter_type}."
        )
        raise ValueError(f"Filter type must be one of {valid_filter_types}")
    if filter_type == "date_range":
        if not start_date and (not end_date):
            verbose and logger.warning(
                f"[{brick_display_name}] No date range specified. Returning data unchanged."
            )
            result = data
    if filter_type == "date_part":
        if not isinstance(date_part_values, list):
            verbose and logger.error(
                f"[{brick_display_name}] Invalid date_part_values format! Expected a list."
            )
            raise ValueError("Date part values must be provided as a list!")
        if not date_part_values:
            verbose and logger.warning(
                f"[{brick_display_name}] No date part values specified. Returning data unchanged."
            )
            result = data
    if result is None:
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting filter with action '{action}' and filter type '{filter_type}'."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = {col[0]: col[1] for col in column_info}
            verbose and logger.info(
                f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
            )
            if date_column not in all_columns:
                verbose and logger.error(
                    f"[{brick_display_name}] Date column '{date_column}' not found in data."
                )
                raise ValueError(f"Date column '{date_column}' not found in data.")
            verbose and logger.info(
                f"[{brick_display_name}] Using date column: '{date_column}' (type: {all_columns[date_column]})."
            )
            sanitized_col = _sanitize_identifier(date_column)
            col_expr = f'"{sanitized_col}"'
            if filter_type == "date_range":
                date_condition = _build_date_range_condition(
                    col_expr, start_date, end_date
                )
                verbose and logger.info(
                    f"[{brick_display_name}] Date range filter: [{start_date or 'unbounded'}, {end_date or 'unbounded'}]."
                )
            elif filter_type == "relative_range":
                date_condition = _build_relative_range_condition(
                    col_expr, relative_type, relative_value, relative_unit
                )
                verbose and logger.info(
                    f"[{brick_display_name}] Relative range filter: {relative_type} {relative_value} {relative_unit}."
                )
            elif filter_type == "date_part":
                date_condition = _build_date_part_condition(
                    col_expr, date_part_type, date_part_values
                )
                verbose and logger.info(
                    f"[{brick_display_name}] Date part filter: {date_part_type} IN {date_part_values}."
                )
            if action == "keep_matching_rows":
                query = f"SELECT * FROM input_table WHERE {date_condition}"
                verbose and logger.info(
                    f"[{brick_display_name}] Keeping rows where date condition matches."
                )
            elif action == "remove_matching_rows":
                query = f"SELECT * FROM input_table WHERE NOT ({date_condition})"
                verbose and logger.info(
                    f"[{brick_display_name}] Removing rows where date condition matches."
                )
            elif action in ["clear_matching_cells", "clear_non_matching_cells"]:
                select_parts = []
                clear_on_match = action == "clear_matching_cells"
                for col in all_columns.keys():
                    sanitized_col_iter = _sanitize_identifier(col)
                    if col == date_column:
                        if clear_on_match:
                            select_parts.append(
                                f'CASE WHEN {date_condition} THEN NULL ELSE "{sanitized_col_iter}" END AS "{sanitized_col_iter}"'
                            )
                        else:
                            select_parts.append(
                                f'CASE WHEN {date_condition} THEN "{sanitized_col_iter}" ELSE NULL END AS "{sanitized_col_iter}"'
                            )
                    else:
                        select_parts.append(f'"{sanitized_col_iter}"')
                select_clause = ", ".join(select_parts)
                query = f"SELECT {select_clause} FROM input_table"
                verbose and logger.info(
                    f"[{brick_display_name}] Clearing content of {('matching' if clear_on_match else 'non-matching')} cells."
                )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            verbose and logger.info(
                f"[{brick_display_name}] Filter completed successfully."
            )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during filtering: {str(e)}"
            )
            raise
        finally:
            if conn is not None:
                conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow