Dates from Strings

Parse strings containing dates to datetime columns.

Dates from Strings

Processing

Parse string columns in the input data structure and convert them into datetime/timestamp columns. It supports three methods for defining the date format: automatic detection, selection from a list of common presets, or a custom strftime/strptime format string. Additionally, it provides functionality for handling timezones, allowing conversion based on a fixed timezone or dynamic values derived from a separate timezone column.

Inputs

data
The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) containing columns to be parsed.
columns (optional)
A list of specific column names targeted for date parsing. If empty and no regex is provided, all columns will be processed.
regex pattern (optional)
A regular expression pattern used to match column names that should be parsed. If provided, this selection overrides the explicit columns list.
date format mode (optional)
Defines how the date format is determined ('auto' for automatic detection, 'preset' for predefined formats, or 'custom' for a user-defined format).
date format preset (optional)
Selects a standard date format if date format mode is set to 'preset'.
date format custom (optional)
A custom strftime/strptime format string if date format mode is set to 'custom'.
prefix column (optional)
A prefix string used for naming new output columns. If provided, new columns are created instead of overwriting the originals.
timezone mode (optional)
Defines how timezone information is applied ('none' for no conversion, 'fixed' for a specified fixed timezone, or 'column' to read timezone from a dedicated column).
timezone column (optional)
The name of the column containing timezone definitions (e.g., 'America/New_York') if timezone mode is set to 'column'.
fixed timezone (optional)
A fixed timezone string (e.g., 'America/Los_Angeles') if timezone mode is set to 'fixed'.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List
regex pattern Str
date format mode Str
date format preset Str
date format custom Str
prefix column Str
timezone mode Str
timezone column Str
fixed timezone Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure containing the newly created or overwritten timestamp columns.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Dates from Strings brick contains some changeable options:

Columns to Parse
A selection of columns in the dataset to which the date parsing operation will be applied.
Regex Pattern
An optional regular expression pattern used to match columns for parsing. If provided, this overrides the explicit selection in Columns to Parse.
Date Format Mode
Controls how the system determines the date format string ('auto' for DuckDB's automatic casting, 'preset' for common templates, or 'custom' for manual input).
Date Format Preset
If the mode is set to 'preset', select one of the common date and time format templates provided (e.g., YYYY-MM-DD, ISO 8601).
Custom Date Format
If the mode is set to 'custom', specify the explicit strftime format string for date parsing (e.g., %Y-%m-%d).
Prefix Column
If specified, parsed dates will be written to new columns using this prefix, preserving the original string columns. If left empty, parsing occurs in-place.
Timezone Mode
Specifies how timezone information should be handled: 'none' (no conversion), 'fixed' (apply a selected fixed timezone), or 'column' (read timezone from a specified column).
Timezone Column
The name of the column that contains the timezone strings if Timezone Mode is set to 'column'.
Fixed Timezone
The specific IANA timezone string to use if Timezone Mode is set to 'fixed'. Defaults to UTC.
Output Format
Specifies the desired format of the output data structure: pandas DataFrame, polars DataFrame, or arrow Table.
Verbose
Enables detailed logging of the process steps, warnings, and final status.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def _generate_unique_column_name(base_name, existing_columns):
    """
    Generate a unique column name that doesn't conflict with existing columns.
    """
    if base_name not in existing_columns:
        return base_name
    counter = 1
    while f"{base_name}_{counter}" in existing_columns:
        counter += 1
    return f"{base_name}_{counter}"


def _convert_preset_to_format(preset):
    """
    Convert human-readable preset to strptime format string.
    """
    preset_mapping = {
        "YYYY-MM-DD": "%Y-%m-%d",
        "YYYY/MM/DD": "%Y/%m/%d",
        "DD-MM-YYYY": "%d-%m-%Y",
        "DD/MM/YYYY": "%d/%m/%Y",
        "MM-DD-YYYY": "%m-%d-%Y",
        "MM/DD/YYYY": "%m/%d/%Y",
        "YYYY-MM-DD HH:MM:SS": "%Y-%m-%d %H:%M:%S",
        "YYYY/MM/DD HH:MM:SS": "%Y/%m/%d %H:%M:%S",
        "DD-MM-YYYY HH:MM:SS": "%d-%m-%Y %H:%M:%S",
        "DD/MM/YYYY HH:MM:SS": "%d/%m/%Y %H:%M:%S",
        "MM-DD-YYYY HH:MM:SS": "%m-%d-%Y %H:%M:%S",
        "MM/DD/YYYY HH:MM:SS": "%m/%d/%Y %H:%M:%S",
        "YYYY-MM-DD HH:MM": "%Y-%m-%d %H:%M",
        "DD/MM/YYYY HH:MM": "%d/%m/%Y %H:%M",
        "MM/DD/YYYY HH:MM": "%m/%d/%Y %H:%M",
        "YYYYMMDD": "%Y%m%d",
        "DD Mon YYYY": "%d %b %Y",
        "DD Month YYYY": "%d %B %Y",
        "Mon DD, YYYY": "%b %d, %Y",
        "Month DD, YYYY": "%B %d, %Y",
        "ISO 8601": "%Y-%m-%dT%H:%M:%S",
        "ISO 8601 UTC": "%Y-%m-%dT%H:%M:%SZ",
    }
    return preset_mapping.get(preset, None)


def _match_columns_by_regex(pattern, all_columns):
    """
    Match column names using a regex pattern.
    """
    if not pattern:
        return []
    try:
        regex = re.compile(pattern)
        matched = [col for col in all_columns if regex.search(col)]
        return matched
    except re.error as e:
        raise ValueError(f"Invalid regex pattern: {pattern}. Error: {str(e)}")


def parse_dates(
    data: Union[DataFrame, ArrowTable],
    columns: List = None,
    regex_pattern: Str = None,
    date_format_mode: Str = None,
    date_format_preset: Str = None,
    date_format_custom: Str = None,
    prefix_column: Str = None,
    timezone_mode: Str = None,
    timezone_column: Str = None,
    fixed_timezone: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Dates from Strings"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    date_format_mode = _coalesce(
        date_format_mode, options.get("date_format_mode", "auto")
    )
    date_format_preset = _coalesce(
        date_format_preset, options.get("date_format_preset", "YYYY-MM-DD")
    )
    date_format_custom = _coalesce(
        date_format_custom, options.get("date_format_custom", "")
    )
    prefix_column = _coalesce(prefix_column, options.get("prefix_column", ""))
    timezone_mode = _coalesce(timezone_mode, options.get("timezone_mode", "none"))
    timezone_column = _coalesce(timezone_column, options.get("timezone_column", ""))
    fixed_timezone = _coalesce(fixed_timezone, options.get("fixed_timezone", "UTC"))
    output_format = options.get("output_format", "pandas")
    result = None
    conn = None
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    try:
        parse_all_columns = len(columns) == 0 and (not regex_pattern)
        parse_mode = None
        if regex_pattern:
            parse_mode = "regex_pattern"
        elif parse_all_columns:
            parse_mode = "all_columns"
        else:
            parse_mode = "column_list"
        verbose and logger.info(
            f"[{brick_display_name}] Detected mode: '{parse_mode}'. Starting date parsing operation."
        )
        date_format = None
        if date_format_mode == "auto":
            date_format = None
            verbose and logger.info(
                f"[{brick_display_name}] Using automatic date format detection."
            )
        elif date_format_mode == "preset":
            date_format = _convert_preset_to_format(date_format_preset)
            if date_format is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Invalid preset format: '{date_format_preset}'."
                )
                raise ValueError(f"Invalid preset format: '{date_format_preset}'.")
            verbose and logger.info(
                f"[{brick_display_name}] Using preset date format: '{date_format_preset}' -> '{date_format}'."
            )
        elif date_format_mode == "custom":
            if not date_format_custom:
                verbose and logger.error(
                    f"[{brick_display_name}] Custom date format must be specified when date_format_mode is 'custom'."
                )
                raise ValueError(
                    "Custom date format must be specified when date_format_mode is 'custom'."
                )
            date_format = date_format_custom
            verbose and logger.info(
                f"[{brick_display_name}] Using custom date format: '{date_format}'."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Invalid date_format_mode: '{date_format_mode}'."
            )
            raise ValueError(f"Invalid date_format_mode: '{date_format_mode}'.")
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        all_column_names = list(all_columns.keys())
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
        )
        columns_to_parse = []
        if parse_mode == "regex_pattern":
            columns_to_parse = _match_columns_by_regex(regex_pattern, all_column_names)
            if not columns_to_parse:
                verbose and logger.warning(
                    f"[{brick_display_name}] No columns matched the regex pattern: '{regex_pattern}'. Returning data unchanged."
                )
                result = data
            else:
                verbose and logger.info(
                    f"[{brick_display_name}] Matched {len(columns_to_parse)} columns using regex pattern: {columns_to_parse}."
                )
        elif parse_mode == "all_columns":
            columns_to_parse = all_column_names
            verbose and logger.info(
                f"[{brick_display_name}] Parsing all {len(columns_to_parse)} columns."
            )
        else:
            missing_columns = [col for col in columns if col not in all_columns]
            if missing_columns:
                verbose and logger.error(
                    f"[{brick_display_name}] Columns not found in data: {missing_columns}."
                )
                raise ValueError(f"Columns not found in data: {missing_columns}")
            columns_to_parse = columns
            verbose and logger.info(
                f"[{brick_display_name}] Parsing {len(columns_to_parse)} specified columns: {columns_to_parse}."
            )
        if not columns_to_parse:
            verbose and logger.warning(
                f"[{brick_display_name}] No columns to parse. Returning data unchanged."
            )
            result = data
        else:
            if timezone_mode == "column":
                if not timezone_column:
                    verbose and logger.error(
                        f"[{brick_display_name}] Timezone column must be specified when timezone_mode is 'column'."
                    )
                    raise ValueError(
                        "Timezone column must be specified when timezone_mode is 'column'."
                    )
                if timezone_column not in all_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Timezone column '{timezone_column}' not found in data."
                    )
                    raise ValueError(
                        f"Timezone column '{timezone_column}' not found in data."
                    )
                verbose and logger.info(
                    f"[{brick_display_name}] Using timezone from column: '{timezone_column}'."
                )
            elif timezone_mode == "fixed":
                verbose and logger.info(
                    f"[{brick_display_name}] Using fixed timezone: '{fixed_timezone}'."
                )
            else:
                verbose and logger.info(
                    f"[{brick_display_name}] No timezone conversion will be applied."
                )
            in_place = not prefix_column
            if in_place:
                verbose and logger.info(
                    f"[{brick_display_name}] Parsing dates in-place (overwriting original columns)."
                )
            else:
                verbose and logger.info(
                    f"[{brick_display_name}] Creating new columns with prefix: '{prefix_column}'."
                )
            select_parts = []
            parsed_count = 0
            for col in all_column_names:
                sanitized_col = _sanitize_identifier(col)
                if col in columns_to_parse:
                    if date_format:
                        parse_expr = f"""\n                        CASE \n                            WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n                            ELSE TRY_STRPTIME("{sanitized_col}", '{date_format}')\n                        END\n                        """
                    else:
                        parse_expr = f"""\n                        CASE \n                            WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n                            ELSE TRY_CAST("{sanitized_col}" AS TIMESTAMP)\n                        END\n                        """
                    if timezone_mode == "column":
                        sanitized_tz_col = _sanitize_identifier(timezone_column)
                        parse_expr = f'\n                        CASE \n                            WHEN ({parse_expr}) IS NULL THEN NULL\n                            ELSE timezone("{sanitized_tz_col}", {parse_expr})\n                        END\n                        '
                    elif timezone_mode == "fixed" and fixed_timezone != "UTC":
                        parse_expr = f"\n                        CASE \n                            WHEN ({parse_expr}) IS NULL THEN NULL\n                            ELSE timezone('{fixed_timezone}', {parse_expr})\n                        END\n                        "
                    if in_place:
                        output_col = col
                    else:
                        output_col = f"{prefix_column}{col}"
                        output_col = _generate_unique_column_name(
                            output_col, all_columns
                        )
                    sanitized_output = _sanitize_identifier(output_col)
                    select_parts.append(f'{parse_expr} AS "{sanitized_output}"')
                    parsed_count += 1
                    verbose and logger.info(
                        f"[{brick_display_name}] Parsing column '{col}' to '{output_col}' (handling NULL/empty values)."
                    )
                    if not in_place:
                        pass
                if not in_place or col not in columns_to_parse:
                    select_parts.append(f'"{sanitized_col}"')
            if in_place:
                select_parts = []
                for col in all_column_names:
                    sanitized_col = _sanitize_identifier(col)
                    if col in columns_to_parse:
                        if date_format:
                            parse_expr = f"""\n                            CASE \n                                WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n                                ELSE TRY_STRPTIME("{sanitized_col}", '{date_format}')\n                            END\n                            """
                        else:
                            parse_expr = f"""\n                            CASE \n                                WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n                                ELSE TRY_CAST("{sanitized_col}" AS TIMESTAMP)\n                            END\n                            """
                        if timezone_mode == "column":
                            sanitized_tz_col = _sanitize_identifier(timezone_column)
                            parse_expr = f'\n                            CASE \n                                WHEN ({parse_expr}) IS NULL THEN NULL\n                                ELSE timezone("{sanitized_tz_col}", {parse_expr})\n                            END\n                            '
                        elif timezone_mode == "fixed" and fixed_timezone != "UTC":
                            parse_expr = f"\n                            CASE \n                                WHEN ({parse_expr}) IS NULL THEN NULL\n                                ELSE timezone('{fixed_timezone}', {parse_expr})\n                            END\n                            "
                        select_parts.append(f'{parse_expr} AS "{sanitized_col}"')
                    else:
                        select_parts.append(f'"{sanitized_col}"')
            else:
                select_parts = []
                new_columns = []
                for col in all_column_names:
                    sanitized_col = _sanitize_identifier(col)
                    select_parts.append(f'"{sanitized_col}"')
                    if col in columns_to_parse:
                        if date_format:
                            parse_expr = f"""\n                            CASE \n                                WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n                                ELSE TRY_STRPTIME("{sanitized_col}", '{date_format}')\n                            END\n                            """
                        else:
                            parse_expr = f"""\n                            CASE \n                                WHEN "{sanitized_col}" IS NULL OR TRIM("{sanitized_col}") = '' THEN NULL\n                                ELSE TRY_CAST("{sanitized_col}" AS TIMESTAMP)\n                            END\n                            """
                        if timezone_mode == "column":
                            sanitized_tz_col = _sanitize_identifier(timezone_column)
                            parse_expr = f'\n                            CASE \n                                WHEN ({parse_expr}) IS NULL THEN NULL\n                                ELSE timezone("{sanitized_tz_col}", {parse_expr})\n                            END\n                            '
                        elif timezone_mode == "fixed" and fixed_timezone != "UTC":
                            parse_expr = f"\n                            CASE \n                                WHEN ({parse_expr}) IS NULL THEN NULL\n                                ELSE timezone('{fixed_timezone}', {parse_expr})\n                            END\n                            "
                        output_col = f"{prefix_column}{col}"
                        output_col = _generate_unique_column_name(
                            output_col, all_columns
                        )
                        sanitized_output = _sanitize_identifier(output_col)
                        new_columns.append(f'{parse_expr} AS "{sanitized_output}"')
                select_parts.extend(new_columns)
            select_clause = ", ".join(select_parts)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(
                f"[{brick_display_name}] Executing query to parse dates (skipping NULL/empty values)."
            )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            verbose and logger.info(
                f"[{brick_display_name}] Date parsing completed successfully. Parsed {parsed_count} column(s)."
            )
    except Exception as e:
        verbose and logger.error(f"[{brick_display_name}] Error during date parsing")
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow