dataflow.api.routes.agent_routes

WebSocket and REST API endpoints for agent interactions.

This module provides WebSocket-based communication with Haive agents, enabling real-time interactions, streaming responses, and persistent conversation state. It also includes REST endpoints for agent management and configuration.

The WebSocket protocol supports different message types for user messages, agent responses, status updates, and error handling. Connections are managed per thread, allowing multiple concurrent agent sessions.

Key components: - WebSocket connection manager for handling multiple clients - Message types and formats for structured communication - Authentication and authorization using Supabase - Agent configuration and customization options - Streaming response support for real-time feedback

Typical usage example:

# Client-side WebSocket example import websockets import json import asyncio

async def connect_to_agent():

uri = “ws://localhost:8000/api/ws/agent/chat?token=YOUR_AUTH_TOKEN” async with websockets.connect(uri) as websocket:

# Send initial configuration await websocket.send(json.dumps({

“type”: “config”, “content”: {

“agent_name”: “TextAnalyzer”, “provider”: “openai”, “model”: “gpt-4”, “stream”: True

}

}))

# Send a message to the agent await websocket.send(json.dumps({

“type”: “message”, “content”: “Analyze this text for sentiment”

}))

# Receive streaming responses while True:

response = json.loads(await websocket.recv()) if response[“type”] == “response”:

print(response[“content”])

elif response[“type”] == “state_complete”:

break

asyncio.run(connect_to_agent())

Attributes

Classes

AgentChatConfig

Configuration for agent chat sessions via WebSocket.

ConnectionManager

Manages WebSocket connections for agent chat sessions.

WSMessage

WebSocket message format for agent communication.

WSMessageType

WebSocket message types for agent communication.

Functions

configure_agent(...)

Configure agent with LLM settings.

get_user_from_token(→ str | None)

Validate JWT token and return user ID.

load_agent_config(...)

Load agent configuration from package.

reset_thread(thread_id[, user_id])

Reset/clear a chat thread.

websocket_chat_endpoint(websocket, agent_name[, ...])

WebSocket endpoint for real-time chat with an agent.

Module Contents

class dataflow.api.routes.agent_routes.AgentChatConfig(/, **data: Any)

Bases: pydantic.BaseModel

Configuration for agent chat sessions via WebSocket.

This model defines the configuration options for an agent chat session, including which agent to use, LLM settings, and behavior options like streaming and persistence.

agent_name

Name of the agent to use for this chat session

provider

LLM provider to use (e.g., AZURE, OPENAI, ANTHROPIC)

model

Specific model to use from the provider

temperature

Sampling temperature for response generation (0.0-1.0)

system_prompt

Optional override for the agent’s system prompt

persistent

Whether to persist chat state between messages

stream

Whether to stream responses incrementally

extra_params

Additional provider-specific parameters

Examples

>>> config = AgentChatConfig(
...     agent_name="TextAnalyzer",
...     provider=LLMProvider.OPENAI,
...     model="gpt-4",
...     temperature=0.5,
...     system_prompt="You are an expert text analyst.",
...     stream=True
... )
agent_name: str = None
buffer_chunks: bool = None
chunk_size: int = None
extra_params: dict[str, Any] | None = None
model: str = None
persistent: bool = None
progressive_updates: bool = None
provider: haive.core.models.llm.provider_types.LLMProvider = None
stream: bool = None
stream_format: str = None
stream_mode: str = None
system_prompt: str | None = None
temperature: float = None
class dataflow.api.routes.agent_routes.ConnectionManager

Manages WebSocket connections for agent chat sessions.

This class provides functionality for managing WebSocket connections, organizing them by thread, and handling connection lifecycle events. It supports multiple concurrent connections to the same thread, enabling features like shared sessions and observers.

The connection manager maintains: - Active WebSocket connections grouped by thread ID - Metadata for each thread (configuration, state, etc.) - Thread-safe operations with asyncio locks

active_connections

Dictionary mapping thread IDs to lists of WebSocket connections

thread_metadata

Dictionary mapping thread IDs to metadata dictionaries

_lock

Asyncio lock for thread-safe operations on shared data structures

async broadcast_to_thread(thread_id: str, message: WSMessage)

Broadcast message to all connections in a thread.

async connect(websocket: fastapi.WebSocket, thread_id: str, user_id: str) bool

Connect a WebSocket to a thread.

This method accepts a new WebSocket connection and associates it with the specified thread. If this is the first connection to the thread, it also initializes the thread metadata.

Parameters:
  • websocket – The WebSocket connection to add

  • thread_id – The ID of the thread to connect to

  • user_id – The ID of the user making the connection

Returns:

True if the connection was successful, False otherwise

Return type:

bool

Raises:

WebSocketDisconnect – If the connection cannot be established

Examples

>>> manager = ConnectionManager()
>>> success = await manager.connect(websocket, "thread-123", "user-456")
>>> if success:
...     print("Connection established")
async disconnect(websocket: fastapi.WebSocket, thread_id: str)

Disconnect a WebSocket from a thread.

async update_activity(thread_id: str)

Update last activity timestamp for a thread.

active_connections: dict[str, list[fastapi.WebSocket]]
thread_metadata: dict[str, dict[str, Any]]
class dataflow.api.routes.agent_routes.WSMessage(/, **data: Any)

Bases: pydantic.BaseModel

WebSocket message format for agent communication.

This model defines the standard format for all messages exchanged over the WebSocket connection. It provides a consistent structure with metadata for message handling and tracking.

type

The type of message (from WSMessageType enum)

content

The actual message content (type depends on message type)

thread_id

Optional ID for persistent chat threads

stream_index

Optional index for streaming response chunks

timestamp

When the message was created (defaults to current time)

Examples

>>> message = WSMessage(
...     type=WSMessageType.MESSAGE,
...     content="Analyze this text",
...     thread_id="thread-123"
... )
>>> json_str = message.json()
class Config
json_encoders
content: Any = None
stream_index: int | None = None
thread_id: str | None = None
timestamp: datetime.datetime = None
type: WSMessageType = None
class dataflow.api.routes.agent_routes.WSMessageType

Bases: str, enum.Enum

WebSocket message types for agent communication.

This enumeration defines the different types of messages that can be exchanged over the WebSocket connection. Each type has a specific purpose and expected format.

MESSAGE

User message sent to the agent

RESPONSE

Agent response sent back to the user

STATUS

System status updates (connection, processing, etc.)

ERROR

Error messages for exception handling

STATE

Incremental agent state updates during processing

STATE_COMPLETE

Final agent state after processing completes

ERROR = 'error'
MESSAGE = 'message'
RESPONSE = 'response'
STATE = 'state'
STATE_COMPLETE = 'state_complete'
STATUS = 'status'
async dataflow.api.routes.agent_routes.configure_agent(config: haive.core.engine.base.agent_config.AgentConfig, chat_config: AgentChatConfig) haive.core.engine.base.agent_config.AgentConfig

Configure agent with LLM settings.

dataflow.api.routes.agent_routes.get_user_from_token(token: str) str | None

Validate JWT token and return user ID.

async dataflow.api.routes.agent_routes.load_agent_config(agent_name: str, user_id: str, thread_id: str) haive.core.engine.base.agent_config.AgentConfig | None

Load agent configuration from package.

async dataflow.api.routes.agent_routes.reset_thread(thread_id: str, user_id: str = Depends(require_auth))

Reset/clear a chat thread.

async dataflow.api.routes.agent_routes.websocket_chat_endpoint(websocket: fastapi.WebSocket, agent_name: str, token: str = Query(..., description='JWT authentication token'), thread_id: str | None = Query(None, description='Existing thread ID for persistence'), config: str | None = Query(None, description='JSON encoded chat configuration'))

WebSocket endpoint for real-time chat with an agent.

Parameters:
  • websocket – WebSocket connection

  • agent_name – Name of the agent to chat with

  • token – JWT authentication token

  • thread_id – Optional thread ID for persistent chat

  • config – Optional JSON-encoded chat configuration

dataflow.api.routes.agent_routes.logger
dataflow.api.routes.agent_routes.manager
dataflow.api.routes.agent_routes.router