Source code for haive.core.contracts.boundaries

"""State boundaries with controlled access.

This module implements BoundedState and StateView to provide controlled
access to state with explicit permissions, maintaining flexibility while
adding runtime guarantees.
"""

from __future__ import annotations

import copy
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Set

from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)


[docs] class AccessPermissions(BaseModel): """Define what fields a component can access. This provides fine-grained control over state access, allowing different components to have different levels of access to state fields. Attributes: readable: Fields component can read from state writable: Fields component can write to state append_only: Fields component can append to but not overwrite compute_only: Fields component can derive from but not store Examples: >>> # LLM engine permissions >>> llm_permissions = AccessPermissions( ... readable={"messages", "temperature"}, ... writable={"response"}, ... append_only={"conversation_history"} ... ) >>> >>> # Tool node permissions >>> tool_permissions = AccessPermissions( ... readable={"tool_calls", "tools"}, ... writable={"tool_results"}, ... compute_only={"context"} ... ) """ readable: Set[str] = Field( default_factory=set, description="Fields this component can read" ) writable: Set[str] = Field( default_factory=set, description="Fields this component can write" ) append_only: Set[str] = Field( default_factory=set, description="Fields this component can only append to" ) compute_only: Set[str] = Field( default_factory=set, description="Fields this component can use for computation but not modify" )
[docs] def merge(self, other: AccessPermissions) -> AccessPermissions: """Merge with another set of permissions. Args: other: Other permissions to merge with Returns: New AccessPermissions with combined permissions """ return AccessPermissions( readable=self.readable.union(other.readable), writable=self.writable.union(other.writable), append_only=self.append_only.union(other.append_only), compute_only=self.compute_only.union(other.compute_only) )
[docs] def validate_consistency(self) -> List[str]: """Check for permission conflicts. Returns: List of consistency issues found """ issues = [] # Check for fields that are both writable and append_only both = self.writable.intersection(self.append_only) if both: issues.append(f"Fields cannot be both writable and append_only: {both}") # Check for fields that are compute_only but also writable compute_write = self.compute_only.intersection(self.writable) if compute_write: issues.append(f"Fields cannot be compute_only and writable: {compute_write}") return issues
[docs] class StateView: """Filtered view of state with access control. This provides a component with controlled access to state, enforcing permissions at runtime while maintaining flexibility. Each component gets its own view with specific permissions. Attributes: _state: Reference to actual state (not copied) _permissions: Access permissions for this view _access_log: Log of all access attempts _component_name: Name of component using this view """ def __init__( self, state: Dict[str, Any], permissions: AccessPermissions, component_name: str = "unknown" ): """Initialize state view with permissions. Args: state: Reference to actual state (not copied) permissions: Access permissions for this view component_name: Name of component using this view """ self._state = state self._permissions = permissions self._component_name = component_name self._access_log: List[Dict[str, Any]] = [] self._mutations: List[Dict[str, Any]] = []
[docs] def get(self, field: str, default: Any = None) -> Any: """Get field value with permission check. Args: field: Field name to retrieve default: Default value if field not found or not accessible Returns: Field value or default (deep copied for safety) Raises: PermissionError: If field not readable Examples: >>> view.get("messages") # Returns deep copy >>> view.get("private_field") # Raises PermissionError >>> view.get("optional", []) # Returns default if not found """ if field not in self._permissions.readable: self._log_access_violation("read", field) raise PermissionError( f"Component '{self._component_name}' cannot read field '{field}'" ) self._log_access("read", field) value = self._state.get(field, default) # Deep copy to prevent indirect mutation return copy.deepcopy(value)
[docs] def set(self, field: str, value: Any) -> None: """Set field value with permission check. Args: field: Field name to set value: Value to set Raises: PermissionError: If field not writable Examples: >>> view.set("response", "Hello") # If writable >>> view.set("readonly", "value") # Raises PermissionError """ if field not in self._permissions.writable: self._log_access_violation("write", field) raise PermissionError( f"Component '{self._component_name}' cannot write field '{field}'" ) # Track mutation for rollback capability old_value = self._state.get(field, None) self._mutations.append({ "field": field, "old_value": copy.deepcopy(old_value), "new_value": copy.deepcopy(value), "timestamp": datetime.now().isoformat() }) self._log_access("write", field) self._state[field] = value
[docs] def append(self, field: str, item: Any) -> None: """Append to list field with permission check. Args: field: Field name containing list item: Item to append Raises: PermissionError: If field not appendable TypeError: If field is not a list Examples: >>> view.append("history", {"event": "processed"}) >>> view.append("immutable_list", "item") # Raises PermissionError """ if field not in self._permissions.append_only: self._log_access_violation("append", field) raise PermissionError( f"Component '{self._component_name}' cannot append to field '{field}'" ) if field not in self._state: self._state[field] = [] if not isinstance(self._state[field], list): raise TypeError(f"Field '{field}' is not a list, cannot append") self._log_access("append", field) self._state[field].append(item) # Track append as mutation self._mutations.append({ "field": field, "operation": "append", "item": copy.deepcopy(item), "timestamp": datetime.now().isoformat() })
[docs] def compute_from(self, fields: List[str]) -> Dict[str, Any]: """Get values for computation without storage permission. Args: fields: Fields to retrieve for computation Returns: Dictionary of field values (deep copied) Raises: PermissionError: If any field not compute_only Examples: >>> # Get fields for derived computation >>> data = view.compute_from(["embeddings", "weights"]) >>> result = complex_computation(data) >>> # Cannot store result back to those fields """ result = {} for field in fields: if field not in self._permissions.compute_only: self._log_access_violation("compute", field) raise PermissionError( f"Component '{self._component_name}' cannot compute from field '{field}'" ) self._log_access("compute", field) result[field] = copy.deepcopy(self._state.get(field)) return result
[docs] def has_permission(self, operation: str, field: str) -> bool: """Check if operation is permitted on field. Args: operation: Operation type (read, write, append, compute) field: Field name Returns: True if operation is permitted """ if operation == "read": return field in self._permissions.readable elif operation == "write": return field in self._permissions.writable elif operation == "append": return field in self._permissions.append_only elif operation == "compute": return field in self._permissions.compute_only else: return False
[docs] def get_accessible_fields(self) -> Dict[str, Set[str]]: """Get all accessible fields by operation type. Returns: Dictionary mapping operation to field sets """ return { "readable": self._permissions.readable.copy(), "writable": self._permissions.writable.copy(), "append_only": self._permissions.append_only.copy(), "compute_only": self._permissions.compute_only.copy() }
[docs] def get_mutations(self) -> List[Dict[str, Any]]: """Get list of mutations made through this view. Returns: List of mutation records """ return copy.deepcopy(self._mutations)
def _log_access(self, operation: str, field: str) -> None: """Log successful access. Args: operation: Type of operation field: Field accessed """ self._access_log.append({ "timestamp": datetime.now().isoformat(), "component": self._component_name, "operation": operation, "field": field, "status": "success" }) def _log_access_violation(self, operation: str, field: str) -> None: """Log access violation. Args: operation: Type of operation attempted field: Field attempted to access """ self._access_log.append({ "timestamp": datetime.now().isoformat(), "component": self._component_name, "operation": operation, "field": field, "status": "denied" }) logger.warning( f"Access denied: Component '{self._component_name}' " f"attempted {operation} on field '{field}'" )
[docs] def get_access_log(self) -> List[Dict[str, Any]]: """Get access log for this view. Returns: List of access log entries """ return copy.deepcopy(self._access_log)
[docs] class BoundedState: """State container with access boundaries. This maintains the actual state and provides controlled views to different components based on their access permissions. Supports versioning, checkpointing, and rollback. Attributes: _data: The actual state data _access_rules: Permission rules for each component _version: Current state version _history: Checkpoint history for rollback _global_access_log: Consolidated access log """ def __init__(self, initial_data: Optional[Dict[str, Any]] = None): """Initialize bounded state. Args: initial_data: Initial state data """ self._data = initial_data or {} self._access_rules: Dict[str, AccessPermissions] = {} self._version = 0 self._history: List[Dict[str, Any]] = [] self._global_access_log: List[Dict[str, Any]] = [] self._active_views: Dict[str, StateView] = {} # Create initial checkpoint self.checkpoint("Initial state")
[docs] def register_component( self, name: str, permissions: AccessPermissions ) -> None: """Register component with access permissions. Args: name: Component identifier permissions: Access permissions for component Raises: ValueError: If permissions have conflicts Examples: >>> state = BoundedState() >>> state.register_component("llm", llm_permissions) >>> state.register_component("tools", tool_permissions) """ # Validate permissions consistency issues = permissions.validate_consistency() if issues: raise ValueError(f"Permission conflicts for '{name}': {issues}") self._access_rules[name] = permissions logger.info( f"Registered component '{name}' with permissions: " f"read={len(permissions.readable)}, write={len(permissions.writable)}, " f"append={len(permissions.append_only)}, compute={len(permissions.compute_only)}" )
[docs] def get_view_for(self, component_name: str) -> StateView: """Get filtered state view for component. Args: component_name: Name of component requesting view Returns: StateView with appropriate permissions Raises: ValueError: If component not registered Examples: >>> llm_view = state.get_view_for("llm") >>> messages = llm_view.get("messages") >>> llm_view.set("response", "Generated response") """ if component_name not in self._access_rules: raise ValueError( f"Component '{component_name}' not registered. " f"Available: {list(self._access_rules.keys())}" ) permissions = self._access_rules[component_name] view = StateView(self._data, permissions, component_name) # Track active views for monitoring self._active_views[component_name] = view return view
[docs] def update_permissions( self, component_name: str, permissions: AccessPermissions ) -> None: """Update permissions for existing component. Args: component_name: Component to update permissions: New permissions Raises: ValueError: If component not registered """ if component_name not in self._access_rules: raise ValueError(f"Component '{component_name}' not registered") old_permissions = self._access_rules[component_name] self._access_rules[component_name] = permissions logger.info( f"Updated permissions for '{component_name}': " f"readable {len(old_permissions.readable)}→{len(permissions.readable)}, " f"writable {len(old_permissions.writable)}→{len(permissions.writable)}" )
[docs] def snapshot(self) -> Dict[str, Any]: """Get immutable snapshot of current state. Returns: Deep copy of current state Examples: >>> snapshot = state.snapshot() >>> # Modifications to snapshot don't affect state >>> snapshot["temp"] = "value" >>> assert "temp" not in state._data """ return copy.deepcopy(self._data)
[docs] def checkpoint(self, description: str = "") -> int: """Create checkpoint in history. Args: description: Optional checkpoint description Returns: Version number of checkpoint Examples: >>> version = state.checkpoint("Before processing") >>> # Make changes... >>> state.rollback(version) # Restore to checkpoint """ self._version += 1 checkpoint = { "version": self._version, "timestamp": datetime.now().isoformat(), "description": description, "state": copy.deepcopy(self._data), "access_rules": copy.deepcopy(self._access_rules) } self._history.append(checkpoint) logger.info(f"Created checkpoint v{self._version}: {description}") return self._version
[docs] def rollback(self, version: int) -> None: """Rollback to previous version. Args: version: Version number to rollback to Raises: ValueError: If version not found Examples: >>> state.rollback(3) # Rollback to version 3 >>> state.rollback(state._version - 1) # Rollback one version """ for checkpoint in self._history: if checkpoint["version"] == version: self._data = copy.deepcopy(checkpoint["state"]) self._access_rules = copy.deepcopy(checkpoint["access_rules"]) self._version = version logger.info( f"Rolled back to version {version}: {checkpoint['description']}" ) return available = [c["version"] for c in self._history] raise ValueError( f"Version {version} not found. Available: {available}" )
[docs] def get_history(self) -> List[Dict[str, Any]]: """Get checkpoint history. Returns: List of checkpoint summaries (without full state) """ return [ { "version": c["version"], "timestamp": c["timestamp"], "description": c["description"], "state_size": len(str(c["state"])) } for c in self._history ]
[docs] def merge_access_logs(self) -> List[Dict[str, Any]]: """Merge access logs from all active views. Returns: Consolidated access log sorted by timestamp """ all_logs = [] for component_name, view in self._active_views.items(): logs = view.get_access_log() all_logs.extend(logs) # Sort by timestamp all_logs.sort(key=lambda x: x["timestamp"]) return all_logs
[docs] def get_access_summary(self) -> Dict[str, Any]: """Get summary of access patterns. Returns: Summary statistics of field access """ logs = self.merge_access_logs() # Count by operation operations = {} for log in logs: op = log["operation"] operations[op] = operations.get(op, 0) + 1 # Count by field fields = {} for log in logs: field = log["field"] fields[field] = fields.get(field, 0) + 1 # Count violations violations = sum(1 for log in logs if log["status"] == "denied") return { "total_accesses": len(logs), "operations": operations, "most_accessed_fields": sorted( fields.items(), key=lambda x: x[1], reverse=True )[:10], "violations": violations, "violation_rate": violations / len(logs) if logs else 0 }
[docs] def validate_state_consistency(self) -> List[str]: """Check state consistency across components. Returns: List of consistency issues found """ issues = [] # Check for orphaned fields (no component can access) all_accessible = set() for permissions in self._access_rules.values(): all_accessible.update(permissions.readable) all_accessible.update(permissions.writable) all_accessible.update(permissions.append_only) all_accessible.update(permissions.compute_only) orphaned = set(self._data.keys()) - all_accessible if orphaned: issues.append(f"Orphaned fields with no access rules: {orphaned}") # Check for write conflicts (multiple writers) write_counts = {} for component, permissions in self._access_rules.items(): for field in permissions.writable: if field not in write_counts: write_counts[field] = [] write_counts[field].append(component) conflicts = { field: writers for field, writers in write_counts.items() if len(writers) > 1 } if conflicts: issues.append(f"Multiple writers for fields: {conflicts}") return issues
def __repr__(self) -> str: """String representation of bounded state.""" return ( f"BoundedState(version={self._version}, " f"fields={len(self._data)}, " f"components={len(self._access_rules)}, " f"checkpoints={len(self._history)})" )