Extract Date Elements
Extract various elements of a date column to other columns.
Extract Date Elements
Processing
This function extracts specific temporal components (like year, month, day, hour, etc.) from a designated date or timestamp column within a tabular dataset. It leverages DuckDB for efficient SQL-based operations. The function supports timezone handling, allowing the user to specify a fixed timezone or use values from a dedicated timezone column. The resulting output data contains all original columns plus the newly calculated columns for the selected date elements. The naming convention for the new columns can be customized using prefix modes.
Inputs
- data
- The input tabular data, provided as a Pandas DataFrame, Polars DataFrame, or PyArrow Table.
- date column (optional)
- The name of the column containing the date or timestamp values to be processed. This is a required field, typically configured via options.
- timezone column (optional)
- The name of an optional column containing specific IANA timezone strings per row. If provided, this overrides the fixed timezone setting.
- fixed timezone (optional)
- A specific IANA timezone string to apply uniformly to the date column if no timezone column is specified.
- date elements (optional)
- A list specifying which date components (e.g., 'year', 'day_of_week') should be extracted.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
date column |
Str |
timezone column |
Str |
fixed timezone |
Str |
date elements |
List |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting data structure containing all original columns and the newly generated columns corresponding to the extracted date elements.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Extract Date Elements brick contains some changeable options:
- Date Column
- The column in the input data containing the date or timestamp values to be extracted. (Required input).
- Timezone Column
- An optional column containing IANA timezone strings used to localize the date before extraction. If this option is set, it overrides the Fixed Timezone.
- Fixed Timezone
- A specific IANA timezone string (from the provided list) applied to all date values if the Timezone Column is empty. Defaults to UTC.
- Date Elements to Extract
- A selection of temporal components to generate as new columns.
- Column Name Prefix Mode
- Defines how the names of the newly created columns are prefixed. Choices include:
none(just the element name),date_column(uses the source column name), orcustom. - Custom Prefix
- A user-defined string used as the prefix if the Prefix Mode is set to 'custom'.
- Prefix Separator
- The separator character placed between the chosen prefix and the element name (default:
_). - Output Format
- Specifies the type of the returned object:
pandasDataFrame,polarsDataFrame, orarrowTable. - Verbose
- Enables detailed logging during execution, providing information about the process steps and validation checks.
The output columns generated by the selected Date Elements include:
- year: The year component (Integer).
- month: The month component (1-12, Integer).
- day: The day of the month component (1-31, Integer).
- day_of_week: The ISO day of the week (1=Monday to 7=Sunday, Integer).
- hour: The hour component (0-23, Integer).
- minute: The minute component (0-59, Integer).
- second: The second component (0-59, Integer).
- millisecond: The millisecond component (0-999, Integer).
- week_of_year: The week number of the year (Integer).
- timestamp: The total Unix epoch time (seconds since 1970-01-01 00:00:00 UTC, Big Integer).
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _is_date_type(duckdb_type):
"""
Check if a DuckDB type is a date/timestamp type.
Covers all possible date/time types in DuckDB.
"""
type_lower = duckdb_type.lower()
date_time_keywords = [
"date",
"timestamp",
"timestamptz",
"timestamp with time zone",
"timestamp without time zone",
"time",
"timetz",
"time with time zone",
"time without time zone",
"datetime",
"interval",
]
return any((keyword in type_lower for keyword in date_time_keywords))
def _build_column_name(
element_name, prefix_mode, date_column, custom_prefix, separator
):
"""
Build the output column name based on prefix mode.
Args:
element_name: The date element name (e.g., "year", "month")
prefix_mode: One of "none", "date_column", or "custom"
date_column: The name of the date column
custom_prefix: Custom prefix string
separator: Separator between prefix and element name
Returns:
The constructed column name
"""
if prefix_mode == "none":
return element_name
elif prefix_mode == "date_column":
return f"{date_column}{separator}{element_name}"
elif prefix_mode == "custom":
if custom_prefix:
return f"{custom_prefix}{separator}{element_name}"
else:
return element_name
else:
return element_name
def extract_date_elements(
data: Union[DataFrame, ArrowTable],
date_column: Str = None,
timezone_column: Str = None,
fixed_timezone: Str = None,
date_elements: List = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Extract Date Elements"
options = options or {}
verbose = options.get("verbose", True)
date_column = _coalesce(date_column, options.get("date_column", ""))
timezone_column = _coalesce(timezone_column, options.get("timezone_column", ""))
fixed_timezone = _coalesce(fixed_timezone, options.get("fixed_timezone", "UTC"))
date_elements = _coalesce(date_elements, options.get("date_elements", []))
prefix_mode = options.get("prefix_mode", "none")
custom_prefix = options.get("custom_prefix", "")
prefix_separator = options.get("prefix_separator", "_")
output_format = options.get("output_format", "pandas")
result = None
conn = None
if not date_column:
verbose and logger.error(
f"[{brick_display_name}] Date column must be specified."
)
raise ValueError("Date column must be specified.")
if not isinstance(date_elements, list):
verbose and logger.error(
f"[{brick_display_name}] Date elements must be provided as a list."
)
raise ValueError("Date elements must be provided as a list.")
try:
verbose and logger.info(
f"[{brick_display_name}] Starting date element extraction from column: '{date_column}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if date_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Date column '{date_column}' not found in data."
)
raise ValueError(f"Date column '{date_column}' not found in data.")
if not _is_date_type(all_columns[date_column]):
verbose and logger.warning(
f"[{brick_display_name}] Date column '{date_column}' (type: {all_columns[date_column]}) may not be a date/timestamp type. Attempting to cast to TIMESTAMP."
)
use_timezone_column = False
if timezone_column:
if timezone_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Timezone column '{timezone_column}' not found in data."
)
raise ValueError(
f"Timezone column '{timezone_column}' not found in data."
)
use_timezone_column = True
verbose and logger.info(
f"[{brick_display_name}] Using timezone from column: '{timezone_column}'."
)
else:
verbose and logger.info(
f"[{brick_display_name}] Using fixed timezone: '{fixed_timezone}'."
)
if prefix_mode == "date_column":
verbose and logger.info(
f"[{brick_display_name}] Using date column name '{date_column}' as prefix with separator '{prefix_separator}'."
)
elif prefix_mode == "custom" and custom_prefix:
verbose and logger.info(
f"[{brick_display_name}] Using custom prefix '{custom_prefix}' with separator '{prefix_separator}'."
)
else:
verbose and logger.info(
f"[{brick_display_name}] No prefix will be added to output column names."
)
sanitized_date_col = _sanitize_identifier(date_column)
if use_timezone_column:
sanitized_tz_col = _sanitize_identifier(timezone_column)
date_expr = (
f'timezone("{sanitized_tz_col}", "{sanitized_date_col}"::TIMESTAMPTZ)'
)
elif fixed_timezone == "UTC":
date_expr = f'"{sanitized_date_col}"::TIMESTAMP'
else:
date_expr = (
f"""timezone('{fixed_timezone}', "{sanitized_date_col}"::TIMESTAMPTZ)"""
)
select_parts = []
extraction_count = 0
for col in all_columns.keys():
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
extraction_config = {
"year": f"EXTRACT(YEAR FROM {date_expr})",
"month": f"EXTRACT(MONTH FROM {date_expr})",
"day": f"EXTRACT(DAY FROM {date_expr})",
"day_of_week": f"EXTRACT(ISODOW FROM {date_expr})",
"hour": f"EXTRACT(HOUR FROM {date_expr})",
"minute": f"EXTRACT(MINUTE FROM {date_expr})",
"second": f"EXTRACT(SECOND FROM {date_expr})::INTEGER",
"millisecond": f"EXTRACT(MILLISECOND FROM {date_expr})::INTEGER",
"week_of_year": f"EXTRACT(WEEK FROM {date_expr})",
"timestamp": f"EPOCH({date_expr})::BIGINT",
}
for element_type in date_elements:
if not isinstance(element_type, str):
verbose and logger.warning(
f"[{brick_display_name}] Invalid date element: {element_type}. Expected string. Skipping."
)
continue
if element_type not in extraction_config:
verbose and logger.warning(
f"[{brick_display_name}] Unknown date element type: '{element_type}'. Skipping."
)
continue
sql_expr = extraction_config[element_type]
output_col = _build_column_name(
element_type, prefix_mode, date_column, custom_prefix, prefix_separator
)
if output_col in all_columns:
verbose and logger.warning(
f"[{brick_display_name}] Output column '{output_col}' already exists and will be overwritten."
)
sanitized_output_col = _sanitize_identifier(output_col)
select_parts.append(f'{sql_expr} AS "{sanitized_output_col}"')
extraction_count += 1
verbose and logger.info(
f"[{brick_display_name}] Extracting {element_type} to column: '{output_col}'."
)
if extraction_count == 0:
verbose and logger.warning(
f"[{brick_display_name}] No date elements specified for extraction. Returning data unchanged."
)
result = data
else:
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to extract date elements."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Date element extraction completed successfully. Extracted {extraction_count} element(s)."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during date element extraction: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow