Read Data File

Read data from files supporting multiple standard formats (parquet, json, csv, excel).

Read Data File

Processing

Reads structured data from a specified file path. The function supports reading data in multiple standard formats including Parquet, CSV, JSON, and Excel. If the file format is set to 'auto', it attempts to detect the format based on the file extension. The resulting dataset can be returned as a Pandas DataFrame, Polars DataFrame, or PyArrow Table, selectable via the Output Format option.

Inputs

file path
The full path to the data file that needs to be read.
file format (optional)
Specifies the expected format of the input file (e.g., 'csv', 'parquet', 'excel', 'json'). If omitted, it defaults to 'auto' detection based on file extension.
sheet name (optional)
Applicable only for Excel files. Specifies the name of the sheet to read within the workbook.
excel range (optional)
Applicable only for Excel files. Defines a specific cell range (e.g., A1:B10) from which to load data.
stop at empty (optional)
Applicable only for Excel files. If enabled, reading stops upon encountering the first empty row outside a specified range.
all varchar (optional)
Applicable only for Excel files. Forces all columns to be read as text (VARCHAR) to manage mixed data types effectively.
csv delimiter (optional)
Applicable only for CSV files. Specifies the character used to delimit fields.
csv header (optional)
Applicable only for CSV files. Boolean indicating whether the file includes a header row.
json lines (optional)
Applicable only for JSON files. If true, the file is interpreted using the JSON Lines (newline-delimited JSON) format.

Inputs Types

Input Types
file path Str, Path
file format Str
sheet name Str
excel range Str
stop at empty Bool
all varchar Bool
csv delimiter Str
csv header Bool
json lines Bool

You can check the list of supported types here: Available Type Hints.

Outputs

data
The loaded dataset, structured according to the selected Output Format option (Pandas DataFrame, Polars DataFrame, or PyArrow Table).

Outputs Types

Output Types
data DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Read Data File brick contains some changeable options:

File Format
Specifies the type of file being read. Options are 'auto' (default), 'parquet', 'csv', 'json', and 'excel'.
Excel Sheet Name
For Excel files, the name of the sheet to load. Defaults to the first sheet if empty.
Excel Range (e.g., A1:B10)
For Excel files, specifies a cell range to limit the data loaded.
Stop at Empty Row
For Excel files, controls whether the reader should stop upon encountering an empty row (default: True).
All Excel Columns as Text
If enabled, forces all columns in the Excel file to be read as text (VARCHAR), useful for consistency and handling mixed data types (default: False).
CSV Delimiter
The character used to separate values in CSV files (default: ,).
CSV Has Header
Specifies whether the CSV file includes a header row (default: True).
JSON Lines Format
If enabled, treats the input file as newline-delimited JSON (JSONL) (default: False).
Output Format
Selects the desired output structure for the loaded data: 'pandas' (default), 'polars', or 'arrow' (PyArrow Table).
Verbose
Enables detailed logging messages during execution (default: True).
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, Path, Str, Bool, DataFrame, ArrowTable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _detect_file_format(file_path):
    """
    Detect file format based on file extension.
    """
    path_obj = Path(file_path)
    suffix = path_obj.suffix.lower()
    format_map = {
        ".parquet": "parquet",
        ".pq": "parquet",
        ".csv": "csv",
        ".txt": "csv",
        ".json": "json",
        ".jsonl": "json",
        ".xlsx": "excel",
        ".xls": "excel",
    }
    return format_map.get(suffix, None)


def read_file(
    file_path: Union[Str, Path],
    file_format: Str = None,
    sheet_name: Str = None,
    excel_range: Str = None,
    stop_at_empty: Bool = None,
    all_varchar: Bool = None,
    csv_delimiter: Str = None,
    csv_header: Bool = None,
    json_lines: Bool = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Read File"
    options = options or {}
    verbose = options.get("verbose", True)
    file_format = _coalesce(file_format, options.get("file_format", "auto"))
    sheet_name = _coalesce(sheet_name, options.get("sheet_name", ""))
    excel_range = _coalesce(excel_range, options.get("excel_range", ""))
    stop_at_empty = _coalesce(stop_at_empty, options.get("stop_at_empty", True))
    all_varchar = _coalesce(all_varchar, options.get("all_varchar", False))
    csv_delimiter = _coalesce(csv_delimiter, options.get("csv_delimiter", ","))
    csv_header = _coalesce(csv_header, options.get("csv_header", True))
    json_lines = _coalesce(json_lines, options.get("json_lines", False))
    output_format = options.get("output_format", "pandas")
    data = None
    conn = None
    try:
        path_obj = Path(file_path)
        verbose and logger.info(
            f"[{brick_display_name}] Reading file from path: '{path_obj}'."
        )
        if not path_obj.exists():
            verbose and logger.error(
                f"[{brick_display_name}] File does not exist: '{path_obj}'."
            )
            raise FileNotFoundError(f"File does not exist: '{path_obj}'.")
        if not path_obj.is_file():
            verbose and logger.error(
                f"[{brick_display_name}] Path is not a file: '{path_obj}'."
            )
            raise ValueError(f"Path is not a file: '{path_obj}'.")
        detected_format = file_format
        if file_format == "auto":
            detected_format = _detect_file_format(path_obj)
            if detected_format is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Could not auto-detect file format for: '{path_obj}'."
                )
                raise ValueError(
                    f"Could not auto-detect file format. Please specify format explicitly."
                )
            verbose and logger.info(
                f"[{brick_display_name}] Auto-detected file format: '{detected_format}'."
            )
        else:
            verbose and logger.info(
                f"[{brick_display_name}] Using specified file format: '{detected_format}'."
            )
        valid_formats = ["parquet", "csv", "json", "excel"]
        if detected_format not in valid_formats:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported file format: '{detected_format}'."
            )
            raise ValueError(
                f"Unsupported file format: '{detected_format}'. Supported formats: {valid_formats}."
            )
        conn = duckdb.connect(":memory:")
        if detected_format == "parquet":
            verbose and logger.info(f"[{brick_display_name}] Reading Parquet file.")
            query = f"SELECT * FROM read_parquet('{str(path_obj)}')"
        elif detected_format == "csv":
            verbose and logger.info(
                f"[{brick_display_name}] Reading CSV file with delimiter '{csv_delimiter}' and header={csv_header}."
            )
            header_opt = "true" if csv_header else "false"
            query = f"SELECT * FROM read_csv('{str(path_obj)}', delim='{csv_delimiter}', header={header_opt}, auto_detect=true)"
        elif detected_format == "json":
            if json_lines:
                verbose and logger.info(
                    f"[{brick_display_name}] Reading JSON Lines file."
                )
                query = f"SELECT * FROM read_json_auto('{str(path_obj)}', format='newline_delimited')"
            else:
                verbose and logger.info(f"[{brick_display_name}] Reading JSON file.")
                query = f"SELECT * FROM read_json_auto('{str(path_obj)}')"
        elif detected_format == "excel":
            excel_params = []
            if sheet_name:
                excel_params.append(f"sheet='{sheet_name}'")
                verbose and logger.info(
                    f"[{brick_display_name}] Reading Excel file with sheet: '{sheet_name}'."
                )
            else:
                verbose and logger.info(
                    f"[{brick_display_name}] Reading Excel file (first sheet)."
                )
            if excel_range:
                excel_params.append(f"range='{excel_range}'")
                verbose and logger.info(
                    f"[{brick_display_name}] Using Excel range: '{excel_range}'."
                )
            if excel_range or not stop_at_empty:
                stop_at_empty_str = "true" if stop_at_empty else "false"
                excel_params.append(f"stop_at_empty={stop_at_empty_str}")
                verbose and logger.info(
                    f"[{brick_display_name}] Stop at empty row: {stop_at_empty}."
                )
            if all_varchar:
                excel_params.append("all_varchar=true")
                verbose and logger.info(
                    f"[{brick_display_name}] Reading all columns as text (VARCHAR) to handle mixed types."
                )
            if excel_params:
                params_str = ", " + ", ".join(excel_params)
                query = f"SELECT * FROM read_xlsx('{str(path_obj)}'{params_str})"
            else:
                query = f"SELECT * FROM read_xlsx('{str(path_obj)}')"
        verbose and logger.info(f"[{brick_display_name}] Executing query to load data.")
        if output_format == "pandas":
            data = conn.execute(query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted data to pandas DataFrame with {len(data)} rows and {len(data.columns)} columns."
            )
        elif output_format == "polars":
            data = conn.execute(query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted data to Polars DataFrame with {len(data)} rows and {len(data.columns)} columns."
            )
        elif output_format == "arrow":
            data = conn.execute(query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted data to Arrow Table with {data.num_rows} rows and {data.num_columns} columns."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: '{output_format}'."
            )
            raise ValueError(f"Unsupported output format: '{output_format}'.")
        verbose and logger.info(
            f"[{brick_display_name}] File read successfully from '{path_obj}'."
        )
    except Exception as e:
        verbose and logger.error(f"[{brick_display_name}] Error reading file: {str(e)}")
        raise
    finally:
        if conn is not None:
            conn.close()
    return data

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow