Flag Rows by Date
Flag rows from the dataset using a date filter defined by a fixed date range, relative date range, or matching date part. Creates a new column with '1' for matching rows.
Flag Rows by Date
Processing
Flag rows from the dataset using a date filter defined by a fixed date range, relative date range, or matching date part. Rows matching the specified criteria are flagged with '1', while non-matching rows or rows with invalid date values are flagged with NULL.
Inputs
data- The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) to which the flag column will be added.
flag column name(optional)- The name of the new column that will contain the date match flag (1 or NULL).
date column(optional)- The name of the column in the dataset containing the date values used for filtering.
filter type(optional)- Defines the type of date filter to apply: fixed
date_range,relative_range, ordate_partmatching. start date(optional)- The starting date (YYYY-MM-DD, inclusive) for the fixed date range filter.
end date(optional)- The ending date (YYYY-MM-DD, inclusive) for the fixed date range filter.
relative type(optional)- Specifies the direction or context for relative filtering (e.g.,
last_n,next_n,current). relative value(optional)- The numerical value (N) used in conjunction with
relative unitfor relative filtering. relative unit(optional)- The time unit (e.g.,
days,months,years) used withrelative valuefor relative filtering. date part type(optional)- The date component (
year,month,day, etc.) to extract and check when using thedate_partfilter. date part values(optional)- A list of specific values that the extracted date part must match.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
flag column name |
Str |
date column |
Str |
filter type |
Str |
start date |
Str |
end date |
Str |
relative type |
Str |
relative value |
Int |
relative unit |
Str |
date part type |
Str |
date part values |
List |
You can check the list of supported types here: Available Type Hints.
Outputs
result- The output dataset, which is the original data augmented with the new flag column (
flag column name).
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Flag Rows by Date brick contains some changeable options:
- Flag Column Name
- The name of the new column that will store the flag result. Defaults to
date_match_flag. - Date Column
- The name of the column in the input data containing the date values to filter against.
- Filter On
- Defines the type of filtering to be applied:
date_range(fixed dates),relative_range(relative to today), ordate_part(matching specific components). Defaults todate_range. - Start Date (YYYY-MM-DD)
- The fixed starting date for the date range filter.
- End Date (YYYY-MM-DD)
- The fixed ending date for the date range filter.
- Relative Type
- Specifies the direction or scope for relative filtering:
last_n,next_n,currentperiod, oruntil_now. Defaults tolast_n. - Relative Value (N)
- The numerical quantity (N) defining the extent of the relative range. Defaults to 7.
- Relative Unit
- The unit of time used for relative filtering:
days,weeks,months,quarters, oryears. Defaults todays. - Date Part Type
- The specific date component to match when using the
date_partfilter:year,quarter,month,day,day_of_week,day_of_year, orweek. Defaults tomonth. - Date Part Values
- A list of numerical values that the extracted date part must match (e.g., [1, 2] for January and February).
- Output Format
- Specifies the desired format of the output dataset:
pandas,polars, orarrow. Defaults topandas. - Verbose
- If enabled, detailed execution logs and information will be outputted.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from datetime import datetime, date
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Int, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
"""
return identifier.replace('"', '""')
def _build_date_range_condition(column_expr, start_date, end_date):
"""Build SQL condition for fixed date range filter."""
conditions = []
date_expr = f"TRY_CAST({column_expr} AS DATE)"
conditions.append(f"{date_expr} IS NOT NULL")
if start_date:
conditions.append(f"{date_expr} >= DATE '{start_date}'")
if end_date:
conditions.append(f"{date_expr} <= DATE '{end_date}'")
return f"({' AND '.join(conditions)})"
def _build_relative_range_condition(
column_expr, relative_type, relative_value, relative_unit
):
"""Build SQL condition for relative date range filter."""
date_expr = f"TRY_CAST({column_expr} AS DATE)"
current_date = "CURRENT_DATE"
if relative_unit == "days":
interval = f"INTERVAL '{relative_value}' DAY"
elif relative_unit == "weeks":
interval = f"INTERVAL '{relative_value}' WEEK"
elif relative_unit == "months":
interval = f"INTERVAL '{relative_value}' MONTH"
elif relative_unit == "quarters":
interval = f"INTERVAL '{relative_value * 3}' MONTH"
elif relative_unit == "years":
interval = f"INTERVAL '{relative_value}' YEAR"
else:
interval = f"INTERVAL '{relative_value}' DAY"
if relative_type == "last_n":
condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} - {interval} AND {date_expr} <= {current_date})"
elif relative_type == "next_n":
condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} AND {date_expr} <= {current_date} + {interval})"
elif relative_type == "current":
if relative_unit == "days":
condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
elif relative_unit == "weeks":
condition = f"({date_expr} IS NOT NULL AND date_trunc('week', {date_expr}) = date_trunc('week', {current_date}))"
elif relative_unit == "months":
condition = f"({date_expr} IS NOT NULL AND date_trunc('month', {date_expr}) = date_trunc('month', {current_date}))"
elif relative_unit == "quarters":
condition = f"({date_expr} IS NOT NULL AND date_trunc('quarter', {date_expr}) = date_trunc('quarter', {current_date}))"
elif relative_unit == "years":
condition = f"({date_expr} IS NOT NULL AND date_trunc('year', {date_expr}) = date_trunc('year', {current_date}))"
else:
condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
elif relative_type == "until_now":
condition = f"({date_expr} IS NOT NULL AND {date_expr} <= {current_date})"
else:
condition = f"{date_expr} IS NOT NULL"
return condition
def _build_date_part_condition(column_expr, date_part_type, date_part_values):
"""Build SQL condition for date part filter."""
if not date_part_values:
return "FALSE"
date_expr = f"TRY_CAST({column_expr} AS DATE)"
if date_part_type == "year":
part_expr = f"EXTRACT(YEAR FROM {date_expr})"
elif date_part_type == "quarter":
part_expr = f"EXTRACT(QUARTER FROM {date_expr})"
elif date_part_type == "month":
part_expr = f"EXTRACT(MONTH FROM {date_expr})"
elif date_part_type == "day":
part_expr = f"EXTRACT(DAY FROM {date_expr})"
elif date_part_type == "day_of_week":
part_expr = f"EXTRACT(DOW FROM {date_expr})"
elif date_part_type == "day_of_year":
part_expr = f"EXTRACT(DOY FROM {date_expr})"
elif date_part_type == "week":
part_expr = f"EXTRACT(WEEK FROM {date_expr})"
else:
part_expr = f"EXTRACT(YEAR FROM {date_expr})"
try:
int_values = [int(v) for v in date_part_values]
values_list = ", ".join((str(v) for v in int_values))
condition = f"({date_expr} IS NOT NULL AND {part_expr} IN ({values_list}))"
except (ValueError, TypeError):
condition = "FALSE"
return condition
def flag_rows_by_date(
data: Union[DataFrame, ArrowTable],
flag_column_name: Str = None,
date_column: Str = None,
filter_type: Str = None,
start_date: Str = None,
end_date: Str = None,
relative_type: Str = None,
relative_value: Int = None,
relative_unit: Str = None,
date_part_type: Str = None,
date_part_values: List = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Flag Rows by Date"
options = options or {}
verbose = options.get("verbose", True)
flag_column_name = _coalesce(
flag_column_name, options.get("flag_column_name", "date_match_flag")
)
date_column = _coalesce(date_column, options.get("date_column", ""))
filter_type = _coalesce(filter_type, options.get("filter_type", "date_range"))
start_date = _coalesce(start_date, options.get("start_date", ""))
end_date = _coalesce(end_date, options.get("end_date", ""))
relative_type = _coalesce(relative_type, options.get("relative_type", "last_n"))
relative_value = _coalesce(relative_value, options.get("relative_value", 7))
relative_unit = _coalesce(relative_unit, options.get("relative_unit", "days"))
date_part_type = _coalesce(date_part_type, options.get("date_part_type", "month"))
date_part_values = _coalesce(date_part_values, options.get("date_part_values", []))
output_format = options.get("output_format", "pandas")
result = None
conn = None
if not date_column:
verbose and logger.error(
f"[{brick_display_name}] Date column must be specified!"
)
raise ValueError("Date column must be specified!")
if not flag_column_name or not isinstance(flag_column_name, str):
verbose and logger.error(f"[{brick_display_name}] Invalid flag column name!")
raise ValueError("Flag column name must be a non-empty string!")
valid_filter_types = ["date_range", "relative_range", "date_part"]
if filter_type not in valid_filter_types:
verbose and logger.error(
f"[{brick_display_name}] Invalid filter type: {filter_type}."
)
raise ValueError(f"Filter type must be one of {valid_filter_types}")
if filter_type == "date_range" and (not start_date) and (not end_date):
verbose and logger.warning(
f"[{brick_display_name}] No date range specified. All rows will be flagged as not matching."
)
if filter_type == "date_part":
if not isinstance(date_part_values, list):
verbose and logger.error(
f"[{brick_display_name}] Invalid date_part_values format! Expected a list."
)
raise ValueError("Date part values must be provided as a list!")
if not date_part_values:
verbose and logger.warning(
f"[{brick_display_name}] No date part values specified. All rows will be flagged as not matching."
)
try:
verbose and logger.info(
f"[{brick_display_name}] Starting flagging with filter type '{filter_type}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if flag_column_name in all_columns:
verbose and logger.warning(
f"[{brick_display_name}] Flag column '{flag_column_name}' already exists and will be overwritten."
)
if date_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Date column '{date_column}' not found in data."
)
raise ValueError(f"Date column '{date_column}' not found in data.")
verbose and logger.info(
f"[{brick_display_name}] Using date column: '{date_column}' (type: {all_columns[date_column]})."
)
sanitized_col = _sanitize_identifier(date_column)
col_expr = f'"{sanitized_col}"'
if filter_type == "date_range":
if not start_date and (not end_date):
date_condition = "FALSE"
else:
date_condition = _build_date_range_condition(
col_expr, start_date, end_date
)
verbose and logger.info(
f"[{brick_display_name}] Date range filter: [{start_date or 'unbounded'}, {end_date or 'unbounded'}]."
)
elif filter_type == "relative_range":
date_condition = _build_relative_range_condition(
col_expr, relative_type, relative_value, relative_unit
)
verbose and logger.info(
f"[{brick_display_name}] Relative range filter: {relative_type} {relative_value} {relative_unit}."
)
elif filter_type == "date_part":
date_condition = _build_date_part_condition(
col_expr, date_part_type, date_part_values
)
verbose and logger.info(
f"[{brick_display_name}] Date part filter: {date_part_type} IN {date_part_values}."
)
flag_condition = f"CASE WHEN {date_condition} THEN '1' ELSE NULL END"
select_parts = []
for col in all_columns.keys():
if col != flag_column_name:
sanitized_col_iter = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col_iter}"')
sanitized_flag_col = _sanitize_identifier(flag_column_name)
select_parts.append(f'{flag_condition} AS "{sanitized_flag_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Creating flag column '{flag_column_name}'."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Flagging completed successfully."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during flagging: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow