Source code for haive.core.engine.document.loaders.cache_manager
"""Cache manager for document loader registry.This module provides caching functionality to speed up document loaderinitialization by avoiding repeated scanning of 230+ loader modules."""importhashlibimportjsonimportloggingimportpicklefromdatetimeimportdatetime,timedeltafrompathlibimportPathfromtypingimportAnylogger=logging.getLogger(__name__)
[docs]classRegistryCacheManager:"""Manages caching of document loader registry data. This significantly speeds up imports by caching: - Discovered source modules - Registered loaders and their configurations - Source type mappings - Loader capabilities Cache is invalidated when: - Source files are modified - Package version changes - Cache expires (default 7 days) - User explicitly clears cache """def__init__(self,cache_dir:Path|None=None,cache_ttl_days:int=7,use_memory_cache:bool=True,):"""Initialize cache manager. Args: cache_dir: Directory for cache files (default: ~/.cache/haive/loaders) cache_ttl_days: Cache time-to-live in days use_memory_cache: Whether to use in-memory caching for current session """self.cache_dir=cache_diror(Path.home()/".cache"/"haive"/"loaders")self.cache_ttl=timedelta(days=cache_ttl_days)self.use_memory_cache=use_memory_cache# File pathsself.registry_cache_file=self.cache_dir/"registry_cache.pkl"self.metadata_file=self.cache_dir/"cache_metadata.json"# In-memory cache for current sessionself._memory_cache:dict[str,Any]={}self._cache_loaded=False# Ensure cache directory existsself.cache_dir.mkdir(parents=True,exist_ok=True)
[docs]defget_cached_registry(self)->dict[str,Any]|None:"""Get cached registry data if valid. Returns: Cached registry data or None if cache is invalid/missing """# Check memory cache firstifself.use_memory_cacheandself._cache_loaded:returnself._memory_cache.get("registry")# Check disk cacheifnotself._is_cache_valid():logger.debug("Cache is invalid or missing")returnNonetry:withopen(self.registry_cache_file,"rb")asf:data=pickle.load(f)# Load into memory cacheifself.use_memory_cache:self._memory_cache["registry"]=dataself._cache_loaded=Truelogger.info("Loaded registry from cache")returndataexceptExceptionase:logger.warning(f"Failed to load cache: {e}")returnNone
[docs]defsave_registry_cache(self,registry_data:dict[str,Any],source_files:set[Path]|None=None)->bool:"""Save registry data to cache. Args: registry_data: Registry data to cache source_files: Set of source files that were scanned Returns: True if cache was saved successfully """try:# Save registry datawithopen(self.registry_cache_file,"wb")asf:pickle.dump(registry_data,f,protocol=pickle.HIGHEST_PROTOCOL)# Save metadatametadata={"created_at":datetime.now().isoformat(),"haive_version":self._get_haive_version(),"source_files_hash":self._calculate_files_hash(source_files),"stats":{"total_sources":len(registry_data.get("sources",{})),"total_loaders":sum(len(info.get("loaders",{}))forinfoinregistry_data.get("sources",{}).values()),},}withopen(self.metadata_file,"w")asf:json.dump(metadata,f,indent=2)# Update memory cacheifself.use_memory_cache:self._memory_cache["registry"]=registry_dataself._cache_loaded=Truelogger.info(f"Saved registry cache with {metadata['stats']['total_sources']} sources "f"and {metadata['stats']['total_loaders']} loaders")returnTrueexceptExceptionase:logger.exception(f"Failed to save cache: {e}")returnFalse
[docs]defclear_cache(self)->None:"""Clear all cache files and memory cache."""# Clear disk cacheifself.registry_cache_file.exists():self.registry_cache_file.unlink()ifself.metadata_file.exists():self.metadata_file.unlink()# Clear memory cacheself._memory_cache.clear()self._cache_loaded=Falselogger.info("Cleared registry cache")
def_is_cache_valid(self)->bool:"""Check if cache is valid. Returns: True if cache exists and is valid """# Check if cache files existifnotself.registry_cache_file.exists()ornotself.metadata_file.exists():returnFalsetry:# Load metadatawithopen(self.metadata_file)asf:metadata=json.load(f)# Check cache agecreated_at=datetime.fromisoformat(metadata["created_at"])ifdatetime.now()-created_at>self.cache_ttl:logger.debug("Cache expired")returnFalse# Check version compatibilitycurrent_version=self._get_haive_version()ifmetadata.get("haive_version")!=current_version:logger.debug(f"Version mismatch: {metadata.get('haive_version')} != {current_version}")returnFalse# TODO: Check if source files have been modified# This would require tracking file modification timesreturnTrueexceptExceptionase:logger.debug(f"Cache validation failed: {e}")returnFalsedef_get_haive_version(self)->str:"""Get current haive version."""try:importhaivereturngetattr(haive,"__version__","unknown")exceptBaseException:return"unknown"def_calculate_files_hash(self,source_files:set[Path]|None)->str:"""Calculate hash of source files for change detection."""ifnotsource_files:return""# Sort files for consistent hashingsorted_files=sorted(source_files)# Create hash of file paths and modification timeshasher=hashlib.md5()forfile_pathinsorted_files:iffile_path.exists():hasher.update(str(file_path).encode())hasher.update(str(file_path.stat().st_mtime).encode())returnhasher.hexdigest()
[docs]defget_cache_info(self)->dict[str,Any]:"""Get information about current cache status. Returns: Dictionary with cache information """info={"cache_dir":str(self.cache_dir),"cache_exists":self.registry_cache_file.exists(),"memory_cache_loaded":self._cache_loaded,"memory_cache_size":len(self._memory_cache),}ifself.metadata_file.exists():try:withopen(self.metadata_file)asf:metadata=json.load(f)info["metadata"]=metadatainfo["cache_valid"]=self._is_cache_valid()exceptBaseException:info["metadata"]=Noneinfo["cache_valid"]=Falseelse:info["metadata"]=Noneinfo["cache_valid"]=Falsereturninfo
# Global cache manager instance_cache_manager=RegistryCacheManager()
[docs]defget_cache_manager()->RegistryCacheManager:"""Get the global cache manager instance."""return_cache_manager
[docs]defclear_loader_cache()->None:"""Clear the document loader cache. Use this when you've installed new packages or made changes to loader implementations. """_cache_manager.clear_cache()
[docs]defget_cache_status()->dict[str,Any]:"""Get current cache status information."""return_cache_manager.get_cache_info()