Source code for haive.core.utils.haive_discovery.retriever_analyzers
"""Analyzers for retriever and vector store components."""
import importlib
import inspect
import logging
from datetime import datetime
from typing import Any
from haive.core.utils.haive_discovery.base_analyzer import ComponentAnalyzer
from haive.core.utils.haive_discovery.component_info import ComponentInfo
logger = logging.getLogger(__name__)
# Check for LangChain availability
try:
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
LANGCHAIN_AVAILABLE = True
except ImportError:
logger.warning("LangChain not available. Tool features will be limited.")
LANGCHAIN_AVAILABLE = False
[docs]
class RetrieverAnalyzer(ComponentAnalyzer):
"""Analyzer for retrievers."""
[docs]
def can_analyze(self, obj: Any) -> bool:
# More robust check
try:
if not inspect.isclass(obj):
return False
# Check for Retriever in name
if hasattr(obj, "__name__") and "Retriever" in obj.__name__:
return True
# Check for retriever-like methods
return bool(
hasattr(obj, "get_relevant_documents") or hasattr(obj, "retrieve")
)
except Exception as e:
logger.debug(f"Error checking if can analyze: {e}")
return False
[docs]
def analyze(self, obj: Any, module_path: str) -> ComponentInfo:
info = ComponentInfo(
name=self.safe_get_name(obj, "Retriever"),
component_type="retriever",
module_path=module_path,
class_name=self.safe_get_class_name(obj),
description=inspect.getdoc(obj) or "",
source_code=self.get_source_code(obj),
env_vars=self.detect_env_vars(self.get_source_code(obj)),
schema=self.extract_schema(obj),
metadata={},
timestamp=datetime.now().isoformat(),
)
if LANGCHAIN_AVAILABLE:
info.tool_instance = self.create_tool(info)
info.engine_config = self.create_engine_config(info)
return info
[docs]
def create_tool(self, component_info: ComponentInfo) -> Any | None:
"""Convert retriever to a StructuredTool."""
if not LANGCHAIN_AVAILABLE:
return None
try:
# Import the class
module = importlib.import_module(component_info.module_path)
retriever_class = getattr(module, component_info.class_name)
# Create args model
args_model = self.create_pydantic_model(
retriever_class, force_serializable=True
)
# Add query fields
class RetrieverArgs(args_model):
query: str = Field(description="Query to search for")
k: int | None = Field(
default=4, description="Number of documents to retrieve"
)
def retriever_function(**kwargs) -> dict[str, Any]:
"""Retrieve documents."""
try:
query = kwargs.pop("query")
k = kwargs.pop("k", 4)
# Filter kwargs
filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None}
instance = retriever_class(**filtered_kwargs)
# Retrieve documents
if hasattr(instance, "get_relevant_documents"):
documents = instance.get_relevant_documents(query)
elif hasattr(instance, "retrieve"):
documents = instance.retrieve(query)
else:
return {
"error": "Retriever doesn't have expected methods",
"success": False,
}
documents = documents[:k]
return {
"num_documents": len(documents),
"query": query,
"documents": [
{
"content": doc.page_content,
"metadata": doc.metadata,
"score": getattr(doc, "score", None),
}
for doc in documents
],
}
except Exception as e:
return {"error": str(e), "success": False}
# Create safe tool name
tool_name = f"retrieve_{component_info.name.lower().replace(' ', '_').replace('-', '_')}"
tool_name = "".join(
c if c.isalnum() or c == "_" else "_" for c in tool_name
)
return StructuredTool.from_function(
func=retriever_function,
name=tool_name,
description=f"Retrieve documents using {component_info.class_name}",
args_schema=RetrieverArgs,
)
except Exception as e:
logger.warning(f"Error creating tool: {e}")
return None
[docs]
def create_engine_config(
self, component_info: ComponentInfo
) -> dict[str, Any] | None:
"""Create a Haive RetrieverEngine config."""
try:
return {
"engine_type": "retriever",
"retriever_class": component_info.class_name,
"module_path": component_info.module_path,
"description": component_info.description,
"env_vars": component_info.env_vars,
"schema": component_info.schema,
}
except Exception as e:
logger.warning(f"Error creating engine config: {e}")
return None
[docs]
class VectorStoreAnalyzer(ComponentAnalyzer):
"""Analyzer for vector stores."""
[docs]
def can_analyze(self, obj: Any) -> bool:
try:
if not inspect.isclass(obj):
return False
# Check name
if hasattr(obj, "__name__") and "VectorStore" in obj.__name__:
return True
# Check methods
return bool(
hasattr(obj, "similarity_search") or hasattr(obj, "add_documents")
)
except Exception as e:
logger.debug(f"Error checking if can analyze: {e}")
return False
[docs]
def analyze(self, obj: Any, module_path: str) -> ComponentInfo:
info = ComponentInfo(
name=self.safe_get_name(obj, "VectorStore"),
component_type="vector_store",
module_path=module_path,
class_name=self.safe_get_class_name(obj),
description=inspect.getdoc(obj) or "",
source_code=self.get_source_code(obj),
env_vars=self.detect_env_vars(self.get_source_code(obj)),
schema=self.extract_schema(obj),
metadata={},
timestamp=datetime.now().isoformat(),
)
if LANGCHAIN_AVAILABLE:
info.tool_instance = self.create_tool(info)
info.engine_config = self.create_engine_config(info)
return info
[docs]
def create_tool(self, component_info: ComponentInfo) -> Any | None:
"""Convert vector store to a StructuredTool."""
if not LANGCHAIN_AVAILABLE:
return None
try:
class VectorStoreArgs(BaseModel):
query: str = Field(description="Query to search for")
k: int | None = Field(
default=4, description="Number of documents to retrieve"
)
filter: dict[str, Any] | None = Field(
default=None, description="Metadata filter"
)
def vectorstore_search(**kwargs) -> dict[str, Any]:
"""Search vector store placeholder."""
return {
"message": f"Vector store {component_info.class_name} search placeholder",
"query": kwargs.get("query"),
"k": kwargs.get("k", 4),
"note": "This is a placeholder. Actual implementation requires instantiated vector store.",
}
# Create safe tool name
tool_name = f"search_{component_info.name.lower().replace(' ', '_').replace('-', '_')}"
tool_name = "".join(
c if c.isalnum() or c == "_" else "_" for c in tool_name
)
return StructuredTool.from_function(
func=vectorstore_search,
name=tool_name,
description=f"Search documents using {component_info.class_name}",
args_schema=VectorStoreArgs,
)
except Exception as e:
logger.warning(f"Error creating tool: {e}")
return None
[docs]
def create_engine_config(
self, component_info: ComponentInfo
) -> dict[str, Any] | None:
"""Create a Haive VectorStoreEngine config."""
try:
return {
"engine_type": "vector_store",
"vector_store_class": component_info.class_name,
"module_path": component_info.module_path,
"description": component_info.description,
"env_vars": component_info.env_vars,
"schema": component_info.schema,
}
except Exception as e:
logger.warning(f"Error creating engine config: {e}")
return None