#!/usr/bin/env python3
"""
RePORTaLiN Main Pipeline
========================
Central entry point for the clinical data processing pipeline, orchestrating:
- Data dictionary loading and validation
- Excel to JSONL extraction with type conversion
- PHI/PII de-identification with country-specific compliance
This module provides a complete end-to-end pipeline with comprehensive error handling,
progress tracking, and flexible configuration via command-line arguments.
Public API
----------
Exports 2 main functions via ``__all__``:
- ``main``: Main pipeline orchestrator
- ``run_step``: Pipeline step executor with error handling
Key Features
------------
- **Multi-Step Pipeline**: Dictionary â Extraction â De-identification
- **Flexible Execution**: Skip individual steps or run complete pipeline
- **Country Compliance**: Support for 14 countries (US, IN, ID, BR, etc.)
- **Error Recovery**: Comprehensive error handling with detailed logging
- **Version Tracking**: Built-in version management
Pipeline Steps
--------------
**Step 0: Data Dictionary Loading (Optional)**
- Processes Excel data dictionary files
- Splits multi-table sheets automatically
- Outputs JSONL format with metadata
**Step 1: Data Extraction (Default)**
- Converts Excel files to JSONL format
- Dual output: original and cleaned versions
- Type conversion and validation
- Progress tracking with real-time feedback
**Step 2: De-identification (Opt-in)**
- PHI/PII detection and pseudonymization
- Country-specific regulations (HIPAA, GDPR, DPDPA, etc.)
- Encrypted mapping storage
- Date shifting with interval preservation
Error Handling
--------------
The pipeline uses comprehensive error handling:
1. **Step-level Errors**: Each step is wrapped in try/except
2. **Validation Errors**: Invalid results cause immediate exit
3. **Logging**: All errors logged with full stack traces
4. **Exit Codes**: Non-zero exit on any failure
Return Codes:
- 0: Success
- 1: Pipeline failure (any step)
See Also
--------
**User Documentation:**
- :doc:`user_guide/quickstart` - Quick start guide with basic examples
- :doc:`user_guide/usage` - Advanced usage patterns and workflows
- :doc:`user_guide/configuration` - Configuration and command-line options
- :doc:`developer_guide/architecture` - Technical architecture details
**API Reference:**
- :mod:`scripts.load_dictionary` - Data dictionary processing
- :mod:`scripts.extract_data` - Data extraction
- :mod:`scripts.deidentify` - De-identification
- :mod:`config` - Configuration settings
"""
import argparse
import logging
import sys
from typing import Callable, Any
from pathlib import Path
from scripts.load_dictionary import load_study_dictionary
from scripts.extract_data import extract_excel_to_jsonl
from scripts.deidentify import deidentify_dataset, DeidentificationConfig
from scripts.utils import logging as log
import config
try:
import argcomplete
ARGCOMPLETE_AVAILABLE = True
except ImportError:
ARGCOMPLETE_AVAILABLE = False
from __version__ import __version__
__all__ = ['main', 'run_step']
[docs]
def run_step(step_name: str, func: Callable[[], Any]) -> Any:
"""
Execute pipeline step with error handling and logging.
Args:
step_name: Name of the pipeline step
func: Callable function to execute
Returns:
Result from the function, or exits with code 1 on error
"""
try:
log.info(f"--- {step_name} ---")
result = func()
# Check if result indicates failure
if isinstance(result, bool) and not result:
log.error(f"{step_name} failed.")
sys.exit(1)
elif isinstance(result, dict) and result.get('errors'):
log.error(f"{step_name} completed with {len(result['errors'])} errors.")
sys.exit(1)
log.success(f"{step_name} completed successfully.")
return result
except Exception as e:
log.error(f"Error in {step_name}: {e}", exc_info=True)
sys.exit(1)
[docs]
def main() -> None:
"""
Main pipeline orchestrating dictionary loading, data extraction, and de-identification.
Command-line Arguments:
--skip-dictionary: Skip data dictionary loading
--skip-extraction: Skip data extraction
--enable-deidentification: Enable de-identification (disabled by default)
--skip-deidentification: Skip de-identification even if enabled
--no-encryption: Disable encryption for de-identification mappings
-c, --countries: Country codes (e.g., IN US ID BR) or ALL
-v, --verbose: Enable verbose (DEBUG level) logging
"""
parser = argparse.ArgumentParser(
prog='RePORTaLiN',
description='Clinical data processing pipeline with de-identification support.',
epilog="""
Examples:
%(prog)s # Run complete pipeline
%(prog)s --skip-dictionary # Skip dictionary, run extraction
%(prog)s --enable-deidentification # Run pipeline with de-identification
%(prog)s -c IN US --verbose # Multi-country with debug logging
For detailed documentation, see the Sphinx docs or README.md
""",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}',
help="Show program version and exit")
parser.add_argument('--skip-dictionary', action='store_true',
help="Skip data dictionary loading (Step 0)")
parser.add_argument('--skip-extraction', action='store_true',
help="Skip data extraction (Step 1)")
parser.add_argument('--skip-deidentification', action='store_true',
help="Skip de-identification of extracted data (Step 2)")
parser.add_argument('--enable-deidentification', action='store_true',
help="Enable PHI/PII de-identification with encryption")
parser.add_argument('--no-encryption', action='store_true',
help="Disable encryption for mappings (testing only)")
parser.add_argument('-c', '--countries', nargs='+', metavar='CODE',
help="Country codes (IN US ID BR etc.) or ALL. Default: IN")
parser.add_argument('-v', '--verbose', action='store_true',
help="Enable verbose (DEBUG) logging with detailed context")
parser.add_argument('--simple', action='store_true',
help="Enable simple logging (INFO level, minimal details)")
# Enable shell completion if available
if ARGCOMPLETE_AVAILABLE:
argcomplete.autocomplete(parser)
args = parser.parse_args()
# Set log level based on flags: verbose (DEBUG) > default (INFO) > simple (INFO but less console output)
if args.verbose:
log_level = logging.DEBUG
elif args.simple:
log_level = logging.INFO
else:
log_level = config.LOG_LEVEL
log.setup_logger(name=config.LOG_NAME, log_level=log_level, simple_mode=args.simple)
log.info("Starting RePORTaLiN pipeline...")
# Validate configuration and warn about missing files
config_warnings = config.validate_config()
if config_warnings:
for warning in config_warnings:
log.warning(warning)
# Don't exit on warnings, just inform the user
log.info("Proceeding despite configuration warnings. Some features may not work.")
# Ensure required directories exist
config.ensure_directories()
# Display startup banner
print("\n" + "=" * 70)
print("RePORTaLiN - Report India Clinical Study Data Pipeline")
print("=" * 70 + "\n")
if not args.skip_dictionary:
run_step("Step 0: Loading Data Dictionary",
lambda: load_study_dictionary(
file_path=config.DICTIONARY_EXCEL_FILE,
json_output_dir=config.DICTIONARY_JSON_OUTPUT_DIR
))
else:
log.info("--- Skipping Step 0: Data Dictionary Loading ---")
if not args.skip_extraction:
run_step("Step 1: Extracting Raw Data to JSONL", extract_excel_to_jsonl)
else:
log.info("--- Skipping Step 1: Data Extraction ---")
# De-identification step (opt-in for now)
if args.enable_deidentification and not args.skip_deidentification:
def run_deidentification():
# Input directory contains original/ and cleaned/ subdirectories
input_dir = Path(config.CLEAN_DATASET_DIR)
# Output to dedicated deidentified directory within results
output_dir = Path(config.RESULTS_DIR) / "deidentified" / config.DATASET_NAME
log.info(f"De-identifying dataset: {input_dir} -> {output_dir}")
log.info(f"Processing both 'original' and 'cleaned' subdirectories...")
# Parse countries argument
countries = None
if args.countries:
if "ALL" in [c.upper() for c in args.countries]:
countries = ["ALL"]
else:
countries = [c.upper() for c in args.countries]
# Configure de-identification
deid_config = DeidentificationConfig(
enable_encryption=not args.no_encryption,
enable_date_shifting=True,
enable_validation=True,
log_level=config.LOG_LEVEL,
countries=countries,
enable_country_patterns=True
)
# Log configuration
country_display = countries or ["IN (default)"]
log.info(f"Countries: {', '.join(country_display)}")
# Run de-identification (will process subdirectories recursively)
stats = deidentify_dataset(
input_dir=input_dir,
output_dir=output_dir,
config=deid_config,
process_subdirs=True # Enable recursive processing
)
# Build consolidated completion message
completion_msg = (
f"De-identification complete:\n"
f" Texts processed: {stats.get('texts_processed', 0)}\n"
f" Total detections: {stats.get('total_detections', 0)}\n"
f" Countries: {', '.join(stats.get('countries', ['N/A']))}\n"
f" Unique mappings: {stats.get('total_mappings', 0)}\n"
f" Output structure:\n"
f" - {output_dir}/original/ (de-identified original files)\n"
f" - {output_dir}/cleaned/ (de-identified cleaned files)"
)
log.info(completion_msg)
return stats
run_step("Step 2: De-identifying PHI/PII", run_deidentification)
elif args.skip_deidentification:
log.info("--- Skipping Step 2: De-identification ---")
else:
log.info("--- De-identification disabled (use --enable-deidentification to enable) ---")
log.info("RePORTaLiN pipeline finished.")
if __name__ == "__main__":
main()