Source code for scripts.extraction.io.file_io

"""Canonical atomic file-write helpers for the RePORT AI Portal pipeline.

Every module that persists JSONL, JSON, or plain-text artifacts should use
these helpers instead of rolling its own write-to-temp-then-rename dance.
The strategy is:

1. Write to a ``NamedTemporaryFile`` in the **same directory** as the final
   output (guaranteeing same-filesystem for the rename).
2. On success, ``Path.replace()`` atomically swaps the temp file into place.
3. On failure, the temp file is cleaned up in a ``finally`` block.

This eliminates the risk of half-written files after crashes and avoids the
race condition inherent in using a predictable ``.tmp`` suffix.

Exported helpers
~~~~~~~~~~~~~~~~
- ``atomic_write_jsonl``  — write ``list[dict]`` as JSONL lines.
- ``atomic_write_json``   — write a single ``dict`` as pretty-printed JSON.
- ``atomic_write_dataframe_jsonl`` — write a ``pandas.DataFrame`` via
  ``DataFrame.to_json(orient="records", lines=True)``.
"""

from __future__ import annotations

import json
import os
import tempfile
from collections.abc import Iterable
from pathlib import Path
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    import pandas as pd

__all__ = [
    "ATOMIC_WRITE_SUFFIX",
    "FILE_ENCODING",
    "JSONL_EXT",
    "NAMED_TEMP_PREFIX",
    "atomic_write_dataframe_jsonl",
    "atomic_write_json",
    "atomic_write_jsonl",
]

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

ATOMIC_WRITE_SUFFIX: str = ".tmp"
"""Temporary suffix used during atomic writes before final replace."""

FILE_ENCODING: str = "utf-8"
"""Default text encoding for all file operations."""

JSONL_EXT: str = ".jsonl"
"""Canonical JSONL file extension."""

NAMED_TEMP_PREFIX: str = "report_ai_portal_"
"""Default prefix for NamedTemporaryFile instances."""


# ---------------------------------------------------------------------------
# Atomic write: JSONL records
# ---------------------------------------------------------------------------


[docs] def atomic_write_jsonl( output_path: Path | str, records: Iterable[dict[str, Any]], *, ensure_ascii: bool = False, sort_keys: bool = False, default: Any = None, prefix: str = NAMED_TEMP_PREFIX, ) -> None: """Write an iterable of dicts as JSONL atomically. Args: output_path: Final destination path. records: Iterable of JSON-serializable dicts, one per line. ensure_ascii: Passed to ``json.dumps``. sort_keys: Passed to ``json.dumps``. default: Fallback serializer passed to ``json.dumps``. prefix: Prefix for the temporary file name. """ out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) tmp_path: Path | None = None try: with tempfile.NamedTemporaryFile( mode="w", encoding=FILE_ENCODING, dir=out.parent, prefix=prefix, suffix=ATOMIC_WRITE_SUFFIX, delete=False, ) as fh: tmp_path = Path(fh.name) for record in records: fh.write( json.dumps( record, ensure_ascii=ensure_ascii, sort_keys=sort_keys, default=default, ) + "\n" ) fh.flush() os.fsync(fh.fileno()) # durability: flush kernel buffers before rename tmp_path.replace(out) finally: if tmp_path is not None and tmp_path.exists(): tmp_path.unlink(missing_ok=True)
# --------------------------------------------------------------------------- # Atomic write: single JSON document # ---------------------------------------------------------------------------
[docs] def atomic_write_json( output_path: Path | str, payload: Any, *, ensure_ascii: bool = False, indent: int = 2, prefix: str = NAMED_TEMP_PREFIX, ) -> None: """Write a single JSON-serializable value atomically. Args: output_path: Final destination path. payload: JSON-serializable value (dict, list, or scalar). ensure_ascii: Passed to ``json.dump``. indent: Indentation level for pretty-printing. prefix: Prefix for the temporary file name. """ out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) tmp_path: Path | None = None try: with tempfile.NamedTemporaryFile( mode="w", encoding=FILE_ENCODING, dir=out.parent, prefix=prefix, suffix=ATOMIC_WRITE_SUFFIX, delete=False, ) as fh: tmp_path = Path(fh.name) json.dump(payload, fh, ensure_ascii=ensure_ascii, indent=indent) fh.write("\n") fh.flush() os.fsync(fh.fileno()) # durability: flush kernel buffers before rename tmp_path.replace(out) finally: if tmp_path is not None and tmp_path.exists(): tmp_path.unlink(missing_ok=True)
# --------------------------------------------------------------------------- # Atomic write: pandas DataFrame → JSONL # ---------------------------------------------------------------------------
[docs] def atomic_write_dataframe_jsonl( output_path: Path | str, df: pd.DataFrame, *, prefix: str = NAMED_TEMP_PREFIX, ) -> None: """Write a ``pandas.DataFrame`` to JSONL atomically. Uses ``DataFrame.to_json(orient="records", lines=True)`` for serialization. Import of ``pandas`` is deferred so modules that don't use DataFrames avoid the import cost. Args: output_path: Final destination path. df: DataFrame to serialize. prefix: Prefix for the temporary file name. """ out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) tmp_path: Path | None = None try: with tempfile.NamedTemporaryFile( mode="w", encoding=FILE_ENCODING, dir=out.parent, prefix=prefix, suffix=ATOMIC_WRITE_SUFFIX, delete=False, ) as fh: tmp_path = Path(fh.name) df.to_json(fh, orient="records", lines=True, force_ascii=False) fh.flush() os.fsync(fh.fileno()) # durability: flush kernel buffers before rename tmp_path.replace(out) finally: if tmp_path is not None and tmp_path.exists(): tmp_path.unlink(missing_ok=True)