Source code for haive.core.engine.document.loaders.auto_registry

"""Auto-Registry System for Document Loaders.

from typing import Any
This module provides automatic registration and discovery of all document loader
sources and loaders. It scans the sources directory and automatically imports
and registers all available source types without manual intervention.

The auto-registry ensures that all 230+ implemented loaders are automatically
available when the system starts, providing a seamless developer experience.

Examples:
    Auto-register all sources::

        from haive.core.engine.document.loaders import auto_register_all

        # Automatically discover and register all sources
        auto_register_all()

    Check registration status::

        from haive.core.engine.document.loaders import get_registration_status

        status = get_registration_status()
        print(f"Registered {status['total_sources']} sources")

Author: Claude (Haive Document Loader System)
Version: 1.0.0
"""

import importlib
import inspect
import logging
import pkgutil
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any

from haive.core.engine.document.loaders.sources.enhanced_registry import enhanced_registry
from haive.core.engine.document.loaders.sources.source_types import BaseSource, SourceCategory

logger = logging.getLogger(__name__)


[docs] @dataclass class RegistrationInfo: """Information about a registered source. Attributes: source_name: Name of the source type source_class: The source class module_name: Module where source is defined category: Source category loaders: Available loaders for this source registration_time: When the source was registered """ source_name: str source_class: type[BaseSource] module_name: str category: SourceCategory loaders: list[str] registration_time: datetime
[docs] @dataclass class RegistrationStats: """Statistics about the registration process. Attributes: total_modules_scanned: Number of modules scanned total_sources_found: Number of source classes found total_sources_registered: Number of sources successfully registered registration_errors: List of errors encountered registration_time: Total time taken for registration categories_covered: Number of categories with registered sources """ total_modules_scanned: int total_sources_found: int total_sources_registered: int registration_errors: list[str] registration_time: float categories_covered: int
[docs] class AutoRegistry: """Automatic registry for document loader sources. The AutoRegistry scans the sources directory and automatically discovers, imports, and registers all available source types. This eliminates the need for manual registration and ensures all implemented loaders are available. Features: - Automatic module discovery and import - Source class detection and validation - Duplicate registration prevention - Error handling and reporting - Registration statistics and monitoring - Dependency tracking Examples: Basic auto-registration:: registry = AutoRegistry() stats = registry.register_all_sources() print(f"Registered {stats.total_sources_registered} sources") With custom filters:: registry = AutoRegistry() stats = registry.register_sources_by_category(SourceCategory.LOCAL_FILE) """ def __init__(self, registry=None) -> None: """Initialize the AutoRegistry. Args: registry: Optional custom registry instance """ self.registry = registry or enhanced_registry self.registered_sources: dict[str, RegistrationInfo] = {} self.registration_errors: list[str] = [] self._sources_dir = Path(__file__).parent / "sources"
[docs] def discover_source_modules(self) -> list[str]: """Discover all source modules in the sources directory. Returns: List of module names to import Examples: Find all source modules:: registry = AutoRegistry() modules = registry.discover_source_modules() print(f"Found {len(modules)} source modules") """ modules = [] # Get the sources package path sources_package = "haive.core.engine.document.loaders.sources" try: # Import the sources package sources_pkg = importlib.import_module(sources_package) # Walk through all modules in the package for _finder, name, ispkg in pkgutil.iter_modules( sources_pkg.__path__, sources_package + "." ): if not ispkg and not name.endswith("__init__"): modules.append(name) logger.debug(f"Discovered source module: {name}") except Exception as e: error_msg = f"Failed to discover source modules: {e}" logger.exception(error_msg) self.registration_errors.append(error_msg) logger.info(f"Discovered {len(modules)} source modules") return modules
[docs] def import_source_module(self, module_name: str) -> Any | None: """Import a source module safely. Args: module_name: Full module name to import Returns: Imported module or None if import failed Examples: Import specific module:: registry = AutoRegistry() module = registry.import_source_module( "haive.core.engine.document.loaders.sources.file_sources" ) """ try: module = importlib.import_module(module_name) logger.debug(f"Successfully imported {module_name}") return module except Exception as e: error_msg = f"Failed to import {module_name}: {e}" logger.warning(error_msg) self.registration_errors.append(error_msg) return None
[docs] def find_source_classes(self, module: Any) -> list[tuple[str, type[BaseSource]]]: """Find all source classes in a module. Args: module: Imported module to scan Returns: List of (class_name, class_type) tuples Examples: Find sources in module:: registry = AutoRegistry() module = registry.import_source_module("...") classes = registry.find_source_classes(module) print(f"Found {len(classes)} source classes") """ source_classes = [] try: for name, obj in inspect.getmembers(module, inspect.isclass): # Check if it's a source class if ( hasattr(obj, "__bases__") and any(issubclass(base, BaseSource) for base in obj.__bases__) and obj != BaseSource and not name.startswith("_") and hasattr(obj, "source_type") ): source_classes.append((name, obj)) logger.debug(f"Found source class: {name}") except Exception as e: error_msg = f"Error scanning module {module.__name__}: {e}" logger.warning(error_msg) self.registration_errors.append(error_msg) return source_classes
[docs] def validate_source_class(self, source_class: type[BaseSource]) -> bool: """Validate that a source class is properly configured. Args: source_class: Source class to validate Returns: True if source class is valid Examples: Validate source class:: registry = AutoRegistry() valid = registry.validate_source_class(PDFSource) print(f"Source valid: {valid}") """ try: # Check required attributes required_attrs = ["source_type", "category"] for attr in required_attrs: if not hasattr(source_class, attr): logger.warning(f"Source {source_class.__name__} missing {attr}") return False # Check if source_type is a string if not isinstance(source_class.source_type, str): logger.warning( f"Source {source_class.__name__} has invalid source_type" ) return False # Check if category is valid if not isinstance(source_class.category, SourceCategory): logger.warning(f"Source {source_class.__name__} has invalid category") return False # Try to get default instance attributes try: # This will validate the class structure pass except Exception as e: logger.warning(f"Source {source_class.__name__} validation failed: {e}") return False return True except Exception as e: logger.warning(f"Error validating {source_class.__name__}: {e}") return False
[docs] def register_source_class( self, source_name: str, source_class: type[BaseSource], module_name: str ) -> bool: """Register a single source class. Args: source_name: Name to register the source under source_class: Source class to register module_name: Module where the source is defined Returns: True if registration was successful Examples: Register single source:: registry = AutoRegistry() success = registry.register_source_class( "pdf", PDFSource, "file_sources" ) """ try: # Validate the source class if not self.validate_source_class(source_class): error_msg = f"Source class {source_class.__name__} failed validation" logger.warning(error_msg) self.registration_errors.append(error_msg) return False # Check if already registered if source_name in self.registered_sources: logger.debug(f"Source {source_name} already registered, skipping") return True # Get source information source_type = getattr(source_class, "source_type", source_name) category = getattr(source_class, "category", SourceCategory.UNKNOWN) # Get available loaders from registry try: loaders = list(self.registry.get_source_loaders(source_type).keys()) except Exception: loaders = [] # Register in the registry (this should already be done by decorators) # We're just tracking it here registration_info = RegistrationInfo( source_name=source_name, source_class=source_class, module_name=module_name, category=category, loaders=loaders, registration_time=datetime.now(), ) self.registered_sources[source_name] = registration_info logger.debug(f"Registered source: {source_name} from {module_name}") return True except Exception as e: error_msg = f"Failed to register {source_name}: {e}" logger.exception(error_msg) self.registration_errors.append(error_msg) return False
[docs] def register_module_sources(self, module_name: str) -> int: """Register all sources from a specific module. Args: module_name: Module name to process Returns: Number of sources registered from this module Examples: Register all sources from file_sources module:: registry = AutoRegistry() count = registry.register_module_sources( "haive.core.engine.document.loaders.sources.file_sources" ) print(f"Registered {count} sources") """ registered_count = 0 # Import the module module = self.import_source_module(module_name) if module is None: return 0 # Find all source classes source_classes = self.find_source_classes(module) # Register each source class for class_name, source_class in source_classes: # Use the source_type as the registration name if available source_name = getattr(source_class, "source_type", class_name.lower()) if self.register_source_class(source_name, source_class, module_name): registered_count += 1 logger.info(f"Registered {registered_count} sources from {module_name}") return registered_count
[docs] def register_all_sources(self) -> RegistrationStats: """Register all discovered sources automatically. Returns: RegistrationStats with detailed information about the process Examples: Auto-register everything:: registry = AutoRegistry() stats = registry.register_all_sources() print(f"Scanned: {stats.total_modules_scanned} modules") print(f"Found: {stats.total_sources_found} sources") print(f"Registered: {stats.total_sources_registered} sources") print(f"Errors: {len(stats.registration_errors)}") """ start_time = datetime.now() self.registration_errors.clear() logger.info("Starting automatic source registration...") # Discover all source modules modules = self.discover_source_modules() total_modules_scanned = len(modules) total_sources_found = 0 total_sources_registered = 0 # Process each module for module_name in modules: try: # Import and scan module module = self.import_source_module(module_name) if module is None: continue source_classes = self.find_source_classes(module) total_sources_found += len(source_classes) # Register sources from this module registered_count = self.register_module_sources(module_name) total_sources_registered += registered_count except Exception as e: error_msg = f"Error processing module {module_name}: {e}" logger.exception(error_msg) self.registration_errors.append(error_msg) # Calculate final statistics end_time = datetime.now() registration_time = (end_time - start_time).total_seconds() # Count categories covered categories_covered = len( {info.category for info in self.registered_sources.values()} ) stats = RegistrationStats( total_modules_scanned=total_modules_scanned, total_sources_found=total_sources_found, total_sources_registered=total_sources_registered, registration_errors=self.registration_errors.copy(), registration_time=registration_time, categories_covered=categories_covered, ) logger.info( f"Auto-registration completed: {total_sources_registered}/{total_sources_found} " f"sources registered from {total_modules_scanned} modules in {registration_time:.2f}s" ) if self.registration_errors: logger.warning( f"Registration completed with {len(self.registration_errors)} errors" ) return stats
[docs] def register_sources_by_category(self, category: SourceCategory) -> int: """Register sources from a specific category only. Args: category: SourceCategory to register Returns: Number of sources registered Examples: Register only file sources:: registry = AutoRegistry() count = registry.register_sources_by_category(SourceCategory.LOCAL_FILE) print(f"Registered {count} file sources") """ registered_count = 0 # Get all modules modules = self.discover_source_modules() for module_name in modules: module = self.import_source_module(module_name) if module is None: continue source_classes = self.find_source_classes(module) for class_name, source_class in source_classes: # Check if this source matches the category source_category = getattr( source_class, "category", SourceCategory.UNKNOWN ) if source_category == category: source_name = getattr( source_class, "source_type", class_name.lower() ) if self.register_source_class( source_name, source_class, module_name ): registered_count += 1 logger.info( f"Registered {registered_count} sources for category {category.value}" ) return registered_count
[docs] def get_registration_status(self) -> dict[str, Any]: """Get current registration status and statistics. Returns: Dictionary with registration information Examples: Check registration status:: registry = AutoRegistry() status = registry.get_registration_status() print(f"Total sources: {status['total_sources']}") print(f"Categories: {status['categories_count']}") print(f"Recent registrations: {status['recent_registrations']}") """ # Count sources by category category_counts = {} for info in self.registered_sources.values(): category = info.category.value category_counts[category] = category_counts.get(category, 0) + 1 # Get recent registrations (last 10) recent_registrations = sorted( self.registered_sources.values(), key=lambda x: x.registration_time, reverse=True, )[:10] recent_list = [ { "name": info.source_name, "category": info.category.value, "loaders": len(info.loaders), "time": info.registration_time.isoformat(), } for info in recent_registrations ] return { "total_sources": len(self.registered_sources), "categories_count": len(category_counts), "category_breakdown": category_counts, "total_errors": len(self.registration_errors), "recent_registrations": recent_list, "last_updated": datetime.now().isoformat(), }
[docs] def list_sources_by_category(self) -> dict[SourceCategory, list[str]]: """List all registered sources grouped by category. Returns: Dictionary mapping categories to source lists Examples: List sources by category:: registry = AutoRegistry() by_category = registry.list_sources_by_category() for category, sources in by_category.items(): print(f"{category.value}: {', '.join(sources)}") """ by_category = {} for info in self.registered_sources.values(): category = info.category if category not in by_category: by_category[category] = [] by_category[category].append(info.source_name) # Sort source names within each category for category in by_category: by_category[category].sort() return by_category
[docs] def get_source_info(self, source_name: str) -> RegistrationInfo | None: """Get detailed information about a registered source. Args: source_name: Name of the source to get info for Returns: RegistrationInfo or None if not found Examples: Get source details:: registry = AutoRegistry() info = registry.get_source_info("pdf") if info: print(f"Module: {info.module_name}") print(f"Loaders: {info.loaders}") """ return self.registered_sources.get(source_name)
[docs] def validate_all_registrations(self) -> dict[str, Any]: """Validate all registered sources. Returns: Validation report Examples: Validate registrations:: registry = AutoRegistry() report = registry.validate_all_registrations() print(f"Valid: {report['valid_count']}") print(f"Invalid: {report['invalid_count']}") """ valid_sources = [] invalid_sources = [] validation_errors = [] for source_name, info in self.registered_sources.items(): try: if self.validate_source_class(info.source_class): valid_sources.append(source_name) else: invalid_sources.append(source_name) validation_errors.append(f"Source {source_name} failed validation") except Exception as e: invalid_sources.append(source_name) validation_errors.append(f"Error validating {source_name}: {e}") return { "total_sources": len(self.registered_sources), "valid_count": len(valid_sources), "invalid_count": len(invalid_sources), "valid_sources": valid_sources, "invalid_sources": invalid_sources, "validation_errors": validation_errors, }
# Global auto-registry instance auto_registry = AutoRegistry()
[docs] def auto_register_all() -> RegistrationStats: """Convenience function to auto-register all sources. Returns: RegistrationStats with detailed information Examples: Auto-register everything:: from haive.core.engine.document.loaders import auto_register_all stats = auto_register_all() print(f"Registered {stats.total_sources_registered} sources") """ return auto_registry.register_all_sources()
[docs] def get_registration_status() -> dict[str, Any]: """Get current registration status. Returns: Dictionary with registration information Examples: Check status:: from haive.core.engine.document.loaders import get_registration_status status = get_registration_status() print(f"Total sources: {status['total_sources']}") """ ensure_registration() return auto_registry.get_registration_status()
[docs] def list_available_sources() -> list[str]: """List all available source types. Returns: List of source type names Examples: List sources:: from haive.core.engine.document.loaders import list_available_sources sources = list_available_sources() print(f"Available: {', '.join(sources)}") """ ensure_registration() return list(auto_registry.registered_sources.keys())
[docs] def get_sources_by_category(category: SourceCategory) -> list[str]: """Get sources for a specific category. Args: category: SourceCategory to filter by Returns: List of source names in the category Examples: Get file sources:: from haive.core.engine.document.loaders import get_sources_by_category from haive.core.engine.document.loaders.sources.source_types import SourceCategory file_sources = get_sources_by_category(SourceCategory.LOCAL_FILE) print(f"File sources: {file_sources}") """ ensure_registration() by_category = auto_registry.list_sources_by_category() return by_category.get(category, [])
# Lazy auto-registration - only register when needed _registration_done = False _stats = None def ensure_registration() -> Any: """Ensure auto-registration has been completed (lazy loading).""" global _registration_done, _stats if _registration_done: return _stats logger.info("Starting lazy auto-registration...") try: _stats = auto_register_all() _registration_done = True logger.info( f"Lazy registration completed: {_stats.total_sources_registered} sources " f"from {_stats.total_modules_scanned} modules" ) return _stats except Exception as e: logger.exception(f"Lazy auto-registration failed: {e}") return None # Don't auto-register on import - wait until needed logger.debug("Auto-registry initialized (lazy loading enabled)") # Export main functions __all__ = [ "AutoRegistry", "RegistrationInfo", "RegistrationStats", "auto_register_all", "auto_registry", "get_registration_status", "get_sources_by_category", "list_available_sources", ]