Extending RePORTaLiN

For Developers: Customizing and Extending the Pipeline

This guide explains how to extend and customize RePORTaLiN’s functionality through its modular architecture, public APIs, and extension points.

Changed in version 0.3.0: Added configuration module utilities (ensure_directories(), validate_config()). See Working with Configuration Module for new features.

Changed in version 0.3.0: Logging module enhanced with better type hints, optimized performance, and explicit public API.

Changed in version 0.3.0: Data extraction module enhanced with explicit public API (6 exports), comprehensive usage examples, and verified type safety. See Working with Data Extraction Module for public API details.

Changed in version 0.3.0: Data dictionary module enhanced with explicit public API (2 exports), comprehensive usage examples, and algorithm documentation. See Working with Data Dictionary Module for public API details.

Working with Data Dictionary Module

Added in version 0.3.0.

The scripts/load_dictionary.py module provides a well-defined public API for processing data dictionary Excel files.

Using the Public API

The module exports 2 functions via __all__:

  1. load_study_dictionary - High-level function using config defaults

  2. process_excel_file - Low-level function for custom workflows

Best Practice: Use the public API

# Recommended: Use public API
from scripts.load_dictionary import (
    load_study_dictionary,
    process_excel_file
)

# High-level usage with config defaults
success = load_study_dictionary()

# Custom file processing
success = process_excel_file(
    excel_path="data/custom_dictionary.xlsx",
    output_dir="results/custom_output",
    preserve_na=True
)

Extending with Custom Processing

from scripts.load_dictionary import process_excel_file
import pandas as pd

def custom_dictionary_processor(
    excel_path: str,
    output_dir: str,
    custom_validation: callable = None
) -> bool:
    """Process dictionary with custom validation."""

    # Process with standard function
    success = process_excel_file(excel_path, output_dir)

    if success and custom_validation:
        # Apply custom post-processing
        custom_validation(output_dir)

    return success

# Use custom processor
def validate_output(output_dir: str):
    """Custom validation logic."""
    print(f"Validating output in {output_dir}")
    # Add your validation logic here

custom_dictionary_processor(
    "data/dictionary.xlsx",
    "results/output",
    validate_output
)

Understanding Multi-Table Detection

The module’s table detection algorithm:

  1. Identifies horizontal strips (separated by empty rows)

  2. Within each strip, identifies vertical sections (separated by empty columns)

  3. Extracts each non-empty section as a separate table

  4. Deduplicates column names by appending numeric suffixes

  5. Checks for “ignore below” markers and segregates subsequent tables

  6. Adds metadata fields (__sheet__, __table__)

  7. Saves to JSONL with proper directory structure

Type Safety Benefits

The module has return type hints on all functions:

  • All functions have return type annotations

  • IDEs provide better autocomplete and error detection

  • Static analysis tools can verify return types

  • Documentation is clear about expected outputs

See scripts.load_dictionary module for complete API reference.

Working with Data Extraction Module

Added in version 0.3.0.

The scripts/extract_data.py module provides a well-defined public API for Excel to JSONL conversion.

Using the Public API

The module exports 6 functions via __all__:

  1. extract_excel_to_jsonl - Batch process all Excel files

  2. process_excel_file - Process a single Excel file

  3. find_excel_files - Find Excel files in a directory

  4. convert_dataframe_to_jsonl - Convert DataFrame to JSONL

  5. clean_record_for_json - Clean record for JSON serialization

  6. clean_duplicate_columns - Remove duplicate columns

Best Practice: Use the public API

# Recommended: Use public API
from scripts.extract_data import (
    extract_excel_to_jsonl,
    process_excel_file,
    find_excel_files
)

# Batch processing
extract_excel_to_jsonl(
    input_dir="data/dataset/Indo-vap",
    output_dir="results/dataset/Indo-vap"
)

# Single file processing
result = process_excel_file(
    "data/file.xlsx",
    "results/output"
)
print(f"Processed {result['records']} records")

Extending with Custom Conversions

import pandas as pd
from scripts.extract_data import (
    clean_record_for_json,
    convert_dataframe_to_jsonl
)

def custom_dataframe_processor(df: pd.DataFrame) -> pd.DataFrame:
    """Apply custom transformations before conversion."""
    # Custom logic here
    df = df.dropna(subset=['required_column'])
    df['new_column'] = df['old_column'] * 2
    return df

# Use with standard conversion
df = pd.read_excel("input.xlsx")
df = custom_dataframe_processor(df)
convert_dataframe_to_jsonl(df, "output.jsonl", "input.xlsx")

Type Safety Benefits

The module has complete type hint coverage:

  • All functions have parameter and return type annotations

  • IDEs provide better autocomplete and error detection

  • Static analysis tools (mypy, pyright) can verify correctness

  • Documentation is clear about expected inputs/outputs

See scripts.extract_data module for complete API reference.

Working with Configuration Module

Added in version 0.3.0.

The enhanced config.py module provides utilities for robust configuration management.

Using Configuration Utilities

Best Practice: Validate at startup

# main.py or your script

from config import validate_config, ensure_directories
import logging

def main():
    # Validate configuration first
    warnings = validate_config()
    if warnings:
        logging.warning("Configuration issues detected:")
        for warning in warnings:
            logging.warning(f"  {warning}")

    # Ensure directories exist
    ensure_directories()

    # Continue with your pipeline...

Adding Custom Configuration Validation

# custom_validator.py

from typing import List
from config import validate_config
import os

def validate_custom_config() -> List[str]:
    """Extend configuration validation with custom checks."""
    warnings = validate_config()  # Get base warnings

    # Add custom checks
    custom_paths = [
        "/path/to/custom/resource",
        "/path/to/another/file"
    ]

    for path in custom_paths:
        if not os.path.exists(path):
            warnings.append(f"Custom resource not found: {path}")

    return warnings

Using Constants in Extensions

from config import DEFAULT_DATASET_NAME, DATASET_SUFFIXES

def process_dataset(folder_name: str = None):
    """Process a dataset with fallback to default."""
    name = folder_name or DEFAULT_DATASET_NAME
    print(f"Processing dataset: {name}")

# Check if folder has recognized suffix
def has_dataset_suffix(folder_name: str) -> bool:
    """Check if folder name has a dataset suffix."""
    return any(folder_name.endswith(suffix) for suffix in DATASET_SUFFIXES)

Adding New Output Formats

Example: Adding CSV Export

  1. Create the conversion function:

# scripts/extract_data.py

def convert_dataframe_to_csv(
    df: pd.DataFrame,
    output_file: str,
    **kwargs
) -> None:
    """
    Convert DataFrame to CSV format.

    Args:
        df: DataFrame to convert
        output_file: Path to output CSV file
        **kwargs: Additional arguments for to_csv()
    """
    df.to_csv(output_file, index=False, **kwargs)
  1. Add command-line option:

# main.py

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--format',
        choices=['jsonl', 'csv', 'parquet'],
        default='jsonl',
        help='Output format'
    )
    args = parser.parse_args()

    # Use format in extraction
    if args.format == 'csv':
        extract_excel_to_csv(...)
    elif args.format == 'jsonl':
        extract_excel_to_jsonl(...)
  1. Update documentation:

Add usage examples and update user guide.

Adding Data Transformations

Example: Adding Data Validation

# scripts/validators.py

from typing import List, Dict
import pandas as pd
from scripts.utils import logging as log

class DataValidator:
    """Validate data against rules."""

    def __init__(self, rules: Dict[str, any]):
        """
        Initialize validator with rules.

        Args:
            rules: Dictionary of validation rules
        """
        self.rules = rules

    def validate_dataframe(self, df: pd.DataFrame) -> List[str]:
        """
        Validate DataFrame against rules.

        Args:
            df: DataFrame to validate

        Returns:
            List of validation errors
        """
        errors = []

        # Check required columns
        if 'required_columns' in self.rules:
            missing = set(self.rules['required_columns']) - set(df.columns)
            if missing:
                errors.append(f"Missing columns: {missing}")

        # Check data types
        if 'column_types' in self.rules:
            for col, dtype in self.rules['column_types'].items():
                if col in df.columns:
                    if not pd.api.types.is_dtype_equal(df[col].dtype, dtype):
                        errors.append(
                            f"Column {col} has wrong type: "
                            f"{df[col].dtype} (expected {dtype})"
                        )

        # Check value ranges
        if 'value_ranges' in self.rules:
            for col, (min_val, max_val) in self.rules['value_ranges'].items():
                if col in df.columns:
                    if df[col].min() < min_val or df[col].max() > max_val:
                        errors.append(
                            f"Column {col} has values outside range "
                            f"[{min_val}, {max_val}]"
                        )

        return errors

Usage:

# In extract_data.py
from scripts.validators import DataValidator

def process_excel_file_with_validation(input_file, output_dir, rules):
    """Process file with validation."""
    df = pd.read_excel(input_file)

    # Validate
    validator = DataValidator(rules)
    errors = validator.validate_dataframe(df)

    if errors:
        log.warning(f"Validation errors in {input_file}:")
        for error in errors:
            log.warning(f"  - {error}")

    # Continue with extraction
    convert_dataframe_to_jsonl(df, output_file, input_file)

Adding Custom Logging

Changed in version 0.3.0: Logging module enhanced with better type hints, optimized performance, and explicit public API.

Understanding the Logging Module

The scripts.utils.logging module provides a robust logging infrastructure with:

  • Thread-safe: No shared mutable state

  • Optimized: No unnecessary record copying

  • Type-safe: Comprehensive type hints throughout

  • Well-defined API: Explicit __all__ declaration

Public API:

from scripts.utils.logging import (
    # Setup functions (3)
    setup_logger,      # Initialize logging system
    get_logger,        # Get logger instance
    get_log_file_path, # Get current log file path

    # Logging functions (6)
    debug,             # Log debug messages
    info,              # Log info messages
    warning,           # Log warnings
    error,             # Log errors
    critical,          # Log critical errors
    success,           # Log success messages (custom level)

    # Constants (1)
    SUCCESS,           # SUCCESS level constant (25)
)

Best Practices for Extensions

  1. Use the public API only:

    # Good: Use public API
    from scripts.utils.logging import info, success, error
    
    info("Processing data")
    success("Processing complete")
    
    # Avoid: Don't access private internals
    from scripts.utils.logging import _logger  # Don't do this
    
  2. Don’t mutate log records:

    # Good: Create custom formatter without mutation
    class MyFormatter(logging.Formatter):
        def format(self, record: logging.LogRecord) -> str:
            # Don't modify record; work with formatted string
            formatted = super().format(record)
            return f"[CUSTOM] {formatted}"
    
    # Bad: Mutating record (not thread-safe)
    class BadFormatter(logging.Formatter):
        def format(self, record: logging.LogRecord) -> str:
            record.msg = f"[CUSTOM] {record.msg}"  # Don't mutate!
            return super().format(record)
    
  3. Use proper exception handling:

    from scripts.utils.logging import error, info
    
    try:
        risky_operation()
        info("Operation completed")
    except ValueError as e:
        error(f"Invalid value: {e}", exc_info=True)
    except Exception as e:
        error(f"Unexpected error: {e}", exc_info=True)
        raise
    

Example: Adding Email Notifications

# scripts/utils/notifications.py

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging

class EmailHandler(logging.Handler):
    """Send log messages via email."""

    def __init__(
        self,
        smtp_server: str,
        from_addr: str,
        to_addrs: list,
        subject: str = "RePORTaLiN Log"
    ):
        """
        Initialize email handler.

        Args:
            smtp_server: SMTP server address
            from_addr: Sender email address
            to_addrs: List of recipient addresses
            subject: Email subject line
        """
        super().__init__()
        self.smtp_server = smtp_server
        self.from_addr = from_addr
        self.to_addrs = to_addrs
        self.subject = subject

    def emit(self, record):
        """Send log record via email."""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.from_addr
            msg['To'] = ', '.join(self.to_addrs)
            msg['Subject'] = f"{self.subject} - {record.levelname}"

            body = self.format(record)
            msg.attach(MIMEText(body, 'plain'))

            server = smtplib.SMTP(self.smtp_server)
            server.send_message(msg)
            server.quit()
        except Exception as e:
            # Don't let email failure crash the app
            print(f"Failed to send email: {e}")

Usage:

# In logging.py or main.py
from scripts.utils.notifications import EmailHandler

# Add email handler for errors
email_handler = EmailHandler(
    smtp_server='smtp.example.com',
    from_addr='reportalin@example.com',
    to_addrs=['admin@example.com'],
    subject='RePORTaLiN Error'
)
email_handler.setLevel(logging.ERROR)
logger.addHandler(email_handler)

Adding Database Support

Example: PostgreSQL Output

# scripts/database.py

import pandas as pd
from sqlalchemy import create_engine
from typing import Optional
from scripts.utils import logging as log

class DatabaseExporter:
    """Export data to database."""

    def __init__(self, connection_string: str):
        """
        Initialize database connection.

        Args:
            connection_string: SQLAlchemy connection string
        """
        self.engine = create_engine(connection_string)

    def export_dataframe(
        self,
        df: pd.DataFrame,
        table_name: str,
        if_exists: str = 'append'
    ) -> int:
        """
        Export DataFrame to database table.

        Args:
            df: DataFrame to export
            table_name: Target table name
            if_exists: What to do if table exists ('append', 'replace', 'fail')

        Returns:
            Number of rows exported
        """
        try:
            df.to_sql(
                table_name,
                self.engine,
                if_exists=if_exists,
                index=False
            )
            log.success(f"Exported {len(df)} rows to {table_name}")
            return len(df)
        except Exception as e:
            log.error(f"Failed to export to database: {e}")
            raise

    def close(self):
        """Close database connection."""
        self.engine.dispose()

Usage:

# In extract_data.py
from scripts.database import DatabaseExporter

def extract_to_database(input_dir, connection_string):
    """Extract data directly to database."""
    db = DatabaseExporter(connection_string)

    for excel_file in find_excel_files(input_dir):
        df = pd.read_excel(excel_file)
        table_name = Path(excel_file).stem
        db.export_dataframe(df, table_name)

    db.close()

Adding Parallel Processing

Example: Process Files in Parallel

# scripts/parallel.py

from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List, Callable
from pathlib import Path
from tqdm import tqdm
from scripts.utils import logging as log

def process_files_parallel(
    files: List[Path],
    process_func: Callable,
    max_workers: int = 4,
    **kwargs
) -> List[dict]:
    """
    Process files in parallel.

    Args:
        files: List of files to process
        process_func: Function to apply to each file
        max_workers: Maximum number of parallel workers
        **kwargs: Additional arguments for process_func

    Returns:
        List of results from processing each file
    """
    results = []

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_file = {
            executor.submit(process_func, file, **kwargs): file
            for file in files
        }

        # Process completed tasks
        with tqdm(total=len(files), desc="Processing files") as pbar:
            for future in as_completed(future_to_file):
                file = future_to_file[future]
                try:
                    result = future.result()
                    results.append(result)
                    log.info(f"Completed {file}")
                except Exception as e:
                    log.error(f"Failed to process {file}: {e}")
                finally:
                    pbar.update(1)

    return results

Usage:

# In extract_data.py
from scripts.parallel import process_files_parallel

def extract_excel_to_jsonl_parallel(input_dir, output_dir, max_workers=4):
    """Extract files in parallel."""
    files = find_excel_files(input_dir)

    results = process_files_parallel(
        files,
        process_excel_file,
        max_workers=max_workers,
        output_dir=output_dir
    )

    total_records = sum(r.get('records', 0) for r in results)
    log.success(f"Processed {len(results)} files, {total_records} records")

Adding Custom Table Detection

Example: Custom Split Logic

# scripts/custom_split.py

import pandas as pd
from typing import List, Tuple

class CustomTableSplitter:
    """Custom table splitting logic."""

    def split_by_header_rows(
        self,
        df: pd.DataFrame,
        header_pattern: str
    ) -> List[pd.DataFrame]:
        """
        Split DataFrame at rows matching header pattern.

        Args:
            df: DataFrame to split
            header_pattern: Pattern to identify header rows

        Returns:
            List of DataFrames split at header rows
        """
        tables = []
        current_table = []

        for idx, row in df.iterrows():
            # Check if row matches header pattern
            if any(header_pattern in str(val) for val in row):
                if current_table:
                    # Save previous table
                    tables.append(pd.DataFrame(current_table))
                    current_table = []
                # Start new table with this row as header
                current_table = [row]
            else:
                current_table.append(row)

        # Add last table
        if current_table:
            tables.append(pd.DataFrame(current_table))

        return tables

Adding Plugin System

Example: Plugin Architecture

# scripts/plugins.py

from abc import ABC, abstractmethod
from typing import Dict, List
import importlib
import os

class ProcessorPlugin(ABC):
    """Base class for processor plugins."""

    @abstractmethod
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process DataFrame.

        Args:
            df: Input DataFrame

        Returns:
            Processed DataFrame
        """
        pass

class PluginManager:
    """Manage and load plugins."""

    def __init__(self, plugin_dir: str = "plugins"):
        """
        Initialize plugin manager.

        Args:
            plugin_dir: Directory containing plugins
        """
        self.plugin_dir = plugin_dir
        self.plugins: Dict[str, ProcessorPlugin] = {}

    def load_plugins(self):
        """Load all plugins from plugin directory."""
        if not os.path.exists(self.plugin_dir):
            return

        for file in os.listdir(self.plugin_dir):
            if file.endswith('.py') and not file.startswith('_'):
                module_name = file[:-3]
                try:
                    module = importlib.import_module(
                        f"{self.plugin_dir}.{module_name}"
                    )
                    # Look for Plugin class
                    if hasattr(module, 'Plugin'):
                        plugin = module.Plugin()
                        self.plugins[module_name] = plugin
                except Exception as e:
                    print(f"Failed to load plugin {module_name}: {e}")

    def apply_plugins(
        self,
        df: pd.DataFrame,
        plugin_names: List[str] = None
    ) -> pd.DataFrame:
        """
        Apply plugins to DataFrame.

        Args:
            df: DataFrame to process
            plugin_names: List of plugin names to apply (None = all)

        Returns:
            Processed DataFrame
        """
        if plugin_names is None:
            plugin_names = self.plugins.keys()

        for name in plugin_names:
            if name in self.plugins:
                df = self.plugins[name].process(df)

        return df

Example Plugin:

# plugins/normalize_names.py

import pandas as pd
from scripts.plugins import ProcessorPlugin

class Plugin(ProcessorPlugin):
    """Normalize column names."""

    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """Normalize column names to lowercase with underscores."""
        df.columns = [
            col.lower().replace(' ', '_')
            for col in df.columns
        ]
        return df

Usage:

from scripts.plugins import PluginManager

# Load and apply plugins
manager = PluginManager()
manager.load_plugins()

df = pd.read_excel('data.xlsx')
df = manager.apply_plugins(df, ['normalize_names'])

Configuration File Support

Example: YAML Configuration

# scripts/config_loader.py

import yaml
from pathlib import Path
from typing import Dict, Any

class ConfigLoader:
    """Load configuration from YAML file."""

    def __init__(self, config_file: str = "config.yaml"):
        """
        Initialize config loader.

        Args:
            config_file: Path to configuration file
        """
        self.config_file = Path(config_file)
        self.config: Dict[str, Any] = {}

    def load(self) -> Dict[str, Any]:
        """
        Load configuration from file.

        Returns:
            Configuration dictionary
        """
        if self.config_file.exists():
            with open(self.config_file, 'r') as f:
                self.config = yaml.safe_load(f)
        return self.config

    def get(self, key: str, default: Any = None) -> Any:
        """
        Get configuration value.

        Args:
            key: Configuration key (supports dot notation)
            default: Default value if key not found

        Returns:
            Configuration value
        """
        keys = key.split('.')
        value = self.config

        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default

        return value

Example config.yaml:

# config.yaml

pipeline:
  input_dir: data/dataset/Indo-vap
  output_dir: results/dataset/Indo-vap

processing:
  parallel: true
  max_workers: 4

validation:
  enabled: true
  rules:
    required_columns:
      - id
      - date
    column_types:
      id: int64
      date: datetime64

logging:
  level: INFO
  file: .logs/reportalin.log

Adding New Country Regulations

RePORTaLiN supports country-specific data privacy regulations for de-identification. You can add support for new countries by extending the country_regulations module.

Example: Adding a New Country

  1. Define the regulation function:

# scripts/utils/country_regulations.py

def get_new_country_regulation() -> CountryRegulation:
    """New Country - Data Protection Act."""
    return CountryRegulation(
        country_code="XX",  # ISO 3166-1 alpha-2 code
        country_name="New Country",
        regulation_name="Data Protection Act",
        regulation_acronym="DPA",
        common_fields=get_common_fields(),
        specific_fields=[
            DataField(
                name="national_id",
                display_name="National ID Number",
                field_type=DataFieldType.IDENTIFIER,
                privacy_level=PrivacyLevel.CRITICAL,
                required=False,
                pattern=r'^\d{10}$',  # Regex pattern
                description="10-digit National ID",
                examples=["1234567890"],
                country_specific=True
            ),
            DataField(
                name="health_card",
                display_name="Health Insurance Card",
                field_type=DataFieldType.MEDICAL,
                privacy_level=PrivacyLevel.CRITICAL,
                required=False,
                pattern=r'^HC-\d{8}$',
                description="Health card number",
                examples=["HC-12345678"],
                country_specific=True
            ),
        ],
        description="Brief description of the regulation",
        requirements=[
            "Key requirement 1",
            "Key requirement 2",
            "Data protection impact assessment required",
            "Breach notification within X hours",
        ]
    )
  1. Register the country in the registry:

# In CountryRegulationManager class
_REGISTRY: Dict[str, callable] = {
    "US": get_us_regulation,
    "IN": get_india_regulation,
    # ... existing countries ...
    "XX": get_new_country_regulation,  # Add your country
}
  1. Test the implementation: }

  1. Update documentation:

Add the new country to:
  • docs/sphinx/user_guide/country_regulations.rst

  • README.md

  • CLI help text in scripts/deidentify.py

Field Types and Privacy Levels

When defining country-specific fields, use appropriate types:

DataFieldType Options:
  • PERSONAL_NAME: First/last/middle names

  • IDENTIFIER: National IDs, SSN, etc.

  • CONTACT: Phone, email, address

  • DEMOGRAPHIC: Age, gender, ethnicity

  • LOCATION: City, state, postal code

  • MEDICAL: Health card, MRN, insurance

  • FINANCIAL: Tax IDs, bank accounts

  • BIOMETRIC: Fingerprints, facial data

  • CUSTOM: Other sensitive data

PrivacyLevel Options (1-5):
  • PUBLIC: Publicly available information

  • LOW: Low sensitivity (e.g., gender)

  • MEDIUM: Medium sensitivity (e.g., city)

  • HIGH: High sensitivity PII (e.g., phone)

  • CRITICAL: Critical sensitive PII (e.g., SSN, health data)

Regex Pattern Guidelines

When defining detection patterns:

  1. Be Specific: Avoid overly broad patterns that might cause false positives.

  2. Use Anchors: Use ^ and $ to match entire strings:

    pattern=r'^\d{3}-\d{2}-\d{4}$'  # US SSN
    pattern=r'^\d{12}$'              # Indian Aadhaar (without spaces)
    
  3. Handle Variations: Account for different formats:

    # With or without separators
    pattern=r'^\d{3}-\d{2}-\d{4}$|^\d{9}$'
    
    # With or without spaces
    pattern=r'^\d{4}\s?\d{4}\s?\d{4}$'
    
  4. Use Character Classes: Use \d for digits, [A-Z] for uppercase letters:

    pattern=r'^[A-Z]{2}\d{6}[A-D]$'  # UK National Insurance
    
  5. Test Thoroughly: Test patterns with real and synthetic data:

    # Test the pattern
    import re
    pattern = re.compile(r'^\d{3}-\d{2}-\d{4}$')
    assert pattern.match("123-45-6789")
    assert not pattern.match("123456789")
    

Testing Your Country Regulation

  1. Unit Test:

# test_country_regulations.py

def test_new_country_regulation():
    """Test new country regulation."""
    manager = CountryRegulationManager(countries=["XX"])

    # Verify it loads
    assert "XX" in manager.regulations

    # Verify fields
    reg = manager.regulations["XX"]
    assert len(reg.specific_fields) > 0

    # Test detection patterns
    patterns = manager.get_detection_patterns()
    assert "national_id" in patterns
  1. Integration Test:

def test_deidentification_with_new_country():
    """Test de-identification with new country."""
    config = DeidentificationConfig(
        countries=["XX"],
        enable_country_patterns=True,
        enable_encryption=False
    )

    engine = DeidentificationEngine(config=config)

    text = "Patient ID: 1234567890, Health Card: HC-12345678"
    deidentified = engine.deidentify_text(text)

    # Verify identifiers are removed
    assert "1234567890" not in deidentified
    assert "HC-12345678" not in deidentified
  1. Manual Testing:

# Test with command line
python3 -m scripts.utils.country_regulations --countries XX --show-fields

# Test de-identification with sample text
python3 -c "from scripts.deidentify import DeidentificationEngine, DeidentificationConfig; \
config = DeidentificationConfig(countries=['XX']); \
engine = DeidentificationEngine(config=config); \
print(engine.deidentify_text('Patient John Doe, ID: 1234567890'))"

Common Pitfalls

  1. Overlapping Patterns: Ensure patterns don’t conflict with other countries.

  2. Locale-Specific Formats: Account for different date/number formats.

  3. Special Characters: Properly escape regex special characters.

  4. Performance: Avoid extremely complex regex patterns that slow processing.

  5. False Positives: Test with diverse data to minimize false detections.

Regulatory Compliance Considerations

When adding a new country:

  1. Research the Regulation: Thoroughly understand the legal requirements.

  2. Consult Legal Experts: Ensure your implementation meets legal standards.

  3. Document Requirements: List all key requirements in the regulation object.

  4. Stay Updated: Monitor for regulatory changes and updates.

  5. Provide References: Link to official regulatory documentation.

Warning

Adding country-specific regulations does not guarantee legal compliance. Always consult with legal counsel familiar with the jurisdiction.

Best Practices for Extensions

  1. Follow Existing Patterns

    Study existing code and follow the same patterns.

  2. Add Tests

    Always add tests for new functionality.

  3. Update Documentation

    Document new features in user and developer guides.

  4. Maintain Backward Compatibility

    Don’t break existing functionality.

  5. Use Type Hints

    Add type hints to all new functions.

  6. Log Appropriately

    Use the centralized logging system.

  7. Handle Errors Gracefully

    Don’t let errors crash the pipeline.

See Also