"""Telemetry logger for RePORT AI Portal AI Assistant.
Captures agent events (tool calls, LLM invocations, hallucination detections,
feedback) to an append-only JSONL file. All free-text fields are scanned for
PHI patterns and masked before writing.
"""
from __future__ import annotations
import contextlib
import json
import logging
import os
import tempfile
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import config
from scripts.security.phi_patterns import BLOCKING_PATTERNS, WARN_PATTERNS
_HAS_LANGCHAIN = False
_AIMessage: type | None = None
try:
from langchain_core.callbacks.base import (
BaseCallbackHandler as _LCBaseHandler,
)
from langchain_core.messages import AIMessage as _LCAIMessage
_HAS_LANGCHAIN = True
_AIMessage = _LCAIMessage
except ImportError:
pass
__all__ = ["TelemetryLogger"]
logger = logging.getLogger(__name__)
# Single source of truth for PHI patterns is scripts.security.phi_patterns.
# Using the shared catalog keeps telemetry aligned with the query-time gate
# and log_hygiene (which log_hygiene.py itself flags as the correct path).
_TELEMETRY_MASK_PATTERNS = [pat for _, pat in BLOCKING_PATTERNS] + [pat for _, pat in WARN_PATTERNS]
def _mask_phi(text: str) -> str:
"""Replace potential PHI patterns in text with [REDACTED]."""
for pattern in _TELEMETRY_MASK_PATTERNS:
text = pattern.sub("[REDACTED]", text)
return text
def _append_event(event: dict[str, Any]) -> None:
"""Append a telemetry event to the JSONL sink using atomic writes."""
sink_path = Path(config.TELEMETRY_SINK)
sink_path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(event, default=str, ensure_ascii=False) + "\n"
# Atomic append: write to temp, fsync, then append
fd, tmp_path = tempfile.mkstemp(
prefix="telem_",
suffix=".tmp",
dir=sink_path.parent,
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(line)
fh.flush()
os.fsync(fh.fileno())
# Append to main file
with open(sink_path, "a", encoding="utf-8") as fh:
fh.write(line)
# Tighten file mode after every append. The first append creates the
# file with the process umask (typically 0o644 → world-readable);
# subsequent appends are no-ops on permissions but the chmod is
# idempotent so we keep it simple.
sink_path.chmod(0o600)
os.unlink(tmp_path)
except Exception:
# Cleanup temp on failure
with contextlib.suppress(OSError):
os.unlink(tmp_path)
raise
_TelemetryBase = _LCBaseHandler if _HAS_LANGCHAIN else object # type: ignore[misc]
[docs]
class TelemetryLogger(_TelemetryBase): # type: ignore[misc,valid-type]
"""LangChain callback handler for telemetry event capture."""
name: str = "report_ai_portal_telemetry"
[docs]
def on_llm_end(self, response: Any, **kwargs: Any) -> None:
"""Log LLM completion events with token usage."""
event: dict[str, Any] = {
"type": "llm_end",
"timestamp": datetime.now(tz=UTC).isoformat(),
}
# Extract token usage from AIMessage.usage_metadata
if _HAS_LANGCHAIN and hasattr(response, "generations"):
for gen_list in response.generations:
for gen in gen_list:
if (
_AIMessage is not None
and isinstance(gen.message, _AIMessage)
and gen.message.usage_metadata # type: ignore[attr-defined]
):
event["tokens"] = {
"input": gen.message.usage_metadata.get( # type: ignore[attr-defined]
"input_tokens", 0
),
"output": gen.message.usage_metadata.get( # type: ignore[attr-defined]
"output_tokens", 0
),
}
try:
_append_event(event)
except Exception:
logger.debug("Telemetry write failed", exc_info=True)
[docs]
def on_custom_event(
self,
name: str,
data: Any,
**kwargs: Any,
) -> None:
"""Log custom events (hallucination detection, follow-up, etc.)."""
event: dict[str, Any] = {
"type": f"custom:{name}",
"timestamp": datetime.now(tz=UTC).isoformat(),
}
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, str):
event[key] = _mask_phi(value[:500])
elif isinstance(value, (int, float, bool)) or value is None:
event[key] = value
else:
# Non-primitive payloads (exceptions, tracebacks, nested
# dicts, dataframe previews) are force-stringified + PHI-
# masked + length-capped so a caller that hands us a
# repr-heavy object cannot land raw PHI in the JSONL sink.
event[key] = _mask_phi(str(value)[:500])
try:
_append_event(event)
except Exception:
logger.debug("Telemetry write failed", exc_info=True)