Date Difference

Compute the difference between a date column and another time reference.

Date Difference

Processing

Compute the time difference between a date/timestamp column (time since column) and a specified reference point. The reference point (until mode) can be the current time, a value from another column, or a fixed date. The resulting difference, measured in the configured time unit (year, month, day, etc.), is appended as a new column to the input data structure.

Inputs

data
The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) containing the date column.
time since column (optional)
The name of the date/timestamp column from which the time difference calculation begins.
until mode (optional)
Defines the reference point for the end date ('now', 'fixed_date', or 'date_column'). Defaults to 'now'.
until date column (optional)
The name of the column containing the end date. Required if until mode is set to 'date_column'.
until fixed date (optional)
The fixed date string (must be in ISO format like YYYY-MM-DD or YYYY-MM-DD HH:MM:SS) to use as the end reference point. Required if until mode is set to 'fixed_date'.
output time unit (optional)
The unit in which the resulting time difference should be calculated (e.g., 'day', 'month', 'year').
output column (optional)
The name for the new column containing the computed date difference. If left empty, a descriptive name is generated.
reverse output (optional)
If True, the calculated difference is multiplied by -1, effectively swapping the calculation direction.

Inputs Types

Input Types
data DataFrame, ArrowTable
time since column Str
until mode Str
until date column Str
until fixed date Str
output time unit Str
output column Str
reverse output Bool

You can check the list of supported types here: Available Type Hints.

Outputs

result
The input data structure updated with the new column containing the computed date differences.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Date Difference brick contains some changeable options:

Time Since Column
The date/timestamp column used as the starting point for calculating the difference.
Until
Defines the temporal reference point for calculating the difference. Choices are 'now' (current time), 'fixed_date' (a specific date string), or 'date_column' (another column in the dataset).
Until Date Column
The column containing the end date, required if Until is set to 'date_column'.
Until Fixed Date
The fixed date string (in ISO format) used as the end date, required if Until is set to 'fixed_date'.
Output Time Unit
The unit in which the resulting time difference is expressed. Choices: year, month, week, day, hour, minute, second.
Output Column
The name for the newly created column containing the computed difference.
Reverse Output
If enabled, the output difference is multiplied by -1, reversing the mathematical direction of the time subtraction.
Output Format
Specifies the desired output type (pandas, polars, or arrow). This determines the format of the resulting data structure.
Verbose
If enabled, logs informational messages about the computation process.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from datetime import datetime
from coded_flows.types import Union, DataFrame, ArrowTable, Str, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def _is_date_type(duckdb_type):
    """
    Check if a DuckDB type is a date/timestamp type.
    Covers all possible date/time types in DuckDB.
    """
    type_lower = duckdb_type.lower()
    date_time_keywords = [
        "date",
        "timestamp",
        "timestamptz",
        "timestamp with time zone",
        "timestamp without time zone",
        "time",
        "timetz",
        "time with time zone",
        "time without time zone",
        "datetime",
        "interval",
    ]
    return any((keyword in type_lower for keyword in date_time_keywords))


def _generate_unique_column_name(base_name, existing_columns):
    """
    Generate a unique column name that doesn't conflict with existing columns.
    """
    if base_name not in existing_columns:
        return base_name
    counter = 1
    while f"{base_name}_{counter}" in existing_columns:
        counter += 1
    return f"{base_name}_{counter}"


def compute_date_difference(
    data: Union[DataFrame, ArrowTable],
    time_since_column: Str = None,
    until_mode: Str = None,
    until_date_column: Str = None,
    until_fixed_date: Str = None,
    output_time_unit: Str = None,
    output_column: Str = None,
    reverse_output: Bool = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Date Difference"
    options = options or {}
    verbose = options.get("verbose", True)
    time_since_column = _coalesce(
        time_since_column, options.get("time_since_column", "")
    )
    until_mode = _coalesce(until_mode, options.get("until_mode", "now"))
    until_date_column = _coalesce(
        until_date_column, options.get("until_date_column", "")
    )
    until_fixed_date = _coalesce(until_fixed_date, options.get("until_fixed_date", ""))
    output_time_unit = _coalesce(
        output_time_unit, options.get("output_time_unit", "day")
    )
    output_column = _coalesce(output_column, options.get("output_column", ""))
    reverse_output = _coalesce(reverse_output, options.get("reverse_output", False))
    output_format = options.get("output_format", "pandas")
    result = None
    conn = None
    if not time_since_column:
        verbose and logger.error(
            f"[{brick_display_name}] Time since column must be specified."
        )
        raise ValueError("Time since column must be specified.")
    try:
        verbose and logger.info(
            f"[{brick_display_name}] Starting date difference computation from column: '{time_since_column}'."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
        )
        if time_since_column not in all_columns:
            verbose and logger.error(
                f"[{brick_display_name}] Time since column '{time_since_column}' not found in data."
            )
            raise ValueError(
                f"Time since column '{time_since_column}' not found in data."
            )
        if not _is_date_type(all_columns[time_since_column]):
            verbose and logger.warning(
                f"[{brick_display_name}] Time since column '{time_since_column}' (type: {all_columns[time_since_column]}) may not be a date/timestamp type. Attempting to cast to TIMESTAMP."
            )
        sanitized_time_since = _sanitize_identifier(time_since_column)
        until_expr = None
        if until_mode == "now":
            until_expr = "CURRENT_TIMESTAMP"
            verbose and logger.info(
                f"[{brick_display_name}] Computing difference until current time."
            )
        elif until_mode == "date_column":
            if not until_date_column:
                verbose and logger.error(
                    f"[{brick_display_name}] Until date column must be specified when until_mode is 'date_column'."
                )
                raise ValueError(
                    "Until date column must be specified when until_mode is 'date_column'."
                )
            if until_date_column not in all_columns:
                verbose and logger.error(
                    f"[{brick_display_name}] Until date column '{until_date_column}' not found in data."
                )
                raise ValueError(
                    f"Until date column '{until_date_column}' not found in data."
                )
            if not _is_date_type(all_columns[until_date_column]):
                verbose and logger.warning(
                    f"[{brick_display_name}] Until date column '{until_date_column}' (type: {all_columns[until_date_column]}) may not be a date/timestamp type. Attempting to cast to TIMESTAMP."
                )
            sanitized_until_col = _sanitize_identifier(until_date_column)
            until_expr = f'"{sanitized_until_col}"::TIMESTAMP'
            verbose and logger.info(
                f"[{brick_display_name}] Computing difference until column: '{until_date_column}'."
            )
        elif until_mode == "fixed_date":
            if not until_fixed_date:
                verbose and logger.error(
                    f"[{brick_display_name}] Fixed date must be specified when until_mode is 'fixed_date'."
                )
                raise ValueError(
                    "Fixed date must be specified when until_mode is 'fixed_date'."
                )
            try:
                datetime.fromisoformat(until_fixed_date.replace("Z", "+00:00"))
                until_expr = f"TIMESTAMP '{until_fixed_date}'"
                verbose and logger.info(
                    f"[{brick_display_name}] Computing difference until fixed date: '{until_fixed_date}'."
                )
            except ValueError:
                verbose and logger.error(
                    f"[{brick_display_name}] Invalid fixed date format: '{until_fixed_date}'. Use ISO format (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)."
                )
                raise ValueError(
                    f"Invalid fixed date format: '{until_fixed_date}'. Use ISO format."
                )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Invalid until_mode: '{until_mode}'."
            )
            raise ValueError(f"Invalid until_mode: '{until_mode}'.")
        time_since_expr = f'"{sanitized_time_since}"::TIMESTAMP'
        unit_map = {
            "year": "year",
            "month": "month",
            "week": "week",
            "day": "day",
            "hour": "hour",
            "minute": "minute",
            "second": "second",
        }
        if output_time_unit not in unit_map:
            verbose and logger.error(
                f"[{brick_display_name}] Invalid output_time_unit: '{output_time_unit}'."
            )
            raise ValueError(f"Invalid output_time_unit: '{output_time_unit}'.")
        duckdb_unit = unit_map[output_time_unit]
        difference_expr = f"date_diff('{duckdb_unit}', {time_since_expr}, {until_expr})"
        if reverse_output:
            difference_expr = f"(-1) * ({difference_expr})"
            verbose and logger.info(
                f"[{brick_display_name}] Output will be reversed (multiplied by -1)."
            )
        verbose and logger.info(
            f"[{brick_display_name}] Computing difference in {output_time_unit}s."
        )
        if not output_column:
            if until_mode == "now":
                default_name = f"{time_since_column}_to_now_{output_time_unit}s"
            elif until_mode == "date_column":
                default_name = (
                    f"{time_since_column}_to_{until_date_column}_{output_time_unit}s"
                )
            else:
                default_name = f"{time_since_column}_difference_{output_time_unit}s"
            output_column = _generate_unique_column_name(default_name, all_columns)
            verbose and logger.info(
                f"[{brick_display_name}] No output column specified. Using generated name: '{output_column}'."
            )
        elif output_column in all_columns:
            verbose and logger.warning(
                f"[{brick_display_name}] Output column '{output_column}' already exists and will be overwritten."
            )
        select_parts = []
        for col in all_columns.keys():
            sanitized_col = _sanitize_identifier(col)
            select_parts.append(f'"{sanitized_col}"')
        sanitized_output_col = _sanitize_identifier(output_column)
        select_parts.append(f'{difference_expr} AS "{sanitized_output_col}"')
        select_clause = ", ".join(select_parts)
        query = f"SELECT {select_clause} FROM input_table"
        verbose and logger.info(
            f"[{brick_display_name}] Executing query to compute date difference."
        )
        if output_format == "pandas":
            result = conn.execute(query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to pandas DataFrame."
            )
        elif output_format == "polars":
            result = conn.execute(query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = conn.execute(query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Arrow Table."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            raise ValueError(f"Unsupported output format: {output_format}")
        verbose and logger.info(
            f"[{brick_display_name}] Date difference computation completed successfully. Result written to column: '{output_column}'."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during date difference computation: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow