dataflow.api.db¶

Attributes¶

Classes¶

DatabaseManager

Manages database operations for the agent registry.

Module Contents¶

class dataflow.api.db.DatabaseManager(connection_params: dict[str, Any])¶

Manages database operations for the agent registry.

close() None¶

Close the database connection.

connect() bool¶

Connect to the database.

Returns:

True if connection successful, False otherwise

create_schema() bool¶

Create the AI schema if it doesn’t exist.

Returns:

True if successful, False otherwise

create_tables() bool¶

Create the necessary tables in the AI schema.

Returns:

True if successful, False otherwise

get_agent_configs(agent_type: str | None = None) list[dict[str, Any]]¶

Get agent configurations from the database.

Parameters:

agent_type – Optional agent type to filter by

Returns:

List of agent configuration dictionaries

register_agent_config(name: str, class_obj: Any, agent_type: str = 'agent') bool¶

Register an agent configuration in the database.

Parameters:
  • name – Agent name

  • class_obj – Agent configuration class

  • agent_type – Type of agent (e.g., ‘agent’, ‘game’)

Returns:

True if successful, False otherwise

connection = None¶
connection_params¶
schema_name = 'ai'¶
dataflow.api.db.logger¶