DuckDB Query
Execute a custom SQL DuckDB query on input data (pandas DataFrame, Polars DataFrame, or Arrow Table) and return results in the specified format.
DuckDB Query
Processing
Execute a custom SQL DuckDB query on the input data structure (which can be a pandas DataFrame, Polars DataFrame, or Arrow Table) and return the results. The input data is registered temporarily within an in-memory DuckDB connection under a specified table name, allowing standard SQL operations. The output format (Pandas DataFrame, Polars DataFrame, or Arrow Table) is configurable via options.
Inputs
- data
- The input data structure (DataFrame or Arrow Table) on which the SQL query will be executed.
- sql query
- The SQL query string to run against the registered input table.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
sql query |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting data set generated by the execution of the SQL query. The format is determined by the 'Output Format' option.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The DuckDB Query brick contains some changeable options:
- Table Name
- Defines the name used to register the input data within the DuckDB connection. This name should be used in the
FROMclause of the SQL query. (Default:input_table) - Output Format
- Specifies the data type for the returned result. Choices include
pandas(DataFrame),polars(DataFrame), orarrow(Arrow Table). (Default:pandas) - Verbose
- Enables or disables detailed informational logging during the execution of the brick. (Default: True)
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, Str, DataFrame, ArrowTable
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _sanitize_query(query):
"""
Validate and sanitize SQL query to ensure it's not empty.
"""
if not query:
return None
return query.strip()
def execute_duckdb_query(
data: Union[DataFrame, ArrowTable], sql_query: Str, options=None
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "DuckDB Query"
options = options or {}
verbose = options.get("verbose", True)
output_format = options.get("output_format", "pandas")
table_name = options.get("table_name", "input_table")
result = None
conn = None
try:
if not isinstance(sql_query, str):
verbose and logger.error(
f"[{brick_display_name}] Invalid sql_query format! Expected a string."
)
raise ValueError("SQL query must be provided as a string!")
sanitized_query = _sanitize_query(sql_query)
if not sanitized_query:
verbose and logger.error(
f"[{brick_display_name}] SQL query cannot be empty!"
)
raise ValueError("SQL query cannot be empty!")
if not isinstance(table_name, str) or not table_name.strip():
verbose and logger.error(
f"[{brick_display_name}] Invalid table_name! Must be a non-empty string."
)
raise ValueError("Table name must be a non-empty string!")
sanitized_table_name = table_name.strip()
verbose and logger.info(
f"[{brick_display_name}] Starting DuckDB query execution."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register(sanitized_table_name, data)
verbose and logger.info(
f"[{brick_display_name}] Registered input data as '{sanitized_table_name}'."
)
try:
column_info = conn.execute(f"DESCRIBE {sanitized_table_name}").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
except Exception as e:
verbose and logger.warning(
f"[{brick_display_name}] Could not retrieve column information: {str(e)}"
)
verbose and logger.info(
f"[{brick_display_name}] Executing query: {sanitized_query[:100]}{('...' if len(sanitized_query) > 100 else '')}"
)
if output_format == "pandas":
result = conn.execute(sanitized_query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(sanitized_query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(sanitized_query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Query execution completed successfully. Result has {len(result)} row(s)."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during query execution: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow