dataflow.registry.models

Models for the Haive Registry System.

This module defines the core data models used by the registry system to represent different types of entities, configurations, dependencies, and other components. These models provide a structured way to store and retrieve information about various components in the Haive ecosystem.

The models use Pydantic for validation, serialization, and deserialization, ensuring type safety and consistent data structures throughout the system.

Classes:

EntityType: Enumeration of entity types that can be registered ConfigType: Enumeration of configuration types for registry items DependencyType: Enumeration of dependency relationships between entities ImportStatus: Enumeration of import operation status values RegistryItem: Base model for all registry entries Configuration: Model for configuration data associated with registry items GraphDefinition: Model for graph structure definitions (nodes and edges) Dependency: Model for dependency relationships between registry items EnvironmentVar: Model for environment variable requirements ImportLogItem: Model for logging import operations

Example

Creating registry models:

>>> from haive.dataflow.registry.models import RegistryItem, EntityType
>>> from datetime import datetime
>>>
>>> # Create a new registry item
>>> item = RegistryItem(
...     name="TextClassifier",
...     type=EntityType.AGENT,
...     description="Classifies text into categories",
...     module_path="haive.agents.classifiers",
...     class_name="TextClassifierAgent",
...     created_at=datetime.now()
... )
>>>
>>> # Access properties
>>> print(f"Registry item: {item.name} ({item.type})")
>>> print(f"Created at: {item.created_at}")

Classes

ConfigType

Types of configurations.

Configuration

Configuration for a registry item.

Dependency

Dependency relationship between registry items.

DependencyType

Types of dependencies between entities.

EntityType

Types of entities that can be registered.

EnvironmentVar

Environment variable requirement for a registry item.

GraphDefinition

Graph definition for a registry item.

ImportLogItem

Log entry for import operations.

ImportStatus

Import operation status.

MCPPromptDefinition

Definition of an MCP prompt template.

MCPResourceDefinition

Definition of an MCP resource.

MCPServerConfig

Configuration for an MCP server.

MCPServerHealth

Health status of an MCP server.

MCPToolDefinition

Definition of an MCP tool.

MCPTransport

Transport types for MCP servers.

RegistryItem

Base model for registry items.

Module Contents

class dataflow.registry.models.ConfigType

Bases: str, enum.Enum

Types of configurations.

ENGINE = 'engine'
GRAPH = 'graph'
INPUT_SCHEMA = 'input_schema'
NODE = 'node'
OUTPUT_SCHEMA = 'output_schema'
PROMPT = 'prompt'
STATE_SCHEMA = 'state_schema'
class dataflow.registry.models.Configuration(/, **data: Any)

Bases: pydantic.BaseModel

Configuration for a registry item.

This model represents configuration data associated with a registry item. Configurations can include schema definitions, initialization parameters, prompt templates, and other settings needed for component operation.

id

Unique identifier for the configuration

Type:

str

registry_id

ID of the associated registry item

Type:

str

config_type

Type of configuration (STATE_SCHEMA, INPUT_SCHEMA, etc.)

Type:

ConfigType

config_data

The actual configuration data

Type:

dict

created_at

Timestamp when the configuration was created

Type:

datetime, optional

updated_at

Timestamp when the configuration was last updated

Type:

datetime, optional

Example

>>> from haive.dataflow.registry.models import Configuration, ConfigType
>>> from datetime import datetime
>>>
>>> config = Configuration(
...     registry_id="tool-123",
...     config_type=ConfigType.INPUT_SCHEMA,
...     config_data={
...         "type": "object",
...         "properties": {
...             "text": {"type": "string", "description": "Text to process"},
...             "max_length": {"type": "integer", "default": 100}
...         },
...         "required": ["text"]
...     },
...     created_at=datetime.now()
... )
config_data: dict[str, Any]
config_type: ConfigType
created_at: datetime.datetime | None = None
id: str = None
registry_id: str
updated_at: datetime.datetime | None = None
class dataflow.registry.models.Dependency(/, **data: Any)

Bases: pydantic.BaseModel

Dependency relationship between registry items.

This model represents a dependency relationship between two registry items, such as a tool requiring a specific engine, or an agent extending another agent. It captures the type of relationship and the entities involved.

id

Unique identifier for the dependency

Type:

str

registry_id

ID of the registry item that has the dependency

Type:

str

dependent_id

ID of the registry item that is depended upon

Type:

str

dependency_type

Type of dependency relationship

Type:

DependencyType

created_at

Timestamp when the dependency was created

Type:

datetime, optional

Example

>>> from haive.dataflow.registry.models import Dependency, DependencyType
>>> from datetime import datetime
>>>
>>> dependency = Dependency(
...     registry_id="tool-123",
...     dependent_id="engine-456",
...     dependency_type=DependencyType.REQUIRES,
...     created_at=datetime.now()
... )
created_at: datetime.datetime | None = None
dependency_type: DependencyType
dependent_id: str
id: str = None
registry_id: str
class dataflow.registry.models.DependencyType

Bases: str, enum.Enum

Types of dependencies between entities.

EXTENDS = 'extends'
REQUIRES = 'requires'
USES = 'uses'
class dataflow.registry.models.EntityType

Bases: str, enum.Enum

Types of entities that can be registered.

AGENT = 'agent'
ENGINE = 'engine'
GAME = 'game'
LLM_MODEL = 'llm_model'
LLM_PROVIDER = 'llm_provider'
MCP_CLIENT = 'mcp_client'
MCP_PROMPT = 'mcp_prompt'
MCP_RESOURCE = 'mcp_resource'
MCP_SERVER = 'mcp_server'
MCP_TOOL = 'mcp_tool'
TOOL = 'tool'
TOOLKIT = 'toolkit'
class dataflow.registry.models.EnvironmentVar(/, **data: Any)

Bases: pydantic.BaseModel

Environment variable requirement for a registry item.

This model represents an environment variable that a component requires or can use. It tracks whether the variable is required, and can provide a default value for optional variables.

id

Unique identifier for the environment variable entry

Type:

str

registry_id

ID of the registry item that requires this variable

Type:

str

env_name

Name of the environment variable (e.g., “OPENAI_API_KEY”)

Type:

str

is_required

Whether the variable is required for the component to function

Type:

bool

default_value

Default value for the variable if not provided

Type:

str, optional

created_at

Timestamp when the entry was created

Type:

datetime, optional

Example

>>> from haive.dataflow.registry.models import EnvironmentVar
>>> from datetime import datetime
>>>
>>> env_var = EnvironmentVar(
...     registry_id="agent-123",
...     env_name="OPENAI_API_KEY",
...     is_required=True,
...     created_at=datetime.now()
... )
>>>
>>> optional_var = EnvironmentVar(
...     registry_id="tool-456",
...     env_name="DEBUG_LEVEL",
...     is_required=False,
...     default_value="INFO",
...     created_at=datetime.now()
... )
created_at: datetime.datetime | None = None
default_value: str | None = None
env_name: str
id: str = None
is_required: bool = False
registry_id: str
class dataflow.registry.models.GraphDefinition(/, **data: Any)

Bases: pydantic.BaseModel

Graph definition for a registry item.

This model represents a graph structure definition for a component, such as an agent or engine. It stores the nodes and edges that define the component’s execution flow or structure.

id

Unique identifier for the graph definition

Type:

str

registry_id

ID of the associated registry item

Type:

str

nodes

List of node definitions in the graph

Type:

List[Dict[str, Any]]

edges

List of edge definitions connecting nodes

Type:

List[Dict[str, Any]]

created_at

Timestamp when the graph was created

Type:

datetime, optional

updated_at

Timestamp when the graph was last updated

Type:

datetime, optional

Example

>>> from haive.dataflow.registry.models import GraphDefinition
>>> from datetime import datetime
>>>
>>> graph = GraphDefinition(
...     registry_id="agent-123",
...     nodes=[
...         {"id": "node1", "type": "input", "config": {...}},
...         {"id": "node2", "type": "processing", "config": {...}},
...         {"id": "node3", "type": "output", "config": {...}}
...     ],
...     edges=[
...         {"source": "node1", "target": "node2"},
...         {"source": "node2", "target": "node3"}
...     ],
...     created_at=datetime.now()
... )
created_at: datetime.datetime | None = None
edges: list[dict[str, Any]] = None
id: str = None
nodes: list[dict[str, Any]] = None
registry_id: str
updated_at: datetime.datetime | None = None
class dataflow.registry.models.ImportLogItem(/, **data: Any)

Bases: pydantic.BaseModel

Log entry for import operations.

This model represents a log entry for an import operation, recording the details of a single entity import attempt. It tracks the status, any error messages, and the session it belongs to.

id

Unique identifier for the log entry

Type:

str

import_session

ID of the import session this log belongs to

Type:

str

entity_name

Name of the entity being imported

Type:

str

entity_type

Type of the entity being imported

Type:

str

status

Status of the import operation (SUCCESS, FAILURE)

Type:

ImportStatus

message

Optional message or details about the import

Type:

str, optional

traceback

Error traceback if the import failed

Type:

str, optional

Example

>>> from haive.dataflow.registry.models import ImportLogItem, ImportStatus
>>> from datetime import datetime
>>>
>>> log_entry = ImportLogItem(
...     import_session="session-123",
...     entity_name="gpt-4",
...     entity_type="llm_model",
...     status=ImportStatus.SUCCESS,
...     message="Successfully imported model"
... )
>>>
>>> error_log = ImportLogItem(
...     import_session="session-123",
...     entity_name="invalid-model",
...     entity_type="llm_model",
...     status=ImportStatus.FAILURE,
...     message="Failed to import model",
...     traceback="ImportError: Model not found"
... )
created_at: datetime.datetime | None = None
entity_name: str
entity_type: str
id: str = None
import_session: str
message: str | None = None
status: ImportStatus
traceback: str | None = None
class dataflow.registry.models.ImportStatus

Bases: str, enum.Enum

Import operation status.

FAILURE = 'failure'
SUCCESS = 'success'
class dataflow.registry.models.MCPPromptDefinition(/, **data: Any)

Bases: pydantic.BaseModel

Definition of an MCP prompt template.

This model represents a prompt template provided by an MCP server, including variables, instructions, and metadata for prompt execution.

name

Prompt name

Type:

str

description

Prompt description

Type:

str

server_name

Name of the MCP server providing this prompt

Type:

str

template

Prompt template string

Type:

str

variables

Template variable definitions

Type:

list[dict[str, Any]]

instructions

Usage instructions

Type:

str, optional

examples

Example inputs/outputs

Type:

list[dict[str, Any]]

tags

Tags for categorization

Type:

list[str]

Example

>>> prompt = MCPPromptDefinition(
...     name="code_review",
...     description="Review code for best practices",
...     server_name="github",
...     template="Review this code: {code}",
...     variables=[{"name": "code", "type": "string", "required": True}],
...     tags=["code", "review"]
... )
description: str
examples: list[dict[str, Any]] = None
instructions: str | None = None
name: str
server_name: str
tags: list[str] = None
template: str
variables: list[dict[str, Any]] = None
class dataflow.registry.models.MCPResourceDefinition(/, **data: Any)

Bases: pydantic.BaseModel

Definition of an MCP resource.

This model represents a resource provided by an MCP server, such as files, datasets, or other data sources that can be accessed by LLMs.

name

Resource name

Type:

str

uri

Resource URI

Type:

str

server_name

Name of the MCP server providing this resource

Type:

str

mime_type

MIME type of the resource

Type:

str, optional

description

Resource description

Type:

str, optional

annotations

Additional metadata

Type:

dict[str, Any]

size

Resource size in bytes

Type:

int, optional

last_modified

Last modification timestamp

Type:

datetime, optional

Example

>>> resource = MCPResourceDefinition(
...     name="project_docs",
...     uri="file:///project/docs/",
...     server_name="filesystem",
...     mime_type="text/markdown",
...     description="Project documentation files"
... )
annotations: dict[str, Any] = None
description: str | None = None
last_modified: datetime.datetime | None = None
mime_type: str | None = None
name: str
server_name: str
size: int | None = None
uri: str
class dataflow.registry.models.MCPServerConfig(/, **data: Any)

Bases: pydantic.BaseModel

Configuration for an MCP server.

This model represents the configuration needed to connect to and interact with an MCP (Model Context Protocol) server, including connection details, authentication, and capabilities.

name

Unique name for the MCP server

Type:

str

transport

Communication transport type

Type:

MCPTransport

command

Command to run the server (for stdio transport)

Type:

str, optional

args

Arguments for the server command

Type:

list[str]

env

Environment variables for the server

Type:

dict[str, str]

url

URL for HTTP/SSE transports

Type:

str, optional

capabilities

List of server capabilities

Type:

list[str]

auth_config

Authentication configuration

Type:

dict[str, Any], optional

health_check_interval

Seconds between health checks

Type:

int

timeout

Connection timeout in seconds

Type:

int

max_retries

Maximum connection retry attempts

Type:

int

Example

>>> config = MCPServerConfig(
...     name="filesystem",
...     transport=MCPTransport.STDIO,
...     command="npx",
...     args=["-y", "@modelcontextprotocol/server-filesystem"],
...     capabilities=["file_read", "file_write", "directory_list"]
... )
args: list[str] = None
auth_config: dict[str, Any] | None = None
capabilities: list[str] = None
command: str | None = None
env: dict[str, str] = None
health_check_interval: int = 30
max_retries: int = 3
name: str
timeout: int = 10
transport: MCPTransport
url: str | None = None
class dataflow.registry.models.MCPServerHealth(/, **data: Any)

Bases: pydantic.BaseModel

Health status of an MCP server.

This model tracks the health and performance metrics of an MCP server, including connection status, response times, and error rates.

server_name

Name of the MCP server

Type:

str

is_healthy

Whether the server is healthy

Type:

bool

last_check

Last health check timestamp

Type:

datetime

response_time_ms

Average response time in milliseconds

Type:

float, optional

error_count

Number of recent errors

Type:

int

uptime_seconds

Server uptime in seconds

Type:

float, optional

capabilities_available

Currently available capabilities

Type:

list[str]

error_details

Details of recent errors

Type:

str, optional

Example

>>> health = MCPServerHealth(
...     server_name="filesystem",
...     is_healthy=True,
...     last_check=datetime.now(),
...     response_time_ms=50.0,
...     error_count=0,
...     capabilities_available=["file_read", "file_write"]
... )
capabilities_available: list[str] = None
error_count: int = 0
error_details: str | None = None
is_healthy: bool
last_check: datetime.datetime
response_time_ms: float | None = None
server_name: str
uptime_seconds: float | None = None
class dataflow.registry.models.MCPToolDefinition(/, **data: Any)

Bases: pydantic.BaseModel

Definition of an MCP tool.

This model represents a tool provided by an MCP server, including its name, description, schema, and metadata needed for execution.

name

Tool name

Type:

str

description

Tool description

Type:

str

server_name

Name of the MCP server providing this tool

Type:

str

schema

JSON schema for tool parameters

Type:

dict[str, Any]

input_schema

Input validation schema

Type:

dict[str, Any], optional

output_schema

Output validation schema

Type:

dict[str, Any], optional

tags

Tags for categorization

Type:

list[str]

version

Tool version

Type:

str

Example

>>> tool = MCPToolDefinition(
...     name="read_file",
...     description="Read contents of a file",
...     server_name="filesystem",
...     schema={"type": "object", "properties": {"path": {"type": "string"}}},
...     tags=["filesystem", "read"]
... )
description: str
input_schema: dict[str, Any] | None = None
name: str
output_schema: dict[str, Any] | None = None
server_name: str
tags: list[str] = None
tool_schema: dict[str, Any] = None
version: str = '1.0.0'
class dataflow.registry.models.MCPTransport

Bases: str, enum.Enum

Transport types for MCP servers.

HTTP = 'http'
SSE = 'sse'
STDIO = 'stdio'
class dataflow.registry.models.RegistryItem(/, **data: Any)

Bases: pydantic.BaseModel

Base model for registry items.

This model represents a component registered in the registry system, such as an agent, tool, engine, or game. It stores essential metadata about the component, including its type, location, and description.

id

Unique identifier for the registry item

Type:

str

name

Human-readable name for the component

Type:

str

type

Type of the component (AGENT, TOOL, etc.)

Type:

EntityType

description

Detailed description of the component

Type:

str, optional

module_path

Python module path where the component is defined

Type:

str, optional

class_name

Class name of the component within the module

Type:

str, optional

created_at

Timestamp when the item was created

Type:

datetime, optional

updated_at

Timestamp when the item was last updated

Type:

datetime, optional

metadata

Additional metadata about the component

Type:

dict

Example

>>> from haive.dataflow.registry.models import RegistryItem, EntityType
>>> from datetime import datetime
>>>
>>> item = RegistryItem(
...     name="TextSummarizer",
...     type=EntityType.TOOL,
...     description="Summarizes long text documents",
...     module_path="haive.tools.summarizers",
...     class_name="TextSummarizerTool",
...     created_at=datetime.now(),
...     metadata={"version": "1.0", "author": "Haive Team"}
... )
class_name: str | None = None
created_at: datetime.datetime | None = None
description: str | None = None
id: str = None
metadata: dict[str, Any] = None
module_path: str | None = None
name: str
type: EntityType
updated_at: datetime.datetime | None = None