API Reference

This page documents the active public module surface for the current RePORT AI Portal runtime using Sphinx automodule directives.

The options used below are part of Sphinx autodoc’s documented module introspection surface:

  • :members: includes documented members.

  • :undoc-members: also includes members without docstrings.

  • :show-inheritance: shows inheritance details where applicable.

Core Modules

Configuration

Central runtime configuration for RePORT AI Portal.

What. All path constants, environment-variable resolution, study detection, LLM provider inference, staging-directory management, and directory creation in one place.

Why. 138 call sites across the pipeline, agent, UI, and test suite use import config — a single canonical location prevents scattered os.getenv and Path(...) literals throughout the codebase.

How. All values are resolved at import time. STUDY_NAME is determined by the $STUDY_NAME env var or a filesystem scan of data/raw/. LLM provider is inferred from model-name prefix unless overridden by $LLM_PROVIDER. Staging directories are NOT created eagerly; call ensure_directories() after startup.

config.detect_study_name(*, strict=None)[source]
Return type:

str

Parameters:

strict (bool | None)

config.ensure_directories()[source]

Create runtime directories. Sensitive dirs (containing PHI-scrubbed data, agent state, conversations, audit, or logs) are hardened to mode 0o700 after creation so they’re not world-readable under the typical umask 0o022. Dirs that may legitimately need group access (OUTPUT_DIR parent, TMP_DIR is already 0o700 via secure-staging) are left at default mode.

Return type:

None

config.preferred_or_installed_downgrade(model)[source]

Return the sequence of model names to try starting at model.

For qwen3 rungs in QWEN3_DOWNGRADE_LADDER, returns the ladder from the given rung downward. For any other model, returns a one-element list — we only auto-step qwen3 because the three rungs are behaviourally compatible (same family, same tool-use format, same thinking convention).

Return type:

list[str]

Parameters:

model (str)

config.production_mode_enabled()[source]

Return True when production controls should fail closed.

Return type:

bool

config.strict_study_detection_enabled()[source]

Return True when missing auto-detected study data should abort import.

Return type:

bool

config.validate_config()[source]
Return type:

None

config.yaml_get(*keys, default=None)[source]

Retrieve a nested key from the loaded YAML config.

Return type:

Any

Parameters:
>>> yaml_get("app", "log_level", default="INFO")
'INFO'

Main Pipeline

Clinical data processing pipeline for RePORT AI Portal (single-study mode).

This module provides the main entry point for the RePORT AI Portal clinical study data processing pipeline. It orchestrates a multi-step workflow that transforms raw study data from one fixed study under data/raw/{STUDY_NAME}/ into clean, structured JSONL records suitable for analysis.

The system processes one study only. The user provides the LLM; the system provides study-specific AI Assistant, agentic tools, grounding, and deterministic warnings.

The pipeline consists of the following stages (executed in order):

  1. Dictionary Loading (Step 0): Parse and validate the data dictionary Excel file to understand field definitions, types, and constraints. Writes to staging dictionary dir.

  2. Dataset Processing (Step 1+3): Extract tabular study datasets via process_datasets(), landing cleaned JSONL into the staging datasets dir along with a list of per-column drop events.

  3. PDF Preparation (Step 1.5): Copy pre-extracted PDF JSON files from --pdf-source or run automatic extraction from annotated PDFs into the staging PDFs dir. If neither is available, the PDF leg is skipped.

  4. PHI Scrub (Step 1.6): 8-action honest-broker catalog applied to the staging datasets dir — keep → birthdate → drop → cap → generalize → suppress_small_cell → date jitter (SANT) → id pseudonymize (HMAC-SHA256). ~200 Indo-VAP-calibrated rules in scripts/security/phi_scrub.yaml. Runs BEFORE cleanup so the dataset audit never records raw PHI. No-op when the YAML is absent.

  5. Dataset Cleanup (Step 1.7): Remove known junk files and merge structural duplicates against the staging datasets dir; emit the unified dataset audit report under output/{STUDY}/audit/.

  6. Cleanup Propagation (Step 1.8): Compute the propagation drop-set from the dataset audit and prune matching rows/keys from staging dictionary + staging PDFs; emit two more leg audits.

  7. Publish (Step 2): Atomically promote each staging leg into trio_bundle/. Prior trio subtrees are replaced per-leg; cross- filesystem rename falls back to copy-and-remove.

  8. Variables Reference (Step 3): Build trio_bundle/variables.json from the newly-published trio artefacts.

Success only: the staging workspace (tmp/{STUDY}/) is deleted. Failure path: staging is preserved for operator inspection.

Architecture:

The pipeline follows a fail-fast philosophy. Each step is wrapped in error handling via run_step(), which logs progress and exits immediately on failure. Configuration is centralized in config.py, and all operations are logged to both console and .logs/ directory.

Security:
  • Zone guards enforce data/output separation at runtime

  • Output artifacts land under output/{STUDY}/ (see trio_bundle/)

Usage:
Run the complete pipeline:

$ python main.py –pipeline

Skip dictionary loading:

$ python main.py –skip-dictionary

Verbose logging:

$ python main.py –pipeline –verbose

Example

>>> # Basic pipeline execution (requires data setup)
>>> # This is a conceptual example - actual execution requires data files
>>> import sys
>>> sys.argv = ['main.py', '--version']
>>> # Would display: RePORT AI Portal <version>

Notes

  • Requires Python 3.11+ for compatibility with dependencies

  • All data paths are configured in config.py

  • Shell completion available if argcomplete is installed

  • See README.md and Sphinx docs for detailed setup instructions

See also

config.py: Central configuration and path management scripts.extraction.load_dictionary: Data dictionary parsing logic scripts.extraction.dataset_pipeline.process_datasets: Unified dataset pipeline

main.main()[source]

Orchestrate the complete clinical data processing pipeline.

This is the main entry point for the RePORT AI Portal pipeline. It parses command- line arguments, configures logging, validates the environment, and executes the multi-step workflow to process clinical study data from raw Excel files to clean, structured JSONL records.

The function implements a sequential pipeline with optional step skipping:

Step 0 - Dictionary Loading:

Parses the data dictionary Excel file to extract field definitions, data types, validation rules, and metadata. Outputs structured JSON for downstream validation. (Skip with –skip-dictionary)

Step 1+3 - Dataset Processing:

Unified extract → promote → cleanup via process_datasets(). Extracts raw datasets to secure temp workspace, promotes clean JSONL to trio_bundle/datasets/, and removes temp workspace. (Enable with –process-datasets)

Configuration and Validation:
  • All paths, study names, and settings are loaded from config.py

  • Configuration validation runs before any processing starts

  • Required directories are created automatically if missing

  • Logging is configured based on –verbose flag (default: simple mode)

Command-Line Interface:

The function accepts numerous CLI arguments for fine-grained control:

Workflow Control:
--skip-dictionary

Skip Step 0

--process-datasets

Extract and promote datasets (Step 1+3)

--pipeline

Run full pipeline: Extract → Promote → Registry → Index

Logging:
-v, --verbose

Enable DEBUG logging with full output

--version

Show version and exit

Returns:

This function orchestrates the pipeline but does not return a

value. It exits with code 0 on success or code 1 on failure.

Return type:

None

Raises:
  • SystemExit – Always raised on failure (exit code 1). Reasons include: - Configuration validation failure (missing directories) - Any step failure (logged via run_step()) - Uncaught exceptions in argument parsing or setup

  • FileNotFoundError – Caught and converted to SystemExit if required directories are missing (data/raw/<study>/datasets/, etc.)

Example

>>> # Simulate command-line execution (conceptual - requires data setup)
>>> import sys
>>> # Show version
>>> sys.argv = ['main.py', '--version']
>>> # Would display version and exit
>>>
>>> # Run with verbose logging (requires actual data files)
>>> # sys.argv = ['main.py', '--verbose']
>>> # main()  # Would execute full pipeline with DEBUG logging

Notes

  • Default logging: Simple mode (INFO level, minimal console output)

  • Verbose mode (-v): DEBUG level with full context and stack traces

  • All operations are logged to .logs/<LOG_NAME>.log

  • Shell completion available if argcomplete package is installed

See also

run_step: Wrapper for individual pipeline steps with error handling config.validate_config: Validates directory structure and settings scripts.extraction.load_dictionary.load_study_dictionary: Step 0 implementation scripts.extraction.dataset_pipeline.process_datasets: Unified dataset pipeline

main.run_step(step_name, func)[source]

Execute a pipeline step with comprehensive error handling and logging.

This function wraps individual pipeline steps to provide consistent error handling, logging, and exit behavior. It acts as the pipeline’s safety net, ensuring that any failure in a step is caught, logged, and results in a clean exit with a non-zero status code.

The function supports multiple failure modes: - Boolean False return values indicate step failure - Dict results with an ‘errors’ key indicate partial failure - Uncaught exceptions are logged with full stack traces

All steps are logged with clear start/success/failure messages to both console and log files (see config.LOG_NAME for log file location).

Parameters:
  • step_name (str) – Human-readable name of the pipeline step (e.g., “Step 1: Extracting Raw Data to JSONL”). Used in log messages and error reporting.

  • func (Callable[[], Any]) – Zero-argument callable that executes the actual step logic. This should be a lambda or function reference that performs the work and returns a result or raises an exception.

Returns:

The return value from func() if successful. Return type depends

on the specific step being executed (e.g., dict with statistics, bool for success/failure, or None).

Return type:

Any

Raises:

SystemExit – Always raised on failure (exit code 1). This terminates the entire pipeline to prevent cascading errors from invalid data. Reasons for exit: - func() returns False - func() returns a dict with non-empty ‘errors’ list - func() raises any exception

Example

>>> import logging
>>> from scripts.utils import logging_system as log
>>> log.setup_logger(name='test', log_level=logging.INFO, simple_mode=True)
>>> # Successful step
>>> def successful_task():
...     return {'processed': 100, 'errors': []}
>>> result = run_step("Test Task", successful_task)
>>> result['processed']
100
>>> # Failing step (returns False)
>>> def failing_task():
...     return False
>>> try:
...     run_step("Failing Task", failing_task)
... except SystemExit as e:
...     print(f"Exit code: {e.code}")
Exit code: 1

Notes

  • This function uses sys.exit(1) rather than raising exceptions to ensure clean termination visible to shell scripts and CI/CD systems.

  • Stack traces are logged via exc_info=True for debugging.

  • Success messages use log.success() for visual distinction in logs.

See also

main: Orchestrates all pipeline steps using this wrapper config.LOG_NAME: Configures the log file name

Version

Canonical project version metadata for RePORT AI Portal.

This module exposes the single supported source of truth for the repository version string and its parsed tuple form.

Design rules:

  • __version__ is the only canonical version literal.

  • __version_info__ is derived from __version__ and never maintained by hand.

  • Version validation happens at import time and fails fast on invalid values.

  • The accepted format here is the normal Semantic Versioning core form MAJOR.MINOR.PATCH.

Notes:

  • This module intentionally keeps zero external dependencies.

  • Pre-release and build metadata are not accepted by this repository-level version constant.

  • Major version zero indicates initial development under Semantic Versioning.

Extraction Modules

Dictionary Loading

Extract study data dictionaries into structured JSONL mappings.

Reads dictionary files from data/raw/{STUDY_NAME}/data_dictionary/ and writes structured JSONL under output/{STUDY_NAME}/trio_bundle/dictionary/.

Supports .xlsx and .csv inputs. Detects multiple logical tables inside Excel sheets, enriches records with provenance metadata, and exports deterministic JSONL files.

Three-stage pipeline: Discovery → Parsing → Export.

Tables after the “ignore below” marker are saved to an extras/ subdirectory (still as .jsonl).

scripts.extraction.load_dictionary.discover_dictionary_files(dictionary_dir)[source]

Discover all supported dictionary files in the given directory.

Delegates to scripts.extraction.io.discover_files() and converts the returned Path objects to strings for backward compatibility.

Return type:

list[str]

Parameters:

dictionary_dir (Path | str)

scripts.extraction.load_dictionary.load_study_dictionary(dictionary_dir=None, json_output_dir=None, preserve_na=True)[source]

Load and process all study data dictionary files to JSONL format.

When json_output_dir is not supplied the dictionary JSONL files are written to config.STAGING_DICTIONARY_DIR (tmp/{STUDY}/dictionary/); a subsequent publish step promotes them into trio_bundle/dictionary/.

Return type:

bool

Parameters:
  • dictionary_dir (Path | str | None)

  • json_output_dir (Path | str | None)

  • preserve_na (bool)

scripts.extraction.load_dictionary.process_csv_file(csv_path, output_dir, preserve_na=True)[source]

Parse a CSV dictionary file and save as JSONL with provenance metadata.

Return type:

bool

Parameters:
  • csv_path (Path | str)

  • output_dir (Path | str)

  • preserve_na (bool)

scripts.extraction.load_dictionary.process_excel_file(excel_path, output_dir, preserve_na=True)[source]

Extract all tables from an Excel file and save as JSONL files.

Return type:

bool

Parameters:
  • excel_path (Path | str)

  • output_dir (Path | str)

  • preserve_na (bool)

Dataset Extraction

Canonical dataset pipeline for RePORT AI Portal — staged extraction.

This is the single dataset pipeline module for the active single-study, local-first pipeline. It discovers tabular study files under data/raw/{STUDY_NAME}/datasets/, normalises their rows, and writes the resulting JSONL into the study’s staging workspace (tmp/{STUDY_NAME}/datasets/ by default). A subsequent publish step atomically promotes the staging bundle into output/{STUDY}/trio_bundle/datasets/.

Datasets may contain PHI at extraction time. They remain in AMBER staging until scripts.security.phi_scrub runs at Step 1.6; only scrubbed staging artifacts are later published to the trio bundle.

What this module does:
  1. Discover supported dataset files for the active study

  2. Read .xlsx and .csv files

  3. Normalize rows into JSONL-safe records

  4. Write extraction output into the staging datasets directory

  5. Surface per-column drop events from duplicate-column cleanup so a later audit pass can record them.

Supported formats:
  • .xlsx via openpyxl

  • .csv via pandas.read_csv (single-file load; preserves one output file per input)

Discovery rules:
  • Only files directly under data/raw/{STUDY_NAME}/datasets/ are considered.

  • Hidden files, OS junk, and Excel lock files are ignored.

Notes

  • Row iteration uses itertuples() instead of iterrows() to avoid dtype coercion and reduce overhead.

  • JSONL writes are committed atomically via temporary files and Path.replace().

class scripts.extraction.dataset_pipeline.ExtractionResult[source]

Bases: TypedDict

Typed extraction result returned by extract_datasets().

dropped_events: list[dict[str, Any]]
errors: list[dict[str, str]]
files_created: int
files_found: int
output_dir: str
processing_time: float
total_records: int
scripts.extraction.dataset_pipeline.clean_record_for_json(record)[source]

Convert pandas record to JSON-serializable types.

Transforms a DataFrame row (as dict) into a JSON-safe format by converting pandas/numpy types to Python native types.

Parameters:

record (dict[str, Any]) – Dictionary from DataFrame row (typically from row.to_dict()).

Returns:

  • pd.NA, np.nan → None

  • np.inf, -np.inf → None

  • np.integer → int

  • np.floating with no fractional part → int (e.g. 1001.0 → 1001)

  • np.floating with fractional part → float

  • float with no fractional part → int (e.g. 1001.0 → 1001)

  • float with fractional part → float

  • pd.Timestamp, datetime at midnight → date-only str (e.g. “2014-06-23”)

  • pd.Timestamp, datetime with time → ISO 8601 str

  • date → ISO 8601 date str

  • str → stripped of leading/trailing whitespace

  • Other types preserved as-is

Return type:

Dictionary with all values converted to JSON-serializable Python types

Notes

Whole-number floats are converted to int because Excel frequently stores integer IDs (subject IDs, site codes, visit numbers) as floating-point internally, producing values like 1001.0 that should be emitted as 1001.

String values are stripped to remove leading/trailing whitespace introduced by manual data entry, which would otherwise cause silent mismatches in downstream queries and joins.

scripts.extraction.dataset_pipeline.discover_dataset_files(datasets_dir)[source]

Return sorted list of supported dataset files in datasets_dir.

Delegates to scripts.extraction.io.discover_files() with dataset-specific extensions and error labelling.

Parameters:

datasets_dir (str | Path) – Path to data/raw/{STUDY}/datasets/.

Returns:

Sorted list of discovered file paths.

Return type:

list[Path]

Raises:
scripts.extraction.dataset_pipeline.extract_datasets(*, datasets_dir=None, output_dir=None, study_name=None)[source]

Discover and extract all dataset files into AMBER staging.

Output lands in output_dir when supplied, otherwise in config.STAGING_DATASETS_DIR (tmp/{STUDY}/datasets/). The bundle is later published from staging to trio_bundle/ by a separate publish step after the Step 1.6 PHI scrub and cleanup propagation.

Returns:

Extraction summary with keys: files_found, files_created, total_records, errors, processing_time, output_dir, and dropped_events (flat list of per-column drop events emitted by clean_duplicate_columns() across every processed sheet).

Return type:

dict

Parameters:
  • datasets_dir (str | Path | None)

  • output_dir (str | Path | None)

  • study_name (str | None)

scripts.extraction.dataset_pipeline.extract_single_dataset(file_path, output_dir, study_name, extraction_ts)[source]

Extract one dataset file to JSONL directly under output_dir.

Provably-duplicate columns are removed (via clean_duplicate_columns) before the JSONL is written. Output lands directly in output_dir (typically the staging directory for datasets — tmp/{STUDY}/datasets/ by default via extract_datasets()).

Parameters:
  • file_path (Path) – Absolute path to the source dataset file.

  • output_dir (Path) – Directory that will receive the JSONL output.

  • study_name (str) – Active study identifier for provenance.

  • extraction_ts (str) – ISO-8601 timestamp string shared across the batch.

Returns:

(success, record_count, error_message, dropped_events). dropped_events is always a list (possibly empty); it aggregates the per-column drop events reported by clean_duplicate_columns() across every sheet processed from this file.

Return type:

tuple[bool, int, str | None, list[dict[str, Any]]]

scripts.extraction.dataset_pipeline.is_dataframe_empty(df)[source]

Check if DataFrame is completely empty (no rows AND no columns).

Differs from pandas’ df.empty: returns True only if BOTH rows and columns are absent. DataFrames with columns but no rows are NOT empty.

Return type:

bool

Parameters:

df (DataFrame)

scripts.extraction.dataset_pipeline.process_datasets(*, debug=False)[source]

Unified entry point: extract raw datasets into the staging workspace.

This is the single function main.py should call for the dataset leg of extraction. Output lands in config.STAGING_DATASETS_DIR and is later published to trio_bundle/ by a separate publish step.

Parameters:

debug (bool) – No-op, retained for CLI compatibility with earlier versions of the pipeline that used it to preserve a temp workspace.

Returns:

extraction: ExtractionResult dict from extraction step

(includes dropped_events populated from clean_duplicate_columns()).

errors: aggregated list of extraction errors (only present when

extraction reported errors).

Return type:

dict with keys

PDF Extraction

Extract annotated study PDFs into structured variable JSON.

Reads annotated clinical research PDFs from data/raw/{STUDY}/annotated_pdfs/ and produces per-form {stem}_variables.json files in output/{STUDY}/trio_bundle/pdfs/.

This module is extraction-only: it sends PDFs to an LLM, parses the returned JSON, and writes one per-form JSON file.

Supports Anthropic Claude and Google Gemini for PDF vision extraction. Provider and API key are resolved from environment variables (LLM_PROVIDER, ANTHROPIC_API_KEY / GOOGLE_API_KEY, LLM_MODEL).

Output JSON schema (per-form):

{ "form_name", "source_pdf", "version", "summary",
  "variables": { "ABBREV": { "description", "values", "depends_on",
                              "condition", "section_context" } },
  "sections": { "NAME": { "context", "variables": [...] } } }

Usage:

>>> from scripts.extraction.extract_pdf_data import extract_pdfs_to_jsonl
>>> result = extract_pdfs_to_jsonl()

$ python -m scripts.extraction.extract_pdf_data
scripts.extraction.extract_pdf_data.clean_existing_jsons(json_dir=None, *, dry_run=False)[source]

Run dedup + cross-form dedup + validation on existing JSONs in-place.

Operates on the canonical output directory (default config.PDF_EXTRACTIONS_DIR) without re-running LLM extraction. :rtype: dict[str, Any]

Note

The default directory (config.PDF_EXTRACTIONS_DIR = output/{STUDY}/trio_bundle/pdfs/) is the published bundle path, not the staging path. extract_pdfs_to_jsonl() writes freshly extracted files to config.STAGING_PDFS_DIR (tmp/{STUDY}/pdfs/). If you run --clean-only immediately after extraction without passing --output-dir, the default directory will contain no freshly extracted files (they are still in staging). Pass --output-dir <staging_path> explicitly to target the staging directory, or let the pipeline’s publish step promote staging files to the bundle before cleaning.

Parameters:
  • json_dir (Path | None)

  • dry_run (bool)

Return type:

dict[str, Any]

scripts.extraction.extract_pdf_data.extract_pdfs_to_jsonl(pdf_dir=None, output_dir=None, force=False)[source]

Extract all annotated PDFs into structured JSON outputs.

Discovers PDFs and writes per-form structured JSON (_variables.json) files. The actual extraction strategy depends on the _PDF_EXTRACTION_MODE_ENV env var, which the wizard sets per the operator’s choice:

  • llm_run_orchestrator_mode() (text-redacted LLM

    call paired with code path; snapshot fallback per-PDF).

  • snapshot_run_snapshot_mode() (publish verified baseline

    JSONs verbatim; no LLM call).

  • unset — legacy raw-PDF API path

    (_resolve_pdf_provider()-gated). Preserves existing CLI behaviour.

Note

Despite its name (kept for backward compatibility), this function now writes only JSON.

Parameters:
  • pdf_dir (Path | None) – Directory containing annotated PDFs. Defaults to config.ANNOTATED_PDFS_DIR.

  • output_dir (Path | None) – Output directory. Defaults to config.STAGING_PDFS_DIR (tmp/{STUDY}/pdfs/); a subsequent publish step promotes the bundle to trio_bundle/pdfs/.

  • force (bool) – If True, reprocess all files even if output exists.

Returns:

files_found, files_created, files_skipped, variables_extracted, duplicates_cleaned, errors, processing_time.

Return type:

Dict with keys

scripts.extraction.extract_pdf_data.process_single_pdf(pdf_path, output_dir, client, model, *, provider='anthropic', **kw)[source]

Extract one PDF into structured JSON.

Produces {stem}_variables.json.

Parameters:
  • pdf_path (Path) – Path to the annotated PDF file.

  • output_dir (Path) – Output directory.

  • client (Any) – Initialized LLM client (Anthropic or Google).

  • model (str) – Model identifier.

  • provider (str) – "anthropic" or "google".

  • **kw (Any) – Extra provider kwargs (e.g. types for Google).

Return type:

tuple[bool, int, str | None]

Returns:

(success, variable_count, error_message).

Note

This function accepts a pre-built client and bypasses the two-part PHI safety gate that lives in _resolve_pdf_provider(). Callers must pass through _resolve_pdf_provider before invoking this function directly. The only in-tree caller, extract_pdfs_to_jsonl(), always does this; external callers importing process_single_pdf from scripts.extraction must not construct a client themselves and skip the gate.

scripts.extraction.extract_pdf_data.validate_depends_on(json_files)[source]

Check for broken depends_on references across variable JSONs.

Return type:

list[tuple[str, str, str]]

Parameters:

json_files (list[Path])

PDF Orchestrator

Two-way PDF extraction pipeline (Phase 3.F + 3.G + 3.H).

Closes the PDF PHI controls summarized in docs/sphinx/irb_auditor/conformance.rst: no raw PDF bytes on the orchestrator path, LLM responses are re-scrubbed, and LLM calls are cached idempotently.

The pipeline has exactly two acceptable output paths per PDF — either the LLM tier succeeds (paired with the code-extracted text), or we fall back to a human-verified snapshot. The load-study UI step never fails on a single PDF.

Way 1 — LLM + code (merged):

The code path always runs first (pdfplumber → text + heuristic variable candidate). When a capable LLM is configured (per scripts.utils.llm_capabilities.is_capable_model()), the LLM tier runs paired with the code path:

  • The code-extracted text is redacted in place using the existing PHI catalog (phi_patterns.BLOCKING_PATTERNS) so identifiers in form headers become <LABEL> markers before any byte leaves the host. No raw PDF bytes transit the API.

  • The redacted text is sent to the LLM with the schema prompt. The LLM response is parsed and every string field is re-scrubbed through scripts.ai_assistant.phi_safe.guard_text() to catch echoed identifiers.

  • The LLM response is merged with the code-tier candidate: LLM wins on field-level conflicts (it’s more accurate on complex CRFs); the code candidate fills in vars the LLM missed.

Way 2 — Snapshot:

When the LLM tier is unavailable for any reason (no capable model configured, no API key, image-only PDF, LLM call error), the pipeline falls back to a human-verified snapshot at data/snapshots/{STUDY}/pdfs/<form>.json (human-reviewed baseline; LLM-invisible). A code-only result is NEVER an acceptable output — heuristic extraction without LLM oversight is too unreliable to publish, so we’d rather use a verified baseline than ship potentially-wrong metadata into trio_bundle/.

Idempotent caching: the LLM tier keys on SHA-256(pdf_bytes) || provider || model || PHI_SCRUB_CONFIG_HASH so a re-run with the same inputs hits the cache and skips the API call. Cache invalidates on any input change.

Zone discipline (audit finding A3): the pipeline-tier LLM client is constructed fresh in this module and uses the KeyStore for the API key; it does NOT route through the agent’s _build_llm and never sees an environment variable. The HTTP payload contains ONLY the redacted text plus the schema prompt — no file paths, no agent state. The defensive _assert_no_raw_phi_in_payload check fails loud if any pre-redaction string somehow reaches the payload.

class scripts.extraction.pdf_pipeline.ExtractionResult(pdf_name, tier, data, llm_skipped_reason=None, cache_hit=False, code_succeeded=False, llm_succeeded=False, snapshot_used=False, warnings=<factory>)[source]

Bases: object

Outcome of one PDF run through the three-tier pipeline.

tier reports which path produced the surfaced data: "merged" (LLM succeeded, paired with code-extracted text), "llm" (LLM succeeded but code-path text was empty so there was nothing to merge with), "snapshot" (LLM unavailable; fell back to verified baseline), or "empty" (both unavailable and no snapshot — UI will see an empty form).

llm_skipped_reason documents why the LLM tier did not run (capability gate, provider unavailable, etc.) for operator diagnostics; it stays None when the LLM tier did run.

cache_hit is True when the LLM tier was skipped because a valid cached response was found.

Parameters:
cache_hit: bool
code_succeeded: bool
data: dict[str, Any]
llm_skipped_reason: str | None
llm_succeeded: bool
pdf_name: str
snapshot_used: bool
tier: str
warnings: tuple[str, ...]
scripts.extraction.pdf_pipeline.extract_pdf(pdf_path, *, provider=None, model=None, api_key=None, snapshot_dir=None, cache_dir=None)[source]

Run the two-way pipeline for a single PDF.

There are exactly two acceptable output paths: :rtype: ExtractionResult

  1. LLM + code (merged) — when a capable LLM is configured AND the LLM call succeeds, the LLM response is merged with the code-extracted heuristic candidate (LLM wins on field-level conflicts; code fills in vars the LLM missed). The code path contributes both the extracted text used as the LLM input AND a baseline candidate for merge — they are paired.

  2. Snapshot — when the LLM tier is unavailable for any reason (no capable model, no API key, image-only PDF, LLM call error), fall back to a human-verified initial snapshot. Code-only output is never an acceptable result; heuristic-only metadata is too unreliable to publish without LLM oversight, so we’d rather use a verified baseline than ship potentially-wrong extraction.

All keyword args are optional. When provider / model / api_key are all set AND is_capable_model() returns True, the LLM tier runs. Otherwise the LLM tier is skipped with a diagnostic llm_skipped_reason.

snapshot_dir is the directory holding human-verified backup JSONs (typically data/snapshots/{STUDY}/pdfs/). When omitted, the snapshot fallback is unavailable.

cache_dir is the LLM-response cache root (typically tmp/{STUDY}/.pdf_cache/). When omitted, the cache is disabled.

Parameters:
  • pdf_path (Path)

  • provider (str | None)

  • model (str | None)

  • api_key (str | None)

  • snapshot_dir (Path | None)

  • cache_dir (Path | None)

Return type:

ExtractionResult

LLM Capability Gate

LLM capability detection for the PDF-extraction pipeline.

The PDF pipeline runs in three tiers (see docs/sphinx/developer_guide/pdf_pipeline.rst):

  1. Code path — pdfplumber-based, always runs, fast, deterministic.

  2. LLM path — runs ONLY when a “capable” model is configured. Capable means the model can reliably extract structured form metadata from CRF text without hallucinating columns.

  3. Backup snapshot — falls back to a human-verified snapshot baseline when neither path produces valid output.

This module decides tier 2’s eligibility. The default capable set is hardcoded but env-overridable via REPORTALIN_PDF_LLM_CAPABLE_MODELS (comma-separated list of model name prefixes; matches model names by startswith after lowercasing).

Why a hardcoded list + env override (rather than asking the model itself): the LLM can’t reliably self-report its own capabilities, and we don’t want a one-shot completion to incur cost just to find out it shouldn’t have been called. The list is conservative — if your model is excluded but you’ve validated it works, set the env var.

scripts.utils.llm_capabilities.is_capable_model(provider, model)[source]

Return True when (provider, model) is on the LLM-extraction allowlist.

Provider-aware: Ollama is excluded by default regardless of the model name, because local Ollama models historically can’t sustain a JSON-schema response on a 30-page CRF. If you’ve validated a specific local model, override via the env var.

Empty / None inputs return False. Comparison is case-insensitive against the configured prefix list (default or env-overridden).

Return type:

bool

Parameters:
  • provider (str | None)

  • model (str | None)

Security Modules

The scripts.security package groups every module that participates in PHI handling — the four-tier architecture boundaries, the 8-action offline scrubber, the agent-boundary gates, the shared regex catalog, and the clinical-phrase allowlist. See PHI Architecture for the narrative.

Secure Environment (Zone Guard)

Zone-enforcement helpers for the RePORT AI Portal runtime.

Defines the path-assertion helpers that keep raw datasets, staging, and clean published output from bleeding into one another. The four-tier architecture (RED / AMBER / GREEN / GREEN-PROTECT) in the developer- guide PHI-architecture page is implemented in code as the zone guards here.

exception scripts.security.secure_env.ZoneViolationError[source]

Bases: PermissionError

Raised when code attempts to access a forbidden data zone.

scripts.security.secure_env.assert_clean_zone(path)[source]

Hard-fail if path does NOT reside under output/{STUDY}/clean/.

Raises:

ZoneViolationError – path is outside the clean zone.

Return type:

None

Parameters:

path (str | Path)

scripts.security.secure_env.assert_not_raw(path)[source]

Hard-fail if path resides under data/raw/.

Raises:

ZoneViolationError – path is inside the raw vault.

Return type:

None

Parameters:

path (str | Path)

scripts.security.secure_env.assert_output_not_in_data(path)[source]

Hard-fail if path is under data/ — processed output must go to output/.

The data/ directory is reserved exclusively for raw study data (data/raw/). All processed artifacts (clean JSONL, indexes, session data, etc.) must be written under output/.

Raises:

ZoneViolationError – path is inside the data directory.

Return type:

None

Parameters:

path (str | Path)

scripts.security.secure_env.assert_output_zone(path)[source]

Hard-fail if path is not under output/, or is in raw.

Used for chunking inputs that may span multiple output sub-trees (clean JSONL, data dictionary mappings, etc.) but must never touch raw data.

Raises:

ZoneViolationError – path is outside output/ or in a forbidden sub-zone.

Return type:

None

Parameters:

path (str | Path)

scripts.security.secure_env.assert_trio_bundle_zone(path)[source]

Hard-fail if path is not under output/{STUDY}/trio_bundle/.

Pipeline-side directory-level early-reject used at agent tool call sites that glob study data from the trio bundle (variables.json, datasets/.jsonl, pdfs/.json). It is narrower than assert_output_zone() (which also accepts audit/, agent/, etc.) but broader than the agent-runtime zone: the LLM agent’s actual read surface is trio_bundle/ plus agent/, enforced per path by scripts.ai_assistant.file_access.validate_agent_read(). This helper remains as a directory-level pre-flight before glob iteration.

Raises:

ZoneViolationError – path is outside output/{STUDY}/trio_bundle/.

Return type:

None

Parameters:

path (str | Path)

scripts.security.secure_env.assert_write_zone(path)[source]

Hard-fail if path is not under output/ or tmp/, or is in raw.

Accepts paths under either the durable output zone (output/) or the transient staging zone (tmp/). Both are safe write destinations for extraction legs. Raw data is always rejected.

Use this in place of assert_output_zone() for call sites that write to the staging workspace (tmp/{STUDY}/) before atomic publish to output/{STUDY}/trio_bundle/. Audit files that must land in durable storage should continue to use assert_output_zone().

Raises:

ZoneViolationError – path is outside both output/ and tmp/, or is in the raw data zone.

Return type:

None

Parameters:

path (str | Path)

scripts.security.secure_env.validate_paths(paths, *, deny_raw=True, require_clean=False, deny_data_output=False)[source]

Batch-validate a sequence of paths against zone policies.

Parameters:
  • paths (Sequence[str | Path]) – file or directory paths to check.

  • deny_raw (bool) – reject any path under data/raw/.

  • require_clean (bool) – require every path to be under output/{STUDY}/clean/.

  • deny_data_output (bool) – reject any path under data/ (prevents writing processed artifacts into the raw data directory).

Return type:

None

Note

assert_output_zone is always called regardless of flag values — every path must reside under output/.

Raises:

ZoneViolationError – on first violation found.

Parameters:
Return type:

None

PHI Scrubber (Step 1.6)

PHI scrubber — structural-field honest-broker catalog for RePORT AI Portal.

Eight structural-field action classes, evaluated in strict priority order (first match wins per field):

  1. keep (keep_fields) — allowlist; short-circuits every other rule. Used to protect clinical lab / medication / time-of-day / categorical indicators from being swept up by broader patterns.

  2. birthdate (birthdate_field) — posture-dependent:

    • safe_harbor (default) → field dropped entirely per HIPAA §164.514(b)(2)(i)(C) + DPDPA. Age fidelity is lost.

    • limited_dataset → field jittered with the same per-subject offset as other dates (SANT method), preserving age-at-event. Requires an IRB-approved protocol + DUA; the module refuses to run in this mode unless authorities/phi_limited_dataset.md exists.

  3. drop (drop_fields) — field removed from every row. Covers names, initials, signatures, staff identifiers, national IDs (Aadhaar / PAN / voter / passport / DL / ration / ESIC / PM-JAY / Nikshay / ABHA), contact info, exact geography, free-text narratives, system timestamps, and batch/scan artefacts.

  4. cap (cap_fields) — numeric values strictly greater than threshold are replaced with label (default age > 89 → “90+”, HIPAA §164.514(b)(2)(i)(C)).

  5. generalize (generalize_fields + generalization_maps) — value-level categorical mapping (e.g. marital status → Married / Single / Other; facility type → Government / Private / Other).

  6. suppress_small_cell (suppress_small_cell_fields) — numeric values strictly greater than small_cell_threshold are clamped to the threshold (ICMR §11.7 k-anonymity proxy for household-contact counts).

  7. date (date_fields) — per-subject deterministic offset in [-max_jitter_days, +max_jitter_days]. Offset = HMAC-SHA256(key, subject_id)[:4] as int mod (2*N+1) - N. SANT-method interval preservation for epidemiological survival / incidence / person-time analyses.

  8. id (id_fields) — replaced with "SUBJ_" + hmac_sha256(key, raw_id).hexdigest()[:12]. Deterministic cross-file linkage preserved; non-reversible without key possession.

Free-text PHI residuals are handled conservatively by dropping narrative fields wholesale. Current narrative fields like *COMMENT, *REMARK, WITHDRAWEXPLAIN, and *SPECIFY are removed before publication; the agent-boundary PHI gate remains defense-in-depth for returned text.

Rule catalog is declared in phi_scrub.yaml (Indo-VAP-calibrated).

Zone boundary

  • Reads + rewrites tmp/{STUDY}/datasets/*.jsonl in place (write_zone).

  • Optionally writes orphan rows to tmp/{STUDY}/quarantine/{file}.jsonl when a row lacks a resolvable subject_id (write_zone).

  • Emits a single audit envelope at config.AUDIT_SCRUB_REPORT_PATH (output_zone). The audit records counts only — no raw values, no before/after pairs.

Ordering in the pipeline

Runs as Step 1.6 — AFTER Step 1+3 (raw extraction) and BEFORE Step 1.7 (dataset cleanup). This keeps dataset_cleanup_report.json free of raw subject IDs and raw dates, so the dataset-leg audit never contains PHI.

Key management

The HMAC key is a sidecar file at $XDG_CONFIG_HOME/report_ai_portal/phi_key (default ~/.config/report_ai_portal/phi_key). Mode must be 0600. Missing key = hard-fail for developer/operator CLI pipeline runs. Normal users create it through the web UI’s Load Study flow. Developers can bootstrap explicitly:

python -m scripts.security.phi_scrub bootstrap-key

Rotating the key invalidates every previously-scrubbed artifact — full re-ingestion from raw is required. This is a one-way property: deletion of the key forfeits the ability to re-derive the same pseudonyms.

Idempotency

Each scrubbed record gets a _phi_scrubbed: "v1" marker. A second run with the same key is a no-op (the sentinel file tmp/{STUDY}/.phi_scrub_complete short-circuits the orchestrator).

Threat-model summary

  • HMAC-SHA256 with a secret key is non-reversible without key possession.

  • 12 hex (48 bits) collision surface is adequate for single-study cohorts under 100 000 subjects. Larger cohorts should widen the slice.

  • Same (key, subject_id) always yields the same pseudonym → cross-run joins remain stable across re-ingestion.

  • Different machines with different keys → different pseudonyms → hard cross-site joins. This is deliberate: collaborator key distribution is an operational, not pipeline, concern.

class scripts.security.phi_scrub.CapRule(pattern, threshold, label)[source]

Bases: object

Compiled cap rule — pattern + threshold + label.

Each cap_fields entry yields one CapRule. When a row’s field name matches pattern, numeric values strictly greater than threshold are replaced with label. Values ≤ threshold pass through unchanged.

Parameters:
label
matches(name)[source]
Return type:

bool

Parameters:

name (str)

pattern
threshold
class scripts.security.phi_scrub.GeneralizeRule(pattern, mapping_name, mapping)[source]

Bases: object

Compiled generalize rule — pattern + named value mapping.

Each generalize_fields entry pairs a field-name pattern with the name of a value-to-value mapping under generalization_maps. At scrub time the value is lower-cased, looked up in the mapping, and replaced; missing values fall through unchanged (audit event still recorded with count=0 for that row).

Parameters:
mapping
mapping_name
matches(name)[source]
Return type:

bool

Parameters:

name (str)

pattern
class scripts.security.phi_scrub.IdRule(pattern, label)[source]

Bases: object

Compiled id rule — pattern + semantic label.

Each id_fields entry yields one IdRule. When a row’s field name matches pattern, the field value is pseudonymized via pseudo_id() with the attached label. The label is propagated both as the visible output prefix (<LABEL>_<hmac12>) AND as the HMAC domain-separator, so the same raw value under two different labels yields two different pseudonyms.

Keep the label short (3-5 chars, uppercase). It becomes part of every pseudonymized output and of the IRB-facing audit log.

Parameters:
label
matches(name)[source]
Return type:

bool

Parameters:

name (str)

pattern
exception scripts.security.phi_scrub.PHIKeyMissingError[source]

Bases: PHIScrubError

Raised when the sidecar key file is absent.

exception scripts.security.phi_scrub.PHIKeyPermissionError[source]

Bases: PHIScrubError

Raised when the sidecar key file has unsafe permissions.

exception scripts.security.phi_scrub.PHIQuarantineOverflowError[source]

Bases: PHIScrubError

Raised when orphan-row count exceeds the configured threshold.

class scripts.security.phi_scrub.PHIScrubConfig(*, compliance_posture, subject_id_fields, date_patterns, id_patterns, birthdate_pattern, max_jitter_days, orphan_quarantine_threshold, keep_patterns=None, drop_patterns=None, cap_rules=None, generalize_rules=None, suppress_small_cell_patterns=None, age_cap_threshold=89, age_cap_label='90+', small_cell_threshold=5)[source]

Bases: object

Parsed + compiled scrub configuration.

Regex patterns from YAML are compiled once at load time; config is a throwaway struct (not persisted beyond the pipeline run).

Rule priority (first match wins within _scrub_row()):
  1. keep_patterns — allowlist, short-circuits every other rule

  2. birthdate_pattern — posture-dependent drop or jitter

  3. drop_patterns — field removed from row

  4. cap_rules — numeric capped to label

  5. generalize_rules — value mapped to broad category

  6. suppress_small_cell_patterns — numeric clamped to threshold

  7. date_patterns — jitter via SANT

  8. id_patterns — HMAC-SHA256 pseudonymize

Parameters:
age_cap_label
age_cap_threshold
birthdate_pattern
cap_rule_for(name)[source]

Return the first matching CapRule for name, or None.

Return type:

CapRule | None

Parameters:

name (str)

cap_rules
compliance_posture
date_patterns
drop_patterns
field_is_birthdate(name)[source]
Return type:

bool

Parameters:

name (str)

field_is_date(name)[source]

Return True if name matches any date_fields pattern.

Birthdate fields are excluded here — they are handled separately via field_is_birthdate() so Safe Harbor drops can be distinguished from jitter events.

Return type:

bool

Parameters:

name (str)

field_is_drop(name)[source]
Return type:

bool

Parameters:

name (str)

field_is_id(name)[source]

Compatibility shim — True when any id rule matches name.

Return type:

bool

Parameters:

name (str)

field_is_keep(name)[source]

Return True if name matches any keep_fields pattern.

Keep rules short-circuit every other rule — a kept field passes through the scrubber unchanged with no audit event recorded.

Return type:

bool

Parameters:

name (str)

field_is_suppress_small_cell(name)[source]
Return type:

bool

Parameters:

name (str)

generalize_rule_for(name)[source]

Return the first matching GeneralizeRule for name, or None.

Return type:

GeneralizeRule | None

Parameters:

name (str)

generalize_rules
id_label_for(name)[source]

Return the semantic label for name, or None if no rule matches.

First-match wins — the YAML order determines precedence when a field name is ambiguous (e.g. a generic (?:patient|subject)[-_]?id pattern listed AFTER a specific ^SUBJID$ rule keeps the specific rule’s label).

Return type:

str | None

Parameters:

name (str)

id_patterns
keep_patterns
max_jitter_days
orphan_quarantine_threshold
small_cell_threshold
subject_id_fields
suppress_small_cell_patterns
exception scripts.security.phi_scrub.PHIScrubError[source]

Bases: Exception

Base class for PHI scrub errors.

scripts.security.phi_scrub.bootstrap_key(path=None)[source]

Generate a new 32-byte HMAC key and write it to the sidecar location.

Refuses to overwrite an existing key (would silently invalidate every prior pseudonym). Returns the path on success.

Return type:

Path

Parameters:

path (Path | None)

scripts.security.phi_scrub.cap_numeric(value, *, threshold, label)[source]

Cap numeric value to label when strictly greater than threshold.

Returns (new_value, was_capped). Non-numeric / empty values pass through unchanged with was_capped=False. Values ≤ threshold also pass through unchanged — capping affects the tail only.

Used for HIPAA §164.514(b)(2)(i)(C) age-over-89 aggregation and any similarly-shaped numeric-tail collapse rule. Because capping runs per-cell (not per-distribution), it is safe to apply in a streaming scrubber without seeing the rest of the dataset.

Return type:

tuple[Any, bool]

Parameters:
scripts.security.phi_scrub.date_offset_days(subject_id, *, key, max_days)[source]

Per-subject deterministic offset in [-max_days, +max_days] inclusive.

Algorithm: int.from_bytes(hmac_sha256(key, subject_id)[:4], 'big') % (2*max_days + 1) - max_days.

Return type:

int

Parameters:
scripts.security.phi_scrub.generalize_value(value, *, mapping)[source]

Map value to a broader category via mapping (case-insensitive).

Returns (new_value, was_generalized). Non-string / empty values pass through unchanged. Strings not present in the mapping also pass through unchanged — operators must curate the mapping to cover every valid value; unknown values surface as-is so the audit report flags coverage gaps (via the false-count per field).

Return type:

tuple[Any, bool]

Parameters:
scripts.security.phi_scrub.load_key(path=None)[source]

Load the HMAC key from the sidecar file.

Raises PHIKeyMissingError if the file is absent and PHIKeyPermissionError if the file mode is not 0600.

Return type:

bytes

Parameters:

path (Path | None)

scripts.security.phi_scrub.load_scrub_config(path=None)[source]

Load + compile the scrub config. Returns None if file is absent.

An absent config is NOT an error — it means phi_scrub is a no-op for this study, and the pipeline continues. This lets users opt in per-study by dropping a YAML file in place.

When compliance_posture: limited_dataset is set, the function also verifies the authority note exists at _LIMITED_DATASET_AUTHORITY.

Loads the full rule set: keep / drop / cap / generalize / suppress / date / id patterns plus generalization_maps, age_cap, and small_cell_threshold constants.

Return type:

PHIScrubConfig | None

Parameters:

path (Path | None)

scripts.security.phi_scrub.pseudo_id(raw_id, *, key, label='ID')[source]

Return <LABEL>_<hmac12hex> with cryptographic domain separation.

The HMAC input is f"{label}:{raw_id}" so the same raw value under different label arguments produces different pseudonyms. This implements the domain-separation property used by HKDF’s info parameter (RFC 5869 §3.2): if an adversary obtains two datasets where the same person appears under different id categories (e.g. FID and SUBJID), they cannot link records by pseudonym equality.

Same (label, raw_id, key) always yields the same output → in-category longitudinal linkage is preserved across files, which is what the agent needs for cohort-level joins. Different key → disjoint pseudonym namespace.

Parameters:
  • raw_id (str) – the raw identifier string (already stripped by the caller).

  • key (bytes) – 32-byte HMAC key loaded from the sidecar keyfile.

  • label (str) – short semantic category (e.g. "SUBJ", "FAM", "LAB"). Propagated both into the HMAC input (domain separation) and as the visible output prefix (debuggability + IRB-audit clarity).

Return type:

str

Returns:

f"{label}_{hex12}" — the visible prefix mirrors the label so the output is self-describing in audit logs and downstream tools.

scripts.security.phi_scrub.run_scrub(study_name=None)[source]

Orchestrate the scrub: load key + config, walk staging, emit audit.

Return type:

None

Parameters:

study_name (str | None)

Pre-conditions:
  • tmp/{STUDY}/datasets/*.jsonl is populated by Step 1+3.

  • PHI_KEY_PATH exists and is mode 0600 — else hard-fail.

  • A phi_scrub.yaml config is present — else the module no-ops and writes an empty audit (so downstream audit tooling always finds a fourth file).

Post-conditions:
  • Datasets JSONL rewritten in place with scrubbed values + _phi_scrubbed marker.

  • Orphan rows (missing subject_id) land under tmp/{STUDY}/quarantine/.

  • Fourth audit report emitted at config.AUDIT_SCRUB_REPORT_PATH.

  • Sentinel tmp/{STUDY}/.phi_scrub_complete marks the run.

scripts.security.phi_scrub.shift_date(value, offset_days, *, field_name=None)[source]

Parse value, shift by offset_days, re-emit in the same format.

Returns None if the string does not parse as a date. Non-string inputs must be handled by the caller.

Return type:

str | None

Parameters:
  • value (str)

  • offset_days (int)

  • field_name (str | None)

scripts.security.phi_scrub.suppress_small_cell(value, *, threshold)[source]

Clamp numeric value to at most threshold.

Returns (new_value, was_clamped). Non-numeric / empty values pass through unchanged. Values strictly greater than the threshold collapse to the threshold itself (NOT to a label) so downstream numeric analyses remain type-stable.

ICMR §11.7 recommends threshold=5 for household / contact counts in cohort studies where unique household demographics could re-identify a subject. For counts at or below the threshold, the value passes through — small cells here are an analytic concern, not a privacy concern.

Return type:

tuple[Any, bool]

Parameters:

Shared PHI Regex Catalog

Shared PHI regex catalog used by phi_gate and log_hygiene.

Single source of truth for “what does a PHI-like substring look like” so the query-time gate, the log redactor, and the narrative scrub all agree.

Three tiers:

  • Blocking patterns — high-confidence PHI (Aadhaar, PAN, SSN, email, phone, Indian PIN). A blocking hit in any tool return blocks the response.

  • Warn patterns — lower-confidence heuristics (bare NUMERIC_ID, DATE_MDY, PERSON_NAME). Logged but do not block. Over-aggressive in mixed clinical text; surfaced for audit, not enforcement.

  • Subject-ID patterns — Indo-VAP-specific subject-ID shapes (SC\d{4,}, SUBJ-\d+, SUBJID_N). Used to key per-subject HMAC redaction in the log wrapper.

Regulatory anchors: HIPAA §164.514(b)(2)(i)(A-P), DPDPA §2(t), Aadhaar Act §29, SPDI Rule 3, ICMR 2017 §11.4.

scripts.security.phi_patterns.BLOCKING_PATTERNS: list[tuple[str, Pattern[str]]] = [('AADHAAR', re.compile('\\b\\d{4}[\\s\\-\\.]?\\d{4}[\\s\\-\\.]?\\d{4}\\b')), ('PAN', re.compile('\\b[A-Z]{5}\\d{4}[A-Z]\\b')), ('INDIAN_VOTER_ID', re.compile('\\b[A-Z]{3}\\d{7}\\b')), ('INDIAN_DL', re.compile('\\b[A-Z]{2}\\d{2}\\s?\\d{4}\\d{7}\\b')), ('INDIAN_PASSPORT', re.compile('\\b[A-Z]\\d{7}\\b')), ('INDIAN_PHONE', re.compile('(?<!\\d)(?:\\+91[\\s-]?)?[6-9]\\d{9}(?!\\d)')), ('EMAIL', re.compile('\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}\\b')), ('URL', re.compile('\\bhttps?://[^\\s/$.?#].[^\\s]*\\b', re.IGNORECASE)), ('INDIAN_PIN', re.compile('(?i:pin\\s*(?:code)?|postal\\s*code|zip)\\s*[:=\\-]?\\s*\\b(\\d{6})\\b')), ('SSN', re.compile('\\b\\d{3}-\\d{2}-\\d{4}\\b')), ('MRN', re.compile('\\bMRN[-:]?\\s*\\d{6,10}\\b', re.IGNORECASE)), ('IP', re.compile('\\b(?:\\d{1,3}\\.){3}\\d{1,3}\\b')), ('DATE_ISO', re.compile('\\b(?:19|20)\\d{2}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12]\\d|3[01])(?:[ T]\\d{2}:\\d{2}(?::\\d{2})?)?\\b')), ('PERSON_NAME_PREFIX', re.compile('\\b(?:Mr|Mrs|Ms|Dr|Prof)\\.?\\s+[A-Z][a-z]+(?:\\s+[A-Z][a-z]+){0,2}\\b'))]

High-confidence PHI patterns — a hit blocks the response.

scripts.security.phi_patterns.SUBJECT_ID_PATTERNS: list[Pattern[str]] = [re.compile('\\bSUBJ[-_]?\\d+\\b'), re.compile('\\bSC\\d{4,}\\b'), re.compile('\\bFID\\d*\\b')]

Literal subject-ID substrings that the log wrapper HMAC-redacts per-subject.

scripts.security.phi_patterns.WARN_PATTERNS: list[tuple[str, Pattern[str]]] = [('NUMERIC_ID_SHORT', re.compile('\\b\\d{6,7}\\b')), ('DATE_MDY', re.compile('\\b\\d{1,2}/\\d{1,2}/\\d{2,4}\\b')), ('PERSON_NAME_GENERIC', re.compile('\\b[A-Z][a-z]{2,15}\\s+[A-Z][a-z]{2,15}\\b'))]

Low-confidence PHI heuristics — recorded for audit, do NOT block.

Clinical-Phrase Allowlist

Clinical-phrase allowlist for PHI false-positive suppression.

Three pure functions:

  • is_clinical_phrase() — True for whitelisted clinical / research terms like “Treatment Completed” or “Bacteriologic relapse”.

  • is_clinical_free_text() — True for whole-value clinical notations like “patient expired” or “died on 3/1/2014” that should not be flagged by the generic name-like heuristic.

  • looks_like_real_name() — True when a two-to-four-word capitalized string has at least one token in the common-name lexicon.

The bundled datasets are small seed lists that prevent the most common false positives in Indo-VAP free-text (TB status, treatment outcome, specimen quality). Extend by adding entries directly to the frozen sets below.

scripts.security.phi_allowlist.CLINICAL_PHRASES: frozenset[str] = frozenset({'bact. failure', 'bact. relapse', 'bacteriologic failure', 'bacteriologic relapse', 'clinical failure', 'clinical relapse', 'cure declared', 'cured', 'definite case', "don't know", 'insufficient volume', 'loss to follow up', 'lost to follow up', 'never smoker', 'no, never', 'normal delivery', 'not a case', 'not applicable', 'not available', 'not done', 'not known', 'not reported', 'not tb', 'possible case', 'preterm delivery', 'probable case', 'sample not collected', 'specimen rejected', 'spontaneous abortion', 'treatment completed', 'treatment failure', 'treatment success', 'yes, current smoker', 'yes, former smoker'})

Lower-cased whole-phrase allowlist.

scripts.security.phi_allowlist.CLINICAL_SINGLE_WORDS: frozenset[str] = frozenset({'abnormal', 'advanced', 'apical', 'basal', 'bilateral', 'cavitary', 'cavity', 'completed', 'contaminated', 'contamination', 'culture', 'cured', 'definite', 'ethambutol', 'failure', 'indeterminate', 'insufficient', 'invalid', 'isoniazid', 'jensen', 'lesion', 'liquid', 'lowenstein', 'mdr', 'minimal', 'moderate', 'neelsen', 'negative', 'nonreactive', 'normal', 'pending', 'positive', 'possible', 'probable', 'pyrazinamide', 'reactive', 'rejected', 'relapse', 'rifampicin', 'smear', 'solid', 'streptomycin', 'tb', 'treatment', 'tuberculosis', 'unilateral', 'xdr', 'ziehl'})

Lower-cased single-word clinical vocabulary (used for two-token phrase check).

scripts.security.phi_allowlist.COMMON_FIRST_NAMES: frozenset[str] = frozenset({'aishwarya', 'ananya', 'anil', 'babu', 'barbara', 'geetha', 'gita', 'james', 'john', 'kumar', 'lakshmi', 'linda', 'mahesh', 'mary', 'michael', 'patricia', 'pooja', 'priya', 'rajesh', 'raju', 'ramesh', 'robert', 'sanjay', 'saraswati', 'sunil', 'suresh', 'vijay'})

Small seed — extend by adding entries to this frozenset.

scripts.security.phi_allowlist.COMMON_LAST_NAMES: frozenset[str] = frozenset({'brown', 'gupta', 'iyer', 'johnson', 'jones', 'kumar', 'menon', 'naidu', 'nair', 'patel', 'pillai', 'rao', 'reddy', 'sharma', 'singh', 'smith', 'verma', 'williams'})

Small seed — extend by adding entries to this frozenset.

scripts.security.phi_allowlist.is_clinical_free_text(text)[source]

Return True if the entire value is a recognisable clinical notation.

Catches phrasings the generic name-like heuristic would otherwise flag, e.g. “patient expired” or “died on 3/1/2014”.

Return type:

bool

Parameters:

text (str)

scripts.security.phi_allowlist.is_clinical_phrase(text)[source]

Return True if text is a known clinical / research phrase.

Checks both the exact phrase (lowered) against CLINICAL_PHRASES and whether every whitespace-separated token is in CLINICAL_SINGLE_WORDS.

Return type:

bool

Parameters:

text (str)

scripts.security.phi_allowlist.looks_like_real_name(text)[source]

Return True if text looks like a real person name.

A two-to-four-word capitalized string is considered likely a name when at least one token is in COMMON_FIRST_NAMES or COMMON_LAST_NAMES, AND the string is not a clinical phrase.

Return type:

bool

Parameters:

text (str)

PHI Gate (Agent-Boundary)

Query-time PHI gate for the RePORT AI Portal agent boundary.

Uses the shared scripts.security.phi_patterns catalog and the scripts.security.phi_allowlist clinical-phrase allowlist. The allowlist suppresses obvious-false-positive warnings on clinical verbatim like “Treatment Completed” that would otherwise match the generic name-like heuristic.

Presidio NER is intentionally not wired in — comparative benchmarks showed precision around 22.7 % on mixed data where the rule catalog + clinical allowlist reach materially higher precision on the calibrated Indo-VAP field shapes.

The gate is the defence-in-depth layer at the trio-bundle → agent boundary: every @tool function in scripts.ai_assistant.agent_tools runs its return text through phi_gate_check() before the string reaches the LLM, so even if the offline scrub missed a token the live query cannot surface it.

IRB-grade benchmark anchors:
  • Pillar 2.4 — every tool return passes through a PHI gate

  • Pillar 1.5 — narrative-content leak detection

  • Pillar 5.3 — breach-alert emission on blocked responses

exception scripts.security.phi_gate.PHIGateConfigError[source]

Bases: ValueError

Raised when the PHI gate is invoked with malformed input.

class scripts.security.phi_gate.PHIGateResult(blocked, findings)[source]

Bases: object

Outcome of a PHI-gate scan.

blocked is True when any blocking pattern matched. findings is a sorted, unique tuple of category tags recorded across the scan (both blocking and warn-only). Safe to show the operator — the tags are category names like AADHAAR / EMAIL, never raw values.

Parameters:
blocked: bool
findings: tuple[str, ...]
scripts.security.phi_gate.phi_gate_check(texts)[source]

Scan texts for PHI. Returns blocked=True only on high-confidence PHI.

Low-confidence heuristics (bare NUMERIC_ID, DATE_MDY, generic PERSON_NAME) are recorded in findings for audit but do not trigger blocking — they over-fire on legitimate clinical phrases and would block benign agent responses.

Clinical-phrase allowlist (phi_allowlist) is consulted on the warn tier only. Blocking tier always wins.

Return type:

PHIGateResult

Parameters:

texts (str | Sequence[str])

k-Anonymity Gate

k-anonymity / small-cell suppression gate for agent-tool responses.

At the trio-bundle -> agent boundary, row-level queries can surface equivalence classes (age-band x sex x district x outcome) with very small sample sizes. A response returning one matched row with all sensitive attributes visible defeats the whole scrub — the scrub guarantees de-identification at rest, but k-anon defends against re-identification at query time.

This module provides two utilities:

  • kanon_check() — given a list of equivalence-class records and a k threshold, returns a KAnonResult with blocked set when any class has fewer than k members.

  • suppress_small_cells() — given aggregate counts, replaces any count < k with the string "<5" (or equivalent) so the agent surface never reveals an exact small-cell value.

IRB-grade benchmark anchor: Pillar 1.7 — k-anonymity ≥ 5 enforced on quasi-identifier combos surfaced to the agent; l-diversity ≥ 2 is a tracked design gap (see references.rst). Reference: ICMR 2017 §11.7; NIST SP 800-188 §5.

class scripts.security.kanon_gate.KAnonResult(blocked, smallest_class_size, violating_keys)[source]

Bases: object

Outcome of a k-anonymity check.

blocked is True when at least one equivalence class is smaller than k. smallest_class_size reports the minimum class size observed (or 0 when no classes were supplied). violating_keys is a sorted tuple of equivalence-class keys whose size is below the threshold; each key is a string form of the quasi-identifier tuple, safe to log.

Parameters:
  • blocked (bool)

  • smallest_class_size (int)

  • violating_keys (tuple[str, ...])

blocked: bool
smallest_class_size: int
violating_keys: tuple[str, ...]
class scripts.security.kanon_gate.LDiversityResult(blocked, smallest_diversity, violating_classes)[source]

Bases: object

Outcome of an l-diversity check.

A row set passes l-diversity (l ≥ 2) when every equivalence class (defined by the quasi-identifier tuple) contains at least l distinct values for each sensitive attribute. l = 2 is the smallest meaningful threshold; higher values resist homogeneity attacks more strongly.

blocked is True when at least one (class, sensitive_attr) pair has fewer than l distinct values. violating_classes enumerates which equivalence classes failed and on which attribute.

Parameters:
blocked: bool
smallest_diversity: int
violating_classes: tuple[tuple[str, str], ...]

Tuples of (equivalence_class_key, sensitive_attribute_name) whose distinct-value count fell below l.

scripts.security.kanon_gate.kanon_check(rows, *, quasi_identifiers, k=5)[source]

Return a KAnonResult for the given rows + quasi-identifiers.

Does NOT mutate rows. Counts equivalence classes by the tuple of quasi-identifier values; any class with size < k marks the result as blocked. An empty input returns blocked=False with zero class size — caller decides whether empty is permitted.

Return type:

KAnonResult

Parameters:
scripts.security.kanon_gate.l_diversity_check(rows, *, quasi_identifiers, sensitive_attributes, l_threshold=2)[source]

Verify that every equivalence class has ≥ l_threshold distinct values for every sensitive attribute.

Use AFTER kanon_check() — k-anonymity ensures classes are large enough; l-diversity ensures they aren’t homogeneous on the outcomes that matter (e.g., all 5+ subjects in a class share outcome=DIED). Empty input returns blocked=False.

Raises ValueError if either tuple is empty or l_threshold < 1.

Return type:

LDiversityResult

Parameters:
scripts.security.kanon_gate.mask_small_cell(count, *, k=5, label='<5')[source]

Return count if count >= k, else label (default "<5").

Pair with suppress_small_cells() when aggregating cross- tabulations for the agent surface.

Return type:

Any

Parameters:
scripts.security.kanon_gate.suppress_small_cells(counts, *, k=5, label='<5')[source]

Return a new dict where values < k are replaced with label.

Leaves keys untouched. Intended for cross-tab / frequency counts that a tool is about to return to the LLM.

Return type:

dict[Any, Any]

Parameters:

Utility Modules

Logging System

Canonical centralized logging system for RePORT AI Portal.

This module provides the single supported logging boundary for the single-study, privacy-first, local-first runtime. It exposes:

  • one application logger rooted at report_ai_portal

  • a custom SUCCESS log level between INFO and WARNING

  • rotating file handlers plus filtered console output

  • convenience functions, timing/error decorators, and verbose tree logging

  • log cleanup helpers for retention management

Design rules: - Logger instances are obtained via logging.getLogger(...) and cached. - Initialization is thread-safe and idempotent. - File logging supports rotation via RotatingFileHandler.

class scripts.utils.logging_system.CustomFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]

Bases: Formatter

Standard text formatter with explicit SUCCESS label support.

format(record)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Return type:

str

Parameters:

record (LogRecord)

class scripts.utils.logging_system.JSONFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]

Bases: Formatter

JSON formatter for structured audit and machine-readable logging.

format(record)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Return type:

str

Parameters:

record (LogRecord)

class scripts.utils.logging_system.VerboseLogger(logger_module)[source]

Bases: object

Hierarchical helper for DEBUG-mode tree logging.

Parameters:

logger_module (types.ModuleType)

detail(message)[source]
Return type:

None

Parameters:

message (str)

file_processing(filename, total_records=None)[source]
Parameters:
  • filename (str)

  • total_records (int | None)

items_list(label, items, max_show=5)[source]
Return type:

None

Parameters:
metric(label, value)[source]
Return type:

None

Parameters:
step(step_name)[source]
Parameters:

step_name (str)

timing(operation, seconds)[source]
Return type:

None

Parameters:
scripts.utils.logging_system.cleanup_old_logs(max_age_days=None, max_files=None, log_dir=None, dry_run=False, recursive=True, pattern='*.log')[source]

Delete old log files according to age and/or count criteria.

Return type:

dict[str, Any]

Parameters:
  • max_age_days (int | None)

  • max_files (int | None)

  • log_dir (Path | None)

  • dry_run (bool)

  • recursive (bool)

  • pattern (str)

scripts.utils.logging_system.critical(msg, *args, include_log_path=True, **kwargs)[source]
Return type:

None

Parameters:
scripts.utils.logging_system.debug(msg, *args, **kwargs)[source]
Return type:

None

Parameters:
scripts.utils.logging_system.error(msg, *args, include_log_path=True, **kwargs)[source]
Return type:

None

Parameters:
scripts.utils.logging_system.exception(msg, *args, include_log_path=True, **kwargs)[source]
Return type:

None

Parameters:
scripts.utils.logging_system.get_log_file_path()[source]

Return the current main log-file path if initialized.

Return type:

str | None

scripts.utils.logging_system.get_logger(name=None)[source]

Return the root application logger or a child logger.

Return type:

Logger

Parameters:

name (str | None)

scripts.utils.logging_system.get_verbose_logger()[source]

Return the singleton verbose tree logger helper.

Return type:

VerboseLogger

scripts.utils.logging_system.info(msg, *args, **kwargs)[source]
Return type:

None

Parameters:
scripts.utils.logging_system.log_errors(logger_name=None, reraise=True)[source]

Decorator that logs exceptions with stack traces and optional re-raise.

Parameters:
  • logger_name (str | None)

  • reraise (bool)

scripts.utils.logging_system.log_execution_time(operation_name, logger_name=None)[source]

Context manager that logs execution time for an arbitrary block.

Parameters:
  • operation_name (str)

  • logger_name (str | None)

scripts.utils.logging_system.log_time(logger_name=None, level=20)[source]

Decorator that logs function execution time.

Parameters:
  • logger_name (str | None)

  • level (int)

scripts.utils.logging_system.reset_logging()[source]

Reset main logger state. Primarily for tests.

Return type:

None

scripts.utils.logging_system.setup_logger(name='report_ai_portal', log_level=20, simple_mode=False, verbose=False)[source]

Legacy compatibility wrapper around setup_logging.

Return type:

Logger

Parameters:
scripts.utils.logging_system.setup_logging(module_name='__main__', log_level=None, simple_mode=False, verbose=False, json_format=False, max_bytes=10485760, backup_count=10)[source]

Create and configure the singleton application logger.

Return type:

Logger

Parameters:
  • module_name (str)

  • log_level (str | None)

  • simple_mode (bool)

  • verbose (bool)

  • json_format (bool)

  • max_bytes (int)

  • backup_count (int)

scripts.utils.logging_system.step_progress(msg, *args, **kwargs)[source]
Return type:

None

Parameters:
scripts.utils.logging_system.success(msg, *args, **kwargs)[source]
Return type:

None

Parameters:
scripts.utils.logging_system.warning(msg, *args, include_log_path=False, **kwargs)[source]
Return type:

None

Parameters:

Phase-0 Secure Staging

Hardened AMBER-zone staging helpers for the RePORT AI Portal pipeline.

The pipeline processes raw study data (PHI) in a transient staging workspace under tmp/{STUDY_NAME}/ before atomically publishing the PHI-free trio_bundle/. Staging is the honest-broker AMBER zone — it must carry the strongest defensive posture the local filesystem supports:

  • Restrictive permissions — directory mode 0700 + umask 0077 for every write, so no other OS user can read partial staging output.

  • Secure teardown — on successful completion, each staging file is overwritten with random bytes and fsynced before unlink, reducing the window where deleted staging contents could be recovered via filesystem forensics.

  • In-memory staging (optional) — when the environment sets REPORTALIN_TMPFS_STAGING=1 AND /dev/shm is writable, staging is redirected to a tmpfs mount so raw extracted data never touches the physical disk on the extraction host. Otherwise falls back to the default on-disk staging root resolved by config.

The module is pure filesystem plumbing: it never reads row contents, never logs values, never crosses zone boundaries. Every file operation is wrapped by scripts.security.secure_env.assert_write_zone() so a misconfigured staging root fails fast with a zone violation rather than silently writing outside the allowed area.

IRB-grade benchmark anchors (see docs/sphinx/irb_auditor/):
  • HIPAA §164.310(c) device + media controls

  • NIST SP 800-188 §6.3-§6.5 on transient de-identification workspaces

  • ICMR 2017 §11.5 audit + confidentiality

  • DPDPA 2023 §8(7) erasure

Public API:
scripts.utils.secure_staging.prepare_staging(root, subdirs)[source]

Wipe root and create subdirs with hardened permissions.

Return type:

None

Parameters:
Order of operations:
  1. If root exists, invoke secure_remove_tree() on it so no residue from a prior failed run carries over.

  2. Under scoped_umask(), create root and each subdir with mode 0700.

  3. Zone-guard root via assert_write_zone.

Side effects:
  • Every directory created lands with mode 0700.

  • The process umask is temporarily 0o077 only while creating.

Raises:

ZoneViolationError – when root is outside the allowed write zones.

Parameters:
Return type:

None

scripts.utils.secure_staging.resolve_staging_root(default_root, *, study_name)[source]

Return the staging root for this run.

When REPORTALIN_TMPFS_STAGING is truthy AND /dev/shm is writable, returns /dev/shm/report_ai_portal/{STUDY_NAME}. Otherwise returns default_root (the on-disk staging path from config).

The caller is responsible for updating config.STUDY_STAGING_DIR and the per-leg STAGING_*_DIR paths before the extraction leg reads them. This function does not mutate any global state.

Zone guard: callers must pass the returned path to prepare_staging(), which asserts it against the write zone. A misconfigured env override (e.g. REPORTALIN_TMPFS_STAGING=1 on a platform without tmpfs AND a mangled default root) is caught at the prepare_staging call site.

Return type:

Path

Parameters:
  • default_root (Path)

  • study_name (str)

scripts.utils.secure_staging.scoped_umask(mask=63)[source]

Context manager that sets the process umask for the duration of a block.

Yields the previous umask so callers may inspect it. Always restored on exit, even on exception.

Use this to wrap extraction-leg writes into staging so newly-created files land with mode 0600 (or 0700 for directories) rather than the platform default (often 0644 / 0755).

Return type:

Generator[int, None, None]

Parameters:

mask (int)

scripts.utils.secure_staging.secure_remove_tree(root)[source]

Recursively overwrite + delete every file under root, then the tree.

For each regular file found: overwrite with random bytes, fsync, unlink. Empty directories are then removed bottom-up. Non-file entries (symlinks, sockets, pipes) are unlinked without overwrite.

Failures are logged at WARNING and do not abort the teardown — the goal is best-effort secure-delete for the happy path; a partial failure still leaves the caller with an empty tree.

Zone guard: root is asserted against the write zone so a stray invocation on a protected path fails fast.

Return type:

None

Parameters:

root (Path)

Integrity Helpers (SHA-256)

Integrity helpers — streamed SHA-256 hashing for the pipeline integrity chain.

What. Two pure functions that produce the hex SHA-256 of a file and of an arbitrary byte stream in 64 KiB chunks. Extracted from scripts.extraction.dataset_pipeline and scripts.utils.lineage where the same logic was duplicated.

Why. Every raw input, every staged JSONL, every published trio artifact, and the lineage manifest itself must be hashable with a stable, memory- bounded implementation so the NIST SP 800-188 §5.2 integrity chain holds across stages. A single authoritative helper keeps the hash behaviour identical everywhere and avoids drift when the chunk size or the hash algorithm is revisited.

How. hash_file() opens the path in binary mode, reads 64 KiB at a time, feeds each chunk into a hashlib.sha256 instance, and returns the lowercase hex digest. hash_bytes() is the same but takes an in-memory bytes/bytearray buffer — useful for test fixtures and for hashing small audit payloads without a filesystem round-trip.

scripts.utils.integrity.DEFAULT_CHUNK_SIZE = 65536

Streaming read-chunk size. Matches the 2025 guidance for balanced memory pressure + syscall overhead on modern filesystems.

scripts.utils.integrity.hash_bytes(data)[source]

Return lowercase hex SHA-256 of an in-memory data buffer.

What. SHA-256 hex digest of data. Why. Lets tests seed known fixtures without a filesystem round-trip and lets audit payloads hash themselves when no file backing exists. How. Single hashlib.sha256(data).hexdigest() call — the buffer is already in memory so chunking adds no benefit.

Return type:

str

Parameters:

data (bytes | bytearray | memoryview)

scripts.utils.integrity.hash_file(path, *, chunk_size=65536)[source]

Return lowercase hex SHA-256 of path contents, streamed.

What. SHA-256 hex digest of the file at path. Why. Stable integrity anchor for NIST SP 800-188 §5.2; carried in every extracted record’s _provenance.raw_sha256 and in every lineage_manifest.json input/output entry. How. Open the path binary, read chunk_size bytes at a time, feed each chunk into hashlib.sha256. Works on arbitrarily large files without exhausting memory.

Return type:

str

Parameters:
  • path (Path)

  • chunk_size (int)

Lineage Manifest

Per-run lineage manifest for the RePORT AI Portal pipeline.

Regulators auditing a clinical de-identification pipeline want a single artifact that ties every input file to every output file with hashes and timestamps at each transformation step. This module produces that artifact as output/{STUDY}/audit/lineage_manifest.json.

The manifest records:

  • Run metadata — pipeline version, extraction engine, UTC timestamp, compliance posture, study name.

  • Inputs — every raw file that entered the pipeline this run, with SHA-256 + size + mtime.

  • Outputs — every file in the published trio_bundle/ + every audit report, with SHA-256 + size.

  • Steps — per-leg (datasets / dictionary / pdfs) timestamps and rule-action counts (read from existing audit reports; this module does NOT re-compute scrub events).

The manifest carries only counts and hashes — never raw PHI values. Caller must ensure emit_lineage_manifest() runs AFTER _publish_staging so the trio bundle exists and AFTER all audit reports are on disk.

IRB-grade benchmark anchors:
  • NIST SP 800-188 §7 governance + audit

  • FDA 21 CFR Part 11 §11.10(e) audit record requirements

  • ICMR 2017 §11.5 audit + confidentiality

  • CDISC ODM origin/source traceability

exception scripts.utils.lineage.LineageManifestError[source]

Bases: Exception

Raised when the lineage manifest cannot be assembled.

scripts.utils.lineage.emit_lineage_manifest(*, study_name, raw_datasets_dir, raw_dictionary_dir, raw_pdfs_dir, trio_bundle_dir, audit_dir, pipeline_version, compliance_posture, manifest_path, phi_key_fingerprint=None)[source]

Assemble + atomically write the lineage manifest for this run.

Returns the manifest payload (dict) so callers may log a summary.

Zone guard: manifest_path is asserted against the output zone so a mis-configured audit dir fails fast.

Return type:

dict[str, Any]

Parameters:
  • study_name (str)

  • raw_datasets_dir (Path)

  • raw_dictionary_dir (Path | None)

  • raw_pdfs_dir (Path | None)

  • trio_bundle_dir (Path)

  • audit_dir (Path)

  • pipeline_version (str)

  • compliance_posture (str)

  • manifest_path (Path)

  • phi_key_fingerprint (str | None)

scripts.utils.lineage.hash_path(path, *, chunk_size=65536)

Return lowercase hex SHA-256 of path contents, streamed.

What. SHA-256 hex digest of the file at path. Why. Stable integrity anchor for NIST SP 800-188 §5.2; carried in every extracted record’s _provenance.raw_sha256 and in every lineage_manifest.json input/output entry. How. Open the path binary, read chunk_size bytes at a time, feed each chunk into hashlib.sha256. Works on arbitrarily large files without exhausting memory.

Return type:

str

Parameters:
  • path (Path)

  • chunk_size (int)

Log Hygiene (PHI Redactor)

PHI-redacting log filter for the RePORT AI Portal pipeline.

Before the PHI scrub runs (Step 1.6), the pipeline processes raw subject data — raw SUBJIDs, raw dates, raw narrative strings. If any of that content is logged at INFO / DEBUG during extraction or orchestration, it lands in .logs/*.log and becomes a PHI side-channel the scrub does not touch.

This module installs a logging.Filter that redacts likely-PHI substrings from every log record before the handler emits. Specifically:

  • Subject IDs — any literal substring matching the configured subject_id_fields regex catalogue is replaced with a stable HMAC tag <SUBJ_{HMAC[:8]}>. Same-subject redaction is deterministic across a run (the HMAC key is loaded once at filter install time).

  • Common PHI regex classes — Aadhaar, PAN, Indian phone, email, SSN, ISO/M-D-Y dates, Indian PIN-code patterns are replaced with a category tag like <AADHAAR> or <EMAIL>.

Design constraints:

  • No raw values in filter memory — the filter stores only compiled regex + the PHI HMAC key; never a raw value.

  • Fast path for clean messages — the filter short-circuits if the message contains none of the pre-compiled triggers, so the common case pays one substring search per record.

  • Fail-closed per record — on any exception during redaction, the filter replaces the message with a fixed redaction-failure notice. Logs remain useful for operations without passing raw PHI through.

IRB-grade benchmark anchors:
  • ICMR 2017 §11.5 audit + confidentiality

  • HIPAA §164.312(b) audit controls

  • NIST SP 800-188 §6.4 on side-channel closure

class scripts.utils.log_hygiene.PHIRedactingFilter(*, hmac_key, subject_id_patterns=None, generic_patterns=None)[source]

Bases: Filter

Log filter that redacts PHI substrings before the handler emits.

Installed on the root logger by install_phi_redactor(), so every named logger inherits redaction. Two redaction passes:

  1. Subject-ID pass — a caller-supplied list of subject_id_fields regex patterns is matched against the message. Each match is replaced with <SUBJ_{HMAC-SHA256[:8]}> — deterministic per subject within a run, unrecoverable across the filter instance.

  2. Generic pass_GENERIC_PATTERNS catches the common PHI classes (Aadhaar, PAN, email, phone, date, pincode, SSN).

Parameters:
filter(record)[source]

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

Return type:

bool

Parameters:

record (LogRecord)

scripts.utils.log_hygiene.attach_to_logger(logger, filter_instance)[source]

Attach filter_instance to a specific named logger (belt-and-braces).

logging.Filter is evaluated by the handler on the logger where it is attached, not inherited by child loggers. For defence-in-depth, callers can also attach the filter to each leg logger explicitly.

Return type:

None

Parameters:
scripts.utils.log_hygiene.install_phi_redactor(*, hmac_key, subject_id_patterns=None)[source]

Attach PHIRedactingFilter to the root logger and return it.

Idempotent: if the root logger already has a PHIRedactingFilter installed, the existing filter is returned and no duplicate is added.

Callers must supply an hmac_key — typically the same 32-byte key used by scripts.security.phi_scrub so log redaction and on-disk pseudonyms are joinable by operators with key access.

Return type:

PHIRedactingFilter

Parameters:

Extraction Modules (continued)

Deduplication

Unified deduplication helpers for the RePORT AI Portal extraction pipeline.

This module provides a single place for all duplicate-detection and duplicate-removal logic across the three extraction legs:

  • Dataset / Dictionary (JSONL): duplicate columns inside tabular data (e.g. SUBJID and SUBJID2 that contain identical values).

  • PDF (JSON): duplicate variables within a single form (case-insensitive collisions) and cross-form duplicate variables (the same abbreviation appearing in multiple *_variables.json files).

Most functions in this module are stateless-filesystem helpers: they accept data, return cleaned data (or a report), and never touch the filesystem. File I/O remains in the caller so that atomic-write semantics are preserved.

Note: remove_within_file_duplicates mutates its input data dict in-place when dry_run=False; see its docstring for the mutation contract.

Usage:
>>> from scripts.extraction.dedup import (
...     clean_duplicate_columns,          # for DataFrames (dataset / dict)
...     remove_within_file_duplicates,    # single form JSON
...     clean_cross_form_duplicates,      # across multiple form JSONs
...     variable_richness_score,          # scoring helper
... )
scripts.extraction.dedup.clean_cross_form_duplicates(form_data)[source]

Remove cross-form duplicate variables from a set of per-form JSON dicts.

Scans all extracted variable JSONs, identifies variables appearing in more than one form, keeps the richest definition, and strips the duplicates from every other form.

Parameters:

form_data (dict[str, dict[str, Any]]) – Mapping of filename parsed JSON dict for each form. Every dict must have a "variables" key.

Return type:

dict[str, dict[str, Any]]

Returns:

Dict mapping each modified filename to its cleaned data dict. Only files that were actually changed are included.

scripts.extraction.dedup.clean_duplicate_columns(df, *, source_file, sheet)[source]

Remove duplicate columns ending with numeric suffixes from a DataFrame.

Implements intelligent duplicate detection:

  1. Identify columns matching the pattern base_name + optional '_' + digits (e.g. SUBJID2, NAME_3).

  2. Check if the base column (without suffix) exists.

  3. Remove if 100% identical to the base column OR if entirely null.

  4. Keep columns with ANY differing values.

Parameters:
  • df (DataFrame) – pandas DataFrame to clean.

  • source_file (str) – Name of the source file (e.g. "01_Demographics.jsonl"). Recorded verbatim on each drop event.

  • sheet (str | None) – Sheet name for multi-sheet inputs, or None for single-sheet / non-Excel sources. Recorded verbatim on each drop event.

Returns:

  • cleaned_df is a copy of df with duplicate columns removed.

  • drop_events is a list of dicts — one per removed column — with the keys scope (always "dataset-column"), name (the dropped column), file (source_file), sheet (sheet), reason ("100% identical to '<base>'" or "entirely null"), and kept (the base column name, or None for pure-null drops).

Return type:

Tuple of (cleaned_df, drop_events) where

scripts.extraction.dedup.remove_within_file_duplicates(data, *, dry_run=False)[source]

Check a single parsed form JSON for duplicate variable abbreviations.

LLM extractions can sometimes produce the same abbreviation twice within a single form (e.g. repeated header fields on multi-page PDFs, or the model listing a variable under two sections). When found, the richest definition (most fields populated) is kept and extras are removed.

This does not touch cross-form duplicates (SUBJID appearing in Form 1A and Form 1B) — that dedup belongs to the registry builder.

Warning

Mutation contract. When dry_run=False, this function mutates data["variables"] in-place via the reference obtained at variables = data.get("variables", {}). The cleaned_data key in the return value is the same object as the input data — not a copy. Callers that depend on result["cleaned_data"] is data aliasing are correct; do not insert copy.deepcopy here. A caller that passes data expecting no side-effect will see silent in-place modification.

Parameters:
  • data (dict[str, Any]) – The parsed *_variables.json dict (must contain a "variables" key).

  • dry_run (bool) – If True, report only — don’t modify the data.

Return type:

dict[str, Any]

Returns:

Dict with duplicates_removed (int), details (list), and optionally cleaned_data (the modified dict, only when not dry_run and changes were made). cleaned_data is the same object as the input data (see mutation contract above).

scripts.extraction.dedup.variable_richness_score(var_data)[source]

Score a variable definition by completeness for dedup tie-breaking.

Returns a tuple (fields_populated, description_length, description) that sorts higher for richer definitions. Used to pick the canonical definition when the same abbreviation appears in multiple forms.

Return type:

tuple[int, int, str]

Parameters:

var_data (dict[str, Any])

Dataset Cleanup

Dataset cleanup for the staging datasets directory.

Runs on the staging tree (config.STAGING_DATASETS_DIR by default) after raw-data extraction and before promotion to the trio bundle. Only clean, unique datasets survive into the trio bundle.

Responsibilities:
  1. Remove known junk files (test/error artifacts).

  2. Detect structurally-duplicate dataset pairs via schema + row-count comparison.

  3. Merge confirmed duplicates — keep the file with more records (or union if complementary). Remove the duplicate.

  4. Serialize a unified audit report to config.AUDIT_DATASET_REPORT_PATH that combines upstream extraction column-drop events with the junk/duplicate-file events produced here. Audit lives under output/{STUDY}/audit/ and survives the run — it is authoritative.

All removals are logged. No raw-data access occurs — this module only touches the staging tree (tmp/{STUDY}/) for its working files and the output zone (output/{STUDY}/audit/) for its audit envelope.

Usage:
>>> from scripts.extraction.dataset_cleanup import clean_trio_datasets
>>> report = clean_trio_datasets(
...     datasets_dir,
...     extracted_drop_events=[...],
...     study_name="Indo-VAP",
... )
scripts.extraction.dataset_cleanup.clean_trio_datasets(datasets_dir=None, *, extracted_drop_events=None, study_name=None, audit_path=None)[source]

Clean the staging datasets directory and emit a unified audit report.

Removes junk files and merges confirmed structural duplicates from the staging tree, then writes {study, generated_utc, leg, removed[]} atomically to the audit path — combining upstream extraction column drops with this leg’s file-level removals.

Parameters:
  • datasets_dir (Path | None) – Path to the datasets directory. Defaults to config.STAGING_DATASETS_DIR (junk/duplicate scans operate on the staging tree, not the promoted trio bundle).

  • extracted_drop_events (list[dict[str, Any]] | None) – Upstream column-drop events from the extraction leg, each shaped like the unified-audit schema row ({scope, name, file, sheet, reason, kept}). Passed through verbatim into the audit. Defaults to [].

  • study_name (str | None) – Study identifier for the audit envelope. Defaults to config.STUDY_NAME.

  • audit_path (Path | None) – Destination for the unified audit JSON. Defaults to config.AUDIT_DATASET_REPORT_PATH.

Return type:

CleanupReport

Returns:

CleanupReport with details of junk/duplicate actions taken here. The audit file is always written — even when datasets_dir is missing or empty — to guarantee a stable envelope downstream.

AI Assistant Modules

KeyStore

In-memory API-key registry.

Replaces the prior pattern of writing user-entered LLM API keys to os.environ (so any sibling process or sandboxed Python could read them) with a process-local registry held in Streamlit session state.

The trust boundary is straightforward: keys live ONLY here in memory and are passed explicitly to LangChain client constructors via api_key=. The single narrow exception is when the wizard launches the pipeline as a subprocess that needs ANTHROPIC_API_KEY / GOOGLE_API_KEY for vision-API calls — KeyStore.env_for_subprocess() returns a new dict suitable for subprocess.run(env=...) without ever mutating the parent’s os.environ.

See docs/sphinx/developer_guide/sandbox.rst for the threat model.

class scripts.ai_assistant.keystore.KeyStore[source]

Bases: object

Process-local registry of LLM provider API keys.

Instances are intended to be held in streamlit.session_state via get_keystore(); for non-Streamlit contexts (CLI, tests) a fresh instance is fine — the class itself has no global state.

Keys live in a private dict on the instance. They are never written to disk, never copied to os.environ by this class, and never logged (the redaction patterns in scripts.utils.log_hygiene catch any accidental leak in log output).

clear(provider=None)[source]

Forget one provider’s key (or all if provider is omitted).

This only touches the instance’s in-memory dict. It does NOT touch os.environ — if the user pre-set a shell env var, that remains the user’s choice and lives in their shell’s session.

Return type:

None

Parameters:

provider (str | None)

env_for_subprocess(providers)[source]

Build an env dict suitable for subprocess.run(env=...).

Returns a new dict containing {ENV_VAR: key} for each requested provider that has a key set. Providers without a stored key are skipped (the caller decides whether that’s an error). Unknown providers raise ValueError immediately.

This method is the ONLY place keys leave the KeyStore in an env-shaped form. The returned dict is a pure value — neither os.environ nor the KeyStore is mutated.

Return type:

dict[str, str]

Parameters:

providers (Iterable[str])

get(provider)[source]

Return the stored key for provider or None.

Return type:

str | None

Parameters:

provider (str)

has(provider)[source]
Return type:

bool

Parameters:

provider (str)

set(provider, key)[source]

Store key for provider. Raises if provider is unknown.

Return type:

None

Parameters:
scripts.ai_assistant.keystore.get_keystore()[source]

Return the KeyStore for the current Streamlit session.

In Streamlit: persisted via st.session_state. Outside Streamlit (CLI, scripts): cached on a module global so a single process sees one consistent KeyStore across calls. Unit tests that want isolation should construct KeyStore() directly.

Return type:

KeyStore

scripts.ai_assistant.keystore.provider_slug_for(langchain_provider)[source]

Map a LangChain model_provider string to a KeyStore slug.

Returns None for providers that don’t take an API key (e.g. ollama) so callers can short-circuit.

Return type:

str | None

Parameters:

langchain_provider (str)

Agent Graph

ReAct agent for RePORT AI Portal AI Assistant.

Uses LangChain’s create_agent (built on LangGraph) with MemorySaver for session persistence. The agent autonomously decides which tools to call and how to compose answers.

LLM provider is controlled by config.LLM_PROVIDER / config.LLM_MODEL.

scripts.ai_assistant.agent_graph.get_agent()[source]

Return the compiled ReAct agent (create on first call).

Uses single-agent mode with the full tool set. The deterministic run_study_analysis tool handles multi-step analytical pipelines internally, so even small models only need to make one tool call.

Return type:

CompiledStateGraph

scripts.ai_assistant.agent_graph.get_checkpointer()[source]

Return the module-level MemorySaver (create on first call).

Return type:

InMemorySaver

scripts.ai_assistant.agent_graph.invoke_query(query, *, thread_id='default', callbacks=None)[source]

Invoke the agent and return the final answer text.

Convenience wrapper over stream_query() that collects the full response.

Parameters:
  • query (str) – User question.

  • thread_id (str) – Conversation thread ID for session persistence.

  • callbacks (list[Any] | None) – LangChain callbacks (e.g. TelemetryLogger).

Return type:

str

Note

query must be pre-screened by scripts.ai_assistant.phi_safe.guard_user_prompt() before calling this function. Callers that bypass the guard risk sending raw PHI to the LLM.

Return type:

str

Returns:

The agent’s final answer as a string.

Parameters:
scripts.ai_assistant.agent_graph.reset_agent()[source]

Reset the agent and checkpointer (clears all sessions + tool cache).

Return type:

None

scripts.ai_assistant.agent_graph.stream_query(query, *, thread_id='default', callbacks=None)[source]

Stream a query through the ReAct agent.

Parameters:
  • query (str) – User question.

  • thread_id (str) – Conversation thread ID for session persistence.

  • callbacks (list[Any] | None) – LangChain callbacks (e.g. TelemetryLogger).

Return type:

Iterator[dict[str, Any]]

Note

query must be pre-screened by scripts.ai_assistant.phi_safe.guard_user_prompt() before calling this function. Callers that bypass the guard risk sending raw PHI to the LLM.

Yields:

State updates from the agent (contains messages with the response).

Parameters:
Return type:

Iterator[dict[str, Any]]

Agent Prompts

System prompt for the RePORT AI Portal ReAct agent.

File-Access Validator

Agent-world file-access boundary enforcement.

The production LLM agent’s permitted zones are (2026-04-24 boundary design):

  • ReadTRIO_BUNDLE_DIR (scrubbed, k-anon-gated trio outputs) or AGENT_STATE_DIR (its own analysis outputs and conversations). A small allowlist admits read-only source-tree config files (config/study_knowledge.yaml) that tool implementations need.

  • WriteAGENT_STATE_DIR only.

Everything else — STUDY_AUDIT_DIR (incl. telemetry), RAW_DATA_DIR, LOGS_DIR, STUDY_STAGING_DIR, arbitrary filesystem paths — is hard-rejected with ZoneViolationError (a PermissionError subclass from scripts.security.secure_env).

This module is the chokepoint: every agent-tool file read or write should call validate_agent_read() or validate_agent_write() before touching disk. The existing assert_trio_bundle_zone and assert_output_zone in scripts.security.secure_env remain valid narrower checks — this module layers the expanded agent-runtime zone on top without changing pipeline-side enforcement.

scripts.ai_assistant.file_access.is_agent_readable(path)[source]

Non-raising variant of validate_agent_read() for sentinel checks.

Return type:

bool

Parameters:

path (str | Path)

scripts.ai_assistant.file_access.validate_agent_read(path)[source]

Return the resolved Path if the agent may read it.

Raises:

ZoneViolationErrorpath is outside the agent’s permitted read zones.

Return type:

Path

Parameters:

path (str | Path)

scripts.ai_assistant.file_access.validate_agent_write(path)[source]

Return the resolved Path if the agent may write it.

Raises:

ZoneViolationErrorpath is outside AGENT_STATE_DIR.

Return type:

Path

Parameters:

path (str | Path)

scripts.ai_assistant.file_access.validate_sandbox_write(path)[source]

Return the resolved Path if the exec_python sandbox may write to path.

The sandbox runs LLM-generated code — a strictly narrower threat model than tool-code. Writes are scoped to AGENT_OUTPUT_DIR (agent/analysis/) rather than the full AGENT_STATE_DIR.

Uses os.path.commonpath (via _is_within()) so that sibling prefixes like agent/analysis_exfil cannot masquerade as analysis/.

Raises:

ZoneViolationErrorpath is outside AGENT_OUTPUT_DIR.

Return type:

Path

Parameters:

path (str | Path)

Agent Tools

Structured tool registry for the RePORT AI Portal AI Assistant system.

All read-side tools resolve every path through scripts.ai_assistant.file_access.validate_agent_read — the unified agent-zone chokepoint. The permitted read zone is output/{STUDY}/trio_bundle/ (PHI-scrubbed artifacts) plus output/{STUDY}/agent/ (the agent’s own analysis outputs, and conversations). Telemetry lives under audit/ and is off-limits to the agent, so is raw data and staging. Writes (analysis figures and narratives) are confined to output/{STUDY}/agent/ via validate_agent_write, with a narrower validate_sandbox_write for the exec_python path (LLM-generated code → agent/analysis/ only). The pipeline-side assert_trio_bundle_zone / assert_output_zone helpers are still called as directory-level early-rejects before glob iteration — they layer beneath the unified validator, not instead of it. Each tool is decorated with @tool so it is automatically registered with the LangGraph ReAct agent.

Tools

  1. search_variables — fuzzy search across the unified variables reference

  2. find_variable_candidates — always-returns-top-k ranked candidates for disambiguation

  3. get_variable_details — full metadata for a specific variable

  4. list_forms — list all CRF forms in the study (from variables.json)

  5. get_form_variables — list all variables belonging to a specific form

  6. query_dataset — structural query on a JSONL dataset

  7. get_dataset_stats — summary statistics for a dataset (record counts, columns)

  8. get_study_overview — high-level study summary (datasets, forms, variables)

  9. run_python_analysis — sandboxed code execution for statistical analysis

  10. cross_reference_variables — cross-reference a variable across datasets + forms

  11. run_study_analysis — deterministic epidemiological analysis

  12. search_pdf_context — keyword search over extracted CRF form text (qualitative Q&A)

Tool Cache

Session-scoped tool result cache for the RePORT AI Portal ReAct agent.

Caches tool call results by (tool_name, args_hash) so that repeated identical tool calls within a session return instantly without re-reading files from disk.

The cache is an ordered-dict LRU with a configurable max size. Clearing the cache (e.g. on :reset) is a single .clear() call.

Usage:

from scripts.ai_assistant.tool_cache import tool_cache

# In a tool function:
hit = tool_cache.get("search_variables", query="tuberculosis")
if hit is not None:
    return hit
result = _expensive_operation()
tool_cache.put("search_variables", result, query="tuberculosis")
return result

# On session reset:
tool_cache.clear()
class scripts.ai_assistant.tool_cache.ToolCache(max_size=256)[source]

Bases: object

LRU cache for tool results, keyed on (tool_name, args_hash).

Parameters:

max_size (int)

clear()[source]

Clear all cached entries (e.g. on session reset).

Return type:

None

get(tool_name, **kwargs)[source]

Look up a cached result. Returns None on miss.

Return type:

str | None

Parameters:
  • tool_name (str)

  • kwargs (Any)

put(tool_name, result, **kwargs)[source]

Store a tool result. Evicts LRU entry if at capacity.

Return type:

None

Parameters:
property stats: dict[str, int]

Return cache hit/miss statistics.

Agent-Boundary PHI Safety

Agent-tool PHI-safety decorator for the RePORT AI Portal agent.

Every @tool in scripts.ai_assistant.agent_tools that surfaces free-text or row-level data to the LLM should route its return through this module. Four enforcement layers:

  • phi_safe_return() — wraps a tool function so its returned string is scanned by scripts.security.phi_gate.phi_gate_check(). A blocking finding replaces the return value with a standard redaction message; warn-only findings pass through with an audit event.

  • guard_rows_with_kanon() — when a tool returns row-level data with quasi-identifiers, callers can opt into k-anonymity enforcement by invoking this helper before packaging the response.

  • guard_user_prompt() — input-side PHI refusal. UI + CLI entry points call this before sending the researcher’s message to the LLM; any blocking-tier PHI (Aadhaar, PAN, email, phone, etc.) in the prompt triggers a friendly refusal and the LLM is never invoked for that turn.

  • sanitise_untrusted_snippet() — wraps an untrusted text snippet (e.g. PDF-extracted content) in a marker envelope and redacts blatant imperative-voice injection phrases before the snippet is surfaced to the LLM. Closes the indirect-prompt-injection vector from PDF text.

All helpers log to the module logger (redacted by the log-hygiene filter when scripts.utils.log_hygiene.install_phi_redactor() has been installed). None print or persist raw row values.

IRB-grade benchmark anchors: Pillar 2.4 (every tool return passes the PHI gate) + Pillar 1.7 (k-anonymity enforcement at surface). Prompt-side gate + PDF snippet sanitiser close the two prompt-injection gaps summarized in docs/sphinx/irb_auditor/conformance.rst.

exception scripts.ai_assistant.phi_safe.PHISafetyError[source]

Bases: Exception

Raised when a configuration mistake would let raw PHI reach the LLM.

class scripts.ai_assistant.phi_safe.UserPromptGuardResult(ok, findings, refusal_message)[source]

Bases: object

Outcome of a user-prompt PHI scan.

ok is True when the prompt is safe to send to the LLM. refusal_message is populated when ok is False — a user-facing sentence the caller should display instead of invoking the agent. findings is a sorted tuple of PHI category labels (safe to log / show — labels are AADHAAR, EMAIL, etc., never raw values).

Parameters:
findings: tuple[str, ...]
ok: bool
refusal_message: str | None
scripts.ai_assistant.phi_safe.guard_rows_with_kanon(rows, *, quasi_identifiers, k=5, tool_name='<unknown>')[source]

Apply k-anonymity check to rows; suppress when classes too small.

Returns (rows_to_surface, kanon_result). When the check blocks, rows_to_surface is an empty list — caller should emit an aggregate-only response or a “too-few-records” message. Non-blocking responses return the original rows unchanged.

This is deliberately conservative: we do not auto-aggregate within this helper (aggregation is the tool’s scientific responsibility); we only gate the row-level surface.

Return type:

tuple[list[Mapping[str, Any]], KAnonResult]

Parameters:
scripts.ai_assistant.phi_safe.guard_rows_with_kanon_and_ldiv(rows, *, quasi_identifiers, sensitive_attributes=None, k=5, l_threshold=2, tool_name='<unknown>')[source]

Run k-anonymity then (when sensitive_attributes is provided) l-diversity. Returns (rows_to_surface, kanon_result, ldiv_result).

Either gate blocking sets rows_to_surface to an empty list. When sensitive_attributes is None, l-diversity is skipped and the third return value is None — equivalent to the legacy guard_rows_with_kanon() semantics with a richer return shape.

Phase 3.A + 3.B: this is the gate every row-returning tool should call before serialising rows to the LLM. See docs/sphinx/irb_auditor/conformance.rst.

Return type:

tuple[list[Mapping[str, Any]], KAnonResult, LDiversityResult | None]

Parameters:
scripts.ai_assistant.phi_safe.guard_text(text, *, tool_name='<unknown>')[source]

Scan text and return either the original text or a redaction string.

A blocking PHI match replaces the response; warn-only findings log but pass through. Non-string inputs are coerced to str so the decorator can wrap tools that return numeric / json-like content.

Return type:

str

Parameters:
scripts.ai_assistant.phi_safe.guard_user_prompt(text)[source]

Scan the user’s prompt for blocking-tier PHI before LLM invocation.

Called at the UI + CLI entry points. If the prompt contains a high-confidence PHI pattern (Aadhaar, PAN, voter, passport, DL, Indian phone, email, URL, PIN, SSN, MRN, IP, ISO date, title-prefixed name), the guard returns ok=False with a user-facing refusal. The LLM is not invoked for this turn.

Warn-tier heuristics (short numeric IDs, M/D/Y dates, generic two- word names) are not blocked here — they would over-fire on legitimate research prompts (e.g. “show me subjects with SUBJ_12345”). The downstream tool-return gate still catches any residual leak.

Non-string or empty input returns ok=True (nothing to scan).

Return type:

UserPromptGuardResult

Parameters:

text (str)

scripts.ai_assistant.phi_safe.phi_safe_return(fn)[source]

Decorator — route the decorated function’s return string through the PHI gate.

Intended for @tool-decorated callables that return strings (LangChain tools). When the return is not a string, guard_text() coerces via str() before scanning.

Example:

@tool
@phi_safe_return
def my_tool(query: str) -> str:
    return expensive_free_text_build(query)
Return type:

TypeVar(F, bound= Callable[..., Any])

Parameters:

fn (F)

scripts.ai_assistant.phi_safe.redact_phi_in_text(text)[source]

Replace PHI-shaped substrings with category tags, returning a safe string.

Shares the blocking + warn catalog with scripts.security.phi_patterns and the log-hygiene filter, so every surface that persists or exports text sees the same substitution rules. Intended for: :rtype: str

  • saving conversation JSON to disk (raw user prompts + assistant replies),

  • exporting conversations to text / markdown,

  • any other “at-rest” path where user content is written somewhere an auditor might later inspect.

Substitution is a plain regex replacement — each hit becomes <LABEL> (e.g. <AADHAAR>). Subject-ID shapes get an HMAC-tagged form <SUBJ_xxxxxxxx> (uses an import-time ephemeral key so the same subject yields the same tag within one process; no cross-process linkage).

Non-string input is coerced to str before redaction; None and empty strings return “” immediately.

Parameters:

text (str)

Return type:

str

scripts.ai_assistant.phi_safe.sanitise_traceback(tb)[source]

Return an exception traceback safe to surface to the LLM / UI / logs.

Input may be (a) a pre-formatted traceback string, (b) an exception instance (formatted via traceback.format_exception), or (c) None (returns empty string).

Transformations: :rtype: str

  • Keep only the last _MAX_TRACEBACK_LINES lines (framework frames are usually the tail; stripping the head also drops any caller-line that may have included raw data).

  • Replace any long single-quoted literal ('…', 40+ chars) with '<…>' — catches DataFrame preview fragments, JSON bodies, and repr-style row dumps that pandas / numpy exceptions often embed.

  • Run the output through redact_phi_in_text() so any surviving PHI shape is tagged.

Parameters:

tb (str | BaseException | None)

Return type:

str

scripts.ai_assistant.phi_safe.sanitise_untrusted_snippet(text, *, source_label='untrusted document')[source]

Wrap an untrusted snippet + redact instruction-voice tokens.

Called on any text that is surfaced from a source outside the agent’s control — today, the snippets returned by search_pdf_context. Applies two defences: :rtype: str

  1. Spotlighting. The snippet is wrapped in a marker envelope ([UNTRUSTED BEGIN] / [UNTRUSTED END]) so the LLM can distinguish document content from its own instructions. This is the recognised industry pattern for neutralising indirect prompt injection (see OpenAI “Spotlighting” note, 2024).

  2. Imperative-voice redaction. Known injection phrases (“ignore previous instructions”, “you are now …”, “system:”, etc.) are replaced with [INJECTION-REDACTED]. The list is conservative; false positives on legitimate CRF / protocol text are vanishingly unlikely because that text does not contain imperative-voice meta-instructions.

Non-string input is coerced via str(). Empty input returns "". source_label is surfaced in the wrapper so the LLM knows where the content came from (purely informational).

Parameters:
  • text (str)

  • source_label (str)

Return type:

str

Web UI

RePORT AI Portal Chat UI — entry point.

Launch:

uv run streamlit run scripts/ai_assistant/web_ui.py or uv run python main.py –web

scripts.ai_assistant.web_ui.main()[source]

Streamlit app entry point.

Return type:

None

CLI

Interactive CLI (REPL) for the RePORT AI Portal AI Assistant system.

Commands:

:quit / :exit – End the session. :reset – Clear conversation history and start a new thread. :thread – Show current thread ID. :model – Change LLM provider/model interactively. :good / :bad – Rate the last response. :debug – Toggle verbose stream tracing.

scripts.ai_assistant.cli.main()[source]

Entry point for the CLI.

Return type:

None

scripts.ai_assistant.cli.run_repl()[source]

Start the interactive REPL loop.

Return type:

None

Telemetry

Telemetry logger for RePORT AI Portal AI Assistant.

Captures agent events (tool calls, LLM invocations, hallucination detections, feedback) to an append-only JSONL file. All free-text fields are scanned for PHI patterns and masked before writing.

class scripts.utils.telemetry.TelemetryLogger[source]

Bases: BaseCallbackHandler

LangChain callback handler for telemetry event capture.

name: str = 'report_ai_portal_telemetry'
on_custom_event(name, data, **kwargs)[source]

Log custom events (hallucination detection, follow-up, etc.).

Return type:

None

Parameters:
on_llm_end(response, **kwargs)[source]

Log LLM completion events with token usage.

Return type:

None

Parameters:
on_tool_start(serialized, input_str, **kwargs)[source]

Log tool invocation events.

Return type:

None

Parameters:

Analytical Engine

Analytical Engine — Pre-built, deterministic epidemiological analysis modules.

These are pure Python functions — no LLM involvement. They produce the same results regardless of which model or orchestration mode calls them.

class scripts.ai_assistant.analytical_engine.AnalysisResult(cohort_name, outcome, n=0, events=0, univariate=None, multivariate=None, interaction=None, descriptive=None, interactive_figures=<factory>, figures=<factory>, narrative='', caveats='')[source]

Bases: object

Container for all analysis outputs.

Parameters:
  • cohort_name (str)

  • outcome (str)

  • n (int)

  • events (int)

  • univariate (DataFrame | None)

  • multivariate (dict[str, Any] | None)

  • interaction (DataFrame | None)

  • descriptive (dict[str, Any] | None)

  • interactive_figures (list[Path])

  • figures (list[Path])

  • narrative (str)

  • caveats (str)

caveats: str = ''
cohort_name: str
descriptive: dict[str, Any] | None = None
events: int = 0
figures: list[Path]
interaction: DataFrame | None = None
interactive_figures: list[Path]
multivariate: dict[str, Any] | None = None
n: int = 0
narrative: str = ''
outcome: str
univariate: DataFrame | None = None
class scripts.ai_assistant.analytical_engine.CohortBuilder(knowledge, data_dir)[source]

Bases: object

Load, join, and recode datasets into an analytic DataFrame.

Parameters:
build(cohort_id, concepts, outcome, timeout_fn=None)[source]
Return type:

DataFrame

Parameters:
class scripts.ai_assistant.analytical_engine.DescriptiveAnalyzer[source]

Bases: object

Summary statistics and frequency tables.

run(df, predictors)[source]
Return type:

dict[str, Any]

Parameters:
  • df (DataFrame)

  • predictors (list[str])

class scripts.ai_assistant.analytical_engine.InteractionAnalyzer[source]

Bases: object

Logistic regression with interaction terms.

run(df, outcome, factors, moderators)[source]
Return type:

DataFrame

Parameters:
class scripts.ai_assistant.analytical_engine.MultivariateAnalyzer[source]

Bases: object

Backward stepwise logistic regression.

run(df, outcome, predictors, alpha=0.05)[source]
Return type:

dict[str, Any]

Parameters:
class scripts.ai_assistant.analytical_engine.PlotArtifacts(interactive=None, static=None)[source]

Bases: object

Saved artifacts for a generated analysis plot.

Parameters:
  • interactive (Path | None)

  • static (Path | None)

interactive: Path | None
static: Path | None
class scripts.ai_assistant.analytical_engine.PlotGenerator[source]

Bases: object

Generate analysis plots.

PLOT_TYPES: ClassVar[dict[str, str]] = {'interaction_scatter': 'Scatter colored by sex, sized by age', 'interaction_violin': 'Violin paneled by age-group and sex', 'scatter': 'Scatter/strip plot for continuous predictor vs binary outcome', 'violin': 'Violin plot for categorical predictor vs binary outcome'}
generate(df, outcome, predictor, plot_type, save_dir, **kwargs)[source]
Return type:

PlotArtifacts | None

Parameters:
  • df (DataFrame)

  • outcome (str)

  • predictor (str)

  • plot_type (str)

  • save_dir (Path)

  • kwargs (Any)

class scripts.ai_assistant.analytical_engine.ResultInterpreter[source]

Bases: object

Convert statistical output into narrative text.

generate_caveats(df, outcome, cohort_name)[source]
Return type:

str

Parameters:
  • df (DataFrame)

  • outcome (str)

  • cohort_name (str)

interpret_descriptive(stats, outcome, events, cohort_name)[source]
Return type:

str

Parameters:
interpret_interaction(results, cohort_name)[source]
Return type:

str

Parameters:
  • results (DataFrame)

  • cohort_name (str)

interpret_multivariate(result, cohort_name)[source]
Return type:

str

Parameters:
interpret_univariate(results, outcome, cohort_name)[source]
Return type:

str

Parameters:
  • results (DataFrame)

  • outcome (str)

  • cohort_name (str)

class scripts.ai_assistant.analytical_engine.UnivariateAnalyzer[source]

Bases: object

Run univariate logistic regression for each predictor.

run(df, outcome, predictors)[source]
Return type:

DataFrame

Parameters:
  • df (DataFrame)

  • outcome (str)

  • predictors (list[str])

scripts.ai_assistant.analytical_engine.run_full_analysis(knowledge, data_dir, output_dir, cohort_id, outcome=None, predictors=None, analysis_types=None, plot_types=None, timeout=0)[source]

Run a complete analysis pipeline for a single cohort.

Return type:

AnalysisResult

Parameters:

Study Knowledge

Study Knowledge Base — YAML-driven ground truth for variable mappings.

class scripts.ai_assistant.study_knowledge.StudyKnowledge(yaml_path=None)[source]

Bases: object

Provides deterministic lookups for variable mappings, value encodings, dataset relationships, and outcome definitions from study_knowledge.yaml.

Parameters:

yaml_path (Path | None)

get_cohort(cohort_id)[source]
Return type:

dict[str, Any]

Parameters:

cohort_id (str)

get_default_outcome(cohort_id)[source]
Return type:

tuple[str, dict[str, Any]]

Parameters:

cohort_id (str)

get_derived_variable(name, cohort_id)[source]
Return type:

dict[str, Any]

Parameters:
get_join_plan(cohort_id, concepts)[source]
Return type:

list[dict[str, Any]]

Parameters:
get_outcome(cohort_id, outcome_name)[source]
Return type:

dict[str, Any]

Parameters:
  • cohort_id (str)

  • outcome_name (str)

get_value_encoding(column, cohort_id)[source]
Return type:

dict[str, Any]

Parameters:
  • column (str)

  • cohort_id (str)

list_cohorts()[source]
Return type:

list[str]

list_concepts(cohort_id)[source]
Return type:

list[str]

Parameters:

cohort_id (str)

resolve_concept(concept, cohort_id)[source]
Return type:

dict[str, Any]

Parameters:
  • concept (str)

  • cohort_id (str)

property study_description: str
property study_name: str

Web UI Modules

Chat UI: welcome hero, thread rendering, composer, model pill.

scripts.ai_assistant.ui.chat.composer(assistant_slot=None)[source]

Render chat composer; handle submit and streaming.

Return type:

None

Parameters:

assistant_slot (Any | None)

scripts.ai_assistant.ui.chat.hero()[source]
Return type:

None

scripts.ai_assistant.ui.chat.render_thread()[source]
Return type:

Any | None

Disk-backed multi-conversation persistence (JSON files).

Version-aware model allowlist for high-risk actions.

Loading or reloading a study mutates output/{STUDY}/ in place. That pipeline is irreversible without a snapshot restore, so we gate it behind a model quality bar:

  • Anthropic Claude Opus ≥ 4.6

  • Google Gemini Pro ≥ 3.1

  • OpenAI GPT ≥ 5.3

Any model explicitly in the Ollama provider category passes automatically — local models are the user’s own hardware and are assumed operator-approved.

The allowlist uses version comparison, not exact string matching, because model names change. New minor versions are admitted automatically once they meet the floor.

Public API

class scripts.ai_assistant.ui.model_policy.ModelGateResult(allowed, reason)[source]

Bases: object

Outcome of evaluating a model against the study-load allowlist.

Parameters:
allowed: bool
reason: str
scripts.ai_assistant.ui.model_policy.describe_allowlist()[source]

Human-readable summary for UI captions.

Return type:

str

scripts.ai_assistant.ui.model_policy.is_model_allowed_for_study_load(*, provider, model)[source]

Return whether provider/model may trigger a study load/reload.

Rules: :rtype: ModelGateResult

  • Ollama (local) is always allowed — the user controls the runtime.

  • Otherwise, the model must match one of the known family rules and meet the minimum version (floor comparison is tuple-wise).

  • Unknown models are rejected (fail-closed).

Parameters:
Return type:

ModelGateResult

LLM provider configuration and Ollama model helpers.

scripts.ai_assistant.ui.providers.preferred_or_installed_downgrade(preferred, installed)[source]

Resolve preferred against installed, downgrading when needed.

Returns the preferred tag when it (or its :latest equivalent) is installed. Otherwise walks QWEN3_DOWNGRADE_LADDER from the preferred size downward and returns the first tag present. Returns None when no qwen3 tag is installed — callers should treat that as “ask the operator” rather than silently picking a non-qwen3 tag.

Hardware reality on the current dev box: qwen3:8b OOMs at ~3 GiB free, so a user configuring qwen3:8b gets silently downgraded to qwen3:1.7b rather than an inference-time crash.

Return type:

str | None

Parameters:

Shell: CSS injection, JS bridge, topbar, sidebar.

scripts.ai_assistant.ui.shell.inject_css()[source]
Return type:

None

scripts.ai_assistant.ui.shell.install_bridge()[source]
Return type:

None

scripts.ai_assistant.ui.shell.sidebar()[source]
Return type:

None

scripts.ai_assistant.ui.shell.topbar()[source]
Return type:

None

Session-state bootstrap and conversation helpers for RePORT AI Portal chat UI.

scripts.ai_assistant.ui.state.get_meta(idx)[source]

Return (and lazily create) the meta dict for message at index idx.

Return type:

dict[str, Any]

Parameters:

idx (int)

scripts.ai_assistant.ui.state.init_state()[source]

Initialize session_state idempotently on every Streamlit rerun.

Return type:

None

LLM streaming, message rendering, and response formatting.

Setup wizard: LLM config, pipeline run, 3-step setup flow.

scripts.ai_assistant.ui.wizard.apply_llm_config(provider_label, api_key, model)[source]

Persist provider/model selection + stash the API key in the KeyStore.

The non-secret bits (LLM_PROVIDER, LLM_MODEL) still live in env vars + the config module so the rest of the app can read them at any time. The API key goes into the in-memory KeyStore only — never into os.environ. agent_graph._build_llm reads it from there at client-construction time and passes it as api_key= explicitly.

Return type:

None

Parameters:
  • provider_label (str)

  • api_key (str)

  • model (str)

scripts.ai_assistant.ui.wizard.ensure_llm_config()[source]

Re-apply non-secret LLM env vars on every Streamlit rerun.

The KeyStore is persisted in st.session_state so keys survive reruns automatically — this function only refreshes the non-secret LLM_PROVIDER / LLM_MODEL env vars + module attributes. If the user pasted a key on this rerun cycle it has already been routed through apply_llm_config() → KeyStore.

Return type:

None

scripts.ai_assistant.ui.wizard.inject_wizard_css()[source]

Hide sidebar and center the wizard column.

Return type:

None

scripts.ai_assistant.ui.wizard.render_setup_page()[source]

Render the 3-step setup wizard.

Return type:

None

scripts.ai_assistant.ui.wizard.run_pipeline()[source]

Run the data-extraction pipeline as a subprocess (the “Load Study” flow’s worker).

The pipeline’s PDF-extraction step needs ANTHROPIC_API_KEY / GOOGLE_API_KEY in its env to call vision APIs. Rather than leak those into the parent’s os.environ for the lifetime of the app, we inject them only into this single subprocess call via the KeyStore’s env_for_subprocess helper. The parent’s env stays clean before, during, and after the call.

The PDF orchestrator inside main.py always tries the LLM path first (when a capable provider is configured). If fresh PDF extraction cannot produce a complete result and a reviewed data/snapshots/{STUDY}/ baseline exists, the pipeline restores that baseline over the live trio_bundle/. “Use Existing Study” performs the same restore before chat starts.

Return type:

dict[str, Any]

scripts.ai_assistant.ui.wizard.use_existing_study()[source]

Restore the reviewed snapshot baseline before enabling chat.

Return type:

dict[str, Any]

Extraction Modules (continued)

Build Variables Reference

Build a unified variables.json from the available annotation sources.

Merges two active data sources into a single, canonical reference:

  1. Extraction variables (tmp/extracted_variables/*_variables.json, or trio_bundle/pdfs/*_variables.json when populated) — authoritative for description, coded_options, depends_on, condition, section, section_context, and form-level metadata (form_id, form_name, source_pdf, form_version, form_summary).

  2. Dictionary JSONL (trio_bundle/dictionary/tbl*/*.jsonl) — authoritative for data_type, core_status, and codelist references.

The is_phi / phi_reason / phi_type fields still ship in the output schema for backward compatibility but are always emitted as False / "" / "". PHI scrubbing lives in scripts.security.phi_scrub (Step 1.6 of the pipeline, 8-action catalog) and does not interact with this variables-reference builder — by the time this module reads the trio bundle, the artifacts are already PHI-free.

Output schema (v3, 23 fields per variable):

{
    "variable_name":            str,
    "form_id":                  str,
    "form_name":                str,
    "source_pdf":               str,
    "form_version":             str,
    "form_summary":             str,
    "section":                  str | None,
    "section_context":          str | None,
    "description":              str,
    "coded_options":            dict[str, str] | None,
    "depends_on":               str | None,
    "condition":                str | None,
    "data_type":                str,
    "core_status":              str,
    "is_phi":                   bool,       # always False (PHI scrubbing in phi_scrub.py)
    "phi_reason":               str,        # always ""
    "phi_type":                 "id" | "date" | None,  # always None
    "date_kind":                str | None,
    "anchor_rule":              str | None,
    "suggested_output_variable": str | None,
    "approved_for_transform":   bool | None,
    "date_group_by":            str | None,
    "deidentified_as":          list[str],
}

Usage:

uv run python main.py --build-variables
scripts.extraction.build_variables_reference.build_variables_reference(trio_bundle_dir, output_path, jurisdiction='IN', tmp_dir=None, *, pdf_extractions_dir=None, dictionary_dir=None)[source]

Build unified variables.json from all available annotation sources.

Parameters:
  • trio_bundle_dir (Path) – Root of the trio bundle.

  • output_path (Path) – Full path for the output variables.json.

  • jurisdiction (str) – Retained for backward compatibility (default "IN"); PHI classification has been retired, so this value is ignored.

  • tmp_dir (Path | None) – Optional path to the project tmp/ directory. When provided, tmp/extracted_variables/ is used as a fallback extraction source when the PDF extractions dir is empty.

  • pdf_extractions_dir (Path | None) – Explicit path for PDF extraction JSON files. When omitted, uses config.PDF_EXTRACTIONS_DIR then falls back to trio_bundle_dir / "pdfs".

  • dictionary_dir (Path | None) – Explicit path for dictionary mapping files. When omitted, uses config.DICTIONARY_JSON_OUTPUT_DIR then falls back to trio_bundle_dir / "dictionary".

Returns:

Summary statistics of the build.

Return type:

dict

Cleanup Propagation

Cleanup propagation — prune dictionary + PDF artifacts after dataset drops.

Runs against the staging workspace (tmp/{STUDY_NAME}/{datasets,dictionary,pdfs}/) after scripts.extraction.dataset_cleanup.clean_trio_datasets() completes.

Dictionary and PDF legs carry no PHI and therefore emit no audit report — the prune step is side-effect-only, keeping the dict and PDF schemas aligned with the surviving dataset schema so the LLM sees no dangling references. The dataset leg’s own audit (AUDIT_DATASET_REPORT_PATH) remains the single source of truth for what was removed.

Pruning rule

A variable V is pruned from the dictionary and PDF legs iff it was dropped from at least one dataset and never survives in any final surviving dataset JSONL schema. Variables dropped from one dataset but kept in another are NOT pruned.

Comparisons are case-folded; dataset provenance fields (source_file, _provenance, _metadata) are excluded from the surviving-vars set.

scripts.extraction.cleanup_propagation.compute_propagation_set(audit_path, datasets_dir)[source]

Return the case-folded set of variables that should propagate-prune.

Return type:

set[str]

Parameters:
  • audit_path (Path)

  • datasets_dir (Path)

Algorithm:
  1. Load audit_path (the dataset leg’s unified audit). Union all scope == "dataset-column" events’ name into dataset_dropped_vars (case-folded).

  2. Scan every datasets_dir/*.jsonl. Union all row keys (excluding PROVENANCE_FIELDS) into surviving_dataset_vars (case-folded).

  3. Return dataset_dropped_vars - surviving_dataset_vars.

Variables dropped from one dataset but kept in another → excluded from the returned set (they “survive” somewhere). Missing audit or empty datasets dir → empty set.

scripts.extraction.cleanup_propagation.prune_dictionary(drop_set, dict_dir)[source]

Walk dict_dir/**/*.jsonl and drop rows in drop_set.

Each row’s _DICT_VAR_KEY value is compared case-folded against drop_set (which callers pass pre-folded — see compute_propagation_set()). Matching rows are removed and the file is rewritten atomically.

Returns the total number of rows removed across all files (for logging). No audit artifact is written — the dictionary leg carries no PHI.

Return type:

int

Parameters:
  • drop_set (set[str])

  • dict_dir (Path)

scripts.extraction.cleanup_propagation.prune_pdfs(drop_set, pdf_dir)[source]

Walk pdf_dir/*_variables.json and drop matching entries.

For each JSON file: :rtype: int

  1. Remove keys from the top-level variables: dict whose key (case-folded) is in drop_set.

  2. For each section in sections: dict, remove matching entries from sections[name]["variables"]: list.

Modified files are rewritten atomically; unmodified files are left alone. Returns the total number of entries removed (vars + section refs). No audit artifact is written — the PDF leg carries no PHI.

Parameters:
  • drop_set (set[str])

  • pdf_dir (Path)

Return type:

int

scripts.extraction.cleanup_propagation.run_propagation()[source]

Orchestrate the propagation: compute drop set, prune dict + PDF legs.

All paths resolved from config.STAGING_* and config.AUDIT_* — never touches the promoted trio bundle directly. Dict + PDF legs emit no audit report (no PHI); only their prune counts are logged.

Return type:

None

Utility Modules (continued)

Errors

Structured error envelope for RePORT AI Portal.

A single RePORTError dataclass carries enough context (stage, operation, cause, path, hint, traceback) to diagnose failures without trawling logs. Pipeline legs, agent tools, and the UI all wrap raised exceptions through the wrap helper so callers get a uniform, JSON-serialisable envelope.

Public API

  • RePORTError — frozen dataclass with to_dict / to_json / human formatter.

  • wrap() — turn any BaseException into a RePORTError.

  • format_for_user() — short, operator-facing one-liner.

  • format_for_log() — verbose multi-line block (includes traceback).

class scripts.utils.errors.RePORTError(stage, operation, cause, message, path=None, hint=None, traceback=None, timestamp=<factory>)[source]

Bases: object

Structured failure envelope.

Parameters:
  • stage (str)

  • operation (str)

  • cause (str)

  • message (str)

  • path (str | None)

  • hint (str | None)

  • traceback (str | None)

  • timestamp (str)

stage

High-level phase (e.g., "pipeline.dataset", "agent.tool", "ui.load_study").

operation

Specific operation that failed (e.g., "query_dataset", "publish_staging").

cause

The exception class name (e.g., "FileNotFoundError").

message

Short human description (the first line of str(exc)).

path

Optional path the error relates to. Stored as a string.

hint

Optional operator-facing fix suggestion.

traceback

Optional multi-line traceback for logs. Not surfaced to end users.

timestamp

ISO-8601 UTC timestamp the envelope was created.

as_log_block()[source]

Verbose multi-line block suitable for logs.

Return type:

str

as_user_message()[source]

Short, single-line message safe to show end users.

Return type:

str

to_dict()[source]

Return a JSON-serialisable representation.

Return type:

dict[str, Any]

to_json()[source]

Serialise to a compact JSON string.

Return type:

str

scripts.utils.errors.format_for_log(err)[source]

Render a multi-line block including traceback for logs/audit.

Return type:

str

Parameters:

err (RePORTError)

scripts.utils.errors.format_for_user(err)[source]

Render a short operator-facing one-liner.

Return type:

str

Parameters:

err (RePORTError)

scripts.utils.errors.wrap(exc, *, stage, operation, path=None, hint=None, include_traceback=True)[source]

Wrap a raised exception as a RePORTError.

This is the single entry point other modules should use so the envelope stays consistent. The caller supplies stage and operation; the exception’s class and first message line are pulled automatically.

Return type:

RePORTError

Parameters:

Snapshot Manager

Human-reviewed snapshot baseline helpers.

The snapshot baseline is a full copy of output/{STUDY}/trio_bundle/ saved under data/snapshots/{STUDY}/ after human review. It is the operator-approved fallback for broken or incomplete live bundles.

The active operations are:

  • save the current live trio bundle as the reviewed snapshot baseline;

  • restore the reviewed snapshot baseline over the live trio bundle;

  • check whether a reviewed snapshot baseline exists.

exception scripts.utils.snapshots.SnapshotError[source]

Bases: RuntimeError

Raised when a snapshot operation cannot be completed.

scripts.utils.snapshots.create_snapshot(name=None, *, overwrite=False)[source]

Copy the live trio bundle into data/snapshots/{STUDY}/.

Return type:

Path

Parameters:
  • name (str | None)

  • overwrite (bool)

scripts.utils.snapshots.latest_snapshot_name()[source]

Return the study snapshot name, or None if no baseline exists.

Return type:

str | None

scripts.utils.snapshots.list_snapshots()[source]

Return the single reviewed snapshot name when it exists.

Return type:

list[str]

scripts.utils.snapshots.main(argv=None)[source]
Return type:

int

Parameters:

argv (list[str] | None)

scripts.utils.snapshots.resolve_snapshot_name(name)[source]

Compatibility shim: the only active snapshot name is the study name.

Return type:

str

Parameters:

name (str | None)

scripts.utils.snapshots.restore_snapshot(name=None)[source]

Overwrite the live trio bundle with the reviewed snapshot baseline.

Return type:

Path

Parameters:

name (str | None)

scripts.utils.snapshots.snapshot_exists()[source]

Return True when the reviewed snapshot baseline has usable content.

Return type:

bool

Step Cache

Pipeline step caching for fast incremental re-runs.

This module provides file-based caching so that each pipeline step can be skipped when its outputs already exist and its inputs have not changed since the last successful run.

How it works:

  1. Before running a step, is_step_fresh() checks for a manifest file (.<step_name>.manifest.json) stored inside the step’s output directory. The manifest records: - SHA-256 content hashes of every input file - Artifact version strings that were current when the step ran - A timestamp for human convenience

  2. If the manifest exists and every recorded input hash still matches the file on disk (and artifact versions haven’t changed), the step is considered fresh and can be skipped.

  3. After a step completes successfully, save_step_manifest() writes a new manifest capturing the current state of its inputs.

This gives deterministic, content-based cache invalidation with no need for external databases or lock files.

Design rules: - Pure-function hashing: only file contents matter, not timestamps. - Manifests are hidden dotfiles so they don’t pollute ls output. - --force in the CLI always bypasses the cache. - Missing output directories always mean “not fresh”. - If the manifest itself is corrupt or missing, the step runs.

scripts.utils.step_cache.MANIFEST_VERSION = '1.0.0'

Schema version of the manifest file itself.

scripts.utils.step_cache.hash_directory(directory, *, extensions=None)[source]

Compute per-file SHA-256 hashes for every relevant file in directory.

Files are discovered recursively. Hidden files, __pycache__ dirs, and .pyc files are excluded. If extensions is provided, only files whose suffix is in the set are included (e.g. {".xlsx", ".csv"}).

The returned dict maps relative_path hex_sha256. Keys are sorted so that the overall dict is deterministic regardless of filesystem walk order.

Parameters:
  • directory (Path) – Root directory to hash.

  • extensions (frozenset[str] | None) – Optional allowlist of file suffixes to include.

Return type:

dict[str, str]

Returns:

Sorted dict of {relative_path: sha256_hex}.

scripts.utils.step_cache.hash_file(path, *, chunk_size=65536)[source]

Return lowercase hex SHA-256 of path contents, streamed.

What. SHA-256 hex digest of the file at path. Why. Stable integrity anchor for NIST SP 800-188 §5.2; carried in every extracted record’s _provenance.raw_sha256 and in every lineage_manifest.json input/output entry. How. Open the path binary, read chunk_size bytes at a time, feed each chunk into hashlib.sha256. Works on arbitrarily large files without exhausting memory.

Return type:

str

Parameters:
  • path (Path)

  • chunk_size (int)

scripts.utils.step_cache.is_step_fresh(step_name, output_dir, current_input_hashes, *, artifact_versions=None, required_outputs=None)[source]

Check whether a pipeline step can be skipped.

A step is fresh when ALL of the following hold:

  1. The output directory exists.

  2. A valid manifest file exists inside it.

  3. Every input file hash in the manifest matches the current hash.

  4. No new input files have appeared that weren’t in the manifest.

  5. If artifact_versions is provided, every recorded version matches.

  6. If required_outputs is provided, each named file exists under output_dir.

Parameters:
  • step_name (str) – Pipeline step identifier.

  • output_dir (Path) – Directory where the step writes outputs.

  • current_input_hashes (dict[str, str]) – Live hashes of current input files.

  • artifact_versions (dict[str, str] | None) – If provided, must match recorded versions.

  • required_outputs (list[str] | None) – Optional list of filenames/globs that must exist under output_dir for the step to be considered complete.

Return type:

bool

Returns:

True if the step is fresh and can be safely skipped.

scripts.utils.step_cache.save_step_manifest(step_name, output_dir, input_hashes, *, artifact_versions=None, extra_metadata=None)[source]

Persist a cache manifest after a successful step run.

Parameters:
  • step_name (str) – Short identifier for the pipeline step (e.g. "dictionary").

  • output_dir (Path) – Directory where the step wrote its outputs.

  • input_hashes (dict[str, str]) – {relative_path: sha256} of every input file.

  • artifact_versions (dict[str, str] | None) – Optional artifact version strings to record.

  • extra_metadata (dict[str, Any] | None) – Optional extra data to store (e.g. counts, flags).

Return type:

Path

Returns:

Path to the written manifest file.

Artifact Version Registry

Canonical artifact-version registry for RePORT AI Portal.

This module is the single supported source of truth for generated artifact, schema, prompt, and API version identifiers in the single-study, privacy-first, local-first runtime.

Design rules: - Versions use semantic-version strings in MAJOR.MINOR.PATCH form. - Each key maps to one artifact contract that can require rebuilds. - The public registry exposed to callers is read-only. - Callers should read versions from here, not duplicate literals elsewhere.

exception scripts.artifact_versions.ArtifactVersionError[source]

Bases: ValueError

Raised when artifact-version keys or values are invalid.

scripts.artifact_versions.VERSIONS: Mapping[str, str] = mappingproxy({'clean_jsonl_schema': '1.0.0'})

Read-only artifact version map keyed by artifact contract name.

When a version here changes, previously generated artifacts for that contract should be rebuilt.

scripts.artifact_versions.get_version(name)[source]

Return the registered version for one artifact contract.

Return type:

str

Parameters:

name (str)

scripts.artifact_versions.snapshot_versions()[source]

Return a plain mutable snapshot of the current registry.

Return type:

dict[str, str]

scripts.artifact_versions.validate_versions(versions=None)[source]

Validate artifact version keys and values.

Parameters:

versions (Mapping[str, str] | None) – Mapping to validate. Defaults to the internal registry.

Raises:

ArtifactVersionError – If any key or version value is invalid.

Return type:

None

Sandbox Subprocess

The sandbox subpackage executes LLM-generated code in a fresh subprocess with OS-level rlimits and an in-child AST guard. See Sandbox: Subprocess-Isolated Code Execution for the conceptual overview.

Sandbox Public API

User-facing CLI: re-run a saved analysis .py file against the local trio bundle.

Saved code lives in output/{STUDY}/agent/analysis/code/run_*.py and gets a docstring header explaining how to replicate the run. This module is the replicate step from that header:

python -m scripts.ai_assistant.sandbox.replicate <path_to_saved.py>

Unlike the agent-side sandbox, this runs the code in the current Python process so the user can see output / interact with figures / write files to their working directory normally. The same AST guards still apply (import allow-list, dunder block) as a defense-in-depth check on code that was originally LLM-generated, even if the user has chosen to run it locally.

scripts.ai_assistant.sandbox.replicate.main(path_str)[source]
Return type:

int

Parameters:

path_str (str)

Sandbox Resource Limits

Cross-platform OS-level resource limits for the sandbox subprocess.

Honest about platform asymmetry:

  • Linux: RLIMIT_AS (memory), RLIMIT_NPROC (process count), RLIMIT_CPU (CPU time), and RLIMIT_NOFILE (file descriptors) all enforce reliably.

  • macOS: RLIMIT_CPU and RLIMIT_NOFILE work; RLIMIT_DATA is set on best-effort but not strictly honored; RLIMIT_AS and RLIMIT_NPROC are effectively no-ops on Darwin and we do not pretend otherwise.

The production deployment target is Linux. macOS is the developer environment; the dev-vs-prod gap is documented in docs/sphinx/developer_guide/sandbox.rst.

scripts.ai_assistant.sandbox.limits.make_preexec_fn(*, cpu_seconds, memory_mb, max_procs, max_files)[source]

Build a preexec_fn for subprocess.Popen that applies rlimits in the child process immediately before the new program is launched.

Returns None on Windows (where subprocess.Popen(preexec_fn=...) is not supported); the caller falls back to wall-clock-only protection there.

Return type:

Callable[[], None] | None

Parameters:
  • cpu_seconds (int)

  • memory_mb (int)

  • max_procs (int)

  • max_files (int)

Sandbox Child Runner

Sandbox child process: AST/runtime guards, code execution, figure & code persistence.

Invoked as a subprocess by scripts.ai_assistant.sandbox.__init__:

python -m scripts.ai_assistant.sandbox.runner <spec_path>

spec_path points to a JSON file with the execution spec (code, df_paths, output_dir, persist_code, max_output_bytes, max_figures). The runner writes its result manifest to {output_dir}/_sandbox_result.json and exits with a code summarising the outcome:

  • 0 — success

  • 1 — runtime error in user code (still emits a manifest with stderr)

  • 2 — pre-execution rejection (AST guard, blocked import, blocked builtin)

Stdout and stderr go through subprocess pipes; the parent reads them.

This file deliberately avoids importing the project’s config module so that the child’s read/write zones are only what the spec gives it — keeping the trust boundary explicit and decoupled from runtime config.

exception scripts.ai_assistant.sandbox.runner.SandboxRejectionError[source]

Bases: Exception

Code rejected by AST/runtime guards before or during execution.

scripts.ai_assistant.sandbox.runner.main(spec_path)[source]
Return type:

int

Parameters:

spec_path (str)

Extraction I/O Helpers

Clinical Date Parsing

Variable-aware date parsing for the Indo-VAP clinical dataset.

The Indo-VAP Excel sheets store dates in three distinct ways:

  1. Excel datetime cells — openpyxl / pandas parse these into Python datetime objects automatically. No ambiguity.

  2. Slash-delimited text strings — stored as plain text in the cell. The date order (month-first vs day-first) varies per variable:

    • Most variables use US-style M/D/YYYY or M/D/YY (e.g. "08/12/2014 12:27:48 PM", "7/28/14").

    • Six specific variables use Indian-style D/M/YYYY or D/M/YY (e.g. IC_VISDAT="28/05/2014", IT_IGRADAT="12/12/12").

    The canonical set of day-first variables is maintained in DMY_VARIABLES below.

  3. ISO datetime strings"2014-07-28" or "2014-07-28 00:00:00". Unambiguous; year-month-day order.

This module provides:

All functions are pure (no side effects) and safe to call from any module.

Generated by scanning all 44 raw Excel files (2026-03-25).

scripts.extraction.io.clinical_dates.DMY_VARIABLES: frozenset[str] = frozenset({'CBC_HBADAT', 'CC_VISDAT', 'FOA_VISDAT', 'FOB_VISDAT', 'IC_VISDAT', 'IT_IGRADAT'})

Variable names whose slash-date text strings use D/M/YYYY (day first, Indian).

All other slash-date variables default to M/D/YYYY (month first, US).

class scripts.extraction.io.clinical_dates.ParsedDate(dt, has_time, ampm, format, original)[source]

Bases: NamedTuple

Result of parsing a date string.

Parameters:
ampm: str | None

‘AM’ or ‘PM’ if present in the original, else None.

dt: datetime

The parsed datetime. Time component is always 00:00:00; the original string’s time is intentionally discarded for date-granularity anonymization (see has_time and ampm for source-layout metadata).

format: str

One of ‘iso’, ‘mdy’, ‘dmy’ indicating the detected source format.

has_time: bool

True if the original string included a time component.

original: str

The original (stripped) string that was parsed.

scripts.extraction.io.clinical_dates.is_dmy_variable(field_name)[source]

Return True if field_name is known to use D/M (day-first) date order.

Checks against the canonical DMY_VARIABLES frozenset defined in this module. The comparison is exact (case-sensitive) because the variable names come from Excel headers.

Return type:

bool

Parameters:

field_name (str)

scripts.extraction.io.clinical_dates.parse_date(value, *, field_name=None)[source]

Parse a date/datetime text string into a ParsedDate.

Parameters:
  • value (str) – The raw text string (e.g. "7/28/14", "28/05/2014", "2014-07-28 00:00:00").

  • field_name (str | None) – The column/variable name the value came from. Used to determine M/D vs D/M order for slash-delimited dates. If None, defaults to M/D (US-style).

Return type:

ParsedDate | None

Returns:

A ParsedDate on success, or None if the string cannot be parsed as a date.

Examples:

>>> parse_date("7/28/14")
ParsedDate(dt=datetime(2014, 7, 28), ..., format='mdy', ...)

>>> parse_date("28/05/2014", field_name="IC_VISDAT")
ParsedDate(dt=datetime(2014, 5, 28), ..., format='dmy', ...)

>>> parse_date("2014-07-28 00:00:00")
ParsedDate(dt=datetime(2014, 7, 28), ..., format='iso', ...)
scripts.extraction.io.clinical_dates.value_looks_like_date(value)[source]

Return True if value looks like a date/datetime string.

This is a quick check — it does NOT validate the date components.

Return type:

bool

Parameters:

value (str)

File Discovery

File-discovery helpers for the RePORT AI Portal extraction pipeline.

Provides a single discover_files function that scans a directory for files matching a set of extensions, skipping hidden files, OS junk, and Excel lock files. Returns a sorted, deterministic list of Path objects so repeated runs produce identical ordering.

All three extraction modules (dictionary, dataset, PDF) previously implemented this same logic inline. This module consolidates it into one tested, canonical helper.

scripts.extraction.io.file_discovery.DEFAULT_JUNK_FILENAMES: frozenset[str] = frozenset({'.DS_Store', 'Thumbs.db', '__MACOSX', 'desktop.ini'})

Filenames unconditionally skipped during discovery.

scripts.extraction.io.file_discovery.SUPPORTED_TABULAR_EXTENSIONS: tuple[str, ...] = ('.xlsx', '.csv')

File extensions recognised as tabular data sources.

scripts.extraction.io.file_discovery.discover_files(directory, *, extensions=None, junk=frozenset({'.DS_Store', 'Thumbs.db', '__MACOSX', 'desktop.ini'}), label='supported', not_found_label=None)[source]

Return a sorted list of non-hidden, non-junk files matching extensions.

Parameters:
  • directory (Path | str) – The directory to scan (non-recursive, immediate children only).

  • extensions (tuple[str, ...] | frozenset[str] | None) – Allowed lowercase extensions (e.g. (".xlsx", ".csv")). When None, all non-junk, non-hidden files are returned.

  • junk (frozenset[str]) – Set of filenames to unconditionally skip.

  • label (str) – Human-readable label used in the FileNotFoundError message (e.g. "Dataset", "Data dictionary").

  • not_found_label (str | None) – Label used in the ValueError when no matching files are found (e.g. "dictionary", "dataset"). Defaults to label.lower() when not supplied.

Return type:

list[Path]

Returns:

Sorted list of Path objects.

Raises:

File I/O Primitives

Canonical atomic file-write helpers for the RePORT AI Portal pipeline.

Every module that persists JSONL, JSON, or plain-text artifacts should use these helpers instead of rolling its own write-to-temp-then-rename dance. The strategy is:

  1. Write to a NamedTemporaryFile in the same directory as the final output (guaranteeing same-filesystem for the rename).

  2. On success, Path.replace() atomically swaps the temp file into place.

  3. On failure, the temp file is cleaned up in a finally block.

This eliminates the risk of half-written files after crashes and avoids the race condition inherent in using a predictable .tmp suffix.

Exported helpers

  • atomic_write_jsonl — write list[dict] as JSONL lines.

  • atomic_write_json — write a single dict as pretty-printed JSON.

  • atomic_write_dataframe_jsonl — write a pandas.DataFrame via DataFrame.to_json(orient="records", lines=True).

scripts.extraction.io.file_io.ATOMIC_WRITE_SUFFIX: str = '.tmp'

Temporary suffix used during atomic writes before final replace.

scripts.extraction.io.file_io.FILE_ENCODING: str = 'utf-8'

Default text encoding for all file operations.

scripts.extraction.io.file_io.JSONL_EXT: str = '.jsonl'

Canonical JSONL file extension.

scripts.extraction.io.file_io.NAMED_TEMP_PREFIX: str = 'report_ai_portal_'

Default prefix for NamedTemporaryFile instances.

scripts.extraction.io.file_io.atomic_write_dataframe_jsonl(output_path, df, *, prefix='report_ai_portal_')[source]

Write a pandas.DataFrame to JSONL atomically.

Uses DataFrame.to_json(orient="records", lines=True) for serialization. Import of pandas is deferred so modules that don’t use DataFrames avoid the import cost.

Parameters:
  • output_path (Path | str) – Final destination path.

  • df (DataFrame) – DataFrame to serialize.

  • prefix (str) – Prefix for the temporary file name.

Return type:

None

scripts.extraction.io.file_io.atomic_write_json(output_path, payload, *, ensure_ascii=False, indent=2, prefix='report_ai_portal_')[source]

Write a single JSON-serializable value atomically.

Parameters:
  • output_path (Path | str) – Final destination path.

  • payload (Any) – JSON-serializable value (dict, list, or scalar).

  • ensure_ascii (bool) – Passed to json.dump.

  • indent (int) – Indentation level for pretty-printing.

  • prefix (str) – Prefix for the temporary file name.

Return type:

None

scripts.extraction.io.file_io.atomic_write_jsonl(output_path, records, *, ensure_ascii=False, sort_keys=False, default=None, prefix='report_ai_portal_')[source]

Write an iterable of dicts as JSONL atomically.

Parameters:
  • output_path (Path | str) – Final destination path.

  • records (Iterable[dict[str, Any]]) – Iterable of JSON-serializable dicts, one per line.

  • ensure_ascii (bool) – Passed to json.dumps.

  • sort_keys (bool) – Passed to json.dumps.

  • default (Any) – Fallback serializer passed to json.dumps.

  • prefix (str) – Prefix for the temporary file name.

Return type:

None

JSONL Reader

Shared JSONL line-parsing helper for RePORT AI Portal.

This module provides the canonical line-level JSONL parser used across the pipeline: trio bundle and downstream processing. Centralizing this eliminates duplicate copies and provides a single place to fix JSON-parsing edge cases.

exception scripts.extraction.io.jsonl_reader.JSONLParseError[source]

Bases: ValueError

Raised when a JSONL line is malformed or not a JSON object.

scripts.extraction.io.jsonl_reader.load_json_object_line(line, *, source_path, line_number)[source]

Parse one JSONL line and require a top-level JSON object.

Parameters:
  • line (str) – Raw line text (should be stripped by caller).

  • source_path (Path) – File the line came from (for error context).

  • line_number (int) – 1-based line number (for error context).

Return type:

dict[str, Any]

Returns:

Parsed JSON object as a dict.

Raises:

JSONLParseError – If the line is not valid JSON or not a dict.

Doc-Freshness Linter

Doc-freshness linter for the RePORT AI Portal.

What. Compares live, source-of-truth values (tool count from ALL_TOOLS, repo version from __version__, action-class count from phi_scrub.yaml) against the prose in README and Sphinx user, IRB/auditor, and developer guides. Also rejects forbidden phrases that indicate retired architecture (vector DB / RAG / Presidio-as-active / “only zone the LLM agent reads” / stale tool counts / stale Make targets) or inaccessible link text (click here / read this article).

Why. Three rounds of freshness sweeps converged the docs to current state, but inline counts and architecture words drift the moment code changes. Doing this in CI means a future PR that adds a 13th tool — or removes one — fails the docs-quality-check stage with a precise pointer to the line(s) that need updating, instead of silently producing stale docs that the next reviewer has to discover from scratch.

How. Two passes:

  1. Live-value comparison — import ALL_TOOLS and __version__, parse phi_scrub.yaml for action classes, parse the current code-owned values. For each live value, look for forbidden patterns (“11 structured-data tools”, “12 callables”, etc.) that contradict it. Report contradictions.

  2. Forbidden-phrase scan — a curated list of patterns that should NEVER appear in any tracked doc (vector index claims, “only zone the LLM agent reads”, retired Make targets, etc.).

The linter exits non-zero on any finding, which fails CI. Each finding prints path:line: REASON so the dev sees exactly where to look. Disclaimers (“no chunking, no embedding”) are passed through allowlist patterns that match the canonical phrasing.

class scripts.lint_doc_freshness.Finding(path, line_no, line, reason)[source]

Bases: object

One drift instance: file, line, and the reason it’s stale.

Parameters:
  • path (Path)

  • line_no (int)

  • line (str)

  • reason (str)

line: str
line_no: int
path: Path
reason: str
render()[source]
Return type:

str

scripts.lint_doc_freshness.main()[source]

Run every check and return a process exit code (0 = clean, 1 = drift).

Return type:

int

Indices and Tables