The MCP module (ouroboros.mcp) provides Model Context Protocol integration for both consuming external MCP servers and exposing Ouroboros as an MCP server.
from ouroboros.mcp import (
# Errors
MCPError,
MCPClientError,
MCPServerError,
MCPAuthError,
MCPTimeoutError,
MCPConnectionError,
MCPProtocolError,
MCPResourceNotFoundError,
MCPToolError,
# Types
TransportType,
ContentType,
MCPServerConfig,
MCPToolDefinition,
MCPToolParameter,
MCPToolResult,
MCPContentItem,
MCPResourceDefinition,
MCPResourceContent,
MCPPromptDefinition,
MCPPromptArgument,
MCPCapabilities,
MCPServerInfo,
MCPRequest,
MCPResponse,
)
# Client
from ouroboros.mcp.client import (
MCPClient,
MCPClientAdapter,
MCPClientManager,
)
# Server
from ouroboros.mcp.server import (
MCPServer,
ToolHandler,
ResourceHandler,
MCPServerAdapter,
)
# Tools
from ouroboros.mcp.tools import (
ToolRegistry,
OUROBOROS_TOOLS,
)
# Resources
from ouroboros.mcp.resources import (
OUROBOROS_RESOURCES,
)MCP transport type for server connections.
class TransportType(StrEnum):
STDIO = "stdio"
SSE = "sse"
STREAMABLE_HTTP = "streamable-http"
HTTP = "http"Type of content in an MCP response.
class ContentType(StrEnum):
TEXT = "text"
IMAGE = "image"
RESOURCE = "resource"Configuration for connecting to an MCP server.
@dataclass(frozen=True, slots=True)
class MCPServerConfig:
name: str # Unique name for the connection
transport: TransportType # Transport type
command: str | None = None # Command for stdio transport
args: tuple[str, ...] = () # Command arguments
url: str | None = None # URL for SSE/HTTP transport
env: dict[str, str] = {} # Environment variables
timeout: float = 30.0 # Connection timeout (seconds)
headers: dict[str, str] = {} # HTTP headers for SSE/HTTP# STDIO transport
config = MCPServerConfig(
name="my-server",
transport=TransportType.STDIO,
command="npx",
args=("-y", "@my/mcp-server"),
env={"API_KEY": "xxx"},
)
# SSE transport
config = MCPServerConfig(
name="remote-server",
transport=TransportType.SSE,
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer xxx"},
)
# HTTP transport (alias for streamable-http, used by Claude Code)
http_config = MCPServerConfig(
name="github-mcp",
transport=TransportType.HTTP,
url="http://localhost:3000/mcp",
)Definition of an MCP tool.
@dataclass(frozen=True, slots=True)
class MCPToolDefinition:
name: str # Unique tool name
description: str # Human-readable description
parameters: tuple[MCPToolParameter, ...] = () # Tool parameters
server_name: str | None = None # Server providing this toolConvert to JSON Schema for tool input.
A single parameter for an MCP tool.
@dataclass(frozen=True, slots=True)
class MCPToolParameter:
name: str # Parameter name
type: ToolInputType # JSON Schema type
description: str = "" # Description
required: bool = True # Is required
default: Any = None # Default value
enum: tuple[str, ...] | None = None # Allowed valuesResult from an MCP tool invocation.
@dataclass(frozen=True, slots=True)
class MCPToolResult:
content: tuple[MCPContentItem, ...] = () # Content items
is_error: bool = False # Was there an error
meta: dict[str, Any] = {} # Metadata| Property | Type | Description |
|---|---|---|
text_content |
str |
Concatenated text from all text items |
A single content item in an MCP response.
@dataclass(frozen=True, slots=True)
class MCPContentItem:
type: ContentType # Content type
text: str | None = None # Text content
data: str | None = None # Binary data (base64)
mime_type: str | None = None # MIME type
uri: str | None = None # Resource URIDefinition of an MCP resource.
@dataclass(frozen=True, slots=True)
class MCPResourceDefinition:
uri: str # Resource URI
name: str # Human-readable name
description: str = "" # Description
mime_type: str = "text/plain" # MIME typeContent of an MCP resource.
@dataclass(frozen=True, slots=True)
class MCPResourceContent:
uri: str # Resource URI
text: str | None = None # Text content
blob: str | None = None # Binary content (base64)
mime_type: str = "text/plain" # MIME typeCapabilities of an MCP server.
@dataclass(frozen=True, slots=True)
class MCPCapabilities:
tools: bool = False
resources: bool = False
prompts: bool = False
logging: bool = FalseInformation about an MCP server.
@dataclass(frozen=True, slots=True)
class MCPServerInfo:
name: str
version: str = "1.0.0"
capabilities: MCPCapabilities
tools: tuple[MCPToolDefinition, ...]
resources: tuple[MCPResourceDefinition, ...]
prompts: tuple[MCPPromptDefinition, ...]Ouroboros exposes workflow tools as ordinary MCP handlers. The MCP server does
not parse ooo syntax and does not receive channel-specific identifiers for
skill routing.
ouroboros_start_auto starts detached auto work as non-terminal tracked
background work. A successful start response returns a job_id immediately;
that handle identifies tracked background work, not a completed workflow result.
The job remains observable through its lifecycle status until it reaches a
terminal state such as completed, failed, or cancelled. Expired retention
is reported by ouroboros_job_result when the stored terminal result is no
longer available.
MCP clients wait for and retrieve detached auto results with the standard job
tools:
ouroboros_job_status(job_id="JOB_ID")
ouroboros_job_wait(job_id="JOB_ID")
ouroboros_job_result(job_id="JOB_ID")
Treat running or other non-terminal status output as progress only. A
running lifecycle status is non-terminal tracked background work, not a final
auto result. Wait with ouroboros_job_wait, then fetch the completed result
with ouroboros_job_result after the job reaches a terminal lifecycle status.
When MCP status reports completed, ouroboros_job_result(job_id="JOB_ID")
retrieves the stable completed auto result for that job handle. Unknown,
expired, or otherwise unavailable handles return an MCP error response.
When MCP status cannot resolve the supplied handle, treat the detached work as
invalid or unavailable rather than as running or completed. The stable
observable status is the MCP error response for that handle, not a detached
auto result. Next steps are to check the copied job_id, inspect any
surfaced auto session, execution, or lineage handle, then restart the detached
auto flow when no valid handle can be recovered.
The minimal stable invalid-handle error contract marks the handle as terminally unavailable and resultless:
ouroboros_job_result(job_id="missing_detached_auto")
error.message = "Job handle not found: missing_detached_auto. Result unavailable."
error.error_code = "job_handle_not_found"
error.details.job_id = "missing_detached_auto"
error.details.lifecycle_status = "invalid"
error.details.is_terminal = true
error.details.result_available = false
error.details.reason = "not_found"
A completed detached auto result may include text content, metadata, and an
artifact resource reference. The minimal stable retrieval contract is the job
handle, terminal lifecycle status, terminal marker, and returned content:
ouroboros_job_result(job_id="job_auto_docs_done")
content[0].text = "detached auto result artifact: seed.yaml"
content[1].uri = "file:///tmp/detached-auto-result.json"
meta.job_id = "job_auto_docs_done"
meta.status = "completed"
meta.is_terminal = true
When MCP status reports failed, the job is terminal and still observable;
ouroboros_job_result(job_id="JOB_ID") returns the stable failure output or
error details for that job handle with is_error=true, not a successful auto
result. Next steps are to inspect ouroboros_job_status(job_id="JOB_ID") and
ouroboros_job_result(job_id="JOB_ID"), then resume or retry from the surfaced
auto session, execution, or lineage handle when one is present.
ouroboros_job_result(job_id="job_auto_docs_failed")
is_error = true
content[0].text = "detached auto failed: seed repair budget exhausted"
meta.job_id = "job_auto_docs_failed"
meta.status = "failed"
meta.is_terminal = true
meta.error = "seed repair budget exhausted"
When MCP status reports cancelled, the job is terminal and still observable;
ouroboros_job_result(job_id="JOB_ID") returns stable cancellation output or
error details for that job handle with is_error=true and the cancellation
reason when one is available, not a successful auto result. Next steps are to
inspect ouroboros_job_status(job_id="JOB_ID") and
ouroboros_job_result(job_id="JOB_ID"), then restart the detached auto flow or
resume from the surfaced auto session, execution, or lineage handle when one is
present.
ouroboros_job_result(job_id="job_auto_docs_cancelled")
is_error = true
content[0].text = "detached auto cancelled: user requested cancellation"
meta.job_id = "job_auto_docs_cancelled"
meta.status = "cancelled"
meta.lifecycle_status = "cancelled"
meta.is_terminal = true
meta.error = "user requested cancellation"
When ouroboros_job_result(job_id="JOB_ID") reports expired, the job is
terminal tracked background work whose retained result is no longer available
through that job handle. ouroboros_job_status still reports the stored
terminal lifecycle status, while result retrieval returns stable expiration
error details rather than a detached auto result. Next steps are to inspect
any surfaced auto session, execution, or lineage handle, then resume from that
handle or restart the detached auto flow when no recoverable handle is present.
ouroboros_job_result(job_id="job_auto_docs_expired")
error.message = "Job handle expired: job_auto_docs_expired. Result unavailable."
error.error_code = "job_handle_expired"
error.details.job_id = "job_auto_docs_expired"
error.details.lifecycle_status = "expired"
error.details.is_terminal = true
error.details.result_available = false
error.details.reason = "expired"
The brownfield MCP tool registers existing codebases for PM/interview context.
For {"action": "scan"}, scan_root controls only the filesystem walk for
seed repositories and worktrees. When scan_root is omitted, the walk starts at
the current user's home directory. Dot-prefixed directories and known noisy
directories such as node_modules are not walked as seed locations.
Linked worktree expansion is asymmetric. When the walk finds a normal repo root
with a .git directory, Ouroboros runs git worktree list --porcelain from
that repo and may register Git-reported linked worktrees even when those paths
are outside scan_root. When the walk finds a linked worktree root with a
.git file, Ouroboros registers that worktree itself but does not use it to
register the main worktree or sibling worktrees outside scan_root. This keeps
narrow scans scoped when a user intentionally passes one worktree as AI context.
Local repos, repos without remotes, and repos whose remotes are not named
origin are all eligible. Stale linked worktree paths are skipped when the path
no longer exists or is no longer a valid Git working tree.
Runtime integrations that support ooo <skill> or /ouroboros:<skill> use the
shared stateless router in ouroboros.router before invoking MCP. The router
loads packaged SKILL.md frontmatter, validates mcp_tool and mcp_args,
substitutes $1 / $CWD templates, and returns runtime-neutral dispatch
metadata. The runtime then performs its own structured logging, AgentMessage
assembly, and MCP handler invocation.
For setup and command authoring details, see the
Shared ooo Skill Dispatch Router
guide.
All MCP-specific exceptions inherit from MCPError, which inherits from OuroborosError.
OuroborosError
+-- MCPError (MCP base)
+-- MCPClientError - Client-side failures
| +-- MCPConnectionError - Connection failures
| +-- MCPTimeoutError - Request timeout
| +-- MCPProtocolError - Protocol errors
+-- MCPServerError - Server-side failures
+-- MCPAuthError - Authentication failures
+-- MCPResourceNotFoundError - Resource not found
+-- MCPToolError - Tool execution failures
Base exception for all MCP-related errors.
class MCPError(OuroborosError):
def __init__(
self,
message: str,
*,
server_name: str | None = None,
is_retriable: bool = False,
details: dict[str, Any] | None = None,
) -> None: ...| Attribute | Type | Description |
|---|---|---|
server_name |
`str | None` |
is_retriable |
bool |
Whether the operation can be retried |
Failed to connect to an MCP server. Typically retriable.
class MCPConnectionError(MCPClientError):
transport: str | None # Transport typeMCP request timed out. Typically retriable with backoff.
class MCPTimeoutError(MCPClientError):
timeout_seconds: float | None # Timeout value
operation: str | None # Operation that timed outError during tool execution.
class MCPToolError(MCPServerError):
tool_name: str | None # Tool that failed
error_code: str | None # Tool-specific error codeConcrete implementation of MCPClient protocol using the MCP SDK.
class MCPClientAdapter:
def __init__(
self,
*,
max_retries: int = 3,
retry_wait_initial: float = 1.0,
retry_wait_max: float = 10.0,
) -> None: ...| Property | Type | Description |
|---|---|---|
is_connected |
bool |
True if currently connected |
server_info |
`MCPServerInfo | None` |
Connect to an MCP server.
async with MCPClientAdapter() as client:
result = await client.connect(config)
if result.is_ok:
print(f"Connected to {result.value.name}")Disconnect from the current MCP server.
List available tools from the connected server.
async call_tool(name: str, arguments: dict[str, Any] | None = None) -> Result[MCPToolResult, MCPClientError]
Call a tool on the connected server.
result = await client.call_tool(
"search_files",
{"pattern": "*.py", "path": "/src"},
)
if result.is_ok:
print(result.value.text_content)List available resources from the connected server.
Read a resource from the connected server.
List available prompts from the connected server.
Get a filled prompt from the connected server.
Manager for multiple MCP server connections with connection pooling and health checks.
class MCPClientManager:
def __init__(
self,
*,
max_retries: int = 3,
health_check_interval: float = 60.0,
default_timeout: float = 30.0,
) -> None: ...| Property | Type | Description |
|---|---|---|
servers |
Sequence[str] |
List of server names |
async add_server(config: MCPServerConfig, *, connect: bool = False) -> Result[MCPServerInfo | None, MCPClientError]
Add a server configuration.
Remove a server and disconnect if connected.
Connect to a specific server.
Connect to all registered servers.
Disconnect from all servers.
List all tools from all connected servers.
Find which server provides a given tool.
async call_tool(server_name: str, tool_name: str, arguments: dict[str, Any] | None = None, *, timeout: float | None = None) -> Result[MCPToolResult, MCPClientError]
Call a tool on a specific server.
async call_tool_auto(tool_name: str, arguments: dict[str, Any] | None = None, *, timeout: float | None = None) -> Result[MCPToolResult, MCPClientError]
Call a tool, automatically finding the server that provides it.
Start periodic health checks for all connections.
manager = MCPClientManager()
# Add multiple servers
await manager.add_server(MCPServerConfig(
name="filesystem",
transport=TransportType.STDIO,
command="npx",
args=("-y", "@modelcontextprotocol/server-filesystem"),
))
await manager.add_server(MCPServerConfig(
name="github",
transport=TransportType.STDIO,
command="npx",
args=("-y", "@modelcontextprotocol/server-github"),
env={"GITHUB_TOKEN": os.environ["GITHUB_TOKEN"]},
))
# Connect to all
results = await manager.connect_all()
# Use tools from any server
all_tools = await manager.list_all_tools()
# Call tool with auto-discovery
result = await manager.call_tool_auto("read_file", {"path": "/etc/hosts"})
# Cleanup
await manager.disconnect_all()Concrete implementation of MCPServer protocol using FastMCP.
class MCPServerAdapter:
def __init__(
self,
*,
name: str = "ouroboros-mcp",
version: str = "1.0.0",
auth_config: AuthConfig | None = None,
rate_limit_config: RateLimitConfig | None = None,
) -> None: ...| Property | Type | Description |
|---|---|---|
info |
MCPServerInfo |
Server information |
Register a tool handler.
Register a resource handler.
Register a prompt handler.
List all registered tools.
async call_tool(name: str, arguments: dict[str, Any], credentials: dict[str, str] | None = None) -> Result[MCPToolResult, MCPServerError]
Call a registered tool.
Read a registered resource.
Start serving MCP requests. This method blocks until the server is stopped.
Shutdown the server gracefully.
from ouroboros.mcp.server import MCPServerAdapter
server = MCPServerAdapter(
name="my-ouroboros-server",
version="1.0.0",
)
# Register custom handlers
server.register_tool(MyToolHandler())
server.register_resource(MyResourceHandler())
# Start serving
await server.serve()Registry for managing MCP tool handlers.
class ToolRegistry:
def __init__(self) -> None: ...| Property | Type | Description |
|---|---|---|
tool_count |
int |
Number of registered tools |
Register a tool handler.
Register multiple tool handlers.
Unregister a tool handler. Returns True if found.
Get a tool handler by name.
List all registered tools, optionally filtered by category.
List all tool categories.
Call a registered tool.
Check if a tool is registered.
Clear all registered tools.
from ouroboros.mcp.tools import ToolRegistry
registry = ToolRegistry()
# Register tools by category
registry.register(ExecuteSeedHandler(), category="execution")
registry.register(SessionStatusHandler(), category="status")
# List tools
all_tools = registry.list_tools()
execution_tools = registry.list_tools(category="execution")
# Call a tool
result = await registry.call("execute_seed", {"seed_id": "123"})A global registry instance is available for convenience:
from ouroboros.mcp.tools import get_global_registry, register_tool
# Get global registry
registry = get_global_registry()
# Register to global registry
register_tool(MyHandler(), category="custom")Context manager for creating and connecting an MCP client.
from ouroboros.mcp.client.adapter import create_mcp_client
async with create_mcp_client(config) as client:
tools = await client.list_tools()
# client is automatically connected and will disconnect on exitFactory function for creating an Ouroboros MCP server with default configuration.
from ouroboros.mcp.server import create_ouroboros_server
server = create_ouroboros_server(
name="my-server",
version="1.0.0",
)
# Register additional handlers as needed
await server.serve()Provider for MCP tools to integrate with OrchestratorRunner.
from ouroboros.orchestrator import MCPToolProvider
class MCPToolProvider:
def __init__(
self,
mcp_manager: MCPClientManager,
*,
default_timeout: float = 30.0,
tool_prefix: str = "",
) -> None: ...| Property | Type | Description |
|---|---|---|
tool_prefix |
str |
Prefix added to tool names |
conflicts |
Sequence[ToolConflict] |
Tool conflicts detected |
Discover tools from all connected MCP servers.
provider = MCPToolProvider(manager, tool_prefix="mcp_")
tools = await provider.get_tools(builtin_tools=["Read", "Write", "Edit"])
# Returns MCPToolInfo for each non-conflicting toolGet list of available tool names (with prefix).
Check if a tool is available.
Get info for a specific tool.
async call_tool(name: str, arguments: dict[str, Any] | None = None, *, timeout: float | None = None) -> Result[MCPToolResult, MCPToolError]
Call an MCP tool with retry logic and graceful error handling.
result = await provider.call_tool("mcp_read_file", {"path": "/tmp/test"})
if result.is_ok:
print(result.value.text_content)
else:
print(f"Error: {result.error}") # Never raises, returns errorInformation about an available MCP tool.
@dataclass(frozen=True, slots=True)
class MCPToolInfo:
name: str # Tool name (possibly prefixed)
original_name: str # Original tool name from server
server_name: str # Server providing this tool
description: str # Tool description
input_schema: dict[str, Any] # JSON Schema for parametersInformation about a tool name conflict.
@dataclass(frozen=True, slots=True)
class ToolConflict:
tool_name: str # Conflicting tool name
source: str # Server name or "built-in"
shadowed_by: str # What shadowed this tool
resolution: str # How conflict was resolvedLoad MCP client configuration from a YAML file.
from ouroboros.orchestrator import load_mcp_config
result = load_mcp_config(Path("mcp.yaml"))
if result.is_ok:
config = result.value
# config.servers - list of MCPServerConfig
# config.connection - MCPConnectionConfig
# config.tool_prefix - optional prefixComplete MCP client configuration.
@dataclass(frozen=True, slots=True)
class MCPClientConfig:
servers: tuple[MCPServerConfig, ...]
connection: MCPConnectionConfig
tool_prefix: str = ""Connection settings for MCP servers.
@dataclass(frozen=True, slots=True)
class MCPConnectionConfig:
timeout_seconds: float = 30.0
retry_attempts: int = 3
health_check_interval: float = 60.0from ouroboros.orchestrator import (
ClaudeAgentAdapter,
OrchestratorRunner,
load_mcp_config,
)
from ouroboros.mcp.client.manager import MCPClientManager
from ouroboros.persistence.event_store import EventStore
# Load MCP config
config_result = load_mcp_config(Path("mcp.yaml"))
config = config_result.value
# Create and connect MCP manager
manager = MCPClientManager(
max_retries=config.connection.retry_attempts,
default_timeout=config.connection.timeout_seconds,
)
for server_config in config.servers:
await manager.add_server(server_config)
await manager.connect_all()
# Create runner with MCP integration
event_store = EventStore("sqlite+aiosqlite:///~/.ouroboros/ouroboros.db")
await event_store.initialize()
adapter = ClaudeAgentAdapter()
runner = OrchestratorRunner(
adapter,
event_store,
mcp_manager=manager,
mcp_tool_prefix=config.tool_prefix,
)
# Execute seed - MCP tools will be available to the agent
result = await runner.execute_seed(seed)
# Cleanup
await manager.disconnect_all()