"""Auto-Registry System for Document Loaders.
from typing import Any
This module provides automatic registration and discovery of all document loader
sources and loaders. It scans the sources directory and automatically imports
and registers all available source types without manual intervention.
The auto-registry ensures that all 230+ implemented loaders are automatically
available when the system starts, providing a seamless developer experience.
Examples:
Auto-register all sources::
from haive.core.engine.document.loaders import auto_register_all
# Automatically discover and register all sources
auto_register_all()
Check registration status::
from haive.core.engine.document.loaders import get_registration_status
status = get_registration_status()
print(f"Registered {status['total_sources']} sources")
Author: Claude (Haive Document Loader System)
Version: 1.0.0
"""
import importlib
import inspect
import logging
import pkgutil
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
from haive.core.engine.document.loaders.sources.enhanced_registry import enhanced_registry
from haive.core.engine.document.loaders.sources.source_types import BaseSource, SourceCategory
logger = logging.getLogger(__name__)
[docs]
@dataclass
class RegistrationInfo:
"""Information about a registered source.
Attributes:
source_name: Name of the source type
source_class: The source class
module_name: Module where source is defined
category: Source category
loaders: Available loaders for this source
registration_time: When the source was registered
"""
source_name: str
source_class: type[BaseSource]
module_name: str
category: SourceCategory
loaders: list[str]
registration_time: datetime
[docs]
@dataclass
class RegistrationStats:
"""Statistics about the registration process.
Attributes:
total_modules_scanned: Number of modules scanned
total_sources_found: Number of source classes found
total_sources_registered: Number of sources successfully registered
registration_errors: List of errors encountered
registration_time: Total time taken for registration
categories_covered: Number of categories with registered sources
"""
total_modules_scanned: int
total_sources_found: int
total_sources_registered: int
registration_errors: list[str]
registration_time: float
categories_covered: int
[docs]
class AutoRegistry:
"""Automatic registry for document loader sources.
The AutoRegistry scans the sources directory and automatically discovers,
imports, and registers all available source types. This eliminates the
need for manual registration and ensures all implemented loaders are
available.
Features:
- Automatic module discovery and import
- Source class detection and validation
- Duplicate registration prevention
- Error handling and reporting
- Registration statistics and monitoring
- Dependency tracking
Examples:
Basic auto-registration::
registry = AutoRegistry()
stats = registry.register_all_sources()
print(f"Registered {stats.total_sources_registered} sources")
With custom filters::
registry = AutoRegistry()
stats = registry.register_sources_by_category(SourceCategory.LOCAL_FILE)
"""
def __init__(self, registry=None) -> None:
"""Initialize the AutoRegistry.
Args:
registry: Optional custom registry instance
"""
self.registry = registry or enhanced_registry
self.registered_sources: dict[str, RegistrationInfo] = {}
self.registration_errors: list[str] = []
self._sources_dir = Path(__file__).parent / "sources"
[docs]
def discover_source_modules(self) -> list[str]:
"""Discover all source modules in the sources directory.
Returns:
List of module names to import
Examples:
Find all source modules::
registry = AutoRegistry()
modules = registry.discover_source_modules()
print(f"Found {len(modules)} source modules")
"""
modules = []
# Get the sources package path
sources_package = "haive.core.engine.document.loaders.sources"
try:
# Import the sources package
sources_pkg = importlib.import_module(sources_package)
# Walk through all modules in the package
for _finder, name, ispkg in pkgutil.iter_modules(
sources_pkg.__path__, sources_package + "."
):
if not ispkg and not name.endswith("__init__"):
modules.append(name)
logger.debug(f"Discovered source module: {name}")
except Exception as e:
error_msg = f"Failed to discover source modules: {e}"
logger.exception(error_msg)
self.registration_errors.append(error_msg)
logger.info(f"Discovered {len(modules)} source modules")
return modules
[docs]
def import_source_module(self, module_name: str) -> Any | None:
"""Import a source module safely.
Args:
module_name: Full module name to import
Returns:
Imported module or None if import failed
Examples:
Import specific module::
registry = AutoRegistry()
module = registry.import_source_module(
"haive.core.engine.document.loaders.sources.file_sources"
)
"""
try:
module = importlib.import_module(module_name)
logger.debug(f"Successfully imported {module_name}")
return module
except Exception as e:
error_msg = f"Failed to import {module_name}: {e}"
logger.warning(error_msg)
self.registration_errors.append(error_msg)
return None
[docs]
def find_source_classes(self, module: Any) -> list[tuple[str, type[BaseSource]]]:
"""Find all source classes in a module.
Args:
module: Imported module to scan
Returns:
List of (class_name, class_type) tuples
Examples:
Find sources in module::
registry = AutoRegistry()
module = registry.import_source_module("...")
classes = registry.find_source_classes(module)
print(f"Found {len(classes)} source classes")
"""
source_classes = []
try:
for name, obj in inspect.getmembers(module, inspect.isclass):
# Check if it's a source class
if (
hasattr(obj, "__bases__")
and any(issubclass(base, BaseSource) for base in obj.__bases__)
and obj != BaseSource
and not name.startswith("_")
and hasattr(obj, "source_type")
):
source_classes.append((name, obj))
logger.debug(f"Found source class: {name}")
except Exception as e:
error_msg = f"Error scanning module {module.__name__}: {e}"
logger.warning(error_msg)
self.registration_errors.append(error_msg)
return source_classes
[docs]
def validate_source_class(self, source_class: type[BaseSource]) -> bool:
"""Validate that a source class is properly configured.
Args:
source_class: Source class to validate
Returns:
True if source class is valid
Examples:
Validate source class::
registry = AutoRegistry()
valid = registry.validate_source_class(PDFSource)
print(f"Source valid: {valid}")
"""
try:
# Check required attributes
required_attrs = ["source_type", "category"]
for attr in required_attrs:
if not hasattr(source_class, attr):
logger.warning(f"Source {source_class.__name__} missing {attr}")
return False
# Check if source_type is a string
if not isinstance(source_class.source_type, str):
logger.warning(
f"Source {source_class.__name__} has invalid source_type"
)
return False
# Check if category is valid
if not isinstance(source_class.category, SourceCategory):
logger.warning(f"Source {source_class.__name__} has invalid category")
return False
# Try to get default instance attributes
try:
# This will validate the class structure
pass
except Exception as e:
logger.warning(f"Source {source_class.__name__} validation failed: {e}")
return False
return True
except Exception as e:
logger.warning(f"Error validating {source_class.__name__}: {e}")
return False
[docs]
def register_source_class(
self, source_name: str, source_class: type[BaseSource], module_name: str
) -> bool:
"""Register a single source class.
Args:
source_name: Name to register the source under
source_class: Source class to register
module_name: Module where the source is defined
Returns:
True if registration was successful
Examples:
Register single source::
registry = AutoRegistry()
success = registry.register_source_class(
"pdf", PDFSource, "file_sources"
)
"""
try:
# Validate the source class
if not self.validate_source_class(source_class):
error_msg = f"Source class {source_class.__name__} failed validation"
logger.warning(error_msg)
self.registration_errors.append(error_msg)
return False
# Check if already registered
if source_name in self.registered_sources:
logger.debug(f"Source {source_name} already registered, skipping")
return True
# Get source information
source_type = getattr(source_class, "source_type", source_name)
category = getattr(source_class, "category", SourceCategory.UNKNOWN)
# Get available loaders from registry
try:
loaders = list(self.registry.get_source_loaders(source_type).keys())
except Exception:
loaders = []
# Register in the registry (this should already be done by decorators)
# We're just tracking it here
registration_info = RegistrationInfo(
source_name=source_name,
source_class=source_class,
module_name=module_name,
category=category,
loaders=loaders,
registration_time=datetime.now(),
)
self.registered_sources[source_name] = registration_info
logger.debug(f"Registered source: {source_name} from {module_name}")
return True
except Exception as e:
error_msg = f"Failed to register {source_name}: {e}"
logger.exception(error_msg)
self.registration_errors.append(error_msg)
return False
[docs]
def register_module_sources(self, module_name: str) -> int:
"""Register all sources from a specific module.
Args:
module_name: Module name to process
Returns:
Number of sources registered from this module
Examples:
Register all sources from file_sources module::
registry = AutoRegistry()
count = registry.register_module_sources(
"haive.core.engine.document.loaders.sources.file_sources"
)
print(f"Registered {count} sources")
"""
registered_count = 0
# Import the module
module = self.import_source_module(module_name)
if module is None:
return 0
# Find all source classes
source_classes = self.find_source_classes(module)
# Register each source class
for class_name, source_class in source_classes:
# Use the source_type as the registration name if available
source_name = getattr(source_class, "source_type", class_name.lower())
if self.register_source_class(source_name, source_class, module_name):
registered_count += 1
logger.info(f"Registered {registered_count} sources from {module_name}")
return registered_count
[docs]
def register_all_sources(self) -> RegistrationStats:
"""Register all discovered sources automatically.
Returns:
RegistrationStats with detailed information about the process
Examples:
Auto-register everything::
registry = AutoRegistry()
stats = registry.register_all_sources()
print(f"Scanned: {stats.total_modules_scanned} modules")
print(f"Found: {stats.total_sources_found} sources")
print(f"Registered: {stats.total_sources_registered} sources")
print(f"Errors: {len(stats.registration_errors)}")
"""
start_time = datetime.now()
self.registration_errors.clear()
logger.info("Starting automatic source registration...")
# Discover all source modules
modules = self.discover_source_modules()
total_modules_scanned = len(modules)
total_sources_found = 0
total_sources_registered = 0
# Process each module
for module_name in modules:
try:
# Import and scan module
module = self.import_source_module(module_name)
if module is None:
continue
source_classes = self.find_source_classes(module)
total_sources_found += len(source_classes)
# Register sources from this module
registered_count = self.register_module_sources(module_name)
total_sources_registered += registered_count
except Exception as e:
error_msg = f"Error processing module {module_name}: {e}"
logger.exception(error_msg)
self.registration_errors.append(error_msg)
# Calculate final statistics
end_time = datetime.now()
registration_time = (end_time - start_time).total_seconds()
# Count categories covered
categories_covered = len(
{info.category for info in self.registered_sources.values()}
)
stats = RegistrationStats(
total_modules_scanned=total_modules_scanned,
total_sources_found=total_sources_found,
total_sources_registered=total_sources_registered,
registration_errors=self.registration_errors.copy(),
registration_time=registration_time,
categories_covered=categories_covered,
)
logger.info(
f"Auto-registration completed: {total_sources_registered}/{total_sources_found} "
f"sources registered from {total_modules_scanned} modules in {registration_time:.2f}s"
)
if self.registration_errors:
logger.warning(
f"Registration completed with {len(self.registration_errors)} errors"
)
return stats
[docs]
def register_sources_by_category(self, category: SourceCategory) -> int:
"""Register sources from a specific category only.
Args:
category: SourceCategory to register
Returns:
Number of sources registered
Examples:
Register only file sources::
registry = AutoRegistry()
count = registry.register_sources_by_category(SourceCategory.LOCAL_FILE)
print(f"Registered {count} file sources")
"""
registered_count = 0
# Get all modules
modules = self.discover_source_modules()
for module_name in modules:
module = self.import_source_module(module_name)
if module is None:
continue
source_classes = self.find_source_classes(module)
for class_name, source_class in source_classes:
# Check if this source matches the category
source_category = getattr(
source_class, "category", SourceCategory.UNKNOWN
)
if source_category == category:
source_name = getattr(
source_class, "source_type", class_name.lower()
)
if self.register_source_class(
source_name, source_class, module_name
):
registered_count += 1
logger.info(
f"Registered {registered_count} sources for category {category.value}"
)
return registered_count
[docs]
def get_registration_status(self) -> dict[str, Any]:
"""Get current registration status and statistics.
Returns:
Dictionary with registration information
Examples:
Check registration status::
registry = AutoRegistry()
status = registry.get_registration_status()
print(f"Total sources: {status['total_sources']}")
print(f"Categories: {status['categories_count']}")
print(f"Recent registrations: {status['recent_registrations']}")
"""
# Count sources by category
category_counts = {}
for info in self.registered_sources.values():
category = info.category.value
category_counts[category] = category_counts.get(category, 0) + 1
# Get recent registrations (last 10)
recent_registrations = sorted(
self.registered_sources.values(),
key=lambda x: x.registration_time,
reverse=True,
)[:10]
recent_list = [
{
"name": info.source_name,
"category": info.category.value,
"loaders": len(info.loaders),
"time": info.registration_time.isoformat(),
}
for info in recent_registrations
]
return {
"total_sources": len(self.registered_sources),
"categories_count": len(category_counts),
"category_breakdown": category_counts,
"total_errors": len(self.registration_errors),
"recent_registrations": recent_list,
"last_updated": datetime.now().isoformat(),
}
[docs]
def list_sources_by_category(self) -> dict[SourceCategory, list[str]]:
"""List all registered sources grouped by category.
Returns:
Dictionary mapping categories to source lists
Examples:
List sources by category::
registry = AutoRegistry()
by_category = registry.list_sources_by_category()
for category, sources in by_category.items():
print(f"{category.value}: {', '.join(sources)}")
"""
by_category = {}
for info in self.registered_sources.values():
category = info.category
if category not in by_category:
by_category[category] = []
by_category[category].append(info.source_name)
# Sort source names within each category
for category in by_category:
by_category[category].sort()
return by_category
[docs]
def get_source_info(self, source_name: str) -> RegistrationInfo | None:
"""Get detailed information about a registered source.
Args:
source_name: Name of the source to get info for
Returns:
RegistrationInfo or None if not found
Examples:
Get source details::
registry = AutoRegistry()
info = registry.get_source_info("pdf")
if info:
print(f"Module: {info.module_name}")
print(f"Loaders: {info.loaders}")
"""
return self.registered_sources.get(source_name)
[docs]
def validate_all_registrations(self) -> dict[str, Any]:
"""Validate all registered sources.
Returns:
Validation report
Examples:
Validate registrations::
registry = AutoRegistry()
report = registry.validate_all_registrations()
print(f"Valid: {report['valid_count']}")
print(f"Invalid: {report['invalid_count']}")
"""
valid_sources = []
invalid_sources = []
validation_errors = []
for source_name, info in self.registered_sources.items():
try:
if self.validate_source_class(info.source_class):
valid_sources.append(source_name)
else:
invalid_sources.append(source_name)
validation_errors.append(f"Source {source_name} failed validation")
except Exception as e:
invalid_sources.append(source_name)
validation_errors.append(f"Error validating {source_name}: {e}")
return {
"total_sources": len(self.registered_sources),
"valid_count": len(valid_sources),
"invalid_count": len(invalid_sources),
"valid_sources": valid_sources,
"invalid_sources": invalid_sources,
"validation_errors": validation_errors,
}
# Global auto-registry instance
auto_registry = AutoRegistry()
[docs]
def auto_register_all() -> RegistrationStats:
"""Convenience function to auto-register all sources.
Returns:
RegistrationStats with detailed information
Examples:
Auto-register everything::
from haive.core.engine.document.loaders import auto_register_all
stats = auto_register_all()
print(f"Registered {stats.total_sources_registered} sources")
"""
return auto_registry.register_all_sources()
[docs]
def get_registration_status() -> dict[str, Any]:
"""Get current registration status.
Returns:
Dictionary with registration information
Examples:
Check status::
from haive.core.engine.document.loaders import get_registration_status
status = get_registration_status()
print(f"Total sources: {status['total_sources']}")
"""
ensure_registration()
return auto_registry.get_registration_status()
[docs]
def list_available_sources() -> list[str]:
"""List all available source types.
Returns:
List of source type names
Examples:
List sources::
from haive.core.engine.document.loaders import list_available_sources
sources = list_available_sources()
print(f"Available: {', '.join(sources)}")
"""
ensure_registration()
return list(auto_registry.registered_sources.keys())
[docs]
def get_sources_by_category(category: SourceCategory) -> list[str]:
"""Get sources for a specific category.
Args:
category: SourceCategory to filter by
Returns:
List of source names in the category
Examples:
Get file sources::
from haive.core.engine.document.loaders import get_sources_by_category
from haive.core.engine.document.loaders.sources.source_types import SourceCategory
file_sources = get_sources_by_category(SourceCategory.LOCAL_FILE)
print(f"File sources: {file_sources}")
"""
ensure_registration()
by_category = auto_registry.list_sources_by_category()
return by_category.get(category, [])
# Lazy auto-registration - only register when needed
_registration_done = False
_stats = None
def ensure_registration() -> Any:
"""Ensure auto-registration has been completed (lazy loading)."""
global _registration_done, _stats
if _registration_done:
return _stats
logger.info("Starting lazy auto-registration...")
try:
_stats = auto_register_all()
_registration_done = True
logger.info(
f"Lazy registration completed: {_stats.total_sources_registered} sources "
f"from {_stats.total_modules_scanned} modules"
)
return _stats
except Exception as e:
logger.exception(f"Lazy auto-registration failed: {e}")
return None
# Don't auto-register on import - wait until needed
logger.debug("Auto-registry initialized (lazy loading enabled)")
# Export main functions
__all__ = [
"AutoRegistry",
"RegistrationInfo",
"RegistrationStats",
"auto_register_all",
"auto_registry",
"get_registration_status",
"get_sources_by_category",
"list_available_sources",
]