Source code for scripts.utils.log_hygiene

"""PHI-redacting log filter for the RePORT AI Portal pipeline.

Before the PHI scrub runs (Step 1.6), the pipeline processes raw subject
data — raw SUBJIDs, raw dates, raw narrative strings. If any of that
content is logged at INFO / DEBUG during extraction or orchestration, it
lands in ``.logs/*.log`` and becomes a PHI side-channel the scrub does
not touch.

This module installs a ``logging.Filter`` that redacts likely-PHI
substrings from every log record before the handler emits. Specifically:

* **Subject IDs** — any literal substring matching the configured
  ``subject_id_fields`` regex catalogue is replaced with a stable HMAC
  tag ``<SUBJ_{HMAC[:8]}>``. Same-subject redaction is deterministic
  across a run (the HMAC key is loaded once at filter install time).
* **Common PHI regex classes** — Aadhaar, PAN, Indian phone, email,
  SSN, ISO/M-D-Y dates, Indian PIN-code patterns are replaced with a
  category tag like ``<AADHAAR>`` or ``<EMAIL>``.

Design constraints:

* **No raw values in filter memory** — the filter stores only compiled
  regex + the PHI HMAC key; never a raw value.
* **Fast path for clean messages** — the filter short-circuits if the
  message contains none of the pre-compiled triggers, so the common
  case pays one substring search per record.
* **Fail-closed per record** — on any exception during redaction, the filter
  replaces the message with a fixed redaction-failure notice. Logs remain
  useful for operations without passing raw PHI through.

IRB-grade benchmark anchors:
    * ICMR 2017 §11.5 audit + confidentiality
    * HIPAA §164.312(b) audit controls
    * NIST SP 800-188 §6.4 on side-channel closure
"""

from __future__ import annotations

import hashlib
import hmac
import logging
import re

from scripts.security.phi_patterns import BLOCKING_PATTERNS, WARN_PATTERNS

__all__ = [
    "PHIRedactingFilter",
    "attach_to_logger",
    "install_phi_redactor",
]

# The log redactor shares its regex catalog with the agent-boundary PHI gate
# so the two surfaces can never drift. We re-use both BLOCKING_PATTERNS and
# WARN_PATTERNS verbatim — logs lean toward over-redaction (legibility cost
# vs. PHI-leak cost), so low-confidence heuristics like DATE_MDY and generic
# PERSON_NAME are redacted in logs even though the agent gate only blocks
# on the high-confidence BLOCKING tier. A diverging per-module list is
# actively dangerous — new PHI classes added to phi_patterns would silently
# not be redacted in logs.
API_KEY_PATTERNS: list[tuple[str, re.Pattern[str]]] = [
    # Anthropic keys: ``sk-ant-api03-…`` ~108 chars total. Require the
    # ``api`` segment + a long body so ``sk-ant-foo`` shorthand in docs
    # is not redacted.
    ("ANTHROPIC_KEY", re.compile(r"sk-ant-[A-Za-z]+\d*-[A-Za-z0-9_\-]{20,}")),
    # OpenAI keys: ``sk-…`` ≥40 chars body, optionally with ``proj-`` prefix.
    ("OPENAI_KEY", re.compile(r"sk-(?:proj-)?[A-Za-z0-9]{40,}")),
    # NVIDIA NGC keys.
    ("NVIDIA_KEY", re.compile(r"nvapi-[A-Za-z0-9_\-]{30,}")),
    # Google API keys (Gemini, GCP). Always start with ``AIza`` + 35 chars.
    ("GOOGLE_KEY", re.compile(r"AIza[A-Za-z0-9_\-]{35}")),
]
"""LLM provider API-key patterns. After PR #3 the keystore keeps keys out
of ``os.environ`` entirely, so keys never reach the logger via env-var
dump. But defense in depth: if a key ever lands in a log message —
through a stack trace, a tool call, or a copy-paste — these patterns
scrub it before the message is written to ``.logs/``.

Each pattern requires the full provider-specific length so short
references like ``sk-flag`` or doc literals do NOT false-positive.
"""


_GENERIC_PATTERNS: list[tuple[str, re.Pattern[str]]] = [
    *API_KEY_PATTERNS,
    *BLOCKING_PATTERNS,
    *WARN_PATTERNS,
]
"""Redaction catalog — :data:`API_KEY_PATTERNS` first (so a key embedded
inside a longer string is caught before any PHI heuristic might claim
part of it), then :data:`phi_patterns.BLOCKING_PATTERNS` + WARN_PATTERNS.

Applied IN ORDER to every log message by :class:`PHIRedactingFilter`. Each
match is replaced with ``<CATEGORY>`` (e.g. ``<EMAIL>``, ``<ANTHROPIC_KEY>``).
Intentionally conservative — false positives here cost legibility only;
false negatives cost IRB compliance OR API-key disclosure.
"""


[docs] class PHIRedactingFilter(logging.Filter): """Log filter that redacts PHI substrings before the handler emits. Installed on the root logger by :func:`install_phi_redactor`, so every named logger inherits redaction. Two redaction passes: 1. **Subject-ID pass** — a caller-supplied list of ``subject_id_fields`` regex patterns is matched against the message. Each match is replaced with ``<SUBJ_{HMAC-SHA256[:8]}>`` — deterministic per subject within a run, unrecoverable across the filter instance. 2. **Generic pass** — :data:`_GENERIC_PATTERNS` catches the common PHI classes (Aadhaar, PAN, email, phone, date, pincode, SSN). """ def __init__( self, *, hmac_key: bytes, subject_id_patterns: list[re.Pattern[str]] | None = None, generic_patterns: list[tuple[str, re.Pattern[str]]] | None = None, ) -> None: super().__init__() self._hmac_key = hmac_key self._subject_id_patterns = subject_id_patterns or [] self._generic_patterns = generic_patterns or _GENERIC_PATTERNS def _redact_subject_match(self, match: re.Match[str]) -> str: raw = match.group(0) tag = hmac.new(self._hmac_key, raw.encode("utf-8"), hashlib.sha256).hexdigest()[:8] return f"<SUBJ_{tag}>" def _redact_text(self, text: str) -> str: # Subject-ID pass first (most specific, per-instance HMAC). for pattern in self._subject_id_patterns: text = pattern.sub(self._redact_subject_match, text) # Generic PHI pass. for label, pattern in self._generic_patterns: text = pattern.sub(f"<{label}>", text) return text
[docs] def filter(self, record: logging.LogRecord) -> bool: try: # The msg may contain format-string placeholders that are not # interpolated until the handler formats the record. Redact the # fully-interpolated form and stash it back as a plain string. # If interpolation fails (bad args), fall back to the raw msg. try: rendered = record.getMessage() except (TypeError, ValueError): rendered = str(record.msg) redacted = self._redact_text(rendered) # Replace msg + clear args so downstream formatters see the # already-interpolated string. record.msg = redacted record.args = None except Exception: record.msg = "[PHI LOG REDACTION FAILURE - message suppressed]" record.args = None return True
[docs] def install_phi_redactor( *, hmac_key: bytes, subject_id_patterns: list[re.Pattern[str]] | None = None, ) -> PHIRedactingFilter: """Attach :class:`PHIRedactingFilter` to the root logger and return it. Idempotent: if the root logger already has a ``PHIRedactingFilter`` installed, the existing filter is returned and no duplicate is added. Callers must supply an ``hmac_key`` — typically the same 32-byte key used by :mod:`scripts.security.phi_scrub` so log redaction and on-disk pseudonyms are joinable by operators with key access. """ root = logging.getLogger() for existing in root.filters: if isinstance(existing, PHIRedactingFilter): return existing flt = PHIRedactingFilter( hmac_key=hmac_key, subject_id_patterns=subject_id_patterns, ) root.addFilter(flt) return flt
[docs] def attach_to_logger(logger: logging.Logger, filter_instance: PHIRedactingFilter) -> None: """Attach *filter_instance* to a specific named *logger* (belt-and-braces). ``logging.Filter`` is evaluated by the handler on the logger where it is attached, not inherited by child loggers. For defence-in-depth, callers can also attach the filter to each leg logger explicitly. """ logger.addFilter(filter_instance)
def _redact(text: str, filter_instance: PHIRedactingFilter) -> str: """Expose the underlying redaction for test + utility use only. Intentionally module-private (single-underscore) — production code should go through :func:`install_phi_redactor`. This entry point exists so unit tests can assert redaction output without having to install a real logging pipeline. """ return filter_instance._redact_text(text)