Concatenate Columns

Concatenate values across columns using a delimiter string.

Concatenate Columns

Processing

This brick combines the values of multiple specified input columns into a single string column, using a defined delimiter. The resulting data structure contains all original columns plus the newly created column. Users can control how null values are handled during concatenation and specify the desired output format.

Inputs

data
The input DataFrame or Arrow Table containing the columns to be processed.
columns (optional)
The list of column names to be included in the concatenation. If not provided, the value is taken from the brick options.
delimiter (optional)
The string used to separate the concatenated values. If not provided, the value is taken from the brick options (defaults to '').
output column (optional)
The name to assign to the newly created concatenated column. If not provided, the value is taken from the brick options (defaults to 'concatenated').
skip null (optional)
Determines if null values in the input columns should be skipped during concatenation. If not provided, the value is taken from the brick options (defaults to True).

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List
delimiter Str
output column Str
skip null Bool

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure (DataFrame or Arrow Table) including all original columns and the new concatenated column, matching the format specified in the options.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Concatenate Columns brick contains some changeable options:

Columns to Concatenate
Specifies which columns from the input data will be combined into the new output column.
Delimiter
The string used to separate the values of the combined columns. Defaults to an empty string.
Output Column Name
The name for the newly created column containing the concatenated strings. Defaults to concatenated.
Skip Null Values
If enabled, null or missing values within the selected columns are ignored during concatenation (using SQL CONCAT_WS). If disabled, the presence of any null value in the input columns for a row will result in a null value for the output column (using SQL CONCAT). Defaults to True.
Output Format
Specifies the required format for the output data structure. Choices include pandas, polars, or arrow. Defaults to pandas.
Safe Mode
If enabled, columns specified for concatenation that do not exist in the input data will be gracefully skipped, allowing the brick to process the remaining valid columns without raising an error. Defaults to False.
Verbose
Controls whether detailed information and process logs are printed during execution. Defaults to True.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def concatenate_columns(
    data: Union[DataFrame, ArrowTable],
    columns: List = None,
    delimiter: Str = None,
    output_column: Str = None,
    skip_null: Bool = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Concatenate Columns"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    delimiter = _coalesce(delimiter, options.get("delimiter", ""))
    output_column = _coalesce(
        output_column, options.get("output_column", "concatenated")
    )
    skip_null = _coalesce(skip_null, options.get("skip_null", True))
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not columns:
        verbose and logger.error(
            f"[{brick_display_name}] No columns specified for concatenation."
        )
        raise ValueError("At least one column must be specified for concatenation")
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    if not output_column:
        verbose and logger.error(
            f"[{brick_display_name}] Output column name cannot be empty."
        )
        raise ValueError("Output column name cannot be empty")
    try:
        verbose and logger.info(
            f"[{brick_display_name}] Starting concatenation of {len(columns)} column(s) with delimiter '{delimiter}'."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
        )
        if not safe_mode:
            missing_columns = [col for col in columns if col not in all_columns]
            if missing_columns:
                verbose and logger.error(
                    f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                )
                raise ValueError(f"Columns not found in data: {missing_columns}")
        valid_columns = [col for col in columns if col in all_columns]
        if not valid_columns:
            verbose and logger.error(
                f"[{brick_display_name}] No valid columns found for concatenation."
            )
            raise ValueError("No valid columns found for concatenation")
        skipped = len(columns) - len(valid_columns)
        if safe_mode and skipped > 0:
            skipped_cols = [col for col in columns if col not in all_columns]
            verbose and logger.warning(
                f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Concatenating {len(valid_columns)} column(s): {valid_columns}."
        )
        if output_column in all_columns:
            verbose and logger.warning(
                f"[{brick_display_name}] Output column '{output_column}' already exists and will be overwritten."
            )
        sanitized_output = _sanitize_identifier(output_column)
        escaped_delimiter = delimiter.replace("'", "''")
        if skip_null:
            cast_columns = []
            for col in valid_columns:
                sanitized_col = _sanitize_identifier(col)
                cast_columns.append(f'CAST("{sanitized_col}" AS VARCHAR)')
            concat_expr = f"CONCAT_WS('{escaped_delimiter}', {', '.join(cast_columns)})"
            verbose and logger.info(
                f"[{brick_display_name}] Using CONCAT_WS to skip NULL values."
            )
        else:
            cast_columns = []
            for i, col in enumerate(valid_columns):
                sanitized_col = _sanitize_identifier(col)
                cast_columns.append(f'CAST("{sanitized_col}" AS VARCHAR)')
            if len(cast_columns) == 1:
                concat_expr = cast_columns[0]
            else:
                concat_parts = []
                for i, cast_col in enumerate(cast_columns):
                    if i > 0:
                        concat_parts.append(f"'{escaped_delimiter}'")
                    concat_parts.append(cast_col)
                concat_expr = "CONCAT(" + ", ".join(concat_parts) + ")"
            verbose and logger.info(
                f"[{brick_display_name}] Using CONCAT to preserve NULL values."
            )
        select_parts = []
        for col in all_columns.keys():
            if col != output_column:
                sanitized_col = _sanitize_identifier(col)
                select_parts.append(f'"{sanitized_col}"')
        select_parts.append(f'{concat_expr} AS "{sanitized_output}"')
        select_clause = ", ".join(select_parts)
        query = f"SELECT {select_clause} FROM input_table"
        verbose and logger.info(
            f"[{brick_display_name}] Executing query to concatenate columns."
        )
        if output_format == "pandas":
            result = conn.execute(query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to pandas DataFrame."
            )
        elif output_format == "polars":
            result = conn.execute(query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = conn.execute(query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Arrow Table."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            raise ValueError(f"Unsupported output format: {output_format}")
        verbose and logger.info(
            f"[{brick_display_name}] Concatenation completed successfully. Output column: '{output_column}'."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during concatenation: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow