Move Columns

Move one or more columns to a specific position in a DataFrame or Arrow Table.

Move Columns

Processing

Move one or more columns to a specific position within the input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table).

Inputs

data
The input DataFrame or Arrow Table whose columns need to be reordered.
columns to move (optional)
A list of column names that should be moved to the new position. This is usually set via options.
position type (optional)
Defines where the columns should be moved (e.g., 'beginning', 'end', 'before', 'after'). This is usually set via options.
reference column (optional)
The column used as a reference point when position type is 'before' or 'after'. This is usually set via options.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns to_move List
position type Str
reference column Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure (DataFrame or Arrow Table) with columns reordered according to the specified configuration, converted to the chosen Output Format.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Move Columns brick contains some changeable options:

Columns to Move
A list of column names to be moved to the target position.
Position Type
Specifies the location where the columns should be moved. Choices include "beginning", "end", "before" (relative to a reference column), or "after" (relative to a reference column).
Reference Column
The name of the existing column used as a reference point when Position Type is set to "before" or "after".
Output Format
Defines the desired format of the output data structure (pandas, polars, or arrow). Defaults to pandas.
Safe Mode
If enabled, columns listed in Columns to Move that do not exist in the input data will be ignored, allowing the function to proceed without error.
Verbose
If enabled, the function will print detailed execution logs.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None))


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def move_columns(
    data: Union[DataFrame, ArrowTable],
    columns_to_move: List = None,
    position_type: Str = None,
    reference_column: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Move Columns"
    options = options or {}
    verbose = options.get("verbose", True)
    columns_to_move = _coalesce(columns_to_move, options.get("columns_to_move", []))
    position_type = _coalesce(position_type, options.get("position_type", "beginning"))
    reference_column = _coalesce(reference_column, options.get("reference_column", ""))
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    no_reorder = False
    if not columns_to_move:
        verbose and logger.warning(
            f"[{brick_display_name}] No columns specified for reordering. Returning data unchanged."
        )
        result = data
        no_reorder = True
    if not no_reorder:
        if not isinstance(columns_to_move, list) and (
            not all((isinstance(c, str) for c in columns_to_move))
        ):
            verbose and logger.error(
                f"[{brick_display_name}] Invalid columns format! Expected a list."
            )
            raise ValueError("Columns to move must be provided as a list!")
        valid_positions = ["beginning", "end", "before", "after"]
        if position_type not in valid_positions:
            verbose and logger.error(
                f"[{brick_display_name}] Invalid position type: {position_type}. Must be one of {valid_positions}."
            )
            raise ValueError(f"Position type must be one of {valid_positions}")
        if position_type in ["before", "after"] and (not reference_column):
            verbose and logger.error(
                f"[{brick_display_name}] Reference column is required when position type is '{position_type}'."
            )
            raise ValueError(
                f"Reference column is required for position type '{position_type}'"
            )
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting column reorder operation: moving {len(columns_to_move)} column(s) to {position_type}{(f' {reference_column}' if reference_column else '')}."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = [col[0] for col in column_info]
            verbose and logger.info(
                f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
            )
            if not safe_mode:
                missing_columns = [
                    col for col in columns_to_move if col not in all_columns
                ]
                if missing_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                    )
                    conn.close()
                    raise ValueError(f"Columns not found in data: {missing_columns}")
            if position_type in ["before", "after"]:
                if reference_column not in all_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Reference column '{reference_column}' not found in data."
                    )
                    conn.close()
                    raise ValueError(
                        f"Reference column '{reference_column}' not found in data"
                    )
                if reference_column in columns_to_move:
                    verbose and logger.error(
                        f"[{brick_display_name}] Reference column '{reference_column}' cannot be in the list of columns to move."
                    )
                    conn.close()
                    raise ValueError(
                        f"Reference column '{reference_column}' cannot be in the list of columns to move"
                    )
            columns_to_move_set = set(columns_to_move)
            valid_columns_to_move = [
                col for col in columns_to_move if col in all_columns
            ]
            remaining_columns = [
                col for col in all_columns if col not in columns_to_move_set
            ]
            new_column_order = []
            if position_type == "beginning":
                new_column_order = valid_columns_to_move + remaining_columns
                verbose and logger.info(
                    f"[{brick_display_name}] Moving columns to beginning."
                )
            elif position_type == "end":
                new_column_order = remaining_columns + valid_columns_to_move
                verbose and logger.info(
                    f"[{brick_display_name}] Moving columns to end."
                )
            elif position_type == "before":
                for col in remaining_columns:
                    if col == reference_column:
                        new_column_order.extend(valid_columns_to_move)
                    new_column_order.append(col)
                verbose and logger.info(
                    f"[{brick_display_name}] Moving columns before '{reference_column}'."
                )
            elif position_type == "after":
                for col in remaining_columns:
                    new_column_order.append(col)
                    if col == reference_column:
                        new_column_order.extend(valid_columns_to_move)
                verbose and logger.info(
                    f"[{brick_display_name}] Moving columns after '{reference_column}'."
                )
            select_parts = []
            for col in new_column_order:
                sanitized_col = _sanitize_identifier(col)
                select_parts.append(f'"{sanitized_col}"')
            select_clause = ", ".join(select_parts)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(
                f"[{brick_display_name}] Executing query to reorder columns."
            )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            conn.close()
            skipped_count = len(columns_to_move) - len(valid_columns_to_move)
            if safe_mode and skipped_count > 0:
                skipped_columns = [
                    col for col in columns_to_move if col not in all_columns
                ]
                verbose and logger.warning(
                    f"[{brick_display_name}] Safe mode: Skipped {skipped_count} non-existent columns: {skipped_columns}"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Column reorder operation completed successfully. Moved {len(valid_columns_to_move)} column(s)."
            )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during column reorder operation: {str(e)}"
            )
            raise
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow