Source code for haive.core.contracts.orchestrator

"""Orchestrator for contract enforcement.

This module provides the central orchestrator that ensures all contracts
are respected during execution, providing the control layer over the
dynamic runtime system.
"""

from __future__ import annotations

import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

from haive.core.contracts.boundaries import AccessPermissions, BoundedState, StateView
from haive.core.contracts.engine_contracts import EngineContract, EngineInterface
from haive.core.contracts.node_contracts import ContractualNode, ContractViolation, NodeContract

logger = logging.getLogger(__name__)


[docs] class Orchestrator: """Orchestrates execution with contract enforcement. This is the central coordinator that ensures all contracts are respected during execution, providing the control layer over the dynamic runtime system. Attributes: components: Registered components (engines and nodes) contracts: Contracts for each component access_rules: Derived access permissions execution_log: Log of all executions _dependency_graph: Component dependency graph Examples: >>> # Create orchestrator >>> orchestrator = Orchestrator() >>> >>> # Register components >>> orchestrator.register_engine("llm", llm_engine) >>> orchestrator.register_node(validation_node) >>> >>> # Execute with contract enforcement >>> state = BoundedState(initial_data) >>> result = orchestrator.execute("llm", state) """ def __init__(self): """Initialize orchestrator.""" self.components: Dict[str, Any] = {} self.contracts: Dict[str, Union[EngineContract, NodeContract]] = {} self.access_rules: Dict[str, AccessPermissions] = {} self.execution_log: List[Dict[str, Any]] = [] self._dependency_graph: Dict[str, List[str]] = {} self._execution_order: List[str] = []
[docs] def register_engine( self, name: str, engine: EngineInterface, auto_register_state: bool = True ) -> None: """Register engine with its contract. Args: name: Engine identifier engine: Engine implementing EngineInterface auto_register_state: Whether to auto-register with state Examples: >>> orchestrator.register_engine("llm", ContractualLLMEngine()) >>> orchestrator.register_engine("retriever", retriever_engine) """ self.components[name] = engine self.contracts[name] = engine.get_contract() # Derive access rules from contract permissions = self._derive_permissions_from_contract(engine.get_contract()) self.access_rules[name] = permissions # Update dependency graph self._update_dependency_graph(name, []) logger.info( f"Registered engine '{name}' with contract: " f"inputs={len(engine.get_contract().inputs)}, " f"outputs={len(engine.get_contract().outputs)}" )
[docs] def register_node( self, node: ContractualNode, auto_register_state: bool = True ) -> None: """Register node with its contract. Args: node: Node with contract auto_register_state: Whether to auto-register with state Examples: >>> validation_node = ContractualNode("validator", contract, validate_fn) >>> orchestrator.register_node(validation_node) """ self.components[node.name] = node self.contracts[node.name] = node.contract # Derive access rules from node contract permissions = self._derive_permissions_from_node_contract(node.contract) self.access_rules[node.name] = permissions # Update dependency graph self._update_dependency_graph(node.name, node.contract.dependencies) logger.info( f"Registered node '{node.name}' with contract: " f"inputs={node.contract.inputs}, " f"outputs={node.contract.outputs}, " f"dependencies={node.contract.dependencies}" )
[docs] def execute( self, component_name: str, state: BoundedState, validate_only: bool = False ) -> Any: """Execute component with full contract enforcement. Args: component_name: Component to execute state: Bounded state container validate_only: Only validate, don't execute Returns: Execution result Raises: ContractViolation: If any contract violated ValueError: If component not found Examples: >>> # Execute with enforcement >>> result = orchestrator.execute("llm", state) >>> >>> # Validate only >>> orchestrator.execute("llm", state, validate_only=True) """ if component_name not in self.components: available = list(self.components.keys()) raise ValueError( f"Component '{component_name}' not registered. " f"Available: {available}" ) component = self.components[component_name] contract = self.contracts[component_name] # Register component with state if not already if component_name not in state._access_rules: state.register_component(component_name, self.access_rules[component_name]) # Get bounded view for component state_view = state.get_view_for(component_name) # Pre-execution validation self._validate_pre_execution(component, component_name, state) if validate_only: logger.info(f"Validation passed for component '{component_name}'") return {"validation": "passed"} # Execute with monitoring start_time = datetime.now() try: if isinstance(component, ContractualNode): result = component(state_view) elif isinstance(component, EngineInterface): # Convert state view to dict for engine state_dict = self._state_view_to_dict(state_view, component.get_contract()) result = component.execute(state_dict) else: raise TypeError(f"Unknown component type: {type(component)}") execution_time = (datetime.now() - start_time).total_seconds() # Post-execution validation self._validate_post_execution(component, component_name, result, state) # Log successful execution self._log_execution(component_name, execution_time, "success") return result except Exception as e: execution_time = (datetime.now() - start_time).total_seconds() self._log_execution(component_name, execution_time, "failed", str(e)) raise
[docs] def execute_chain( self, component_names: List[str], state: BoundedState, stop_on_error: bool = True ) -> Dict[str, Any]: """Execute chain of components in sequence. Args: component_names: Components to execute in order state: Bounded state container stop_on_error: Whether to stop on first error Returns: Combined results from all components Examples: >>> # Execute pipeline >>> results = orchestrator.execute_chain( ... ["retriever", "llm", "validator"], ... state ... ) """ results = {} executed = [] for name in component_names: try: # Check dependencies if name in self._dependency_graph: missing = [ dep for dep in self._dependency_graph[name] if dep not in executed ] if missing: raise ContractViolation({ "component": name, "type": "dependency", "details": f"Missing dependencies: {missing}" }) # Execute component result = self.execute(name, state) results[name] = result executed.append(name) # Update state with results if dict if isinstance(result, dict): for key, value in result.items(): if key in self.access_rules[name].writable: state._data[key] = value except Exception as e: logger.error(f"Component '{name}' failed: {e}") if stop_on_error: raise results[name] = {"error": str(e)} return results
[docs] def validate_composition( self, components: List[str] ) -> List[str]: """Validate that components can be composed. Args: components: List of component names in execution order Returns: List of compatibility issues Examples: >>> issues = orchestrator.validate_composition(["llm", "validator"]) >>> if issues: ... print(f"Issues found: {issues}") """ issues = [] available_fields = set() for i, current in enumerate(components): if current not in self.contracts: issues.append(f"Component '{current}' not registered") continue contract = self.contracts[current] # Check inputs are available if isinstance(contract, EngineContract): required = contract.get_required_inputs() elif isinstance(contract, NodeContract): required = contract.inputs else: required = [] if i > 0: # Not first component missing = set(required) - available_fields if missing: issues.append( f"Component '{current}' requires {missing} " f"but previous components don't provide them" ) # Add outputs to available if isinstance(contract, EngineContract): outputs = contract.get_guaranteed_outputs() elif isinstance(contract, NodeContract): outputs = contract.outputs else: outputs = [] available_fields.update(outputs) # Add transform targets if node if isinstance(contract, NodeContract): available_fields.update(contract.transforms.values()) return issues
[docs] def get_execution_plan( self, components: List[str] ) -> List[Dict[str, Any]]: """Get execution plan for components. Args: components: Component names Returns: Execution plan with details """ plan = [] for i, name in enumerate(components): if name not in self.contracts: plan.append({ "step": i + 1, "component": name, "status": "not_registered" }) continue contract = self.contracts[name] if isinstance(contract, EngineContract): inputs = contract.get_required_inputs() outputs = contract.get_guaranteed_outputs() dependencies = [] else: inputs = contract.inputs outputs = contract.outputs dependencies = contract.dependencies plan.append({ "step": i + 1, "component": name, "type": "engine" if isinstance(contract, EngineContract) else "node", "inputs": inputs, "outputs": outputs, "dependencies": dependencies }) return plan
[docs] def get_execution_summary(self) -> Dict[str, Any]: """Get summary of all executions. Returns: Execution statistics """ total = len(self.execution_log) successful = sum(1 for e in self.execution_log if e["status"] == "success") failed = total - successful avg_duration = 0 if successful > 0: durations = [ e["duration"] for e in self.execution_log if e["status"] == "success" ] avg_duration = sum(durations) / len(durations) # Component statistics component_stats = {} for log in self.execution_log: comp = log["component"] if comp not in component_stats: component_stats[comp] = {"success": 0, "failed": 0} if log["status"] == "success": component_stats[comp]["success"] += 1 else: component_stats[comp]["failed"] += 1 return { "total_executions": total, "successful": successful, "failed": failed, "success_rate": successful / total if total > 0 else 0, "average_duration": avg_duration, "registered_components": len(self.components), "component_stats": component_stats, "recent_failures": [ e for e in self.execution_log[-10:] if e["status"] == "failed" ] }
def _validate_pre_execution( self, component: Any, name: str, state: BoundedState ) -> None: """Validate pre-execution conditions. Args: component: Component to validate name: Component name state: Current state Raises: ContractViolation: If validation fails """ state_snapshot = state.snapshot() if isinstance(component, EngineInterface): # Validate inputs if not component.validate_input(state_snapshot): raise ContractViolation({ "component": name, "phase": "pre-execution", "details": "Input validation failed" }) # Check preconditions unmet = component.check_preconditions(state_snapshot) if unmet: raise ContractViolation({ "component": name, "phase": "preconditions", "details": f"Unmet preconditions: {unmet}" }) def _validate_post_execution( self, component: Any, name: str, result: Any, state: BoundedState ) -> None: """Validate post-execution conditions. Args: component: Component that executed name: Component name result: Execution result state: Current state Raises: ContractViolation: If validation fails """ if isinstance(component, EngineInterface): # Validate output if not component.validate_output(result): raise ContractViolation({ "component": name, "phase": "post-execution", "details": "Output validation failed" }) # Check postconditions state_snapshot = state.snapshot() unmet = component.check_postconditions(state_snapshot) if unmet: raise ContractViolation({ "component": name, "phase": "postconditions", "details": f"Unmet postconditions: {unmet}" }) def _derive_permissions_from_contract( self, contract: EngineContract ) -> AccessPermissions: """Derive access permissions from engine contract. Args: contract: Engine contract Returns: Access permissions """ permissions = AccessPermissions() # Inputs are readable for field in contract.inputs: permissions.readable.add(field.name) # Outputs are writable for field in contract.outputs: permissions.writable.add(field.name) # Side effects need write access for field in contract.side_effects: permissions.writable.add(field) return permissions def _derive_permissions_from_node_contract( self, contract: NodeContract ) -> AccessPermissions: """Derive access permissions from node contract. Args: contract: Node contract Returns: Access permissions """ permissions = AccessPermissions() # Inputs are readable permissions.readable.update(contract.inputs) # Outputs are writable permissions.writable.update(contract.outputs) # Transforms need both read and write for source, target in contract.transforms.items(): permissions.readable.add(source) permissions.writable.add(target) return permissions def _state_view_to_dict( self, state_view: StateView, contract: EngineContract ) -> Dict[str, Any]: """Convert state view to dict for engine. Args: state_view: State view contract: Engine contract Returns: Dictionary with required fields """ result = {} # Get required inputs for field in contract.inputs: if field.required: try: result[field.name] = state_view.get(field.name) except: if field.default is not None: result[field.name] = field.default elif state_view.has_permission("read", field.name): result[field.name] = state_view.get(field.name, field.default) return result def _update_dependency_graph( self, component: str, dependencies: List[str] ) -> None: """Update component dependency graph. Args: component: Component name dependencies: Component dependencies """ self._dependency_graph[component] = dependencies # Update execution order self._update_execution_order() def _update_execution_order(self) -> None: """Update topological execution order based on dependencies.""" # Simple topological sort visited = set() order = [] def visit(node: str): if node in visited: return visited.add(node) if node in self._dependency_graph: for dep in self._dependency_graph[node]: if dep in self._dependency_graph: visit(dep) order.append(node) for component in self._dependency_graph: visit(component) self._execution_order = order def _log_execution( self, component: str, duration: float, status: str, error: Optional[str] = None ) -> None: """Log execution details. Args: component: Component name duration: Execution duration status: Execution status error: Error message if failed """ log_entry = { "timestamp": datetime.now().isoformat(), "component": component, "duration": duration, "status": status } if error: log_entry["error"] = error self.execution_log.append(log_entry) if status == "success": logger.info(f"Component '{component}' executed successfully in {duration:.3f}s") else: logger.error(f"Component '{component}' failed after {duration:.3f}s: {error}")