dataflow.core¶
Core Registry System for Haive.
This module provides the central registry system for managing LLM models, embeddings, and other entity types in the system.
Attributes¶
Classes¶
Types of configuration that can be associated with entities. |
|
Types of dependencies between entities. |
|
Types of entities that can be registered. |
|
Status of an import operation. |
|
Core registry system for managing entities. |
Module Contents¶
- class dataflow.core.ConfigType¶
-
Types of configuration that can be associated with entities.
- CONNECTION = 'connection'¶
- CUSTOM = 'custom'¶
- INIT = 'init'¶
- SCHEMA = 'schema'¶
- WORKFLOW = 'workflow'¶
- class dataflow.core.DependencyType¶
-
Types of dependencies between entities.
- CONFLICTS = 'conflicts'¶
- EXTENDS = 'extends'¶
- RECOMMENDS = 'recommends'¶
- REQUIRES = 'requires'¶
- class dataflow.core.EntityType¶
-
Types of entities that can be registered.
- AGENT = 'agent'¶
- CUSTOM = 'custom'¶
- DATA_SOURCE = 'data_source'¶
- EMBEDDING = 'embedding'¶
- EMBEDDING_PROVIDER = 'embedding_provider'¶
- LLM = 'llm'¶
- LLM_PROVIDER = 'llm_provider'¶
- TOOL = 'tool'¶
- WORKFLOW = 'workflow'¶
- class dataflow.core.ImportStatus¶
-
Status of an import operation.
- FAILURE = 'failure'¶
- PARTIAL = 'partial'¶
- SKIPPED = 'skipped'¶
- SUCCESS = 'success'¶
- class dataflow.core.RegistrySystem¶
Core registry system for managing entities.
The registry system is a centralized repository for tracking and managing entities such as LLM models, embeddings, agents, tools, etc.
It provides both in-memory storage and database persistence via Supabase.
- add_configuration(registry_id: str, config_type: ConfigType, config_data: Any) str | None¶
Add a configuration to an entity.
- Parameters:
registry_id – ID of the registered entity
config_type – Type of configuration
config_data – Configuration data
- Returns:
ID of the configuration or None on failure
- add_dependency(registry_id: str, dependent_id: str, dependency_type: DependencyType) str | None¶
Add a dependency between two entities.
- Parameters:
registry_id – ID of the entity that depends on another
dependent_id – ID of the entity being depended on
dependency_type – Type of dependency
- Returns:
ID of the dependency or None on failure
- add_environment_var(var_name: str, provider_name: str, is_required: bool = True, description: str | None = None) str | None¶
Add an environment variable to the registry.
- Parameters:
var_name – Name of the environment variable
provider_name – Provider this environment variable is for
is_required – Whether the environment variable is required
description – Optional description
- Returns:
ID of the environment variable entry or None on failure
- add_import_log(import_session: str, entity_name: str, entity_type: str, status: ImportStatus, message: str | None = None, traceback_str: str | None = None) None¶
Add an import log entry.
- Parameters:
import_session – Import session identifier
entity_name – Name of the entity being imported
entity_type – Type of entity
status – Import status
message – Optional message
traceback_str – Optional traceback string
- check_environment_var(var_name: str) bool¶
Check if an environment variable is set.
- Parameters:
var_name – Name of environment variable to check
- Returns:
True if the environment variable is set, False otherwise
- get_available_providers(entity_type: EntityType | None = None) list[dict[str, Any]]¶
Get all available providers.
- Parameters:
entity_type – Optional entity type to filter providers by (e.g., LLM_PROVIDER)
- Returns:
List of provider data with availability info
- get_entities_by_type(entity_type: EntityType) list[dict[str, Any]]¶
Get all entities of a specific type.
- Parameters:
entity_type – Type of entities to retrieve
- Returns:
List of entity data
- get_entity(entity_id: str) dict[str, Any] | None¶
Get an entity by ID.
- Parameters:
entity_id – ID of the entity
- Returns:
Entity data or None if not found
- get_environment_vars(provider_name: str | None = None) list[dict[str, Any]]¶
Get environment variables, optionally filtered by provider.
- Parameters:
provider_name – Optional provider to filter by
- Returns:
List of environment variable data
- register_entity(name: str, entity_type: EntityType, description: str | None = None, metadata: dict[str, Any] | None = None) str¶
Register a new entity in the registry.
- Parameters:
name – Name of the entity
entity_type – Type of entity
description – Optional description
metadata – Optional metadata dictionary
- Returns:
ID of the registered entity
- search_entities(query: str, entity_type: EntityType | None = None, metadata_filter: dict[str, Any] | None = None) list[dict[str, Any]]¶
Search for entities based on a query.
- Parameters:
query – Search query
entity_type – Optional entity type to filter by
metadata_filter – Optional metadata filter
- Returns:
List of matching entities
- dataflow.core.logger¶
- dataflow.core.registry_system¶