Sort Data
Sort rows in a DataFrame or Arrow Table by one or more columns.
Sort Data
Processing
This brick sorts the rows of an incoming data structure (Pandas DataFrame, Polars DataFrame, or PyArrow Table) based on one or more specified columns and their corresponding sort directions (Ascending or Descending). Users can also optionally select a subset of columns to be returned and specify the desired output format (Pandas, Polars, or Arrow Table).
Inputs
- data
- The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) that needs to be sorted.
- columns (optional)
- A list of specific column names to select and include in the output data. If this list is empty or not provided, all columns from the input data will be returned.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns |
List |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting data structure (DataFrame or Arrow Table) containing the sorted rows, filtered according to the column selection if specified.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Sort Data brick contains some changeable options:
- Sort Columns
- Define the columns used for sorting. This requires specifying the column name (
key) and the sort direction (value), which must be eitherASC(Ascending) orDESC(Descending). - Columns to Select
- A list of specific column names to be included in the output data structure. This option is used if the
columnsinput port is not connected. - Output Format
- Specifies the desired format of the output data structure. Choices include
pandas,polars, orarrow. - Verbose
- If enabled, detailed operation logs will be displayed during the sorting process.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
"""Return the first non-None value."""
return next((v for v in values if v is not None))
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def sort_data(
data: Union[DataFrame, ArrowTable], columns: List = None, options=None
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Sort Data"
options = options or {}
verbose = options.get("verbose", True)
sort_columns = options.get("sort_columns", [])
columns = _coalesce(columns, options.get("columns", []))
output_format = options.get("output_format", "pandas")
result = None
if columns and (
not isinstance(columns, list) or not all((isinstance(c, str) for c in columns))
):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list of strings."
)
raise ValueError("Columns must be provided as a list of strings!")
try:
verbose and logger.info(f"[{brick_display_name}] Starting sorting operation.")
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = [col[0] for col in column_info]
total_rows = conn.execute("SELECT COUNT(*) FROM input_table").fetchone()[0]
verbose and logger.info(
f"[{brick_display_name}] Total rows in input data: {total_rows}."
)
if columns and len(columns) > 0:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
conn.close()
raise ValueError(f"Columns not found in data: {missing_columns}")
selected_columns = columns
verbose and logger.info(
f"[{brick_display_name}] Selecting specific columns: {columns}."
)
else:
selected_columns = all_columns
verbose and logger.info(
f"[{brick_display_name}] Selecting all columns ({len(all_columns)} columns)."
)
sanitized_columns = [
f'"{_sanitize_identifier(col)}"' for col in selected_columns
]
select_clause = ", ".join(sanitized_columns)
query = f"SELECT {select_clause} FROM input_table"
if sort_columns and len(sort_columns) > 0:
sort_col_names = [item["key"] for item in sort_columns]
missing_sort_columns = [
col for col in sort_col_names if col not in all_columns
]
if missing_sort_columns:
verbose and logger.error(
f"[{brick_display_name}] Sort columns not found in data: {missing_sort_columns}"
)
conn.close()
raise ValueError(
f"Sort columns not found in data: {missing_sort_columns}"
)
order_by_parts = []
for item in sort_columns:
col_name = item["key"]
order_dir = item["value"]
sanitized_col = _sanitize_identifier(col_name)
order_by_parts.append(f'"{sanitized_col}" {order_dir}')
order_by_clause = ", ".join(order_by_parts)
sorting_columns_display = [
f"{item['key']} {item['value']}" for item in sort_columns
]
verbose and logger.info(
f"[{brick_display_name}] Sorting by columns: {', '.join(sorting_columns_display)}."
)
query = f"{query} ORDER BY {order_by_clause}"
else:
verbose and logger.info(
f"[{brick_display_name}] No sorting specified. Returning data in original order."
)
verbose and logger.info(f"[{brick_display_name}] Executing query to sort data.")
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
conn.close()
raise ValueError(f"Unsupported output format: {output_format}")
conn.close()
result_rows = len(result)
verbose and logger.info(
f"[{brick_display_name}] Sorting operation completed successfully. Returned {result_rows} rows with {len(selected_columns)} columns."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during sorting operation."
)
raise
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow