dataflow.api.game_socketΒΆ

WebSocket server for game state streaming with Supabase integration.

This module provides a general-purpose WebSocket server that can stream game state for any agent-based game in the Haive framework. It supports:

  • Real-time state updates

  • Supabase persistence integration with RLS

  • Player move submissions

  • AI move requests

  • Multiple game types

  • Authentication and user management

The WebSocket server can be integrated with any game agent implementation that follows the standard Haive agent interface.

AttributesΒΆ

ClassesΒΆ

GameSocketFactory

Factory for creating game-specific socket servers.

GameSocketServer

General-purpose WebSocket server for game state streaming.

Module ContentsΒΆ

class dataflow.api.game_socket.GameSocketFactoryΒΆ

Factory for creating game-specific socket servers.

This class creates specialized socket servers for different game types, with appropriate message handling and state management for each game.

Examples

# Create a chess socket server chess_socket = GameSocketFactory.create_chess_socket(app)

# Or create a custom socket server custom_socket = GameSocketFactory.create_socket(

app, agent_class=ChessAgent, state_schema=ChessState, route_prefix=”/ws/chess”

)

static create_chess_socket(app: fastapi.FastAPI) GameSocketServerΒΆ

Create a chess-specific socket server.

Parameters:

app – The FastAPI application

Returns:

A configured GameSocketServer instance for chess

static create_connect4_socket(app: fastapi.FastAPI) GameSocketServerΒΆ

Create a Connect4-specific socket server.

Parameters:

app – The FastAPI application

Returns:

A configured GameSocketServer instance for Connect4

static create_socket(app: fastapi.FastAPI, agent_class: type[haive.core.engine.agent.agent.Agent], state_schema: type[haive.core.schema.state_schema.StateSchema], route_prefix: str = '/ws/games') GameSocketServerΒΆ

Create a game socket server for any agent and state schema.

Parameters:
  • app – The FastAPI application

  • agent_class – The agent class for the game

  • state_schema – The state schema for the game

  • route_prefix – The URL prefix for WebSocket routes

Returns:

A configured GameSocketServer instance

static create_tic_tac_toe_socket(app: fastapi.FastAPI) GameSocketServerΒΆ

Create a Tic Tac Toe-specific socket server.

Parameters:

app – The FastAPI application

Returns:

A configured GameSocketServer instance for Tic Tac Toe

class dataflow.api.game_socket.GameSocketServer(app: fastapi.FastAPI, agent_class: type[haive.core.engine.agent.agent.Agent], state_schema: type[haive.core.schema.state_schema.StateSchema], route_prefix: str = '/ws/games')ΒΆ

General-purpose WebSocket server for game state streaming.

This class provides a WebSocket server that can be integrated with any game agent implementation to stream game state updates in real-time. It handles connection management, message routing, and state updates.

appΒΆ

The FastAPI application to add routes to

Type:

FastAPI

agent_classΒΆ

The agent class for the game

Type:

Type[Agent]

state_schemaΒΆ

The state schema for the game

Type:

Type[StateSchema]

active_connectionsΒΆ

Set of active WebSocket connections

Type:

Set[WebSocket]

connection_thread_mapΒΆ

Map of connections to thread IDs

Type:

Dict[WebSocket, str]

agentsΒΆ

Map of thread IDs to agent instances

Type:

Dict[str, Agent]

async broadcast_to_thread(thread_id: str, message: dict[str, Any])ΒΆ

Broadcast a message to all connections for a thread.

cleanup(thread_id: str | None = None)ΒΆ

Clean up resources.

get_or_create_agent(thread_id: str, config_overrides: dict[str, Any] | None = None) haive.core.engine.agent.agent.AgentΒΆ

Get or create an agent for a thread ID.

register_connection(websocket: fastapi.WebSocket, thread_id: str)ΒΆ

Register a WebSocket connection.

unregister_connection(websocket: fastapi.WebSocket)ΒΆ

Unregister a WebSocket connection.

active_connections: set[fastapi.WebSocket]ΒΆ
agent_classΒΆ
agents: dict[str, haive.core.engine.agent.agent.Agent]ΒΆ
appΒΆ
connection_thread_map: dict[fastapi.WebSocket, str]ΒΆ
route_prefix = '/ws/games'ΒΆ
state_schemaΒΆ
dataflow.api.game_socket.appΒΆ
dataflow.api.game_socket.loggerΒΆ
dataflow.api.game_socket.module_path = b'.'ΒΆ