Copy Column

Copy the content of a source column into a target column.

Copy Column

Processing

This brick copies the data content from a specified source column into a target column within the input dataset. If the target column already exists, the function's behavior is governed by the Overwrite if Target Exists option.

Inputs

data
The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) containing the columns to be processed.
source column (optional)
The name of the existing column whose contents will be copied. This input is typically set via the brick options.
target column (optional)
The name of the column where the copied data will be placed. This input is typically set via the brick options.

Inputs Types

Input Types
data DataFrame, ArrowTable
source column Str
target column Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The modified dataset, with the contents of the source column duplicated into the target column, returned in the format specified by the Output Type option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Copy Column brick contains some changeable options:

Source Column
The name of the existing column from which data will be copied. This field must be specified.
Target Column
The name of the column where the copied data will be stored. This column will be created if it does not exist.
Output Type
Determines the desired format for the returned dataset (pandas, polars, or arrow). Defaults to 'pandas'.
Overwrite if Target Exists
If enabled (default), and the target column already exists, its contents will be overwritten with the source data. If disabled and the target column exists, an error will be raised.
Verbose
Enables detailed logging messages regarding the column duplication process during execution.
import logging
from coded_flows.types import Union, DataFrame, ArrowTable, Str
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None))


def copy_column(
    data: Union[DataFrame, ArrowTable],
    source_column: Str = None,
    target_column: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Copy Column"
    options = options or {}
    source_column = _coalesce(source_column, options.get("source_column", ""))
    target_column = _coalesce(target_column, options.get("target_column", ""))
    output_type = options.get("output_type", "pandas")
    overwrite = options.get("overwrite", True)
    verbose = options.get("verbose", True)
    result = None
    con = duckdb.connect(":memory:")
    try:
        if not source_column:
            verbose and logger.error(
                f"[{brick_display_name}] Source column name cannot be empty"
            )
            raise ValueError("Source column name cannot be empty")
        if not target_column:
            verbose and logger.error(
                f"[{brick_display_name}] Target column name cannot be empty"
            )
            raise ValueError("Target column name cannot be empty")
        verbose and logger.info(
            f"[{brick_display_name}] Starting column duplication: '{source_column}' -> '{target_column}'."
        )
        is_pandas = isinstance(data, pd.DataFrame)
        is_polars = isinstance(data, pl.DataFrame)
        is_arrow = isinstance(data, pa.Table)
        if is_arrow or is_pandas or is_polars:
            con.register("temp_table", data)
            columns = con.execute("DESCRIBE temp_table").fetchall()
            column_names = [col[0] for col in columns]
            target_exists = target_column in column_names
            if source_column not in column_names:
                verbose and logger.error(
                    f"[{brick_display_name}] Source column '{source_column}' not found"
                )
                raise ValueError(f"Source column '{source_column}' not found")
            verbose and logger.info(
                f"[{brick_display_name}] Duplicating column using DuckDB."
            )
            query = ""
            if target_exists:
                if not overwrite:
                    error_msg = f"Target column '{target_column}' already exists and overwrite is set to False."
                    verbose and logger.error(f"[{brick_display_name}] {error_msg}")
                    raise ValueError(error_msg)
                else:
                    verbose and logger.info(
                        f"[{brick_display_name}] Target column '{target_column}' exists and will be overwritten in place."
                    )
                    query = f'SELECT * REPLACE ("{target_column}" AS "{source_column}") FROM temp_table'
            else:
                query = (
                    f'SELECT *, "{source_column}" AS "{target_column}" FROM temp_table'
                )
            if output_type == "pandas":
                result = con.execute(query).df()
            elif output_type == "polars":
                result = con.execute(query).pl()
            elif output_type == "arrow":
                result = con.execute(query).fetch_arrow_table()
        else:
            verbose and logger.error(
                f"Unsupported data type. Supported types: DataFrame, ArrowTable"
            )
            raise TypeError(
                f"Unsupported data type. Supported types:  DataFrame, ArrowTable"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Column '{source_column}' successfully duplicated to '{target_column}'."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error duplicating column: {str(e)}"
        )
        raise
    finally:
        con.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow