haive.core.engine.document.loaders.auto_loader¶
Ultimate Auto-Loader for Document Sources.
This module provides the ultimate auto-loader functionality that can automatically detect, instantiate, and load documents from any source type. It integrates with the enhanced registry and path analyzer to provide seamless document loading.
The AutoLoader is the main entry point for users who want to load documents without manually configuring source types and loaders.
Examples
Basic auto-loading:
from haive.core.engine.document.loaders import AutoLoader
# Auto-detect and load from any source
loader = AutoLoader()
documents = loader.load("https://example.com/docs")
With preferences:
# Prefer quality over speed
loader = AutoLoader(preference="quality")
documents = loader.load("s3://bucket/documents/")
Bulk loading:
# Load entire directory/bucket/site
loader = AutoLoader()
documents = loader.load_all("/path/to/documents")
Author: Claude (Haive Document Loader System) Version: 1.0.0
Classes¶
Ultimate automatic document loader with 230+ langchain_community integrations. |
|
Configuration model for the AutoLoader system. |
|
Comprehensive result container for bulk document loading operations. |
|
Comprehensive result container for single-source document loading operations. |
Functions¶
|
Convenience function to load documents asynchronously. |
|
Convenience function to load documents automatically. |
|
Convenience function to load multiple documents. |
Module Contents¶
- class haive.core.engine.document.loaders.auto_loader.AutoLoader(config=None, registry=None, path_analyzer=None)[source]¶
Ultimate automatic document loader with 230+ langchain_community integrations.
The AutoLoader is the primary interface for loading documents from any source type. It automatically detects source types, selects optimal loaders, and provides comprehensive loading capabilities with enterprise-grade features.
This class implements the complete document loading pipeline including: source detection, loader selection, document loading, metadata enrichment, error handling, retry logic, caching, and concurrent processing.
- config¶
Configuration controlling loader behavior.
- Type:
- registry¶
Registry of available document loaders.
- Type:
EnhancedRegistry
- path_analyzer¶
Component for analyzing and detecting source types.
- Type:
- Supported Sources:
Local Files: PDF, DOCX, TXT, CSV, JSON, XML, code files, archives
Web Sources: HTML pages, APIs, documentation sites, social media
Databases: PostgreSQL, MySQL, MongoDB, Redis, Elasticsearch
Cloud Storage: S3, Google Cloud, Azure Blob, Google Drive, Dropbox
Business Platforms: Salesforce, HubSpot, Zendesk, Jira, Confluence
Communication: Slack, Discord, Teams, email systems
Specialized: Government data, healthcare, finance, education
- Key Methods:
load(): Load documents from a single source
load_documents(): Load from multiple sources (standard langchain method)
load_bulk(): Bulk loading with detailed results
load_all(): Recursive loading from directories/websites
aload(): Async loading for high-performance scenarios
Examples
Basic document loading:
loader = AutoLoader() docs = loader.load("document.pdf") # Single document docs = loader.load_documents(["file1.pdf", "file2.txt"]) # Multiple
Advanced configuration:
config = AutoLoaderConfig( preference=LoaderPreference.QUALITY, max_concurrency=20, enable_caching=True, enable_metadata=True ) loader = AutoLoader(config) docs = loader.load("https://complex-site.com")
Enterprise bulk loading:
sources = [ "/shared/reports/quarterly.pdf", "s3://company-docs/policies/", "https://wiki.company.com/procedures", {"path": "salesforce://attachments", "auth": "token"} ] result = loader.load_bulk(sources) print(f"Loaded {result.total_documents} documents")
High-performance async loading:
async def process_sources(): docs = await loader.aload_documents([ "https://api.service.com/docs", "postgres://db/knowledge_base", "gs://bucket/research-papers/" ]) return docs
Recursive directory processing:
# Load all documents from directory tree docs = loader.load_all("/company/documents/") # Scrape entire documentation site docs = loader.load_all("https://docs.framework.com", max_depth=3)
- Performance Features:
Concurrent loading with configurable worker limits
Intelligent caching with TTL support
Adaptive retry logic with exponential backoff
Progress tracking for bulk operations
Memory-efficient streaming for large datasets
- Error Handling:
Graceful degradation for unsupported sources
Detailed error reporting with source tracking
Automatic fallback to alternative loaders
Comprehensive logging for debugging
- Thread Safety:
This class is thread-safe and can be used safely in concurrent environments. Internal state is properly synchronized.
See also
AutoLoaderConfig: Configuration options
LoadingResult: Detailed loading results
BulkLoadingResult: Bulk operation results
LoaderPreference: Quality vs speed preferences
Initialize the AutoLoader with optional configuration and components.
Creates a new AutoLoader instance with the specified configuration. If no configuration is provided, uses sensible defaults optimized for balanced performance and quality.
- Parameters:
config (Optional[AutoLoaderConfig]) – Configuration object controlling loader behavior including concurrency, preferences, caching, and retry settings. If None, uses default balanced configuration.
registry (Optional[Any]) – Custom enhanced registry instance containing document loader mappings. If None, uses the global enhanced registry with all 230+ registered loaders.
path_analyzer (Optional[PathAnalyzer]) – Custom path analyzer for source type detection. If None, uses the default PathAnalyzer instance.
Examples
Default initialization:
loader = AutoLoader() # Uses balanced defaults
Custom configuration:
config = AutoLoaderConfig( preference=LoaderPreference.QUALITY, max_concurrency=5, enable_caching=True ) loader = AutoLoader(config)
Advanced with custom components:
custom_registry = MyCustomRegistry() custom_analyzer = MyPathAnalyzer() loader = AutoLoader( config=my_config, registry=custom_registry, path_analyzer=custom_analyzer )
Note
The AutoLoader automatically triggers source registration on first use. This process scans for available loaders and may take a few seconds on initial startup.
- async aload(path_or_url, **kwargs)[source]¶
Asynchronously load documents from any source.
- Parameters:
path_or_url (str) – Path, URL, or connection string to load from
**kwargs – Additional parameters passed to the source and loader
- Returns:
List of loaded Document objects
- Return type:
list[langchain_core.documents.Document]
Examples
Async document loading:
async def load_docs(): documents = await loader.aload("https://example.com") return documents documents = asyncio.run(load_docs())
- async aload_bulk(sources, **kwargs)[source]¶
Asynchronously load documents from multiple sources.
- Parameters:
- Returns:
BulkLoadingResult with aggregated results
- Return type:
- async aload_documents(sources, **kwargs)[source]¶
Asynchronously load documents from multiple sources (standard langchain plural method name).
This is the async version of load_documents() that takes a list of sources and returns a flattened list of all documents.
- Parameters:
- Returns:
Flattened list of Document objects from all sources
- Return type:
list[langchain_core.documents.Document]
Examples
Async load from multiple sources:
loader = AutoLoader() docs = await loader.aload_documents([ "document1.pdf", "document2.txt", "https://example.com" ])
- create_source_instance(source_info, path_or_url, **kwargs)[source]¶
Create a source instance for the detected source type.
- Parameters:
source_info (haive.core.engine.document.loaders.path_analyzer.SourceInfo) – Source information from detection
path_or_url (str) – Original path or URL
**kwargs – Additional parameters for source creation
- Returns:
Configured source instance
- Raises:
ValueError – If source cannot be created
- Return type:
haive.core.engine.document.loaders.sources.source_types.BaseSource
Examples
Create and configure source:
info = loader.detect_source("s3://bucket/file.pdf") source = loader.create_source_instance( info, "s3://bucket/file.pdf", aws_access_key_id="key", aws_secret_access_key="secret" )
- detect_source(path_or_url)[source]¶
Detect source type and get source information.
- Parameters:
path_or_url (str) – Path, URL, or connection string to analyze
- Returns:
SourceInfo containing detected source details
- Raises:
ValueError – If source type cannot be detected
- Return type:
Examples
Detect file source:
info = loader.detect_source("/path/to/document.pdf") print(f"Source type: {info.source_type}") print(f"Category: {info.category}")
Detect web source:
info = loader.detect_source("https://example.com") print(f"Capabilities: {info.capabilities}")
- get_best_loader(source_info)[source]¶
Get the best loader for a source based on preferences.
- Parameters:
source_info (haive.core.engine.document.loaders.path_analyzer.SourceInfo) – Source information from detection
- Returns:
Tuple of (loader_name, loader_config)
- Raises:
ValueError – If no suitable loader is found
- Return type:
Examples
Get quality-focused loader:
config = AutoLoaderConfig(preference="quality") loader = AutoLoader(config) info = loader.detect_source("document.pdf") loader_name, loader_config = loader.get_best_loader(info)
- get_capabilities(source_type)[source]¶
Get capabilities for a specific source type.
- Parameters:
source_type (str) – Name of the source type
- Returns:
List of capabilities supported by the source
- Return type:
list[haive.core.engine.document.loaders.sources.source_types.LoaderCapability]
Examples
Check source capabilities:
caps = loader.get_capabilities("pdf") if LoaderCapability.BULK_LOADING in caps: print("Supports bulk loading")
- get_supported_sources()[source]¶
Get information about all supported source types.
Examples
List all supported sources:
sources = loader.get_supported_sources() for source_type, info in sources.items(): print(f"{source_type}: {info['description']}")
- load(path_or_url, **kwargs)[source]¶
Load documents from any source with automatic detection and optimization.
This is the primary interface for single-source document loading. The method performs automatic source type detection, intelligent loader selection based on configured preferences, and returns a list of loaded Document objects.
The loading process includes: 1. Source type detection and analysis 2. Best loader selection based on preference and capabilities 3. Source instance creation with provided parameters 4. Document loading with retry logic and error handling 5. Optional metadata enrichment and caching
- Parameters:
path_or_url (str) – Path, URL, or connection string to load from. Supports local files, web URLs, database connections, cloud storage URIs, and API endpoints. Examples: - “/path/to/file.pdf” (local file) - “https://example.com/doc.html” (web page) - “postgresql://user:pass@host/db” (database) - “s3://bucket/key” (cloud storage)
**kwargs – Additional parameters passed to the source and loader. Common parameters include: - extract_images (bool): Whether to extract images from documents - chunk_size (int): Text splitting chunk size - timeout (int): Override default timeout - headers (dict): HTTP headers for web requests - query (str): SQL query for database sources - recursive (bool): Recursive processing for directories
- Returns:
- List of loaded Document objects. Each Document contains:
page_content (str): Extracted text content
metadata (dict): Source metadata, extraction info, and enrichments
- Return type:
List[Document]
- Raises:
ValueError – If the source cannot be detected, is unsupported, or if required parameters are missing for the detected source type.
TimeoutError – If loading exceeds the configured timeout limit.
ConnectionError – If unable to connect to remote sources (web, database, API).
FileNotFoundError – If local files or directories do not exist.
PermissionError – If insufficient permissions to access the source.
Examples
Basic local file loading:
loader = AutoLoader() docs = loader.load("/documents/report.pdf") print(f"Loaded {len(docs)} pages")
Web page with custom parameters:
docs = loader.load( "https://docs.example.com/api", headers={"Authorization": "Bearer token"}, timeout=120 )
Database with custom query:
docs = loader.load( "postgresql://user:pass@localhost:5432/knowledge", query="SELECT title, content FROM articles WHERE published = true", chunk_size=2000 )
Cloud storage with credentials:
docs = loader.load( "s3://company-docs/policies/security.pdf", aws_access_key_id="AKIA...", aws_secret_access_key="secret", region_name="us-east-1" )
High-quality extraction:
config = AutoLoaderConfig(preference=LoaderPreference.QUALITY) loader = AutoLoader(config) docs = loader.load("complex_document.pdf", extract_images=True)
Note
Results are automatically cached if caching is enabled in configuration
Metadata enrichment adds source tracking information when enabled
The method is thread-safe and can be called concurrently
For multiple sources, consider using load_documents() or load_bulk()
See also
load_documents(): Load from multiple sources (standard langchain method)
load_bulk(): Bulk loading with detailed result information
load_all(): Recursive loading from directories or websites
aload(): Asynchronous version for high-performance scenarios
- load_all(path_or_url, **kwargs)[source]¶
Load all documents from a source recursively.
This method uses the “scrape_all” capability of sources to load all available documents from directories, websites, databases, etc.
- Parameters:
path_or_url (str) – Path, URL, or connection string to load from
**kwargs – Additional parameters for recursive loading
- Returns:
List of all documents found in the source
- Return type:
list[langchain_core.documents.Document]
Examples
Load entire directory:
documents = loader.load_all("/path/to/documents/")
Scrape entire website:
documents = loader.load_all("https://docs.example.com")
Load all tables from database:
documents = loader.load_all("postgresql://user:pass@host/db")
- load_bulk(sources, **kwargs)[source]¶
Load documents from multiple sources concurrently.
- Parameters:
- Returns:
BulkLoadingResult with aggregated results
- Return type:
Examples
Bulk load multiple sources:
sources = [ "file1.pdf", "file2.docx", {"path": "https://example.com", "timeout": 60} ] result = loader.load_bulk(sources) print(f"Total documents: {result.total_documents}")
With progress tracking:
def progress_callback(completed, total): print(f"Progress: {completed}/{total}") result = loader.load_bulk(sources, progress_callback=progress_callback)
- load_detailed(path_or_url, **kwargs)[source]¶
Load documents with detailed result information.
- Parameters:
path_or_url (str) – Path, URL, or connection string to load from
**kwargs – Additional parameters passed to the source and loader
- Returns:
LoadingResult with documents and detailed metadata
- Return type:
Examples
Get detailed loading information:
result = loader.load_detailed("/path/to/document.pdf") print(f"Loaded {len(result.documents)} documents") print(f"Using loader: {result.loader_used}") print(f"Loading time: {result.loading_time:.2f}s") print(f"Source type: {result.source_info.source_type}")
- load_documents(sources, **kwargs)[source]¶
Load documents from multiple sources with standard langchain interface.
This method implements the standard langchain convention for loading documents from multiple sources. It processes all sources concurrently, handles errors gracefully, and returns a flattened list of all successfully loaded documents.
This is the recommended method for loading from multiple sources as it follows langchain conventions and provides seamless integration with existing langchain workflows and chains.
- Parameters:
sources (List[Union[str, Dict[str, Any]]]) –
List of sources to load from. Each source can be either: - str: Simple path, URL, or connection string - Dict[str, Any]: Configuration dict with source-specific parameters.
Must contain either ‘path’ or ‘url’ key, plus optional parameters.
**kwargs – Default parameters applied to ALL sources and loaders. These are overridden by source-specific parameters in dict sources. Common parameters: - max_workers (int): Override concurrency for this operation - timeout (int): Timeout per source - extract_images (bool): Extract images from documents - chunk_size (int): Text splitting chunk size
- Returns:
- Flattened list of Document objects from all successful
source loads. Failed sources are silently skipped. Each Document contains page_content and metadata with source tracking information.
- Return type:
List[Document]
Examples
Basic multi-source loading:
loader = AutoLoader() docs = loader.load_documents([ "/reports/quarterly.pdf", "/docs/manual.docx", "https://company.com/policies.html" ]) print(f"Loaded {len(docs)} total documents")
Mixed source types with configurations:
docs = loader.load_documents([ # Simple string sources "local_file.pdf", "https://simple-site.com", # Complex configured sources { "path": "complex_document.pdf", "extract_images": True, "chunk_size": 2000 }, { "url": "https://api.service.com/docs", "headers": {"Authorization": "Bearer token"}, "timeout": 120 }, { "path": "s3://bucket/document.pdf", "aws_access_key_id": "key", "aws_secret_access_key": "secret" } ])
Enterprise data aggregation:
enterprise_sources = [ "/shared/reports/2024/", # Directory "https://wiki.company.com/procedures", "postgresql://db/knowledge_base", "salesforce://contracts", "sharepoint://policies/" ] docs = loader.load_documents(enterprise_sources)
With global parameters:
docs = loader.load_documents( ["doc1.pdf", "doc2.pdf", "doc3.pdf"], extract_images=True, # Applied to all sources chunk_size=1500, # Applied to all sources max_workers=10 # Override concurrency )
- Performance:
Sources are processed concurrently based on max_concurrency setting
Failed sources don’t stop processing of other sources
Results are automatically cached if caching is enabled
Memory usage is optimized through document streaming
- Error Handling:
Individual source failures are logged but don’t stop processing
Failed sources are excluded from results
Use load_bulk() for detailed error information per source
Network timeouts and connection errors are handled gracefully
- Langchain Compatibility:
This method follows the standard langchain DocumentLoader interface: - Method name: load_documents() (plural) - Return type: List[Document] - Behavior: Load from multiple sources, return flattened results - Integration: Works seamlessly with langchain chains and workflows
See also
load(): Load from a single source
load_bulk(): Get detailed results and error information
load_all(): Recursive loading from directories/websites
aload_documents(): Async version for high-performance scenarios
Note
For detailed loading results including error information and per-source statistics, use load_bulk() instead.
- validate_credentials(source_type, **credentials)[source]¶
Validate credentials for a source type.
- Parameters:
source_type (str) – Name of the source type
**credentials – Credential parameters to validate
- Returns:
True if credentials are valid
- Return type:
Examples
Validate database credentials:
valid = loader.validate_credentials( "postgresql", host="localhost", username="user", password="pass" )
- class haive.core.engine.document.loaders.auto_loader.AutoLoaderConfig(/, **data)[source]¶
Bases:
pydantic.BaseModelConfiguration model for the AutoLoader system.
This class defines all configuration options for the AutoLoader, allowing fine-tuned control over loading behavior, performance characteristics, and operational parameters.
- preference¶
Loading preference balancing speed vs quality. Options: SPEED, QUALITY, BALANCED. Default: BALANCED.
- Type:
- max_concurrency¶
Maximum number of concurrent loading operations. Range: 1-100. Default: 10.
- Type:
- default_chunk_size¶
Default chunk size for text splitting. Range: 100-10000. Default: 1000.
- Type:
- credential_manager¶
Custom credential manager instance. Default: None.
- Type:
Optional[Any]
Examples
Basic quality-focused configuration:
config = AutoLoaderConfig( preference=LoaderPreference.QUALITY, max_concurrency=5, timeout=600, enable_metadata=True )
High-performance configuration with caching:
config = AutoLoaderConfig( preference=LoaderPreference.SPEED, max_concurrency=50, enable_caching=True, cache_ttl=7200, retry_attempts=1 )
Balanced configuration for production:
config = AutoLoaderConfig( preference=LoaderPreference.BALANCED, max_concurrency=20, timeout=300, enable_caching=True, enable_metadata=True )
- Raises:
ValidationError – If any configuration values are outside valid ranges.
- Parameters:
data (Any)
Note
Higher concurrency improves performance but increases resource usage. Enable caching for repeated document access patterns. Quality preference may be slower but provides better text extraction.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- class haive.core.engine.document.loaders.auto_loader.BulkLoadingResult(/, **data)[source]¶
Bases:
pydantic.BaseModelComprehensive result container for bulk document loading operations.
This Pydantic model provides detailed information about bulk loading operations, including individual source results, aggregate statistics, error tracking, and performance metrics across all sources.
- Parameters:
data (Any)
- total_documents¶
Total number of documents successfully loaded across all sources. Sum of documents from all successful LoadingResults.
- Type:
- results¶
List of individual LoadingResult objects, one for each source that was processed (both successful and failed). Provides detailed per-source information including errors.
- Type:
List[LoadingResult]
- failed_sources¶
List of tuples containing (source_identifier, error_message) for sources that failed to load. Allows easy identification of problematic sources.
- total_time¶
Total elapsed time for the entire bulk operation in seconds, including all concurrent processing and overhead.
- Type:
- summary¶
Dictionary containing aggregate statistics: - total_sources (int): Number of sources processed - successful_loads (int): Number of sources loaded successfully - failed_loads (int): Number of sources that failed - success_rate (float): Percentage of successful loads - avg_loading_time (float): Average time per source - total_errors (int): Total number of errors encountered
- Type:
Dict[str, Any]
Examples
Analyzing bulk loading results:
sources = ["doc1.pdf", "doc2.pdf", "invalid.pdf"] result = loader.load_bulk(sources) print(f"Loaded {result.total_documents} documents") print(f"Success rate: {result.summary['success_rate']:.1f}%") print(f"Total time: {result.total_time:.2f}s") if result.failed_sources: print("Failed sources:") for source, error in result.failed_sources: print(f" {source}: {error}")
Processing individual results:
for i, loading_result in enumerate(result.results): source = sources[i] if loading_result.errors: print(f"{source} failed: {loading_result.errors}") else: docs = len(loading_result.documents) time = loading_result.loading_time print(f"{source}: {docs} docs in {time:.2f}s")
Performance analysis:
print("Performance Summary:") print(f" Total sources: {result.summary['total_sources']}") print(f" Average time per source: {result.summary['avg_loading_time']:.2f}s") print(f" Concurrent efficiency: {result.summary['total_sources'] * result.summary['avg_loading_time'] / result.total_time:.1f}x")
Note
This class is returned by load_bulk() and aload_bulk() methods. For simple flattened document lists, use load_documents() instead.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- class haive.core.engine.document.loaders.auto_loader.LoadingResult(/, **data)[source]¶
Bases:
pydantic.BaseModelComprehensive result container for single-source document loading operations.
This Pydantic model encapsulates all information about a document loading operation, including the loaded documents, source analysis results, performance metrics, and any errors encountered during the process.
- Parameters:
data (Any)
- documents¶
List of successfully loaded Document objects. Each Document contains page_content (str) and metadata (dict). Empty list if loading failed.
- Type:
List[Document]
- source_info¶
Detailed information about the detected source including source type, category, confidence score, and capabilities.
- Type:
- loader_used¶
Name of the specific loader that was selected and used for this operation (e.g., “pypdf”, “beautiful_soup”, “csv”).
- Type:
- loading_time¶
Total time taken for the loading operation in seconds, including source detection, loader instantiation, and document extraction.
- Type:
- metadata¶
Additional metadata collected during loading including loader configuration, extraction settings, and performance info.
- Type:
Dict[str, Any]
- errors¶
List of error messages encountered during loading. Empty list indicates successful loading without errors.
- Type:
List[str]
Examples
Successful loading result:
result = loader.load_detailed("document.pdf") print(f"Loaded {len(result.documents)} documents") print(f"Source: {result.source_info.source_type}") print(f"Loader: {result.loader_used}") print(f"Time: {result.loading_time:.2f}s")
Error handling:
result = loader.load_detailed("invalid.pdf") if result.errors: print(f"Loading failed: {result.errors}") else: print(f"Success: {len(result.documents)} documents")
Note
This class is returned by load_detailed() and is included in BulkLoadingResult for individual source results in bulk operations.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- async haive.core.engine.document.loaders.auto_loader.aload_document(path_or_url, **kwargs)[source]¶
Convenience function to load documents asynchronously.
- Parameters:
path_or_url (str) – Path, URL, or connection string to load from
**kwargs – Additional parameters
- Returns:
List of loaded documents
- Return type:
list[langchain_core.documents.Document]
Examples
Async document loading:
from haive.core.engine.document.loaders import aload_document documents = await aload_document("https://example.com")
- haive.core.engine.document.loaders.auto_loader.load_document(path_or_url, **kwargs)[source]¶
Convenience function to load documents automatically.
- Parameters:
path_or_url (str) – Path, URL, or connection string to load from
**kwargs – Additional parameters
- Returns:
List of loaded documents
- Return type:
list[langchain_core.documents.Document]
Examples
Quick document loading:
from haive.core.engine.document.loaders import load_document documents = load_document("file.pdf") documents = load_document("https://example.com")
- haive.core.engine.document.loaders.auto_loader.load_documents_bulk(sources, **kwargs)[source]¶
Convenience function to load multiple documents.
- Parameters:
- Returns:
Flattened list of all loaded documents
- Return type:
list[langchain_core.documents.Document]
Examples
Bulk loading:
from haive.core.engine.document.loaders import load_documents_bulk documents = load_documents_bulk([ "file1.pdf", "file2.docx", "https://example.com" ])