Source code for haive.core.utils.haive_discovery.base_analyzer
"""Base analyzer class and common functionality for component analysis."""importinspectimportloggingimportrefromabcimportABC,abstractmethodfromtypingimportAnyfrompydanticimportBaseModel,ConfigDict,create_modelfromhaive.core.utils.haive_discovery.component_infoimportComponentInfologger=logging.getLogger(__name__)
[docs]classComponentAnalyzer(ABC):"""Abstract base class for component analyzers."""
[docs]@abstractmethoddefcan_analyze(self,obj:Any)->bool:"""Check if this analyzer can handle the given object."""
[docs]@abstractmethoddefanalyze(self,obj:Any,module_path:str)->ComponentInfo:"""Analyze the object and return component info."""
[docs]defcreate_tool(self,component_info:ComponentInfo)->Any|None:"""Convert component to a StructuredTool if possible."""returnNone
[docs]defcreate_engine_config(self,component_info:ComponentInfo)->Any|None:"""Create a Haive engine config if possible."""returnNone
[docs]defsafe_get_name(self,obj:Any,default_prefix:str="Component")->str:"""Safely get the name of an object."""try:ifhasattr(obj,"__name__"):returnobj.__name__ifhasattr(obj,"name"):returnobj.namereturnf"{default_prefix}_{id(obj)}"exceptException:returnf"{default_prefix}_{id(obj)}"
[docs]defsafe_get_class_name(self,obj:Any)->str:"""Safely get the class name of an object."""try:ifhasattr(obj,"__name__"):returnobj.__name__returntype(obj).__name__exceptException:return"UnknownClass"
[docs]defdetect_env_vars(self,source_code:str)->list[str]:"""Detect environment variables in source code."""ifnotsource_code:return[]patterns=[r'os\.environ\.get\(["\']([A-Za-z0-9_]+)["\']',r'os\.getenv\(["\']([A-Za-z0-9_]+)["\']',r'os\.environ\[["\']([A-Za-z0-9_]+)["\']',r'getenv\(["\']([A-Za-z0-9_]+)["\']',r'["\']([A-Z][A-Z0-9_]+_(?:KEY|TOKEN|SECRET|PASSWORD|ID|URL|URI|ENDPOINT|CREDENTIALS))["\']',]env_vars=set()forpatterninpatterns:matches=re.findall(pattern,source_code)env_vars.update(matches)returnsorted(env_vars)
[docs]defget_source_code(self,obj:Any)->str:"""Extract source code from object."""try:returninspect.getsource(obj)except(TypeError,OSError):ifhasattr(obj,"__wrapped__"):try:returninspect.getsource(obj.__wrapped__)except(TypeError,OSError):passreturn""
[docs]defextract_schema(self,obj:Any)->dict[str,Any]:"""Extract schema information from object."""try:ifhasattr(obj,"args_schema")andobj.args_schema:# LangChain tool schemaifhasattr(obj.args_schema,"model_json_schema"):returnobj.args_schema.model_json_schema()ifhasattr(obj.args_schema,"schema"):returnobj.args_schema.schema()# Try to create schema from __init__ signatureifhasattr(obj,"__init__"):sig=inspect.signature(obj.__init__)fields={}forname,paraminsig.parameters.items():ifname=="self":continueparam_type=(param.annotationifparam.annotation!=inspect._emptyelse"Any")default=param.defaultifparam.default!=inspect._emptyelseNonefields[name]={"type":str(param_type),"default":str(default)ifdefaultisnotNoneelseNone,}return{"properties":fields}exceptExceptionase:logger.warning(f"Error extracting schema: {e}")return{}
[docs]defcreate_pydantic_model(self,cls:type,force_serializable:bool=False)->type[BaseModel]:"""Create a Pydantic model from a class signature."""try:sig=inspect.signature(cls.__init__)fields={}forname,paraminsig.parameters.items():ifname=="self":continueparam_type=(param.annotationifparam.annotation!=inspect._emptyelseAny)default=param.defaultifparam.default!=inspect._emptyelse...# Handle complex types that might cause issuestry:ifhasattr(param_type,"__origin__"):# For generic types like Dict[str, Tuple[Type, Any]],# simplify to basic typestype_str=str(param_type)if"Dict"intype_str:param_type=dictelif"List"intype_str:param_type=listelif"Optional"intype_str:param_type=AnyexceptException:# If type inspection fails, use Any as fallbackparam_type=Anyfields[name]=(param_type,default)config=(ConfigDict(arbitrary_types_allowed=True)ifforce_serializableelseNone)returncreate_model(f"{cls.__name__}Args",__config__=config,**fields,)exceptExceptionase:logger.warning(f"Error creating Pydantic model for {cls.__name__}: {e}")# Return a minimal model with just basic fields as fallbacktry:returncreate_model(f"{cls.__name__}Args",__config__=ConfigDict(arbitrary_types_allowed=True),args=(Any,None),kwds=(Any,None),)exceptExceptionasfallback_error:logger.warning(f"Fallback model creation also failed: {fallback_error}")returncreate_model(f"{cls.__name__}Args")