Dates from Strings
Parse strings containing dates to datetime columns.
Dates from Strings
Processing
Parse string columns in the input data structure and convert them into datetime/timestamp columns. It supports three methods for defining the date format: automatic detection, selection from a list of common presets, or a custom strftime/strptime format string. Additionally, it provides functionality for handling timezones, allowing conversion based on a fixed timezone or dynamic values derived from a separate timezone column.
Inputs
- data
- The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) containing columns to be parsed.
- columns (optional)
- A list of specific column names targeted for date parsing. If empty and no regex is provided, all columns will be processed.
- regex pattern (optional)
- A regular expression pattern used to match column names that should be parsed. If provided, this selection overrides the explicit
columnslist. - date format mode (optional)
- Defines how the date format is determined ('auto' for automatic detection, 'preset' for predefined formats, or 'custom' for a user-defined format).
- date format preset (optional)
- Selects a standard date format if
date format modeis set to 'preset'. - date format custom (optional)
- A custom
strftime/strptimeformat string ifdate format modeis set to 'custom'. - prefix column (optional)
- A prefix string used for naming new output columns. If provided, new columns are created instead of overwriting the originals.
- timezone mode (optional)
- Defines how timezone information is applied ('none' for no conversion, 'fixed' for a specified fixed timezone, or 'column' to read timezone from a dedicated column).
- timezone column (optional)
- The name of the column containing timezone definitions (e.g., 'America/New_York') if
timezone modeis set to 'column'. - fixed timezone (optional)
- A fixed timezone string (e.g., 'America/Los_Angeles') if
timezone modeis set to 'fixed'.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns |
List |
regex pattern |
Str |
date format mode |
Str |
date format preset |
Str |
date format custom |
Str |
prefix column |
Str |
timezone mode |
Str |
timezone column |
Str |
fixed timezone |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting data structure containing the newly created or overwritten timestamp columns.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Dates from Strings brick contains some changeable options:
- Columns to Parse
- A selection of columns in the dataset to which the date parsing operation will be applied.
- Regex Pattern
- An optional regular expression pattern used to match columns for parsing. If provided, this overrides the explicit selection in
Columns to Parse. - Date Format Mode
- Controls how the system determines the date format string ('auto' for DuckDB's automatic casting, 'preset' for common templates, or 'custom' for manual input).
- Date Format Preset
- If the mode is set to 'preset', select one of the common date and time format templates provided (e.g., YYYY-MM-DD, ISO 8601).
- Custom Date Format
- If the mode is set to 'custom', specify the explicit
strftimeformat string for date parsing (e.g.,%Y-%m-%d). - Prefix Column
- If specified, parsed dates will be written to new columns using this prefix, preserving the original string columns. If left empty, parsing occurs in-place.
- Timezone Mode
- Specifies how timezone information should be handled: 'none' (no conversion), 'fixed' (apply a selected fixed timezone), or 'column' (read timezone from a specified column).
- Timezone Column
- The name of the column that contains the timezone strings if
Timezone Modeis set to 'column'. - Fixed Timezone
- The specific IANA timezone string to use if
Timezone Modeis set to 'fixed'. Defaults to UTC. - Output Format
- Specifies the desired format of the output data structure:
pandasDataFrame,polarsDataFrame, orarrowTable. - Verbose
- Enables detailed logging of the process steps, warnings, and final status.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _generate_unique_column_name(base_name, existing_columns):
"""
Generate a unique column name that doesn't conflict with existing columns.
"""
if base_name not in existing_columns:
return base_name
counter = 1
while f"{base_name}_{counter}" in existing_columns:
counter += 1
return f"{base_name}_{counter}"
def _convert_preset_to_format(preset):
"""
Convert human-readable preset to strptime format string.
"""
preset_mapping = {
"YYYY-MM-DD": "%Y-%m-%d",
"YYYY/MM/DD": "%Y/%m/%d",
"DD-MM-YYYY": "%d-%m-%Y",
"DD/MM/YYYY": "%d/%m/%Y",
"MM-DD-YYYY": "%m-%d-%Y",
"MM/DD/YYYY": "%m/%d/%Y",
"YYYY-MM-DD HH:MM:SS": "%Y-%m-%d %H:%M:%S",
"YYYY/MM/DD HH:MM:SS": "%Y/%m/%d %H:%M:%S",
"DD-MM-YYYY HH:MM:SS": "%d-%m-%Y %H:%M:%S",
"DD/MM/YYYY HH:MM:SS": "%d/%m/%Y %H:%M:%S",
"MM-DD-YYYY HH:MM:SS": "%m-%d-%Y %H:%M:%S",
"MM/DD/YYYY HH:MM:SS": "%m/%d/%Y %H:%M:%S",
"YYYY-MM-DD HH:MM": "%Y-%m-%d %H:%M",
"DD/MM/YYYY HH:MM": "%d/%m/%Y %H:%M",
"MM/DD/YYYY HH:MM": "%m/%d/%Y %H:%M",
"YYYYMMDD": "%Y%m%d",
"DD Mon YYYY": "%d %b %Y",
"DD Month YYYY": "%d %B %Y",
"Mon DD, YYYY": "%b %d, %Y",
"Month DD, YYYY": "%B %d, %Y",
"ISO 8601": "%Y-%m-%dT%H:%M:%S",
"ISO 8601 UTC": "%Y-%m-%dT%H:%M:%SZ",
}
return preset_mapping.get(preset, None)
def _match_columns_by_regex(pattern, all_columns):
"""
Match column names using a regex pattern.
"""
if not pattern:
return []
try:
regex = re.compile(pattern)
matched = [col for col in all_columns if regex.search(col)]
return matched
except re.error as e:
raise ValueError(f"Invalid regex pattern: {pattern}. Error: {str(e)}")
def parse_dates(
data: Union[DataFrame, ArrowTable],
columns: List = None,
regex_pattern: Str = None,
date_format_mode: Str = None,
date_format_preset: Str = None,
date_format_custom: Str = None,
prefix_column: Str = None,
timezone_mode: Str = None,
timezone_column: Str = None,
fixed_timezone: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Dates from Strings"
options = options or {}
verbose = options.get("verbose", True)
columns = _coalesce(columns, options.get("columns", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
date_format_mode = _coalesce(
date_format_mode, options.get("date_format_mode", "auto")
)
date_format_preset = _coalesce(
date_format_preset, options.get("date_format_preset", "YYYY-MM-DD")
)
date_format_custom = _coalesce(
date_format_custom, options.get("date_format_custom", "")
)
prefix_column = _coalesce(prefix_column, options.get("prefix_column", ""))
timezone_mode = _coalesce(timezone_mode, options.get("timezone_mode", "none"))
timezone_column = _coalesce(timezone_column, options.get("timezone_column", ""))
fixed_timezone = _coalesce(fixed_timezone, options.get("fixed_timezone", "UTC"))
output_format = options.get("output_format", "pandas")
result = None
conn = None
if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
try:
parse_all_columns = len(columns) == 0 and (not regex_pattern)
parse_mode = None
if regex_pattern:
parse_mode = "regex_pattern"
elif parse_all_columns:
parse_mode = "all_columns"
else:
parse_mode = "column_list"
verbose and logger.info(
f"[{brick_display_name}] Detected mode: '{parse_mode}'. Starting date parsing operation."
)
date_format = None
if date_format_mode == "auto":
date_format = None
verbose and logger.info(
f"[{brick_display_name}] Using automatic date format detection."
)
elif date_format_mode == "preset":
date_format = _convert_preset_to_format(date_format_preset)
if date_format is None:
verbose and logger.error(
f"[{brick_display_name}] Invalid preset format: '{date_format_preset}'."
)
raise ValueError(f"Invalid preset format: '{date_format_preset}'.")
verbose and logger.info(
f"[{brick_display_name}] Using preset date format: '{date_format_preset}' -> '{date_format}'."
)
elif date_format_mode == "custom":
if not date_format_custom:
verbose and logger.error(
f"[{brick_display_name}] Custom date format must be specified when date_format_mode is 'custom'."
)
raise ValueError(
"Custom date format must be specified when date_format_mode is 'custom'."
)
date_format = date_format_custom
verbose and logger.info(
f"[{brick_display_name}] Using custom date format: '{date_format}'."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Invalid date_format_mode: '{date_format_mode}'."
)
raise ValueError(f"Invalid date_format_mode: '{date_format_mode}'.")
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
all_column_names = list(all_columns.keys())
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
columns_to_parse = []
if parse_mode == "regex_pattern":
columns_to_parse = _match_columns_by_regex(regex_pattern, all_column_names)
if not columns_to_parse:
verbose and logger.warning(
f"[{brick_display_name}] No columns matched the regex pattern: '{regex_pattern}'. Returning data unchanged."
)
result = data
else:
verbose and logger.info(
f"[{brick_display_name}] Matched {len(columns_to_parse)} columns using regex pattern: {columns_to_parse}."
)
elif parse_mode == "all_columns":
columns_to_parse = all_column_names
verbose and logger.info(
f"[{brick_display_name}] Parsing all {len(columns_to_parse)} columns."
)
else:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}."
)
raise ValueError(f"Columns not found in data: {missing_columns}")
columns_to_parse = columns
verbose and logger.info(
f"[{brick_display_name}] Parsing {len(columns_to_parse)} specified columns: {columns_to_parse}."
)
if not columns_to_parse:
verbose and logger.warning(
f"[{brick_display_name}] No columns to parse. Returning data unchanged."
)
result = data
else:
if timezone_mode == "column":
if not timezone_column:
verbose and logger.error(
f"[{brick_display_name}] Timezone column must be specified when timezone_mode is 'column'."
)
raise ValueError(
"Timezone column must be specified when timezone_mode is 'column'."
)
if timezone_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Timezone column '{timezone_column}' not found in data."
)
raise ValueError(
f"Timezone column '{timezone_column}' not found in data."
)
verbose and logger.info(
f"[{brick_display_name}] Using timezone from column: '{timezone_column}'."
)
elif timezone_mode == "fixed":
verbose and logger.info(
f"[{brick_display_name}] Using fixed timezone: '{fixed_timezone}'."
)
else:
verbose and logger.info(
f"[{brick_display_name}] No timezone conversion will be applied."
)
in_place = not prefix_column
if in_place:
verbose and logger.info(
f"[{brick_display_name}] Parsing dates in-place (overwriting original columns)."
)
else:
verbose and logger.info(
f"[{brick_display_name}] Creating new columns with prefix: '{prefix_column}'."
)
select_parts = []
parsed_count = 0
for col in all_column_names:
sanitized_col = _sanitize_identifier(col)
if col in columns_to_parse:
if date_format:
parse_expr = f"""\n CASE \n WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n ELSE TRY_STRPTIME("{sanitized_col}", '{date_format}')\n END\n """
else:
parse_expr = f"""\n CASE \n WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n ELSE TRY_CAST("{sanitized_col}" AS TIMESTAMP)\n END\n """
if timezone_mode == "column":
sanitized_tz_col = _sanitize_identifier(timezone_column)
parse_expr = f'\n CASE \n WHEN ({parse_expr}) IS NULL THEN NULL\n ELSE timezone("{sanitized_tz_col}", {parse_expr})\n END\n '
elif timezone_mode == "fixed" and fixed_timezone != "UTC":
parse_expr = f"\n CASE \n WHEN ({parse_expr}) IS NULL THEN NULL\n ELSE timezone('{fixed_timezone}', {parse_expr})\n END\n "
if in_place:
output_col = col
else:
output_col = f"{prefix_column}{col}"
output_col = _generate_unique_column_name(
output_col, all_columns
)
sanitized_output = _sanitize_identifier(output_col)
select_parts.append(f'{parse_expr} AS "{sanitized_output}"')
parsed_count += 1
verbose and logger.info(
f"[{brick_display_name}] Parsing column '{col}' to '{output_col}' (handling NULL/empty values)."
)
if not in_place:
pass
if not in_place or col not in columns_to_parse:
select_parts.append(f'"{sanitized_col}"')
if in_place:
select_parts = []
for col in all_column_names:
sanitized_col = _sanitize_identifier(col)
if col in columns_to_parse:
if date_format:
parse_expr = f"""\n CASE \n WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n ELSE TRY_STRPTIME("{sanitized_col}", '{date_format}')\n END\n """
else:
parse_expr = f"""\n CASE \n WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n ELSE TRY_CAST("{sanitized_col}" AS TIMESTAMP)\n END\n """
if timezone_mode == "column":
sanitized_tz_col = _sanitize_identifier(timezone_column)
parse_expr = f'\n CASE \n WHEN ({parse_expr}) IS NULL THEN NULL\n ELSE timezone("{sanitized_tz_col}", {parse_expr})\n END\n '
elif timezone_mode == "fixed" and fixed_timezone != "UTC":
parse_expr = f"\n CASE \n WHEN ({parse_expr}) IS NULL THEN NULL\n ELSE timezone('{fixed_timezone}', {parse_expr})\n END\n "
select_parts.append(f'{parse_expr} AS "{sanitized_col}"')
else:
select_parts.append(f'"{sanitized_col}"')
else:
select_parts = []
new_columns = []
for col in all_column_names:
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
if col in columns_to_parse:
if date_format:
parse_expr = f"""\n CASE \n WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n ELSE TRY_STRPTIME("{sanitized_col}", '{date_format}')\n END\n """
else:
parse_expr = f"""\n CASE \n WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n ELSE TRY_CAST("{sanitized_col}" AS TIMESTAMP)\n END\n """
if timezone_mode == "column":
sanitized_tz_col = _sanitize_identifier(timezone_column)
parse_expr = f'\n CASE \n WHEN ({parse_expr}) IS NULL THEN NULL\n ELSE timezone("{sanitized_tz_col}", {parse_expr})\n END\n '
elif timezone_mode == "fixed" and fixed_timezone != "UTC":
parse_expr = f"\n CASE \n WHEN ({parse_expr}) IS NULL THEN NULL\n ELSE timezone('{fixed_timezone}', {parse_expr})\n END\n "
output_col = f"{prefix_column}{col}"
output_col = _generate_unique_column_name(
output_col, all_columns
)
sanitized_output = _sanitize_identifier(output_col)
new_columns.append(f'{parse_expr} AS "{sanitized_output}"')
select_parts.extend(new_columns)
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to parse dates (skipping NULL/empty values)."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Date parsing completed successfully. Parsed {parsed_count} column(s)."
)
except Exception as e:
verbose and logger.error(f"[{brick_display_name}] Error during date parsing")
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow