Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cspell-repo-terms.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ inspectable
modelcontextprotocol
wfile
rfile
itertools
stacklevel
autouse
Ravande
relativedelta
Rootfs
Expand Down
47 changes: 38 additions & 9 deletions agent-governance-python/agent-os/src/agent_os/cli/mcp_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

import argparse
import hashlib
import itertools
import json
import os
import queue
import re
import ssl
import subprocess
import sys
import tempfile
Expand All @@ -25,6 +27,7 @@
import urllib.error
import urllib.parse
import urllib.request
import warnings
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from pathlib import Path
Expand All @@ -40,6 +43,21 @@
console = Console() if Console else None

MCP_PROTOCOL_VERSION = "2025-11-25"
_SUPPORTED_PROTOCOL_VERSIONS = {"2024-11-05", "2025-03-26", "2025-06-18", MCP_PROTOCOL_VERSION}

# Module-level SSL context; set via _configure_tls(verify=False).
_SSL_CONTEXT: ssl.SSLContext | None = None


def _configure_tls(*, verify: bool = True) -> None:
"""Configure TLS verification for remote MCP endpoints."""
global _SSL_CONTEXT # noqa: PLW0603
if verify:
_SSL_CONTEXT = None
else:
_SSL_CONTEXT = ssl.create_default_context()
_SSL_CONTEXT.check_hostname = False
_SSL_CONTEXT.verify_mode = ssl.CERT_NONE


@dataclass
Expand Down Expand Up @@ -711,9 +729,15 @@ def _validate_initialize_result(result: Any) -> dict[str, Any]:
if not isinstance(result, dict):
raise RuntimeError("initialize result was not an object")
protocol_version = result.get("protocolVersion")
if protocol_version != MCP_PROTOCOL_VERSION:
if protocol_version not in _SUPPORTED_PROTOCOL_VERSIONS:
raise RuntimeError(
f"unsupported MCP protocol version: {protocol_version!r}; expected {MCP_PROTOCOL_VERSION}"
f"unsupported MCP protocol version: {protocol_version!r}; "
f"supported: {sorted(_SUPPORTED_PROTOCOL_VERSIONS)}"
)
if protocol_version != MCP_PROTOCOL_VERSION:
warnings.warn(
f"Server uses older MCP protocol {protocol_version!r}; latest is {MCP_PROTOCOL_VERSION}",
stacklevel=2,
)
capabilities = result.get("capabilities")
if not isinstance(capabilities, Mapping):
Expand Down Expand Up @@ -827,11 +851,11 @@ def inspect_stdio_server(



_REQUEST_ID_COUNTER = itertools.count(1)


def _next_request_id() -> int:
if not hasattr(_next_request_id, "value"):
_next_request_id.value = 0 # type: ignore[attr-defined]
_next_request_id.value += 1 # type: ignore[attr-defined]
return _next_request_id.value # type: ignore[attr-defined]
return next(_REQUEST_ID_COUNTER)


def _jsonrpc_request(method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
Expand Down Expand Up @@ -875,7 +899,7 @@ def _http_request(
req.add_header(key, value)
if body is not None:
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=timeout) as response: # noqa: S310 - URL is user-supplied MCP config by design.
with urllib.request.urlopen(req, timeout=timeout, context=_SSL_CONTEXT) as response: # noqa: S310 - URL is user-supplied MCP config by design.
return response.status, dict(response.headers.items()), response.read()


Expand Down Expand Up @@ -961,7 +985,7 @@ def _streamable_http_request(
for key, value in headers.items():
req.add_header(key, value)
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=timeout) as response: # noqa: S310 - URL is user-supplied MCP config by design.
with urllib.request.urlopen(req, timeout=timeout, context=_SSL_CONTEXT) as response: # noqa: S310 - URL is user-supplied MCP config by design.
response_headers = dict(response.headers.items())
if response.status == 202:
return response.status, response_headers, None
Expand Down Expand Up @@ -1122,7 +1146,7 @@ def _read_sse_message(queue_: queue.Queue[dict[str, Any]], request_id: int, time

def _open_legacy_sse(url: str, headers: Mapping[str, str], timeout: float) -> tuple[urllib.response.addinfourl, str, queue.Queue[dict[str, Any]]]:
req = urllib.request.Request(url, method="GET", headers={**headers, "Accept": "text/event-stream"})
response = urllib.request.urlopen(req, timeout=timeout) # noqa: S310 - user MCP endpoint by design.
response = urllib.request.urlopen(req, timeout=timeout, context=_SSL_CONTEXT) # noqa: S310 - user MCP endpoint by design.
message_queue: queue.Queue[dict[str, Any]] = queue.Queue()
endpoint_holder = {"endpoint": ""}

Expand Down Expand Up @@ -1857,6 +1881,7 @@ def build_parser() -> argparse.ArgumentParser:
help="Do not launch or connect to MCP servers; scan only inline tool definitions and config metadata",
)
scan_parser.add_argument("--json", action="store_true", help="Output in JSON format")
scan_parser.add_argument("--no-verify-tls", action="store_true", help="Skip TLS certificate verification for remote endpoints")

fp_parser = subparsers.add_parser("fingerprint", help="Register/compare tool fingerprints")
fp_parser.add_argument("config", help="Path to MCP config file (JSON/YAML)")
Expand All @@ -1865,13 +1890,15 @@ def build_parser() -> argparse.ArgumentParser:
fp_parser.add_argument("--timeout", type=float, default=10.0, help="Per-request timeout")
fp_parser.add_argument("--static-only", action="store_true", help="Fingerprint inline tools only")
fp_parser.add_argument("--json", action="store_true", help="Output in JSON format")
fp_parser.add_argument("--no-verify-tls", action="store_true", help="Skip TLS certificate verification for remote endpoints")

report_parser = subparsers.add_parser("report", help="Generate a full security report")
report_parser.add_argument("config", help="Path to MCP config file (JSON/YAML)")
report_parser.add_argument("--format", choices=["markdown", "json"], default="markdown")
report_parser.add_argument("--timeout", type=float, default=10.0, help="Per-request timeout")
report_parser.add_argument("--static-only", action="store_true", help="Report on inline tools only")
report_parser.add_argument("--json", action="store_true", help="Output in JSON format")
report_parser.add_argument("--no-verify-tls", action="store_true", help="Skip TLS certificate verification for remote endpoints")

return parser

Expand All @@ -1883,6 +1910,8 @@ def main(argv: list[str] | None = None) -> int:
if not args.command:
parser.print_help()
return 0
if getattr(args, "no_verify_tls", False):
_configure_tls(verify=False)
if args.command == "scan":
return cmd_scan(args)
if args.command == "fingerprint":
Expand Down
11 changes: 10 additions & 1 deletion agent-governance-python/agent-os/tests/test_mcp_scan_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ def tools_only_config_file(tmp_path: Path) -> Path:
return p


@pytest.fixture(autouse=True)
def _reset_handler_state():
"""Reset mutable class-level lists on test handlers to prevent cross-test bleed."""
yield
_StreamableHTTPMCPHandler.requests = []
_LegacySSEMCPHandler.requests = []
_LegacySSEMCPHandler.responses = []


# ============================================================================
# Test load_config
# ============================================================================
Expand Down Expand Up @@ -1035,7 +1044,7 @@ def test_parse_remote_mcp_servers_rejects_falsy_invalid_headers(tmp_path: Path):
def test_inspect_streamable_http_server_rejects_protocol_mismatch():
_StreamableHTTPMCPHandler.requests = []
_StreamableHTTPMCPHandler.response_mode = "json"
_StreamableHTTPMCPHandler.protocol_response = "2024-11-05"
_StreamableHTTPMCPHandler.protocol_response = "2020-01-01"
_StreamableHTTPMCPHandler.capabilities_response = {"tools": {}}
server = _serve(_StreamableHTTPMCPHandler)
try:
Expand Down
Loading