DuckDB Query

Execute a custom SQL DuckDB query on input data (pandas DataFrame, Polars DataFrame, or Arrow Table) and return results in the specified format.

DuckDB Query

Processing

Execute a custom SQL DuckDB query on the input data structure (which can be a pandas DataFrame, Polars DataFrame, or Arrow Table) and return the results. The input data is registered temporarily within an in-memory DuckDB connection under a specified table name, allowing standard SQL operations. The output format (Pandas DataFrame, Polars DataFrame, or Arrow Table) is configurable via options.

Inputs

data
The input data structure (DataFrame or Arrow Table) on which the SQL query will be executed.
sql query
The SQL query string to run against the registered input table.

Inputs Types

Input Types
data DataFrame, ArrowTable
sql query Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data set generated by the execution of the SQL query. The format is determined by the 'Output Format' option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The DuckDB Query brick contains some changeable options:

Table Name
Defines the name used to register the input data within the DuckDB connection. This name should be used in the FROM clause of the SQL query. (Default: input_table)
Output Format
Specifies the data type for the returned result. Choices include pandas (DataFrame), polars (DataFrame), or arrow (Arrow Table). (Default: pandas)
Verbose
Enables or disables detailed informational logging during the execution of the brick. (Default: True)
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, Str, DataFrame, ArrowTable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _sanitize_query(query):
    """
    Validate and sanitize SQL query to ensure it's not empty.
    """
    if not query:
        return None
    return query.strip()


def execute_duckdb_query(
    data: Union[DataFrame, ArrowTable], sql_query: Str, options=None
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "DuckDB Query"
    options = options or {}
    verbose = options.get("verbose", True)
    output_format = options.get("output_format", "pandas")
    table_name = options.get("table_name", "input_table")
    result = None
    conn = None
    try:
        if not isinstance(sql_query, str):
            verbose and logger.error(
                f"[{brick_display_name}] Invalid sql_query format! Expected a string."
            )
            raise ValueError("SQL query must be provided as a string!")
        sanitized_query = _sanitize_query(sql_query)
        if not sanitized_query:
            verbose and logger.error(
                f"[{brick_display_name}] SQL query cannot be empty!"
            )
            raise ValueError("SQL query cannot be empty!")
        if not isinstance(table_name, str) or not table_name.strip():
            verbose and logger.error(
                f"[{brick_display_name}] Invalid table_name! Must be a non-empty string."
            )
            raise ValueError("Table name must be a non-empty string!")
        sanitized_table_name = table_name.strip()
        verbose and logger.info(
            f"[{brick_display_name}] Starting DuckDB query execution."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register(sanitized_table_name, data)
        verbose and logger.info(
            f"[{brick_display_name}] Registered input data as '{sanitized_table_name}'."
        )
        try:
            column_info = conn.execute(f"DESCRIBE {sanitized_table_name}").fetchall()
            all_columns = {col[0]: col[1] for col in column_info}
            verbose and logger.info(
                f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
            )
        except Exception as e:
            verbose and logger.warning(
                f"[{brick_display_name}] Could not retrieve column information: {str(e)}"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Executing query: {sanitized_query[:100]}{('...' if len(sanitized_query) > 100 else '')}"
        )
        if output_format == "pandas":
            result = conn.execute(sanitized_query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to pandas DataFrame."
            )
        elif output_format == "polars":
            result = conn.execute(sanitized_query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = conn.execute(sanitized_query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Arrow Table."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            raise ValueError(f"Unsupported output format: {output_format}")
        verbose and logger.info(
            f"[{brick_display_name}] Query execution completed successfully. Result has {len(result)} row(s)."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during query execution: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow