Source code for haive.core.engine.document.loaders.specific.database
"""Database Loaders for Document Engine.This module implements database loaders for MongoDB, PostgreSQL, and other databasesadapted for the document engine framework."""importloggingfromtypingimportAnyfromurllib.parseimporturlparsefromlangchain_core.document_loaders.baseimportBaseLoaderfromhaive.core.engine.document.loaders.sources.implementationimport(CredentialType,DatabaseSource,)logger=logging.getLogger(__name__)
[docs]defcan_handle(self,path:str)->bool:"""Check if this is a MongoDB connection string."""try:parsed=urlparse(path)returnparsed.schemein["mongodb","mongo"]exceptException:returnFalse
[docs]defget_confidence_score(self,path:str)->float:"""Get confidence score for MongoDB connections."""ifnotself.can_handle(path):return0.0return0.9
[docs]defrequires_authentication(self)->bool:"""MongoDB typically requires authentication."""returnTrue
[docs]defcreate_loader(self)->BaseLoader|None:"""Create a MongoDB loader."""try:fromlangchain_community.document_loadersimportMongodbLoader# Parse connection componentsparsed=urlparse(self.connection_string)# Get credentials if neededusername=parsed.usernamepassword=parsed.passwordifnot(usernameandpassword)andself.credential_manager:cred=self.credential_manager.get_credential("mongodb")ifcredandcred.credential_type==CredentialType.USERNAME_PASSWORD:# Assume format "username:password"if":"incred.value:username,password=cred.value.split(":",1)# Build connection URIifusernameandpassword:netloc=f"{username}:{password}@{parsed.hostname}"ifparsed.port:netloc+=f":{parsed.port}"else:netloc=parsed.netlocconnection_uri=f"{parsed.scheme}://{netloc}"# Get database namedb_name=self.database_nameorparsed.path.lstrip("/")ifnotdb_name:raiseValueError("Database name is required")returnMongodbLoader(connection_string=connection_uri,db_name=db_name,collection_name=self.collection_name,filter_criteria=self.filter_criteria,)exceptImportError:logger.warning("MongodbLoader not available. Install with: pip install pymongo")returnNoneexceptExceptionase:logger.exception(f"Failed to create MongoDB loader: {e}")returnNone
[docs]defcan_handle(self,path:str)->bool:"""Check if this is a PostgreSQL connection string."""try:parsed=urlparse(path)returnparsed.schemein["postgresql","postgres"]exceptException:returnFalse
[docs]defget_confidence_score(self,path:str)->float:"""Get confidence score for PostgreSQL connections."""ifnotself.can_handle(path):return0.0return0.9
[docs]defrequires_authentication(self)->bool:"""PostgreSQL typically requires authentication."""returnTrue
[docs]defcreate_loader(self)->BaseLoader|None:"""Create a PostgreSQL loader."""try:fromlangchain_community.document_loaders.sql_databaseimport(SQLDatabaseLoader,)fromsqlalchemyimportcreate_engine# Get credentials if neededparsed=urlparse(self.connection_string)username=parsed.usernamepassword=parsed.passwordifnot(usernameandpassword)andself.credential_manager:cred=self.credential_manager.get_credential("postgresql")ifcredandcred.credential_type==CredentialType.USERNAME_PASSWORD:if":"incred.value:username,password=cred.value.split(":",1)# Build connection URIifusernameandpassword:netloc=f"{username}:{password}@{parsed.hostname}"ifparsed.port:netloc+=f":{parsed.port}"else:netloc=parsed.netlocconnection_uri=f"{parsed.scheme}://{netloc}{parsed.path}"# Create engineengine=create_engine(connection_uri)# Build queryifself.query:query=self.queryelifself.table_name:query=f"SELECT * FROM {self.table_name}"else:raiseValueError("Either query or table_name must be provided")returnSQLDatabaseLoader(query=query,db=engine,page_content_columns=None,# Use all columns)exceptImportError:logger.warning("SQLDatabaseLoader not available. Install with: pip install sqlalchemy")returnNoneexceptExceptionase:logger.exception(f"Failed to create PostgreSQL loader: {e}")returnNone