Source code for scripts.security.kanon_gate

"""k-anonymity / small-cell suppression gate for agent-tool responses.

At the trio-bundle -> agent boundary, row-level queries can surface
equivalence classes (age-band x sex x district x outcome) with very
small sample sizes. A response returning one matched row with all
sensitive attributes visible defeats the whole scrub — the scrub
guarantees de-identification at rest, but k-anon defends against
re-identification at query time.

This module provides two utilities:

* :func:`kanon_check` — given a list of equivalence-class records and
  a *k* threshold, returns a :class:`KAnonResult` with ``blocked`` set
  when any class has fewer than *k* members.
* :func:`suppress_small_cells` — given aggregate counts, replaces any
  count < *k* with the string ``"<5"`` (or equivalent) so the agent
  surface never reveals an exact small-cell value.

IRB-grade benchmark anchor: Pillar 1.7 — k-anonymity ≥ 5 enforced on
quasi-identifier combos surfaced to the agent; l-diversity ≥ 2 is a
tracked design gap (see references.rst).
Reference: ICMR 2017 §11.7; NIST SP 800-188 §5.
"""

from __future__ import annotations

import logging
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import Any

logger = logging.getLogger(__name__)

__all__ = [
    "KAnonResult",
    "LDiversityResult",
    "kanon_check",
    "l_diversity_check",
    "mask_small_cell",
    "suppress_small_cells",
]


_DEFAULT_K = 5
_SUPPRESSED_LABEL = "<5"


[docs] @dataclass(frozen=True, slots=True) class KAnonResult: """Outcome of a k-anonymity check. ``blocked`` is ``True`` when at least one equivalence class is smaller than *k*. ``smallest_class_size`` reports the minimum class size observed (or 0 when no classes were supplied). ``violating_keys`` is a sorted tuple of equivalence-class keys whose size is below the threshold; each key is a string form of the quasi-identifier tuple, safe to log. """ blocked: bool smallest_class_size: int violating_keys: tuple[str, ...]
def _key_to_str(key: tuple[Any, ...]) -> str: return "|".join("" if v is None else str(v) for v in key)
[docs] def kanon_check( rows: Iterable[Mapping[str, Any]], *, quasi_identifiers: tuple[str, ...], k: int = _DEFAULT_K, ) -> KAnonResult: """Return a :class:`KAnonResult` for the given rows + quasi-identifiers. Does NOT mutate *rows*. Counts equivalence classes by the tuple of quasi-identifier values; any class with size < *k* marks the result as ``blocked``. An empty input returns ``blocked=False`` with zero class size — caller decides whether empty is permitted. """ if k < 1: raise ValueError(f"k must be >= 1, got {k}") if not quasi_identifiers: raise ValueError("quasi_identifiers must be non-empty") counts: dict[tuple[Any, ...], int] = {} for row in rows: key = tuple(row.get(col) for col in quasi_identifiers) counts[key] = counts.get(key, 0) + 1 if not counts: return KAnonResult(blocked=False, smallest_class_size=0, violating_keys=()) smallest = min(counts.values()) violating = sorted(_key_to_str(key) for key, size in counts.items() if size < k) blocked = smallest < k if blocked: logger.warning( "kanon_check: smallest class %d < k=%d (%d violating equivalence classes)", smallest, k, len(violating), ) return KAnonResult( blocked=blocked, smallest_class_size=smallest, violating_keys=tuple(violating), )
[docs] @dataclass(frozen=True, slots=True) class LDiversityResult: """Outcome of an l-diversity check. A row set passes l-diversity (l ≥ 2) when every equivalence class (defined by the quasi-identifier tuple) contains at least *l* distinct values for each sensitive attribute. l = 2 is the smallest meaningful threshold; higher values resist homogeneity attacks more strongly. ``blocked`` is ``True`` when at least one (class, sensitive_attr) pair has fewer than *l* distinct values. ``violating_classes`` enumerates which equivalence classes failed and on which attribute. """ blocked: bool smallest_diversity: int violating_classes: tuple[tuple[str, str], ...] """Tuples of ``(equivalence_class_key, sensitive_attribute_name)`` whose distinct-value count fell below *l*."""
[docs] def l_diversity_check( rows: Iterable[Mapping[str, Any]], *, quasi_identifiers: tuple[str, ...], sensitive_attributes: tuple[str, ...], l_threshold: int = 2, ) -> LDiversityResult: """Verify that every equivalence class has ≥ ``l_threshold`` distinct values for every sensitive attribute. Use AFTER :func:`kanon_check` — k-anonymity ensures classes are large enough; l-diversity ensures they aren't homogeneous on the outcomes that matter (e.g., all 5+ subjects in a class share ``outcome=DIED``). Empty input returns ``blocked=False``. Raises ``ValueError`` if either tuple is empty or ``l_threshold < 1``. """ if l_threshold < 1: raise ValueError(f"l_threshold must be >= 1, got {l_threshold}") if not quasi_identifiers: raise ValueError("quasi_identifiers must be non-empty") if not sensitive_attributes: raise ValueError("sensitive_attributes must be non-empty") classes: dict[tuple[Any, ...], dict[str, set[Any]]] = {} for row in rows: key = tuple(row.get(col) for col in quasi_identifiers) bucket = classes.setdefault(key, {attr: set() for attr in sensitive_attributes}) for attr in sensitive_attributes: bucket[attr].add(row.get(attr)) if not classes: return LDiversityResult(blocked=False, smallest_diversity=0, violating_classes=()) smallest = l_threshold violations: list[tuple[str, str]] = [] for key, bucket in classes.items(): for attr, values in bucket.items(): div = len(values) if div < smallest: smallest = div if div < l_threshold: violations.append((_key_to_str(key), attr)) blocked = bool(violations) if blocked: logger.warning( "l_diversity_check: smallest diversity %d < l=%d (%d violating " "(class, attribute) pairs)", smallest, l_threshold, len(violations), ) return LDiversityResult( blocked=blocked, smallest_diversity=smallest, violating_classes=tuple(sorted(violations)), )
[docs] def mask_small_cell(count: int, *, k: int = _DEFAULT_K, label: str = _SUPPRESSED_LABEL) -> Any: """Return *count* if ``count >= k``, else *label* (default ``"<5"``). Pair with :func:`suppress_small_cells` when aggregating cross- tabulations for the agent surface. """ if count >= k: return count return label
[docs] def suppress_small_cells( counts: Mapping[Any, int], *, k: int = _DEFAULT_K, label: str = _SUPPRESSED_LABEL, ) -> dict[Any, Any]: """Return a new dict where values < *k* are replaced with *label*. Leaves keys untouched. Intended for cross-tab / frequency counts that a tool is about to return to the LLM. """ return {key: mask_small_cell(val, k=k, label=label) for key, val in counts.items()}