Fill All Values
Replace all values in columns with a fixed value.
Fill All Values
Processing
This brick allows users to replace all existing values within specified columns of an input dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) with a single, fixed value. Column selection can be achieved by providing an explicit list of column names, using a regular expression pattern, or by targeting all columns if neither is provided.
Inputs
- data
- The input dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) to be processed.
- columns (optional)
- A list of specific column names to apply the fill operation to. If left empty, the operation applies to all columns unless a
regex patternor theRegex Patternoption is provided. - regex pattern (optional)
- A regular expression used to dynamically select column names for the fill operation.
- fill value (optional)
- The value to be inserted into the selected columns, replacing all existing data.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns |
List |
regex pattern |
Str |
fill value |
Str, Bool, Number, Date, Datetime |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting dataset with the selected columns filled with the specified value, returned in the format defined by the
Output Formatoption.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Fill All Values brick contains some changeable options:
- Columns to Fill
- A list of columns that will have all their values replaced by the
Fill Value. This list is ignored if aRegex Patternis provided. - Regex Pattern
- If provided, this pattern will be used to select all matching column names for the fill operation. If both
Columns to FillandRegex Patternare empty, all columns are targeted. - Fill Value
- The static value that will be inserted into the target columns. This value can be a string, number, boolean, or date/datetime literal.
- Auto Cast Value
- If enabled (default), the system attempts to cast the
Fill Valueto match the native data type of the target column (e.g., converting a string representation of a number into an integer type). If disabled, the value is treated as a string, potentially causing type conflicts if inserted into strict numeric columns. - Output Format
- Specifies the data format for the returned result:
pandasDataFrame (default),polarsDataFrame, orarrowTable. - Safe Mode
- If enabled, the brick will safely skip columns that are specified but do not exist in the input data, or columns where type casting of the
Fill Valuefails, instead of raising an error and halting the flow. - Verbose
- If enabled, detailed logs about the process, column matching, and casting attempts will be printed to the console.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import (
Union,
List,
DataFrame,
ArrowTable,
Str,
Number,
Date,
Datetime,
Bool,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _get_sql_cast_type(duckdb_type):
"""
Map DuckDB types to appropriate SQL cast types.
"""
type_lower = duckdb_type.lower()
if any(
(t in type_lower for t in ["tinyint", "smallint", "integer", "bigint", "int"])
):
return "BIGINT"
if any(
(t in type_lower for t in ["float", "double", "real", "decimal", "numeric"])
):
return "DOUBLE"
if "bool" in type_lower:
return "BOOLEAN"
if "date" in type_lower and "time" not in type_lower:
return "DATE"
if "timestamp" in type_lower:
return "TIMESTAMP"
if "time" in type_lower:
return "TIME"
return "VARCHAR"
def _cast_value_for_sql(value, sql_type, column_name):
"""
Prepare a value for SQL insertion, handling casting appropriately.
Returns a SQL expression string.
NOTE: This function assumes 'value' is already a STRING.
"""
if value == "":
if sql_type in ["BIGINT", "DOUBLE", "BOOLEAN"]:
return "NULL"
return "''"
if value.upper() == "NULL":
return "NULL"
if sql_type == "BOOLEAN":
if value.lower() in ["true", "1", "yes", "t", "y"]:
return "TRUE"
elif value.lower() in ["false", "0", "no", "f", "n"]:
return "FALSE"
else:
raise ValueError(
f"Cannot cast '{value}' to BOOLEAN for column '{column_name}'"
)
if sql_type in ["BIGINT", "DOUBLE"]:
try:
float(value)
return value
except ValueError:
raise ValueError(
f"Cannot cast '{value}' to {sql_type} for column '{column_name}'"
)
if sql_type in ["DATE", "TIMESTAMP", "TIME"]:
escaped_value = value.replace("'", "''")
return f"CAST('{escaped_value}' AS {sql_type})"
escaped_value = value.replace("'", "''")
return f"'{escaped_value}'"
def fill_column(
data: Union[DataFrame, ArrowTable],
columns: List = None,
regex_pattern: Str = None,
fill_value: Union[Str, Bool, Number, Date, Datetime] = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Fill All Values"
options = options or {}
verbose = options.get("verbose", True)
columns = _coalesce(columns, options.get("columns", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
fill_all_columns = len(columns) == 0
fill_value = _coalesce(fill_value, options.get("fill_value", ""))
fill_value_str = str(fill_value)
auto_cast = options.get("auto_cast", True)
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
try:
fill_mode = None
if regex_pattern:
fill_mode = "regex_pattern"
elif fill_all_columns:
fill_mode = "all_columns"
else:
fill_mode = "column_list"
verbose and logger.info(
f"[{brick_display_name}] Detected mode: '{fill_mode}'. Starting fill all operation with value: '{fill_value_str}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
columns_to_fill = []
if fill_mode == "column_list":
if not safe_mode:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
raise ValueError(f"Columns not found in data: {missing_columns}")
columns_to_fill = [col for col in columns if col in all_columns]
skipped = len(columns) - len(columns_to_fill)
if safe_mode and skipped > 0:
skipped_cols = [col for col in columns if col not in all_columns]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
)
verbose and logger.info(
f"[{brick_display_name}] Filling {len(columns_to_fill)} column(s): {columns_to_fill}."
)
elif fill_mode == "regex_pattern":
try:
pattern = re.compile(regex_pattern)
columns_to_fill = [
col for col in all_columns.keys() if pattern.search(col)
]
if not columns_to_fill:
verbose and logger.warning(
f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
)
verbose and logger.info(
f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_fill)} columns: {columns_to_fill}."
)
except re.error as e:
verbose and logger.error(
f"[{brick_display_name}] Invalid regex pattern."
)
raise ValueError(f"Invalid regex pattern!")
elif fill_mode == "all_columns":
columns_to_fill = list(all_columns.keys())
verbose and logger.info(
f"[{brick_display_name}] Filling all {len(columns_to_fill)} columns."
)
select_parts = []
filled_count = 0
for col in all_columns.keys():
sanitized_col = _sanitize_identifier(col)
if col in columns_to_fill:
col_type = all_columns[col]
try:
if auto_cast:
sql_type = _get_sql_cast_type(col_type)
cast_value = _cast_value_for_sql(fill_value_str, sql_type, col)
verbose and logger.info(
f"[{brick_display_name}] Column '{col}' (type: {col_type}): replacing all values with {cast_value} (cast to {sql_type})."
)
select_parts.append(f'{cast_value} AS "{sanitized_col}"')
else:
escaped_value = fill_value_str.replace("'", "''")
select_parts.append(
f''''{escaped_value}' AS "{sanitized_col}"'''
)
verbose and logger.info(
f"[{brick_display_name}] Column '{col}': replacing all values with '{fill_value_str}' (no auto-cast)."
)
filled_count += 1
except ValueError as e:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipping column '{col}' due to cast error."
)
select_parts.append(f'"{sanitized_col}"')
else:
verbose and logger.error(
f"[{brick_display_name}] Error casting value for column '{col}'."
)
raise
else:
select_parts.append(f'"{sanitized_col}"')
if filled_count == 0:
verbose and logger.warning(
f"[{brick_display_name}] No columns were filled. Returning data unchanged."
)
result = data
else:
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to fill all values."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Fill all operation completed successfully. Filled {filled_count} column(s)."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during fill all operation: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow