Date Difference
Compute the difference between a date column and another time reference.
Date Difference
Processing
Compute the time difference between a date/timestamp column (time since column) and a specified reference point. The reference point (until mode) can be the current time, a value from another column, or a fixed date. The resulting difference, measured in the configured time unit (year, month, day, etc.), is appended as a new column to the input data structure.
Inputs
- data
- The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) containing the date column.
- time since column (optional)
- The name of the date/timestamp column from which the time difference calculation begins.
- until mode (optional)
- Defines the reference point for the end date ('now', 'fixed_date', or 'date_column'). Defaults to 'now'.
- until date column (optional)
- The name of the column containing the end date. Required if
until modeis set to 'date_column'. - until fixed date (optional)
- The fixed date string (must be in ISO format like YYYY-MM-DD or YYYY-MM-DD HH:MM:SS) to use as the end reference point. Required if
until modeis set to 'fixed_date'. - output time unit (optional)
- The unit in which the resulting time difference should be calculated (e.g., 'day', 'month', 'year').
- output column (optional)
- The name for the new column containing the computed date difference. If left empty, a descriptive name is generated.
- reverse output (optional)
- If True, the calculated difference is multiplied by -1, effectively swapping the calculation direction.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
time since column |
Str |
until mode |
Str |
until date column |
Str |
until fixed date |
Str |
output time unit |
Str |
output column |
Str |
reverse output |
Bool |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The input data structure updated with the new column containing the computed date differences.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Date Difference brick contains some changeable options:
- Time Since Column
- The date/timestamp column used as the starting point for calculating the difference.
- Until
- Defines the temporal reference point for calculating the difference. Choices are 'now' (current time), 'fixed_date' (a specific date string), or 'date_column' (another column in the dataset).
- Until Date Column
- The column containing the end date, required if Until is set to 'date_column'.
- Until Fixed Date
- The fixed date string (in ISO format) used as the end date, required if Until is set to 'fixed_date'.
- Output Time Unit
- The unit in which the resulting time difference is expressed. Choices: year, month, week, day, hour, minute, second.
- Output Column
- The name for the newly created column containing the computed difference.
- Reverse Output
- If enabled, the output difference is multiplied by -1, reversing the mathematical direction of the time subtraction.
- Output Format
- Specifies the desired output type (pandas, polars, or arrow). This determines the format of the resulting data structure.
- Verbose
- If enabled, logs informational messages about the computation process.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from datetime import datetime
from coded_flows.types import Union, DataFrame, ArrowTable, Str, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _is_date_type(duckdb_type):
"""
Check if a DuckDB type is a date/timestamp type.
Covers all possible date/time types in DuckDB.
"""
type_lower = duckdb_type.lower()
date_time_keywords = [
"date",
"timestamp",
"timestamptz",
"timestamp with time zone",
"timestamp without time zone",
"time",
"timetz",
"time with time zone",
"time without time zone",
"datetime",
"interval",
]
return any((keyword in type_lower for keyword in date_time_keywords))
def _generate_unique_column_name(base_name, existing_columns):
"""
Generate a unique column name that doesn't conflict with existing columns.
"""
if base_name not in existing_columns:
return base_name
counter = 1
while f"{base_name}_{counter}" in existing_columns:
counter += 1
return f"{base_name}_{counter}"
def compute_date_difference(
data: Union[DataFrame, ArrowTable],
time_since_column: Str = None,
until_mode: Str = None,
until_date_column: Str = None,
until_fixed_date: Str = None,
output_time_unit: Str = None,
output_column: Str = None,
reverse_output: Bool = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Date Difference"
options = options or {}
verbose = options.get("verbose", True)
time_since_column = _coalesce(
time_since_column, options.get("time_since_column", "")
)
until_mode = _coalesce(until_mode, options.get("until_mode", "now"))
until_date_column = _coalesce(
until_date_column, options.get("until_date_column", "")
)
until_fixed_date = _coalesce(until_fixed_date, options.get("until_fixed_date", ""))
output_time_unit = _coalesce(
output_time_unit, options.get("output_time_unit", "day")
)
output_column = _coalesce(output_column, options.get("output_column", ""))
reverse_output = _coalesce(reverse_output, options.get("reverse_output", False))
output_format = options.get("output_format", "pandas")
result = None
conn = None
if not time_since_column:
verbose and logger.error(
f"[{brick_display_name}] Time since column must be specified."
)
raise ValueError("Time since column must be specified.")
try:
verbose and logger.info(
f"[{brick_display_name}] Starting date difference computation from column: '{time_since_column}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if time_since_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Time since column '{time_since_column}' not found in data."
)
raise ValueError(
f"Time since column '{time_since_column}' not found in data."
)
if not _is_date_type(all_columns[time_since_column]):
verbose and logger.warning(
f"[{brick_display_name}] Time since column '{time_since_column}' (type: {all_columns[time_since_column]}) may not be a date/timestamp type. Attempting to cast to TIMESTAMP."
)
sanitized_time_since = _sanitize_identifier(time_since_column)
until_expr = None
if until_mode == "now":
until_expr = "CURRENT_TIMESTAMP"
verbose and logger.info(
f"[{brick_display_name}] Computing difference until current time."
)
elif until_mode == "date_column":
if not until_date_column:
verbose and logger.error(
f"[{brick_display_name}] Until date column must be specified when until_mode is 'date_column'."
)
raise ValueError(
"Until date column must be specified when until_mode is 'date_column'."
)
if until_date_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Until date column '{until_date_column}' not found in data."
)
raise ValueError(
f"Until date column '{until_date_column}' not found in data."
)
if not _is_date_type(all_columns[until_date_column]):
verbose and logger.warning(
f"[{brick_display_name}] Until date column '{until_date_column}' (type: {all_columns[until_date_column]}) may not be a date/timestamp type. Attempting to cast to TIMESTAMP."
)
sanitized_until_col = _sanitize_identifier(until_date_column)
until_expr = f'"{sanitized_until_col}"::TIMESTAMP'
verbose and logger.info(
f"[{brick_display_name}] Computing difference until column: '{until_date_column}'."
)
elif until_mode == "fixed_date":
if not until_fixed_date:
verbose and logger.error(
f"[{brick_display_name}] Fixed date must be specified when until_mode is 'fixed_date'."
)
raise ValueError(
"Fixed date must be specified when until_mode is 'fixed_date'."
)
try:
datetime.fromisoformat(until_fixed_date.replace("Z", "+00:00"))
until_expr = f"TIMESTAMP '{until_fixed_date}'"
verbose and logger.info(
f"[{brick_display_name}] Computing difference until fixed date: '{until_fixed_date}'."
)
except ValueError:
verbose and logger.error(
f"[{brick_display_name}] Invalid fixed date format: '{until_fixed_date}'. Use ISO format (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)."
)
raise ValueError(
f"Invalid fixed date format: '{until_fixed_date}'. Use ISO format."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Invalid until_mode: '{until_mode}'."
)
raise ValueError(f"Invalid until_mode: '{until_mode}'.")
time_since_expr = f'"{sanitized_time_since}"::TIMESTAMP'
unit_map = {
"year": "year",
"month": "month",
"week": "week",
"day": "day",
"hour": "hour",
"minute": "minute",
"second": "second",
}
if output_time_unit not in unit_map:
verbose and logger.error(
f"[{brick_display_name}] Invalid output_time_unit: '{output_time_unit}'."
)
raise ValueError(f"Invalid output_time_unit: '{output_time_unit}'.")
duckdb_unit = unit_map[output_time_unit]
difference_expr = f"date_diff('{duckdb_unit}', {time_since_expr}, {until_expr})"
if reverse_output:
difference_expr = f"(-1) * ({difference_expr})"
verbose and logger.info(
f"[{brick_display_name}] Output will be reversed (multiplied by -1)."
)
verbose and logger.info(
f"[{brick_display_name}] Computing difference in {output_time_unit}s."
)
if not output_column:
if until_mode == "now":
default_name = f"{time_since_column}_to_now_{output_time_unit}s"
elif until_mode == "date_column":
default_name = (
f"{time_since_column}_to_{until_date_column}_{output_time_unit}s"
)
else:
default_name = f"{time_since_column}_difference_{output_time_unit}s"
output_column = _generate_unique_column_name(default_name, all_columns)
verbose and logger.info(
f"[{brick_display_name}] No output column specified. Using generated name: '{output_column}'."
)
elif output_column in all_columns:
verbose and logger.warning(
f"[{brick_display_name}] Output column '{output_column}' already exists and will be overwritten."
)
select_parts = []
for col in all_columns.keys():
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
sanitized_output_col = _sanitize_identifier(output_column)
select_parts.append(f'{difference_expr} AS "{sanitized_output_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to compute date difference."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Date difference computation completed successfully. Result written to column: '{output_column}'."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during date difference computation: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow