"""Zone-enforcement helpers for the RePORT AI Portal runtime.
Defines the path-assertion helpers that keep raw datasets, staging, and
clean published output from bleeding into one another. The four-tier
architecture (RED / AMBER / GREEN / GREEN-PROTECT) in the developer-
guide PHI-architecture page is implemented in code as the zone guards
here.
"""
from __future__ import annotations
import os
from collections.abc import Sequence
from pathlib import Path
__all__ = [
"ZoneViolationError",
"assert_clean_zone",
"assert_not_raw",
"assert_output_not_in_data",
"assert_output_zone",
"assert_trio_bundle_zone",
"assert_write_zone",
"validate_paths",
]
# ---------------------------------------------------------------------------
# Markers — resolved at import to avoid repeated config reads
# ---------------------------------------------------------------------------
# The clean zone is study-scoped: output/{STUDY}/trio_bundle/.
# CLEAN_MARKER points to the study-scoped trio_bundle dir for the active study.
def _resolve_markers() -> tuple[str, str, str, str, str, str]:
"""Resolve zone marker paths from config, with fallback for isolated testing."""
try:
import config as _cfg
return (
os.path.realpath(_cfg.RAW_DATA_DIR),
os.path.realpath(_cfg.DATA_DIR),
os.path.realpath(_cfg.TRIO_BUNDLE_DIR),
os.path.realpath(_cfg.OUTPUT_DIR),
os.path.realpath(_cfg.TMP_DIR),
os.path.realpath(_cfg.TRIO_BUNDLE_DIR),
)
except ImportError:
project = str(Path(__file__).resolve().parents[2])
return (
os.path.join(project, "data", "raw"),
os.path.join(project, "data"),
os.path.join(project, "output"), # conservative fallback
os.path.join(project, "output"),
os.path.join(project, "tmp"),
os.path.join(project, "output"), # conservative fallback
)
(
_RAW_MARKER,
_DATA_MARKER,
_CLEAN_MARKER,
_OUTPUT_MARKER,
_TMP_MARKER,
_TRIO_BUNDLE_MARKER,
) = _resolve_markers()
def _is_within(path: str | Path, base: str | Path) -> bool:
"""Return True when *path* is the same as or contained within *base*."""
resolved_path = _resolve(path)
resolved_base = _resolve(base)
try:
return os.path.commonpath([resolved_path, resolved_base]) == resolved_base
except ValueError:
return False
[docs]
class ZoneViolationError(PermissionError):
"""Raised when code attempts to access a forbidden data zone."""
def _resolve(p: str | Path) -> str:
return os.path.realpath(str(p))
[docs]
def assert_not_raw(path: str | Path) -> None:
"""Hard-fail if *path* resides under data/raw/.
Raises:
ZoneViolationError: path is inside the raw vault.
"""
resolved = _resolve(path)
if _is_within(resolved, _RAW_MARKER):
raise ZoneViolationError(
f"Access to raw data zone is forbidden at this pipeline stage: {path}"
)
[docs]
def assert_clean_zone(path: str | Path) -> None:
"""Hard-fail if *path* does NOT reside under output/{STUDY}/clean/.
Raises:
ZoneViolationError: path is outside the clean zone.
"""
resolved = _resolve(path)
if not _is_within(resolved, _CLEAN_MARKER):
raise ZoneViolationError(f"Only clean-zone paths are allowed here. Got: {path}")
[docs]
def assert_output_not_in_data(path: str | Path) -> None:
"""Hard-fail if *path* is under data/ — processed output must go to output/.
The data/ directory is reserved exclusively for raw study data (data/raw/).
All processed artifacts (clean JSONL, indexes, session data, etc.)
must be written under output/.
Raises:
ZoneViolationError: path is inside the data directory.
"""
resolved = _resolve(path)
if _is_within(resolved, _DATA_MARKER):
raise ZoneViolationError(
f"Writing processed output into data/ is forbidden. "
f"All output must go under output/. Got: {path}"
)
[docs]
def assert_output_zone(path: str | Path) -> None:
"""Hard-fail if *path* is not under output/, or is in raw.
Used for chunking inputs that may span multiple output sub-trees
(clean JSONL, data dictionary mappings, etc.) but must never touch
raw data.
Raises:
ZoneViolationError: path is outside output/ or in a forbidden sub-zone.
"""
resolved = _resolve(path)
if not _is_within(resolved, _OUTPUT_MARKER):
raise ZoneViolationError(f"Only paths under output/ are allowed here. Got: {path}")
if _is_within(resolved, _RAW_MARKER):
raise ZoneViolationError(
f"Access to raw data zone is forbidden at this pipeline stage: {path}"
)
[docs]
def assert_trio_bundle_zone(path: str | Path) -> None:
"""Hard-fail if *path* is not under ``output/{STUDY}/trio_bundle/``.
Pipeline-side directory-level early-reject used at agent tool call sites
that glob study data from the trio bundle (variables.json,
datasets/*.jsonl, pdfs/*.json). It is narrower than
:func:`assert_output_zone` (which also accepts ``audit/``, ``agent/``,
etc.) but broader than the agent-runtime zone: the LLM agent's actual
read surface is ``trio_bundle/`` plus ``agent/``, enforced per path by
:func:`scripts.ai_assistant.file_access.validate_agent_read`. This
helper remains as a directory-level pre-flight before glob iteration.
Raises:
ZoneViolationError: path is outside ``output/{STUDY}/trio_bundle/``.
"""
resolved = _resolve(path)
if not _is_within(resolved, _TRIO_BUNDLE_MARKER):
raise ZoneViolationError(
f"Only paths under output/{{STUDY}}/trio_bundle/ are allowed here. Got: {path}"
)
if _is_within(resolved, _RAW_MARKER):
raise ZoneViolationError(
f"Access to raw data zone is forbidden at this pipeline stage: {path}"
)
[docs]
def assert_write_zone(path: str | Path) -> None:
"""Hard-fail if *path* is not under output/ or tmp/, or is in raw.
Accepts paths under either the durable output zone (``output/``) or the
transient staging zone (``tmp/``). Both are safe write destinations for
extraction legs. Raw data is always rejected.
Use this in place of :func:`assert_output_zone` for call sites that write
to the staging workspace (``tmp/{STUDY}/``) before atomic publish to
``output/{STUDY}/trio_bundle/``. Audit files that must land in durable
storage should continue to use :func:`assert_output_zone`.
Raises:
ZoneViolationError: path is outside both output/ and tmp/, or is in
the raw data zone.
"""
resolved = _resolve(path)
if not (_is_within(resolved, _OUTPUT_MARKER) or _is_within(resolved, _TMP_MARKER)):
raise ZoneViolationError(f"Only paths under output/ or tmp/ are allowed here. Got: {path}")
if _is_within(resolved, _RAW_MARKER):
raise ZoneViolationError(
f"Access to raw data zone is forbidden at this pipeline stage: {path}"
)
[docs]
def validate_paths(
paths: Sequence[str | Path],
*,
deny_raw: bool = True,
require_clean: bool = False,
deny_data_output: bool = False,
) -> None:
"""Batch-validate a sequence of paths against zone policies.
Args:
paths: file or directory paths to check.
deny_raw: reject any path under data/raw/.
require_clean: require every path to be under output/{STUDY}/clean/.
deny_data_output: reject any path under data/ (prevents writing
processed artifacts into the raw data directory).
Note:
``assert_output_zone`` is always called regardless of flag values —
every path must reside under ``output/``.
Raises:
ZoneViolationError: on first violation found.
"""
if isinstance(paths, str | Path):
raise TypeError("paths must be a sequence of path values, not a single path")
for p in paths:
if deny_raw:
assert_not_raw(p)
if require_clean:
assert_clean_zone(p)
if deny_data_output:
assert_output_not_in_data(p)
assert_output_zone(p)