"""Service and Application Loaders for Document Engine.
This module implements loaders for various services and applications including
Notion, Obsidian, Slack, and other productivity tools.
"""
import logging
from pathlib import Path
from langchain_core.document_loaders.base import BaseLoader
from haive.core.engine.document.loaders.sources.implementation import (
CredentialType,
EnhancedSource,
SourceType,
)
logger = logging.getLogger(__name__)
[docs]
class NotionSource(EnhancedSource):
"""Notion workspace source."""
source_type: SourceType = SourceType.LOCAL_DIRECTORY
def __init__(
self,
database_id: str | None = None,
page_ids: list[str] | None = None,
**kwargs,
):
"""Init .
Args:
database_id: [TODO: Add description]
page_ids: [TODO: Add description]
"""
super().__init__(source_path="notion://workspace", **kwargs)
self.database_id = database_id
self.page_ids = page_ids or []
[docs]
def can_handle(self, path: str) -> bool:
"""Check if this is a Notion source."""
return path.startswith("notion://") or "notion.so" in path
[docs]
def get_confidence_score(self, path: str) -> float:
"""Get confidence score for Notion sources."""
if not self.can_handle(path):
return 0.0
return 0.9
[docs]
def requires_authentication(self) -> bool:
"""Notion requires authentication."""
return True
[docs]
def get_credential_requirements(self) -> list[CredentialType]:
"""Notion needs API key."""
return [CredentialType.API_KEY]
[docs]
def create_loader(self) -> BaseLoader | None:
"""Create a Notion loader."""
try:
from langchain_community.document_loaders import NotionDBLoader
# Get Notion API key
notion_key = None
if self.credential_manager:
cred = self.credential_manager.get_credential("notion")
if cred and cred.credential_type == CredentialType.API_KEY:
notion_key = cred.value
if not notion_key:
logger.error("Notion API key required")
return None
if self.database_id:
# Load from specific database
return NotionDBLoader(
integration_token=notion_key,
database_id=self.database_id,
)
# Load from directory export
# This would need a local path to Notion export
logger.warning("NotionDirectoryLoader requires local export path")
return None
except ImportError:
logger.warning(
"Notion loaders not available. Install with: pip install notion-client"
)
return None
except Exception as e:
logger.exception(f"Failed to create Notion loader: {e}")
return None
[docs]
class ObsidianSource(EnhancedSource):
"""Obsidian vault source."""
source_type: SourceType = SourceType.LOCAL_DIRECTORY
def __init__(self, vault_path: str, encoding: str = "utf-8", **kwargs):
"""Init .
Args:
vault_path: [TODO: Add description]
encoding: [TODO: Add description]
"""
super().__init__(source_path=vault_path, **kwargs)
self.vault_path = vault_path
self.encoding = encoding
[docs]
def can_handle(self, path: str) -> bool:
"""Check if this is an Obsidian vault."""
try:
p = Path(path)
# Check for .obsidian directory
return p.is_dir() and (p / ".obsidian").exists()
except Exception:
return False
[docs]
def get_confidence_score(self, path: str) -> float:
"""Get confidence score for Obsidian vaults."""
if not self.can_handle(path):
return 0.0
return 0.95
[docs]
def create_loader(self) -> BaseLoader | None:
"""Create an Obsidian loader."""
try:
from langchain_community.document_loaders import ObsidianLoader
return ObsidianLoader(
path=self.vault_path,
encoding=self.encoding,
)
except ImportError:
logger.warning("ObsidianLoader not available")
return None
except Exception as e:
logger.exception(f"Failed to create Obsidian loader: {e}")
return None
[docs]
class SlackSource(EnhancedSource):
"""Slack workspace source."""
source_type: SourceType = SourceType.WEB_API
def __init__(
self,
channel_id: str | None = None,
export_path: str | None = None,
**kwargs,
):
"""Init .
Args:
channel_id: [TODO: Add description]
export_path: [TODO: Add description]
"""
super().__init__(source_path="slack://workspace", **kwargs)
self.channel_id = channel_id
self.export_path = export_path
[docs]
def can_handle(self, path: str) -> bool:
"""Check if this is a Slack source."""
return path.startswith("slack://") or "slack.com" in path
[docs]
def get_confidence_score(self, path: str) -> float:
"""Get confidence score for Slack sources."""
if not self.can_handle(path):
return 0.0
return 0.9
[docs]
def requires_authentication(self) -> bool:
"""Slack API requires authentication."""
return self.export_path is None
[docs]
def get_credential_requirements(self) -> list[CredentialType]:
"""Slack needs OAuth token."""
return [CredentialType.OAUTH2, CredentialType.ACCESS_TOKEN]
[docs]
def create_loader(self) -> BaseLoader | None:
"""Create a Slack loader."""
try:
if self.export_path:
# Load from Slack export
from langchain_community.document_loaders import SlackDirectoryLoader
return SlackDirectoryLoader(
zip_path=self.export_path,
)
# Load via API (would need implementation)
logger.warning("Slack API loader not yet implemented")
return None
except ImportError:
logger.warning("Slack loaders not available")
return None
except Exception as e:
logger.exception(f"Failed to create Slack loader: {e}")
return None
[docs]
class GutenbergSource(EnhancedSource):
"""Project Gutenberg book source."""
source_type: SourceType = SourceType.WEB_URL
def __init__(
self, book_url: str | None = None, book_id: int | None = None, **kwargs
):
"""Init .
Args:
book_url: [TODO: Add description]
book_id: [TODO: Add description]
"""
source_path = book_url or f"gutenberg://book/{book_id}"
super().__init__(source_path=source_path, **kwargs)
self.book_url = book_url
self.book_id = book_id
[docs]
def can_handle(self, path: str) -> bool:
"""Check if this is a Gutenberg source."""
return "gutenberg.org" in path or path.startswith("gutenberg://")
[docs]
def get_confidence_score(self, path: str) -> float:
"""Get confidence score for Gutenberg sources."""
if not self.can_handle(path):
return 0.0
return 0.9
[docs]
def create_loader(self) -> BaseLoader | None:
"""Create a Gutenberg loader."""
try:
from langchain_community.document_loaders import GutenbergLoader
if self.book_url:
return GutenbergLoader(self.book_url)
# Could construct URL from book ID
logger.warning("GutenbergLoader requires book URL")
return None
except ImportError:
logger.warning("GutenbergLoader not available")
return None
except Exception as e:
logger.exception(f"Failed to create Gutenberg loader: {e}")
return None
[docs]
class ConfluenceSource(EnhancedSource):
"""Atlassian Confluence source."""
source_type: SourceType = SourceType.WEB_API
def __init__(
self,
url: str,
space_key: str | None = None,
page_ids: list[str] | None = None,
**kwargs,
):
"""Init .
Args:
url: [TODO: Add description]
space_key: [TODO: Add description]
page_ids: [TODO: Add description]
"""
super().__init__(source_path=url, **kwargs)
self.url = url
self.space_key = space_key
self.page_ids = page_ids or []
[docs]
def can_handle(self, path: str) -> bool:
"""Check if this is a Confluence URL."""
return "confluence" in path or "atlassian.net" in path
[docs]
def get_confidence_score(self, path: str) -> float:
"""Get confidence score for Confluence sources."""
if not self.can_handle(path):
return 0.0
return 0.85
[docs]
def requires_authentication(self) -> bool:
"""Confluence requires authentication."""
return True
[docs]
def get_credential_requirements(self) -> list[CredentialType]:
"""Confluence needs username/password or API token."""
return [CredentialType.USERNAME_PASSWORD, CredentialType.API_KEY]
[docs]
def create_loader(self) -> BaseLoader | None:
"""Create a Confluence loader."""
try:
from langchain_community.document_loaders import ConfluenceLoader
# Get credentials
username = None
api_key = None
if self.credential_manager:
cred = self.credential_manager.get_credential("confluence")
if cred:
if cred.credential_type == CredentialType.USERNAME_PASSWORD:
# Assume format "username:password"
if ":" in cred.value:
username, api_key = cred.value.split(":", 1)
elif cred.credential_type == CredentialType.API_KEY:
api_key = cred.value
if not (username and api_key):
logger.error("Confluence credentials required")
return None
return ConfluenceLoader(
url=self.url,
username=username,
api_key=api_key,
space_key=self.space_key,
page_ids=self.page_ids,
)
except ImportError:
logger.warning(
"ConfluenceLoader not available. Install with: pip install atlassian-python-api"
)
return None
except Exception as e:
logger.exception(f"Failed to create Confluence loader: {e}")
return None
[docs]
class ReadTheDocsSource(EnhancedSource):
"""Read the Docs documentation source."""
source_type: SourceType = SourceType.WEB_URL
def __init__(self, project_url: str, features: list[str] | None = None, **kwargs):
"""Init .
Args:
project_url: [TODO: Add description]
features: [TODO: Add description]
"""
super().__init__(source_path=project_url, **kwargs)
self.project_url = project_url
self.features = features or ["page_content", "metadata"]
[docs]
def can_handle(self, path: str) -> bool:
"""Check if this is a Read the Docs URL."""
return "readthedocs.io" in path or "readthedocs.org" in path
[docs]
def get_confidence_score(self, path: str) -> float:
"""Get confidence score for Read the Docs sources."""
if not self.can_handle(path):
return 0.0
return 0.9
[docs]
def create_loader(self) -> BaseLoader | None:
"""Create a Read the Docs loader."""
try:
# Extract project name from URL
# e.g., https://project.readthedocs.io/ -> project
import re
match = re.search(r"https?://([^.]+)\.readthedocs", self.project_url)
if match:
match.group(1)
# ReadTheDocsLoader expects a local path to downloaded docs
# This is a limitation - would need to download first
logger.warning(
"ReadTheDocsLoader requires local path to downloaded docs"
)
return None
logger.error("Could not extract project name from URL")
return None
except ImportError:
logger.warning("ReadTheDocsLoader not available")
return None
except Exception as e:
logger.exception(f"Failed to create Read the Docs loader: {e}")
return None
# Export service sources
__all__ = [
"ConfluenceSource",
"GutenbergSource",
"NotionSource",
"ObsidianSource",
"ReadTheDocsSource",
"SlackSource",
]