"""Query-time PHI gate for the RePORT AI Portal agent boundary.
Uses the shared :mod:`scripts.security.phi_patterns` catalog and the
:mod:`scripts.security.phi_allowlist` clinical-phrase allowlist. The
allowlist suppresses obvious-false-positive warnings on clinical
verbatim like "Treatment Completed" that would otherwise match the
generic name-like heuristic.
Presidio NER is intentionally not wired in — comparative benchmarks
showed precision around 22.7 % on mixed data where the rule catalog +
clinical allowlist reach materially higher precision on the calibrated
Indo-VAP field shapes.
The gate is the **defence-in-depth** layer at the trio-bundle → agent
boundary: every ``@tool`` function in :mod:`scripts.ai_assistant.agent_tools`
runs its return text through :func:`phi_gate_check` before the string
reaches the LLM, so even if the offline scrub missed a token the live
query cannot surface it.
IRB-grade benchmark anchors:
* Pillar 2.4 — every tool return passes through a PHI gate
* Pillar 1.5 — narrative-content leak detection
* Pillar 5.3 — breach-alert emission on blocked responses
"""
from __future__ import annotations
import logging
import re
from collections.abc import Sequence
from dataclasses import dataclass
from scripts.security import phi_allowlist
from scripts.security.phi_patterns import BLOCKING_PATTERNS, WARN_PATTERNS
logger = logging.getLogger(__name__)
__all__ = [
"PHIGateConfigError",
"PHIGateResult",
"phi_gate_check",
]
[docs]
class PHIGateConfigError(ValueError):
"""Raised when the PHI gate is invoked with malformed input."""
[docs]
@dataclass(frozen=True, slots=True)
class PHIGateResult:
"""Outcome of a PHI-gate scan.
``blocked`` is ``True`` when any blocking pattern matched.
``findings`` is a sorted, unique tuple of category tags recorded
across the scan (both blocking and warn-only). Safe to show the
operator — the tags are category names like ``AADHAAR`` /
``EMAIL``, never raw values.
"""
blocked: bool
findings: tuple[str, ...]
def __bool__(self) -> bool:
# Truthy = SAFE to proceed. Mirrors the archive semantics so
# `if phi_gate_check(text): return text` reads intuitively.
return not self.blocked
def _normalize_texts(texts: str | Sequence[str]) -> list[str]:
if isinstance(texts, str):
return [texts]
if not isinstance(texts, Sequence):
raise PHIGateConfigError("texts must be a string or sequence of strings")
out: list[str] = []
for idx, item in enumerate(texts):
if not isinstance(item, str):
raise PHIGateConfigError(f"texts[{idx}] must be a string, got {type(item)}")
out.append(item)
return out
def _scan_regex(
text: str,
blocking: list[tuple[str, re.Pattern[str]]],
warn: list[tuple[str, re.Pattern[str]]],
) -> tuple[list[str], list[str]]:
"""Return ``(blocking_hits, warn_hits)`` labels for this *text*.
Warn-tier per-match tuning: the generic two-capital-word name
heuristic (``PERSON_NAME_GENERIC``) fires on benign bigrams like
"Treatment Completed", "Cohort A", "Violin Plot" that appear
throughout clinical narratives. We keep the warn only when *at
least one* individual match both (a) fails the clinical-phrase
allowlist and (b) looks like a real name under the seeded first/
last-name lexicon. This is still advisory-only — blocking tier is
unaffected.
"""
blocking_hits: list[str] = []
warn_hits: list[str] = []
for label, pat in blocking:
if pat.search(text):
blocking_hits.append(label)
for label, pat in warn:
if label != "PERSON_NAME_GENERIC":
if pat.search(text):
warn_hits.append(label)
continue
for match in pat.finditer(text):
span = match.group(0)
if phi_allowlist.is_clinical_phrase(span):
continue
if phi_allowlist.looks_like_real_name(span):
warn_hits.append(label)
break
return blocking_hits, warn_hits
def _is_clinical_allowlist_hit(text: str) -> bool:
"""Return True when *text* is fully covered by the clinical allowlist.
Short-circuits the warn tier: clinical phrases like "Bacteriologic
relapse" or "patient expired" are not PHI. Blocking tier still fires
— the allowlist does NOT override Aadhaar / PAN / email matches.
"""
return phi_allowlist.is_clinical_phrase(text) or phi_allowlist.is_clinical_free_text(text)
[docs]
def phi_gate_check(
texts: str | Sequence[str],
) -> PHIGateResult:
"""Scan *texts* for PHI. Returns ``blocked=True`` only on high-confidence PHI.
Low-confidence heuristics (bare NUMERIC_ID, DATE_MDY, generic
PERSON_NAME) are recorded in ``findings`` for audit but do not
trigger blocking — they over-fire on legitimate clinical phrases
and would block benign agent responses.
Clinical-phrase allowlist (:mod:`phi_allowlist`) is consulted on
the warn tier only. Blocking tier always wins.
"""
texts_list = _normalize_texts(texts)
all_blocking: list[str] = []
all_findings: list[str] = []
for t in texts_list:
blocking, warnings_hit = _scan_regex(t, BLOCKING_PATTERNS, WARN_PATTERNS)
all_blocking.extend(blocking)
all_findings.extend(blocking)
if warnings_hit and not _is_clinical_allowlist_hit(t):
all_findings.extend(warnings_hit)
unique = tuple(sorted(set(all_findings)))
is_blocked = bool(set(all_blocking))
if unique:
# Best-effort telemetry — redaction filter should already scrub any
# raw values that ride along via args.
logger.warning(
"phi_gate: %s — findings=%s", "BLOCK" if is_blocked else "WARN", list(unique)
)
return PHIGateResult(blocked=is_blocked, findings=unique)