Impute Missing Values
Impute missing values in numerical columns with mean, median, or mode.
Impute Missing Values
Processing
This brick identify and impute missing values (NULLs) in numerical columns of a dataset. It calculates a central tendency statistic (mean, median, or mode) for the specified columns and uses this value to replace all existing missing entries. Columns can be targeted explicitly via a list, matched using a regex pattern, or all available numeric columns can be processed automatically.
Inputs
- data
- The input DataFrame (Pandas, Polars) or Arrow Table containing missing values.
- columns (optional)
- A list of specific column names to target for imputation. If left empty and no regex is provided, all numeric columns are targeted.
- regex pattern (optional)
- A regular expression used to select columns for imputation. If provided, this overrides the column list input.
- impute method (optional)
- The statistical method used for calculating the imputation value ('mean', 'median', or 'mode').
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns |
List |
regex pattern |
Str |
impute method |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting dataset with missing values imputed in the targeted columns, converted to the specified output format.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Impute Missing Values brick contains some changeable options:
- Columns to Impute
- Used to explicitly list the columns that should have their missing values filled. This list is ignored if a Regex Pattern is provided.
- Regex Pattern
- A regular expression used to select columns for imputation. If provided, this setting takes precedence over the explicit column list.
- Imputation Method
- Specifies the central tendency statistic used to calculate the replacement value (mean, median, or mode). Defaults to 'mean'.
- Output Format
- Specifies the desired format for the returned dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table). Defaults to 'pandas'.
- Safe Mode
- If enabled, the function suppresses errors related to non-existent columns in the input data or attempts to impute non-numeric columns, skipping those columns instead of failing the process.
- Verbose
- If enabled, detailed logging messages are displayed during the execution, providing information about detected columns, imputation values, and status.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _is_numeric_type(duckdb_type):
"""
Check if a DuckDB type is numeric.
"""
type_lower = duckdb_type.lower()
return any(
(
t in type_lower
for t in [
"tinyint",
"smallint",
"integer",
"bigint",
"int",
"float",
"double",
"real",
"decimal",
"numeric",
]
)
)
def impute_missing_values(
data: Union[DataFrame, ArrowTable],
columns: List = None,
regex_pattern: Str = None,
impute_method: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Impute Missing Values"
options = options or {}
verbose = options.get("verbose", True)
columns = _coalesce(columns, options.get("columns", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
fill_all_columns = len(columns) == 0
impute_method = _coalesce(impute_method, options.get("impute_method", "mean"))
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
if impute_method not in ["mean", "median", "mode"]:
verbose and logger.error(
f"[{brick_display_name}] Invalid imputation method: '{impute_method}'. Must be 'mean', 'median', or 'mode'."
)
raise ValueError(
f"Invalid imputation method: '{impute_method}'. Must be 'mean', 'median', or 'mode'."
)
try:
impute_mode = None
if regex_pattern:
impute_mode = "regex_pattern"
elif fill_all_columns:
impute_mode = "all_columns"
else:
impute_mode = "column_list"
verbose and logger.info(
f"[{brick_display_name}] Detected mode: '{impute_mode}'. Starting imputation with method: '{impute_method}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
columns_to_impute = []
if impute_mode == "column_list":
if not safe_mode:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
raise ValueError(f"Columns not found in data: {missing_columns}")
columns_to_impute = [col for col in columns if col in all_columns]
skipped = len(columns) - len(columns_to_impute)
if safe_mode and skipped > 0:
skipped_cols = [col for col in columns if col not in all_columns]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
)
verbose and logger.info(
f"[{brick_display_name}] Imputing {len(columns_to_impute)} column(s): {columns_to_impute}."
)
elif impute_mode == "regex_pattern":
try:
pattern = re.compile(regex_pattern)
columns_to_impute = [
col for col in all_columns.keys() if pattern.search(col)
]
if not columns_to_impute:
verbose and logger.warning(
f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
)
verbose and logger.info(
f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_impute)} columns: {columns_to_impute}."
)
except re.error as e:
verbose and logger.error(
f"[{brick_display_name}] Invalid regex pattern."
)
raise ValueError(f"Invalid regex pattern!")
elif impute_mode == "all_columns":
columns_to_impute = [
col for col in all_columns.keys() if _is_numeric_type(all_columns[col])
]
verbose and logger.info(
f"[{brick_display_name}] Imputing all numeric columns: {len(columns_to_impute)} columns."
)
imputation_values = {}
for col in columns_to_impute:
col_type = all_columns[col]
if not _is_numeric_type(col_type):
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipping non-numeric column '{col}' (type: {col_type})."
)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Column '{col}' is not numeric (type: {col_type})."
)
raise ValueError(
f"Column '{col}' is not numeric (type: {col_type}). Cannot impute missing values."
)
sanitized_col = _sanitize_identifier(col)
try:
if impute_method == "mean":
query = f'SELECT AVG("{sanitized_col}") as impute_value FROM input_table'
impute_value = conn.execute(query).fetchone()[0]
elif impute_method == "median":
query = f'SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "{sanitized_col}") as impute_value FROM input_table WHERE "{sanitized_col}" IS NOT NULL'
impute_value = conn.execute(query).fetchone()[0]
elif impute_method == "mode":
query = f'\n SELECT "{sanitized_col}" as impute_value \n FROM input_table \n WHERE "{sanitized_col}" IS NOT NULL \n GROUP BY "{sanitized_col}" \n ORDER BY COUNT(*) DESC \n LIMIT 1\n '
result_row = conn.execute(query).fetchone()
impute_value = result_row[0] if result_row else None
if impute_value is None:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Column '{col}' has no non-null values. Skipping imputation."
)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Column '{col}' has no non-null values to calculate {impute_method}."
)
raise ValueError(
f"Column '{col}' has no non-null values to calculate {impute_method}."
)
imputation_values[col] = impute_value
verbose and logger.info(
f"[{brick_display_name}] Column '{col}': calculated {impute_method} = {impute_value}."
)
except Exception as e:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Error calculating {impute_method} for column '{col}': {str(e)}"
)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Error calculating {impute_method} for column '{col}'."
)
raise
if len(imputation_values) == 0:
verbose and logger.warning(
f"[{brick_display_name}] No columns were imputed. Returning data unchanged."
)
result = data
else:
select_parts = []
imputed_count = 0
for col in all_columns.keys():
sanitized_col = _sanitize_identifier(col)
if col in imputation_values:
impute_value = imputation_values[col]
select_parts.append(
f'COALESCE("{sanitized_col}", {impute_value}) AS "{sanitized_col}"'
)
imputed_count += 1
verbose and logger.info(
f"[{brick_display_name}] Column '{col}': imputing NULL values with {impute_value}."
)
else:
select_parts.append(f'"{sanitized_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to impute missing values."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Imputation completed successfully. Imputed {imputed_count} column(s) using {impute_method}."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during imputation operation: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow