Rename Columns

Rename columns in a DataFrame or Arrow Table.

Rename Columns

Processing

This brick renames specified columns within an input data structure, which can be a Pandas DataFrame, Polars DataFrame, or PyArrow Table. The user defines the mapping of old column names to new column names. The final structure is then converted into the format specified in the options. If 'Safe Mode' is disabled, the function will raise an error if a column specified for renaming does not exist in the input data.

Inputs

data
The DataFrame or Arrow Table whose columns need to be renamed.
column mapping (optional)
A dictionary mapping old column names (key) to new column names (value). If provided via the input port, it overrides the value set in the Options pane.

Inputs Types

Input Types
data DataFrame, ArrowTable
column mapping Dict

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure (DataFrame, Polars DataFrame, or Arrow Table) with columns renamed according to the mapping and converted to the specified output format.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Rename Columns brick contains some changeable options:

Column Rename Mapping
Defines the mapping of old column names (key) to new column names (value). This is typically entered as key-value pairs in the configuration pane.
Output Format
Specifies the desired format of the output data structure. Choices are 'pandas' (DataFrame), 'polars' (Polars DataFrame), or 'arrow' (Arrow Table). Defaults to pandas.
Safe Mode
If enabled, the brick will ignore columns specified in the mapping that do not exist in the input data, allowing the flow to continue without error. If disabled, missing columns will raise an error. Defaults to False.
Verbose
Enables detailed logging during execution, providing information about detected formats, the number of columns renamed, and the execution status. Defaults to True.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, Dict, DataFrame, ArrowTable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None))


def _is_list_of_key_value_dicts(obj):
    if not isinstance(obj, list):
        return False
    for item in obj:
        if not isinstance(item, dict):
            return False
        if set(item.keys()) != {"key", "value"}:
            return False
    return True


def _is_textual_dict(obj):
    if not isinstance(obj, dict):
        return False
    return all((isinstance(k, str) and isinstance(v, str) for (k, v) in obj.items()))


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def rename_columns(
    data: Union[DataFrame, ArrowTable], column_mapping: Dict = None, options=None
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Rename Columns"
    options = options or {}
    verbose = options.get("verbose", True)
    column_mapping = _coalesce(column_mapping, options.get("column_mapping", []))
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    no_rename = False
    if not column_mapping:
        verbose and logger.warning(
            f"[{brick_display_name}] No column mapping provided. Returning data unchanged."
        )
        result = data
        no_rename = True
    if not no_rename:
        if not (
            _is_list_of_key_value_dicts(column_mapping)
            or _is_textual_dict(column_mapping)
        ):
            verbose and logger.error(
                f"[{brick_display_name}] Invalid column mapping format!"
            )
            raise ValueError("Invalid column mapping format!")
        rename_dict = (
            {item["key"]: item["value"] for item in column_mapping}
            if isinstance(column_mapping, list)
            else column_mapping
        )
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting column rename operation with {len(rename_dict)} mappings."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            verbose and logger.info(
                f"[{brick_display_name}] Building SQL query for column renaming."
            )
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = [col[0] for col in column_info]
            if not safe_mode:
                missing_columns = [
                    col for col in rename_dict.keys() if col not in all_columns
                ]
                if missing_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                    )
                    conn.close()
                    raise ValueError(f"Columns not found in data: {missing_columns}")
            select_parts = []
            renamed_count = 0
            skipped_count = 0
            for col in all_columns:
                if col in rename_dict:
                    new_name = rename_dict[col]
                    sanitized_old = _sanitize_identifier(col)
                    sanitized_new = _sanitize_identifier(new_name)
                    select_parts.append(f'"{sanitized_old}" AS "{sanitized_new}"')
                    verbose and logger.info(
                        f"[{brick_display_name}] Renaming column: {col} -> {new_name}."
                    )
                    renamed_count += 1
                else:
                    sanitized_col = _sanitize_identifier(col)
                    select_parts.append(f'"{sanitized_col}"')
            if safe_mode:
                requested_columns = set(rename_dict.keys())
                existing_columns = set(all_columns)
                skipped_columns = requested_columns - existing_columns
                skipped_count = len(skipped_columns)
                if skipped_columns:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Safe mode: Skipped {skipped_count} non-existent columns: {list(skipped_columns)}"
                    )
            select_clause = ", ".join(select_parts)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(
                f"[{brick_display_name}] Executing query to rename columns."
            )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            conn.close()
            verbose and logger.info(
                f"[{brick_display_name}] Column rename operation completed successfully. Renamed {renamed_count} columns."
            )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during column rename operation."
            )
            raise
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow