"""Path analysis for automatic source detection.
This module provides comprehensive path analysis to automatically detect
the type of document source from a path string. Critical for auto-loading.
"""
import mimetypes
import re
from enum import Enum
from pathlib import Path
from typing import Any
from urllib.parse import parse_qs, urlparse
from pydantic import BaseModel, Field
from haive.core.engine.document.loaders.sources.source_types import LoaderCapability
[docs]
class PathType(str, Enum):
"""Primary path type classification."""
LOCAL_FILE = "local_file"
LOCAL_DIRECTORY = "local_directory"
URL_HTTP = "url_http"
URL_HTTPS = "url_https"
DATABASE_URI = "database_uri"
CLOUD_STORAGE = "cloud_storage"
UNKNOWN = "unknown"
class SourceCategory(str, Enum):
"""Source category classification."""
LOCAL = "local"
REMOTE = "remote"
DATABASE = "database"
CLOUD = "cloud"
UNKNOWN = "unknown"
[docs]
class FileCategory(str, Enum):
"""High-level file category."""
DOCUMENT = "document"
DATA = "data"
CODE = "code"
IMAGE = "image"
AUDIO = "audio"
VIDEO = "video"
ARCHIVE = "archive"
UNKNOWN = "unknown"
[docs]
class SourceInfo(BaseModel):
"""Comprehensive information about a detected document source.
This Pydantic model contains the complete results of source detection and analysis,
providing all information needed for optimal loader selection and configuration.
Created by the PathAnalyzer during the source detection phase.
Attributes:
source_type (str): Specific source type identifier used for loader selection.
Examples: 'pdf', 'web', 'csv', 'postgresql', 's3', 'sharepoint'.
This maps directly to registered loader implementations.
category (SourceCategory): High-level classification of the source type.
Used for capability grouping and fallback logic. Categories include:
FILE_DOCUMENT, WEB_SCRAPING, DATABASE_SQL, CLOUD_STORAGE, etc.
confidence (float): Detection confidence score from 0.0 to 1.0.
Higher values indicate more certain detection. Values below 0.5
may trigger additional validation or fallback detection methods.
metadata (Dict[str, Any]): Rich metadata collected during analysis.
Contains source-specific information such as:
- file_extension: File extension for local files
- mime_type: Detected MIME type
- estimated_size: Estimated content size
- url_components: Parsed URL components for web sources
- database_type: Database system type for database sources
capabilities (Optional[List[LoaderCapability]]): List of supported
capabilities for this source type. Used for loader filtering
and feature availability checks. None if not determined.
Examples:
PDF file detection result::
source_info = SourceInfo(
source_type="pdf",
category=SourceCategory.FILE_DOCUMENT,
confidence=0.95,
metadata={
"file_extension": ".pdf",
"mime_type": "application/pdf",
"estimated_size": 1024000
},
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA_EXTRACTION
]
)
Web source detection result::
source_info = SourceInfo(
source_type="web",
category=SourceCategory.WEB_SCRAPING,
confidence=0.90,
metadata={
"protocol": "https",
"domain": "docs.example.com",
"url_components": {"scheme": "https", "host": "docs.example.com"}
},
capabilities=[
LoaderCapability.WEB_SCRAPING,
LoaderCapability.BULK_LOADING
]
)
Usage:
This class is primarily used internally by the AutoLoader system
for source detection and loader selection. Users typically don't
create SourceInfo instances directly but receive them in LoadingResult
objects and through the detect_source() method.
See Also:
- PathAnalyzer: Creates SourceInfo instances
- LoadingResult: Contains SourceInfo for completed operations
- SourceCategory: Enumeration of source categories
- LoaderCapability: Enumeration of loader capabilities
"""
source_type: str = Field(
description="Specific source type identifier used for loader selection"
)
category: "SourceCategory" = Field(
description="High-level classification of the source type"
)
confidence: float = Field(
ge=0.0, le=1.0, description="Detection confidence score from 0.0 to 1.0"
)
metadata: dict[str, Any] = Field(
default_factory=dict, description="Rich metadata collected during analysis"
)
capabilities: list["LoaderCapability"] | None = Field(
default=None, description="List of supported capabilities for this source type"
)
class Config:
arbitrary_types_allowed = True
[docs]
class PathAnalysisResult(BaseModel):
"""Result of comprehensive path analysis."""
original_path: str = Field(description="Original path that was analyzed")
path_type: PathType = Field(description="Primary path type classification")
# Local file info
is_local: bool = Field(default=False, description="Whether this is a local path")
is_file: bool = Field(default=False, description="Whether this is a file")
is_directory: bool = Field(default=False, description="Whether this is a directory")
file_exists: bool = Field(default=False, description="Whether the file exists")
file_extension: str | None = Field(
default=None, description="File extension if applicable"
)
file_category: FileCategory | None = Field(
default=None, description="High-level file category"
)
mime_type: str | None = Field(default=None, description="Detected MIME type")
file_size: int | None = Field(default=None, ge=0, description="File size in bytes")
# URL info
is_remote: bool = Field(default=False, description="Whether this is a remote URL")
url_components: dict[str, Any] | None = Field(
default=None, description="Parsed URL components"
)
domain: str | None = Field(default=None, description="Domain name for URLs")
# Database info
is_database: bool = Field(
default=False, description="Whether this is a database URI"
)
database_type: str | None = Field(default=None, description="Type of database")
# Cloud storage info
is_cloud: bool = Field(default=False, description="Whether this is cloud storage")
cloud_provider: str | None = Field(default=None, description="Cloud provider name")
bucket_name: str | None = Field(default=None, description="Storage bucket name")
object_key: str | None = Field(
default=None, description="Object key/path in storage"
)
# Confidence
confidence: float = Field(
default=0.0, ge=0.0, le=1.0, description="Analysis confidence score"
)
class Config:
arbitrary_types_allowed = True
[docs]
class PathAnalyzer:
"""Analyzes paths to determine source type and characteristics."""
# File extension to category mapping
EXTENSION_CATEGORIES = {
# Documents
".pdf": FileCategory.DOCUMENT,
".doc": FileCategory.DOCUMENT,
".docx": FileCategory.DOCUMENT,
".odt": FileCategory.DOCUMENT,
".rtf": FileCategory.DOCUMENT,
".tex": FileCategory.DOCUMENT,
".txt": FileCategory.DOCUMENT,
".md": FileCategory.DOCUMENT,
".markdown": FileCategory.DOCUMENT,
".rst": FileCategory.DOCUMENT,
# Data
".csv": FileCategory.DATA,
".json": FileCategory.DATA,
".jsonl": FileCategory.DATA,
".xml": FileCategory.DATA,
".yaml": FileCategory.DATA,
".yml": FileCategory.DATA,
".toml": FileCategory.DATA,
".xls": FileCategory.DATA,
".xlsx": FileCategory.DATA,
".parquet": FileCategory.DATA,
# Code
".py": FileCategory.CODE,
".js": FileCategory.CODE,
".ts": FileCategory.CODE,
".java": FileCategory.CODE,
".cpp": FileCategory.CODE,
".c": FileCategory.CODE,
".h": FileCategory.CODE,
".go": FileCategory.CODE,
".rs": FileCategory.CODE,
".rb": FileCategory.CODE,
# Images
".jpg": FileCategory.IMAGE,
".jpeg": FileCategory.IMAGE,
".png": FileCategory.IMAGE,
".gif": FileCategory.IMAGE,
".bmp": FileCategory.IMAGE,
".svg": FileCategory.IMAGE,
".webp": FileCategory.IMAGE,
# Archive
".zip": FileCategory.ARCHIVE,
".tar": FileCategory.ARCHIVE,
".gz": FileCategory.ARCHIVE,
".rar": FileCategory.ARCHIVE,
".7z": FileCategory.ARCHIVE,
}
# URL patterns for specific services
SERVICE_PATTERNS = {
"github.com": "github",
"gitlab.com": "gitlab",
"youtube.com": "youtube",
"youtu.be": "youtube",
"wikipedia.org": "wikipedia",
"arxiv.org": "arxiv",
"huggingface.co": "huggingface",
"kaggle.com": "kaggle",
}
# Database URI patterns
DATABASE_SCHEMES = {
"postgresql": "postgresql",
"postgres": "postgresql",
"mysql": "mysql",
"sqlite": "sqlite",
"mongodb": "mongodb",
"redis": "redis",
"clickhouse": "clickhouse",
}
# Cloud storage patterns
CLOUD_SCHEMES = {
"s3": "aws",
"gs": "gcp",
"azure": "azure",
"wasb": "azure",
"wasbs": "azure",
}
[docs]
@classmethod
def analyze(cls, path: str | Path) -> PathAnalysisResult:
"""Perform comprehensive path analysis."""
path_str = str(path)
# Try URL analysis first
if cls._looks_like_url(path_str):
return cls._analyze_url(path_str)
# Try database URI
if cls._looks_like_database_uri(path_str):
return cls._analyze_database_uri(path_str)
# Try cloud storage
if cls._looks_like_cloud_storage(path_str):
return cls._analyze_cloud_storage(path_str)
# Default to local path
return cls._analyze_local_path(path_str)
@classmethod
def _looks_like_url(cls, path: str) -> bool:
"""Check if path looks like a URL."""
return bool(re.match(r"^https?://", path, re.IGNORECASE))
@classmethod
def _looks_like_database_uri(cls, path: str) -> bool:
"""Check if path looks like a database URI."""
try:
parsed = urlparse(path)
return parsed.scheme in cls.DATABASE_SCHEMES
except Exception:
return False
@classmethod
def _looks_like_cloud_storage(cls, path: str) -> bool:
"""Check if path looks like cloud storage."""
try:
parsed = urlparse(path)
return parsed.scheme in cls.CLOUD_SCHEMES
except Exception:
return False
@classmethod
def _analyze_local_path(cls, path: str) -> PathAnalysisResult:
"""Analyze a local file system path."""
path_obj = Path(path)
result = PathAnalysisResult(
original_path=path,
path_type=PathType.LOCAL_FILE,
is_local=True,
confidence=0.9,
)
# Check if exists
if path_obj.exists():
result.file_exists = True
if path_obj.is_file():
result.is_file = True
result.path_type = PathType.LOCAL_FILE
# Get file info
result.file_extension = path_obj.suffix.lower()
result.file_size = path_obj.stat().st_size
# Determine category
result.file_category = cls.EXTENSION_CATEGORIES.get(
result.file_extension, FileCategory.UNKNOWN
)
# Get MIME type
mime_type, _ = mimetypes.guess_type(path)
result.mime_type = mime_type
elif path_obj.is_dir():
result.is_directory = True
result.path_type = PathType.LOCAL_DIRECTORY
result.confidence = 1.0
elif "." in path_obj.name:
result.is_file = True
result.file_extension = path_obj.suffix.lower()
result.file_category = cls.EXTENSION_CATEGORIES.get(
result.file_extension, FileCategory.UNKNOWN
)
result.confidence = 0.7
else:
# Assume directory if no extension
result.is_directory = True
result.path_type = PathType.LOCAL_DIRECTORY
result.confidence = 0.6
return result
@classmethod
def _analyze_url(cls, url: str) -> PathAnalysisResult:
"""Analyze a URL."""
parsed = urlparse(url)
result = PathAnalysisResult(
original_path=url,
path_type=(
PathType.URL_HTTPS if parsed.scheme == "https" else PathType.URL_HTTP
),
is_remote=True,
confidence=1.0,
)
# Extract URL components
result.url_components = {
"scheme": parsed.scheme,
"netloc": parsed.netloc,
"path": parsed.path,
"params": parsed.params,
"query": parse_qs(parsed.query),
"fragment": parsed.fragment,
}
result.domain = parsed.netloc
# Check for known services
for pattern, service in cls.SERVICE_PATTERNS.items():
if pattern in parsed.netloc:
result.url_components["service"] = service
break
# Try to determine file type from URL path
if parsed.path:
path_obj = Path(parsed.path)
if path_obj.suffix:
result.file_extension = path_obj.suffix.lower()
result.file_category = cls.EXTENSION_CATEGORIES.get(
result.file_extension, FileCategory.UNKNOWN
)
return result
@classmethod
def _analyze_database_uri(cls, uri: str) -> PathAnalysisResult:
"""Analyze a database URI."""
parsed = urlparse(uri)
result = PathAnalysisResult(
original_path=uri,
path_type=PathType.DATABASE_URI,
is_database=True,
confidence=1.0,
)
result.database_type = cls.DATABASE_SCHEMES.get(parsed.scheme, parsed.scheme)
# Extract components
result.url_components = {
"scheme": parsed.scheme,
"host": parsed.hostname,
"port": parsed.port,
"database": parsed.path.lstrip("/") if parsed.path else None,
"username": parsed.username,
}
return result
@classmethod
def _analyze_cloud_storage(cls, uri: str) -> PathAnalysisResult:
"""Analyze a cloud storage URI."""
parsed = urlparse(uri)
result = PathAnalysisResult(
original_path=uri,
path_type=PathType.CLOUD_STORAGE,
is_cloud=True,
confidence=1.0,
)
result.cloud_provider = cls.CLOUD_SCHEMES.get(parsed.scheme, parsed.scheme)
# Extract bucket and key
if parsed.netloc:
result.bucket_name = parsed.netloc
if parsed.path:
result.object_key = parsed.path.lstrip("/")
return result
[docs]
def analyze_path(path: str | Path) -> PathAnalysisResult:
"""Convenience function for path analysis."""
return PathAnalyzer.analyze(path)
[docs]
def convert_to_source_info(analysis: PathAnalysisResult) -> "SourceInfo":
"""Convert PathAnalysisResult to SourceInfo for compatibility.
Args:
analysis: PathAnalysisResult from path analysis
Returns:
SourceInfo object with detected information
"""
# Import here to avoid circular imports
from haive.core.engine.document.loaders.sources.source_types import LoaderCapability, SourceCategory
# Map PathType to SourceCategory
category_mapping = {
PathType.LOCAL_FILE: SourceCategory.FILE_DOCUMENT,
PathType.LOCAL_DIRECTORY: SourceCategory.FILE_DOCUMENT,
PathType.URL_HTTP: SourceCategory.WEB_SCRAPING,
PathType.URL_HTTPS: SourceCategory.WEB_SCRAPING,
PathType.DATABASE_URI: SourceCategory.DATABASE_SQL,
PathType.CLOUD_STORAGE: SourceCategory.CLOUD_STORAGE,
PathType.UNKNOWN: SourceCategory.UNKNOWN,
}
# Determine source type from file extension or path type
source_type = "unknown"
if analysis.file_extension:
ext = analysis.file_extension.lower().lstrip(".")
source_type = ext
elif analysis.path_type in (PathType.URL_HTTP, PathType.URL_HTTPS):
source_type = "web"
elif analysis.path_type == PathType.DATABASE_URI:
source_type = getattr(analysis, "database_type", "database")
elif analysis.path_type == PathType.CLOUD_STORAGE:
source_type = getattr(analysis, "cloud_provider", "cloud")
# Basic capabilities based on source type
capabilities = []
if source_type in ["pdf", "doc", "docx", "txt"]:
capabilities.extend(
[LoaderCapability.TEXT_EXTRACTION, LoaderCapability.METADATA_EXTRACTION]
)
elif source_type == "web":
capabilities.extend(
[LoaderCapability.WEB_SCRAPING, LoaderCapability.BULK_LOADING]
)
elif "database" in source_type:
capabilities.extend([LoaderCapability.BULK_LOADING, LoaderCapability.FILTERING])
return SourceInfo(
source_type=source_type,
category=category_mapping.get(analysis.path_type, SourceCategory.UNKNOWN),
confidence=analysis.confidence,
metadata={
"original_path": analysis.original_path,
"path_type": analysis.path_type.value,
"file_extension": analysis.file_extension,
"estimated_size": getattr(analysis, "estimated_size", None),
"mime_type": getattr(analysis, "mime_type", None),
},
capabilities=capabilities,
)
# Add method to PathAnalyzer class to return SourceInfo directly
[docs]
def analyze_path_to_source_info(path: str | Path) -> "SourceInfo":
"""Analyze path and return SourceInfo directly.
Args:
path: Path to analyze
Returns:
SourceInfo object with detected source information
"""
analysis = PathAnalyzer.analyze(path)
return convert_to_source_info(analysis)
# Monkey patch the PathAnalyzer to add analyze_path method
PathAnalyzer.analyze_path = classmethod(
lambda cls, path: convert_to_source_info(cls.analyze(path))
)
__all__ = [
"FileCategory",
"PathAnalysisResult",
"PathAnalyzer",
"PathType",
"SourceInfo",
"analyze_path",
"analyze_path_to_source_info",
"convert_to_source_info",
]