Source code for scripts.extraction.load_dictionary

"""Extract study data dictionaries into structured JSONL mappings.

Reads dictionary files from ``data/raw/{STUDY_NAME}/data_dictionary/``
and writes structured JSONL under
``output/{STUDY_NAME}/trio_bundle/dictionary/``.

Supports ``.xlsx`` and ``.csv`` inputs. Detects multiple
logical tables inside Excel sheets, enriches records with provenance
metadata, and exports deterministic JSONL files.

Three-stage pipeline: Discovery → Parsing → Export.

Tables after the "ignore below" marker are saved to an ``extras/``
subdirectory (still as ``.jsonl``).
"""

from __future__ import annotations

__all__ = [
    "discover_dictionary_files",
    "load_study_dictionary",
    "process_csv_file",
    "process_excel_file",
]

import sys
from pathlib import Path
from typing import Any

import pandas as pd
from tqdm import tqdm

import config
from scripts.extraction.io import (
    atomic_write_dataframe_jsonl,
    discover_files,
)
from scripts.extraction.io.file_discovery import SUPPORTED_TABULAR_EXTENSIONS
from scripts.security.secure_env import assert_not_raw
from scripts.utils import logging_system as log

# Constants for table processing
IGNORE_BELOW_MARKER = "ignore below"
EXTRA_TABLES_DIR = "extras"
UNNAMED_COLUMN_PREFIX = "Unnamed"
METADATA_SHEET_KEY = "__sheet__"
METADATA_TABLE_KEY = "__table__"
METADATA_SOURCE_FILE_KEY = "__source_file__"

NAMED_TEMP_PREFIX = config.TEMP_PREFIX_DICT

# Supported dictionary file extensions (deterministic ordering)
SUPPORTED_EXTENSIONS: tuple[str, ...] = SUPPORTED_TABULAR_EXTENSIONS


def _deduplicate_columns(columns: Any) -> list[str]:
    """Make column names unique by appending numeric suffixes to duplicates.

    This function handles duplicate column names (common in Excel with merged cells
    or unnamed columns) by appending _1, _2, etc. to subsequent occurrences. It also
    converts NaN/null values to "Unnamed" prefix.

    Args:
        columns: Iterable of column names (can include None/NaN values).

    Returns:
        List of unique column names with numeric suffixes for duplicates.

    Example:
        >>> cols = ['Name', 'Name', 'Age', None, 'Name']
        >>> _deduplicate_columns(cols)
        ['Name', 'Name_1', 'Age', 'Unnamed', 'Name_2']

    Notes:
        - First occurrence keeps original name
        - Subsequent duplicates get _1, _2, ... suffixes
        - None/NaN values become "Unnamed" (or "Unnamed_1", etc.)
    """
    new_cols: list[str] = []
    counts: dict[str, int] = {}
    for col in columns:
        col_str = str(col) if pd.notna(col) else UNNAMED_COLUMN_PREFIX
        if col_str in counts:
            counts[col_str] += 1
            new_cols.append(f"{col_str}_{counts[col_str]}")
        else:
            new_cols.append(col_str)
            counts[col_str] = 0
    return new_cols


def _split_sheet_into_tables(df: pd.DataFrame) -> list[pd.DataFrame] | None:
    """Split DataFrame into multiple tables based on empty row/column boundaries.

    This implements a two-stage boundary detection algorithm:
    1. Find horizontal strips: contiguous row groups separated by fully empty rows
    2. Within each strip, find vertical segments separated by fully empty columns

    Each segment represents a separate table that can be independently processed.
    This handles Excel sheets with multiple tables laid out side-by-side or stacked.

    Args:
        df: Input DataFrame from Excel sheet (may contain multiple tables).

    Returns:
        List of DataFrames, each representing a detected table.
        Empty list ``[]`` if the sheet is genuinely empty.
        ``None`` if a parse error occurred (distinct from an empty sheet).
    """
    try:
        if df.empty:
            log.debug("Received empty DataFrame, returning empty table list")
            return []

        log.debug(f"Analyzing DataFrame with shape {df.shape} for table boundaries")

        empty_rows = df.index[df.isnull().all(axis=1)].tolist()
        row_boundaries: list[int] = [-1, *empty_rows, df.shape[0]]
        horizontal_strips = [
            df.iloc[row_boundaries[i] + 1 : row_boundaries[i + 1]]
            for i in range(len(row_boundaries) - 1)
            if row_boundaries[i] + 1 < row_boundaries[i + 1]
        ]

        log.debug(f"Found {len(horizontal_strips)} horizontal strip(s)")

        all_tables: list[pd.DataFrame] = []
        for strip in horizontal_strips:
            empty_col_indices = [
                i for i, col in enumerate(strip.columns) if strip[col].isnull().all()
            ]
            col_boundaries = [-1, *empty_col_indices, len(strip.columns)]
            for j in range(len(col_boundaries) - 1):
                start_col, end_col = col_boundaries[j] + 1, col_boundaries[j + 1]
                if start_col < end_col:
                    table_df = strip.iloc[:, start_col:end_col].copy()
                    table_df.dropna(how="all", inplace=True)
                    if not table_df.empty:
                        all_tables.append(table_df)

        log.debug(f"Detected {len(all_tables)} table(s) from DataFrame")
        return all_tables

    except (KeyError, IndexError) as e:
        log.error(f"DataFrame structure error during table splitting: {e}")
        log.debug("DataFrame info:", exc_info=True)
        return None  # signals parse error, distinct from empty sheet
    except Exception as e:
        log.error(f"Unexpected error splitting DataFrame into tables: {type(e).__name__}: {e}")
        log.debug("Full error details:", exc_info=True)
        return None  # signals parse error


def _process_and_save_tables(
    all_tables: list[pd.DataFrame],
    sheet_name: str,
    output_dir: Path | str,
    source_file: str = "",
) -> bool:
    """Process detected tables, add metadata, and save as JSONL.

    Tables before the "ignore below" marker go to the sheet directory.
    Tables after the marker go to ``extras/`` — all as ``.jsonl``.
    """
    output_dir = Path(output_dir)
    folder_name = "".join(c for c in sheet_name if c.isalnum() or c in "._- ").strip()
    sheet_dir = output_dir / folder_name

    sheet_dir.mkdir(parents=True, exist_ok=True)

    log.debug(f"Processing {len(all_tables)} tables from sheet '{sheet_name}'")
    ignore_mode = False
    all_ok = True

    for i, table_df in enumerate(all_tables):
        table_df.reset_index(drop=True, inplace=True)

        if len(table_df) == 0:
            log.warning(f"Table {i + 1} from sheet '{sheet_name}' is empty after reset. Skipping.")
            continue

        if not ignore_mode:
            for idx, col in enumerate(table_df.iloc[0]):
                if IGNORE_BELOW_MARKER in str(col).lower().strip():
                    log.info(
                        f"'{IGNORE_BELOW_MARKER}' found in table {i + 1}. "
                        f"Subsequent → '{EXTRA_TABLES_DIR}'."
                    )
                    ignore_mode = True
                    table_df = table_df.drop(table_df.columns[idx], axis=1)
                    break

        table_df.dropna(how="all", axis=1, inplace=True)
        table_df.dropna(how="all", inplace=True)
        if table_df.empty:
            log.debug(f"Table {i + 1} from sheet '{sheet_name}' is empty after cleanup. Skipping.")
            continue

        # Promote first row to column headers
        try:
            table_df.columns = _deduplicate_columns(table_df.iloc[0])
            table_df = table_df.iloc[1:].reset_index(drop=True)
        except IndexError as e:
            log.error(f"Cannot process table {i + 1} from sheet '{sheet_name}': {e}")
            all_ok = False
            continue

        if table_df.empty:
            log.debug(
                f"Table {i + 1} from sheet '{sheet_name}' is empty after header promotion. "
                "Skipping."
            )
            continue

        # Determine output path
        table_suffix = f"_table_{i + 1}" if len(all_tables) > 1 else "_table"

        if ignore_mode:
            extras_dir = sheet_dir / EXTRA_TABLES_DIR
            extras_dir.mkdir(parents=True, exist_ok=True)
            table_name = f"{EXTRA_TABLES_DIR}{table_suffix}"
            metadata_name = f"{folder_name}_{EXTRA_TABLES_DIR}{table_suffix}"
            output_path = extras_dir / f"{table_name}.jsonl"
        else:
            table_name = metadata_name = f"{folder_name}{table_suffix}"
            output_path = sheet_dir / f"{table_name}.jsonl"

        # Single-user CLI: TOCTOU between check and write is accepted.
        # atomic_write_dataframe_jsonl guarantees no partial files on concurrent race.
        if output_path.exists() and output_path.stat().st_size > 0:
            log.warning("File exists. Skipping: %s", output_path)
            continue

        try:
            table_df[METADATA_SHEET_KEY] = sheet_name
            table_df[METADATA_TABLE_KEY] = metadata_name
            if source_file:
                table_df[METADATA_SOURCE_FILE_KEY] = source_file
            atomic_write_dataframe_jsonl(output_path, table_df, prefix=NAMED_TEMP_PREFIX)
            log.info(f"Saved {len(table_df)} rows → '{output_path}'")
        except OSError as e:
            log.error(f"Failed to write table to '{output_path}': {e}")
            all_ok = False
            continue
        except Exception as e:
            log.error(f"Error saving table '{table_name}': {type(e).__name__}: {e}")
            all_ok = False
            continue

    return all_ok


[docs] def process_excel_file( excel_path: Path | str, output_dir: Path | str, preserve_na: bool = True ) -> bool: """Extract all tables from an Excel file and save as JSONL files.""" excel_path = Path(excel_path) output_dir = Path(output_dir) log.info(f"Processing: '{excel_path}'") log.info(f"Output → '{output_dir}'") output_dir.mkdir(parents=True, exist_ok=True) try: if excel_path.suffix.lower() != ".xlsx": raise ValueError(f"Unsupported dictionary Excel format: {excel_path.suffix}") with pd.ExcelFile(excel_path, engine="openpyxl") as xls: log.debug(f"Excel file loaded. Found {len(xls.sheet_names)} sheets: {xls.sheet_names}") success = True for sheet_name in tqdm( xls.sheet_names, desc="Processing sheets", unit="sheet", file=sys.stdout, dynamic_ncols=True, leave=True, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]", ): try: log.debug("--- Sheet: '%s' ---", sheet_name) if preserve_na: sheet_df = xls.parse( sheet_name=sheet_name, header=None, keep_default_na=False, na_values=[""], ) else: sheet_df = xls.parse(sheet_name=sheet_name, header=None) all_tables = _split_sheet_into_tables(sheet_df) if all_tables is None: log.warning("Table-split error in sheet '%s' — skipping", sheet_name) success = False elif not all_tables: log.info("No tables found in '%s'", sheet_name) else: log.info("Found %d table(s) in '%s'", len(all_tables), sheet_name) result = _process_and_save_tables( all_tables, str(sheet_name), output_dir, source_file=excel_path.name, ) if not result: success = False except Exception as e: log.error("Error on sheet '%s': %s", sheet_name, e, exc_info=True) success = False except Exception as e: log.error(f"Error reading Excel file '{excel_path}': {e}") log.debug("Full error details:", exc_info=True) return False if success: log.success("Excel processing complete!") else: log.warning("Excel processing completed with some errors") return success
[docs] def discover_dictionary_files(dictionary_dir: Path | str) -> list[str]: """Discover all supported dictionary files in the given directory. Delegates to :func:`scripts.extraction.io.discover_files` and converts the returned ``Path`` objects to strings for backward compatibility. """ dictionary_dir = Path(dictionary_dir) files = discover_files( dictionary_dir, extensions=SUPPORTED_EXTENSIONS, label="Data dictionary", not_found_label="dictionary", ) found = [str(f) for f in files] log.info(f"Discovered {len(found)} dictionary file(s) in '{dictionary_dir}':") for f in found: log.info(f" • {Path(f).name}") return found
[docs] def process_csv_file( csv_path: Path | str, output_dir: Path | str, preserve_na: bool = True ) -> bool: """Parse a CSV dictionary file and save as JSONL with provenance metadata.""" csv_path = Path(csv_path) output_dir = Path(output_dir) stem = csv_path.stem log.info(f"Processing CSV: '{csv_path}'") log.info(f"Output → '{output_dir}'") output_dir.mkdir(parents=True, exist_ok=True) try: if preserve_na: df = pd.read_csv(csv_path, keep_default_na=False, na_values=[""]) else: df = pd.read_csv(csv_path) except Exception as e: log.error(f"Error reading CSV file '{csv_path}': {e}") log.debug("Full error details:", exc_info=True) return False if df.empty: log.warning(f"CSV file '{csv_path}' is empty (no data rows).") return True df.columns = _deduplicate_columns(df.columns) df.dropna(how="all", inplace=True) if df.empty: log.warning(f"CSV file '{csv_path}' contained only empty rows after cleanup.") return True sheet_label = stem table_label = f"{stem}_table" df[METADATA_SHEET_KEY] = sheet_label df[METADATA_TABLE_KEY] = table_label df[METADATA_SOURCE_FILE_KEY] = csv_path.name folder_name = "".join(c for c in stem if c.isalnum() or c in "._- ").strip() sheet_dir = output_dir / folder_name sheet_dir.mkdir(parents=True, exist_ok=True) output_path = sheet_dir / f"{table_label}.jsonl" # Single-user CLI: TOCTOU between check and write is accepted. # atomic_write_dataframe_jsonl guarantees no partial files on concurrent race. if output_path.exists() and output_path.stat().st_size > 0: log.warning("File exists. Skipping: %s", output_path) return True try: atomic_write_dataframe_jsonl(output_path, df, prefix=NAMED_TEMP_PREFIX) log.info(f"Saved {len(df)} rows → '{output_path}'") except Exception as e: log.error(f"Failed to write CSV output to '{output_path}': {e}") return False log.success(f"CSV processing complete for '{csv_path.name}'") return True
[docs] def load_study_dictionary( dictionary_dir: Path | str | None = None, json_output_dir: Path | str | None = None, preserve_na: bool = True, ) -> bool: """Load and process all study data dictionary files to JSONL format. When ``json_output_dir`` is not supplied the dictionary JSONL files are written to ``config.STAGING_DICTIONARY_DIR`` (``tmp/{STUDY}/dictionary/``); a subsequent publish step promotes them into ``trio_bundle/dictionary/``. """ if json_output_dir: assert_not_raw(str(json_output_dir)) if dictionary_dir is None: dictionary_dir = config.DATA_DICTIONARY_DIR output_dir: Path = Path(json_output_dir) if json_output_dir else config.STAGING_DICTIONARY_DIR log.info(f"Dictionary source: '{dictionary_dir}'") log.info(f"Output target: '{output_dir}' (staging)") files = discover_dictionary_files(dictionary_dir) all_ok = True for fpath in files: ext = Path(fpath).suffix.lower() if ext == ".xlsx": ok = process_excel_file( excel_path=fpath, output_dir=output_dir, preserve_na=preserve_na ) elif ext == ".csv": ok = process_csv_file(csv_path=fpath, output_dir=output_dir, preserve_na=preserve_na) else: log.warning(f"Unsupported dictionary format (skipped): {fpath}") continue if not ok: all_ok = False if all_ok: log.success("All dictionary files processed successfully.") else: log.warning("Dictionary processing completed with some errors.") return all_ok
if __name__ == "__main__": log.setup_logging( module_name="scripts.extraction.load_dictionary", log_level="INFO", ) try: success = load_study_dictionary(preserve_na=True) except (FileNotFoundError, ValueError) as e: log.error(str(e)) sys.exit(1) if success: log.success(f"Processing complete for data dictionaries from {config.DATA_DICTIONARY_DIR}") else: log.error(f"Processing failed for data dictionaries from {config.DATA_DICTIONARY_DIR}") sys.exit(1)