"""Document Processing Components.
This module provides document processing capabilities including chunking and
content transformation that integrate with the DocumentEngine.
The processors handle:
- Content normalization
- Document chunking strategies
- Metadata extraction
- Format conversion
"""
import logging
from typing import Any
from langchain_core.documents import Document as LCDocument
from haive.core.engine.document.config import (
ChunkingStrategy,
DocumentChunk,
DocumentFormat,
ProcessedDocument,
)
logger = logging.getLogger(__name__)
[docs]
class DocumentProcessor:
"""Base class for document processing operations."""
def __init__(self, **kwargs) -> None:
"""Initialize the processor."""
self.config = kwargs
[docs]
def process(self, document: LCDocument) -> ProcessedDocument:
"""Process a document.
Args:
document: Document to process
Returns:
Processed document
"""
raise NotImplementedError("Subclasses must implement process method")
[docs]
class ChunkingProcessor(DocumentProcessor):
"""Processor for chunking documents into smaller pieces."""
def __init__(
self,
chunking_strategy: ChunkingStrategy = ChunkingStrategy.RECURSIVE,
chunk_size: int = 1000,
chunk_overlap: int = 200,
**kwargs,
):
"""Initialize the chunking processor.
Args:
chunking_strategy: Strategy for chunking
chunk_size: Size of chunks in characters
chunk_overlap: Overlap between chunks
**kwargs: Additional configuration
"""
super().__init__(**kwargs)
self.chunking_strategy = chunking_strategy
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
[docs]
def chunk_text(
self,
text: str,
strategy: ChunkingStrategy,
chunk_size: int,
chunk_overlap: int,
metadata: dict[str, Any],
) -> list[DocumentChunk]:
"""Chunk text according to the specified strategy.
Args:
text: Text to chunk
strategy: Chunking strategy
chunk_size: Size of chunks
chunk_overlap: Overlap between chunks
metadata: Base metadata for chunks
Returns:
List of document chunks
"""
if strategy == ChunkingStrategy.NONE:
return []
chunks = []
if strategy == ChunkingStrategy.FIXED_SIZE:
chunks = self._chunk_fixed_size(text, chunk_size, chunk_overlap)
elif strategy == ChunkingStrategy.PARAGRAPH:
chunks = self._chunk_by_paragraph(text, chunk_size)
elif strategy == ChunkingStrategy.SENTENCE:
chunks = self._chunk_by_sentence(text, chunk_size)
elif strategy == ChunkingStrategy.RECURSIVE:
chunks = self._chunk_recursive(text, chunk_size, chunk_overlap)
elif strategy == ChunkingStrategy.SEMANTIC:
chunks = self._chunk_semantic(text, chunk_size)
else:
# Default to fixed size
chunks = self._chunk_fixed_size(text, chunk_size, chunk_overlap)
# Convert to DocumentChunk objects
doc_chunks = []
for i, chunk_text in enumerate(chunks):
chunk_metadata = metadata.copy()
chunk_metadata.update(
{
"chunk_index": i,
"chunk_strategy": strategy.value,
"chunk_size": len(chunk_text),
}
)
doc_chunks.append(
DocumentChunk(
content=chunk_text,
metadata=chunk_metadata,
chunk_index=i,
chunk_id=f"{metadata.get('source', 'unknown')}_{i}",
)
)
return doc_chunks
def _chunk_fixed_size(self, content: str, size: int, overlap: int) -> list[str]:
"""Chunk content into fixed-size pieces."""
chunks = []
start = 0
while start < len(content):
end = start + size
chunk = content[start:end]
chunks.append(chunk)
if end >= len(content):
break
start = end - overlap
return chunks
def _chunk_by_paragraph(self, content: str, max_size: int) -> list[str]:
"""Chunk content by paragraphs."""
paragraphs = content.split("\n\n")
chunks = []
current_chunk = ""
for paragraph in paragraphs:
if len(current_chunk) + len(paragraph) + 2 <= max_size:
if current_chunk:
current_chunk += "\n\n"
current_chunk += paragraph
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = paragraph
if current_chunk:
chunks.append(current_chunk)
return chunks
def _chunk_by_sentence(self, content: str, max_size: int) -> list[str]:
"""Chunk content by sentences."""
# Simple sentence splitting
import re
sentences = re.split(r"[.!?]+", content)
chunks = []
current_chunk = ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(current_chunk) + len(sentence) + 1 <= max_size:
if current_chunk:
current_chunk += " "
current_chunk += sentence
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
if current_chunk:
chunks.append(current_chunk)
return chunks
def _chunk_recursive(self, content: str, size: int, overlap: int) -> list[str]:
"""Chunk content recursively using multiple separators."""
separators = ["\n\n", "\n", " ", ""]
def split_recursive(text: str, separators: list[str]) -> list[str]:
"""Split Recursive.
Args:
text: [TODO: Add description]
separators: [TODO: Add description]
Returns:
[TODO: Add return description]
"""
if len(text) <= size:
return [text]
if not separators:
# Fall back to character splitting
return self._chunk_fixed_size(text, size, overlap)
separator = separators[0]
remaining_separators = separators[1:]
if separator not in text:
return split_recursive(text, remaining_separators)
splits = text.split(separator)
chunks = []
current_chunk = ""
for split in splits:
test_chunk = (
current_chunk + (separator if current_chunk else "") + split
)
if len(test_chunk) <= size:
current_chunk = test_chunk
else:
if current_chunk:
chunks.append(current_chunk)
if len(split) > size:
# Split is too large, recursively split it
chunks.extend(split_recursive(split, remaining_separators))
current_chunk = ""
else:
current_chunk = split
if current_chunk:
chunks.append(current_chunk)
return chunks
return split_recursive(content, separators)
def _chunk_semantic(self, content: str, size: int) -> list[str]:
"""Chunk content semantically (placeholder implementation)."""
# This would require more sophisticated NLP
# For now, fall back to paragraph chunking
return self._chunk_by_paragraph(content, size)
[docs]
class ContentNormalizer(DocumentProcessor):
"""Processor for normalizing document content."""
def __init__(
self,
normalize_whitespace: bool = True,
remove_extra_newlines: bool = True,
strip_content: bool = True,
**kwargs,
):
"""Initialize the content normalizer.
Args:
normalize_whitespace: Whether to normalize whitespace
remove_extra_newlines: Whether to remove extra newlines
strip_content: Whether to strip leading/trailing whitespace
**kwargs: Additional configuration
"""
super().__init__(**kwargs)
self.normalize_whitespace = normalize_whitespace
self.remove_extra_newlines = remove_extra_newlines
self.strip_content = strip_content
[docs]
def normalize_content(self, content: str) -> str:
"""Normalize document content.
Args:
content: Content to normalize
Returns:
Normalized content
"""
if not content:
return content
normalized = content
# Strip leading/trailing whitespace
if self.strip_content:
normalized = normalized.strip()
# Normalize whitespace
if self.normalize_whitespace:
import re
normalized = re.sub(r" ", " ", normalized)
# Remove extra newlines
if self.remove_extra_newlines:
import re
normalized = re.sub(r"\n\s*\n", "\n\n", normalized)
return normalized
# Export processing components
__all__ = [
"ChunkingProcessor",
"ContentNormalizer",
"DocumentProcessor",
"FormatDetector",
"MetadataExtractor",
]