diff --git a/.cspell-repo-terms.txt b/.cspell-repo-terms.txt index d55e2218d..7afc31a9c 100644 --- a/.cspell-repo-terms.txt +++ b/.cspell-repo-terms.txt @@ -124,3 +124,11 @@ urlopen davidequarracino Marp Slidev +classmethod +Docstrings +isort +mypy +Pydantic +pytest +pyupgrade +xfail diff --git a/agent-governance-python/agent-os/AGENTS.md b/agent-governance-python/agent-os/AGENTS.md index 5a4f417d3..7b79b3854 100644 --- a/agent-governance-python/agent-os/AGENTS.md +++ b/agent-governance-python/agent-os/AGENTS.md @@ -62,6 +62,20 @@ ruff format . - Event types: `GovernanceEventType.POLICY_CHECK`, `.POLICY_VIOLATION`, `.TOOL_CALL_BLOCKED`, `.CHECKPOINT_CREATED` - Tests go in `tests/` (unit) or `modules/*/tests/` (module-specific) +## Policy Check Result (additive) + +Policy checks should prefer the additive `*_execute_check` methods when new code needs structured denial data, while preserving the legacy tuple-returning methods for compatibility. Raise the canonical `PolicyViolationError.from_check_result(result)` for new typed flows; hosts should surface `str(e)` and use `e.check_result.category` for dispatch instead of substring-matching internal details. + +```python +from agent_os.policies.decision import ViolationCategory +from agent_os.exceptions import PolicyViolationError + +result = kernel.pre_execute_check(ctx, input_data) +if not result.allowed: + raise PolicyViolationError.from_check_result(result) +# Hosts: surface str(e); switch on e.check_result.category for typed dispatch. +``` + ## Boundaries - **Never modify** `tests/test_mcp_server.py` (known pre-existing failure, excluded from CI) diff --git a/agent-governance-python/agent-os/src/agent_os/exceptions.py b/agent-governance-python/agent-os/src/agent_os/exceptions.py index a223f5d6b..94bc9e438 100644 --- a/agent-governance-python/agent-os/src/agent_os/exceptions.py +++ b/agent-governance-python/agent-os/src/agent_os/exceptions.py @@ -8,7 +8,13 @@ for structured error handling and logging. """ +from __future__ import annotations + from datetime import datetime, timezone +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from agent_os.policies.decision import PolicyCheckResult class AgentOSError(Exception): @@ -43,6 +49,24 @@ class PolicyViolationError(PolicyError): def __init__(self, message, error_code=None, details=None): super().__init__(message, error_code or "POLICY_VIOLATION", details) + self.check_result = None + + @classmethod + def from_check_result(cls, result: PolicyCheckResult) -> PolicyViolationError: + """Create a policy violation error from a structured check result.""" + + details = { + "category": result.category.value if result.category else None, + "matched_rule": result.matched_rule, + "detail": result.detail, + "scope": result.scope, + "operation": result.operation, + "tool_name": result.tool_name, + **result.audit_entry, + } + e = cls(result.public_message, "POLICY_VIOLATION", details) + e.check_result = result + return e class PolicyDeniedError(PolicyError): diff --git a/agent-governance-python/agent-os/src/agent_os/integrations/base.py b/agent-governance-python/agent-os/src/agent_os/integrations/base.py index f5fde7eaf..4338b4efd 100644 --- a/agent-governance-python/agent-os/src/agent_os/integrations/base.py +++ b/agent-governance-python/agent-os/src/agent_os/integrations/base.py @@ -19,7 +19,10 @@ from dataclasses import dataclass, field from datetime import datetime from enum import Enum -from typing import Any, Callable, Protocol +from typing import TYPE_CHECKING, Any, Callable, Protocol + +if TYPE_CHECKING: + from agent_os.policies.decision import PolicyCheckResult logger = logging.getLogger(__name__) @@ -678,34 +681,45 @@ def __init__(self, policy: GovernancePolicy, context: ExecutionContext | None = self.context = context def intercept(self, request: ToolCallRequest) -> ToolCallResult: + from agent_os.policies.decision_factory import ( + deny_blocked_pattern_tool, + deny_human_approval, + deny_max_tool_calls, + deny_not_allowed_tool, + ) + # Check human approval requirement if self.policy.require_human_approval: + result = deny_human_approval(request.tool_name) return ToolCallResult( allowed=False, - reason=f"Tool '{request.tool_name}' requires human approval per governance policy", + reason=result.reason, ) # Check allowed tools if self.policy.allowed_tools and request.tool_name not in self.policy.allowed_tools: + result = deny_not_allowed_tool(request.tool_name, self.policy.allowed_tools) return ToolCallResult( allowed=False, - reason=f"Tool '{request.tool_name}' not in allowed list: {self.policy.allowed_tools}", + reason=result.reason, ) # Check blocked patterns args_str = str(request.arguments) matched = self.policy.matches_pattern(args_str) if matched: + result = deny_blocked_pattern_tool(matched[0]) return ToolCallResult( allowed=False, - reason=f"Blocked pattern '{matched[0]}' detected in tool arguments", + reason=result.reason, ) # Check call count if self.context and self.context.call_count >= self.policy.max_tool_calls: + result = deny_max_tool_calls(self.policy.max_tool_calls, self.context.call_count) return ToolCallResult( allowed=False, - reason=f"Max tool calls exceeded ({self.policy.max_tool_calls})", + reason=result.reason, ) return ToolCallResult(allowed=True) @@ -956,7 +970,7 @@ def from_cedar( policy_content: str | None = None, entities: list[dict[str, Any]] | None = None, **kwargs: Any, - ) -> "BaseIntegration": + ) -> BaseIntegration: """Create an integration with Cedar policy evaluation. Convenience factory that configures a ``PolicyEvaluator`` with a @@ -1036,16 +1050,26 @@ def emit(self, event_type: GovernanceEventType, data: dict[str, Any]) -> None: event_type, exc, exc_info=True, ) - def pre_execute(self, ctx: ExecutionContext, input_data: Any) -> tuple[bool, str | None]: - """ - Pre-execution policy check. + def pre_execute_check(self, ctx: ExecutionContext, input_data: Any) -> PolicyCheckResult: + """Run pre-execution policy checks and return a structured result. - When a ``PolicyEvaluator`` is configured, it is consulted **before** - the standard ``GovernancePolicy`` checks so that enterprise Cedar/OPA - policies always take precedence. + Args: + ctx: Execution context for the governed operation. + input_data: Input payload to validate before execution. - Returns (allowed, reason) tuple. + Returns: + A structured policy-check result. """ + from agent_os.policies.decision import PolicyCheckResult + from agent_os.policies.decision_factory import ( + deny_blocked_pattern_input, + deny_confidence_threshold, + deny_human_approval, + deny_max_tool_calls, + deny_policy_error, + deny_timeout, + ) + event_base = {"agent_id": ctx.agent_id, "timestamp": datetime.now().isoformat()} self.emit(GovernanceEventType.POLICY_CHECK, {**event_base, "phase": "pre_execute"}) @@ -1071,67 +1095,90 @@ def pre_execute(self, ctx: ExecutionContext, input_data: Any) -> tuple[bool, str ) allowed, reason = self._evaluate_policy(cedar_ctx) if not allowed: + result = deny_policy_error(reason) self.emit( GovernanceEventType.TOOL_CALL_BLOCKED, - {**event_base, "reason": reason, "source": "cedar"}, + {**event_base, "reason": result.reason, "source": "cedar"}, ) - return False, reason + return result # Check call count if ctx.call_count >= self.policy.max_tool_calls: - reason = f"Max tool calls exceeded ({self.policy.max_tool_calls})" - self.emit(GovernanceEventType.POLICY_VIOLATION, {**event_base, "reason": reason}) - return False, reason + result = deny_max_tool_calls(self.policy.max_tool_calls, ctx.call_count) + self.emit( + GovernanceEventType.POLICY_VIOLATION, + {**event_base, "reason": result.reason}, + ) + return result # Check timeout elapsed = (datetime.now() - ctx.start_time).total_seconds() if elapsed > self.policy.timeout_seconds: - reason = f"Timeout exceeded ({self.policy.timeout_seconds}s)" - self.emit(GovernanceEventType.POLICY_VIOLATION, {**event_base, "reason": reason}) - return False, reason + result = deny_timeout(self.policy.timeout_seconds, elapsed) + self.emit( + GovernanceEventType.POLICY_VIOLATION, + {**event_base, "reason": result.reason}, + ) + return result # Check blocked patterns input_str = str(input_data) matched = self.policy.matches_pattern(input_str) if matched: - reason = f"Blocked pattern detected: {matched[0]}" - self.emit(GovernanceEventType.TOOL_CALL_BLOCKED, {**event_base, "reason": reason, "pattern": matched[0]}) - return False, reason + result = deny_blocked_pattern_input(matched[0], input_str) + self.emit( + GovernanceEventType.TOOL_CALL_BLOCKED, + {**event_base, "reason": result.reason, "pattern": matched[0]}, + ) + return result # Check human approval requirement if self.policy.require_human_approval: - reason = "Execution requires human approval per governance policy" - self.emit(GovernanceEventType.POLICY_VIOLATION, {**event_base, "reason": reason}) - return False, reason + result = deny_human_approval() + self.emit( + GovernanceEventType.POLICY_VIOLATION, + {**event_base, "reason": result.reason}, + ) + return result # Check confidence threshold if self.policy.confidence_threshold > 0.0: - confidence = getattr(input_data, 'confidence', None) + confidence = getattr(input_data, "confidence", None) if isinstance(confidence, (int, float)) and confidence < self.policy.confidence_threshold: - reason = ( - f"Confidence {confidence:.2f} below threshold " - f"{self.policy.confidence_threshold:.2f}" + result = deny_confidence_threshold(self.policy.confidence_threshold, confidence) + self.emit( + GovernanceEventType.POLICY_VIOLATION, + {**event_base, "reason": result.reason}, ) - self.emit(GovernanceEventType.POLICY_VIOLATION, {**event_base, "reason": reason}) - return False, reason + return result - return True, None + return PolicyCheckResult() - def post_execute(self, ctx: ExecutionContext, output_data: Any) -> tuple[bool, str | None]: + def pre_execute(self, ctx: ExecutionContext, input_data: Any) -> tuple[bool, str | None]: + """Run pre-execution policy checks and return the legacy tuple. + + Args: + ctx: Execution context for the governed operation. + input_data: Input payload to validate before execution. + + Returns: + The legacy ``(allowed, reason)`` tuple. """ - Post-execution validation including drift detection. - Computes a similarity score between the serialized output and the - baseline (first output) using ``SequenceMatcher``. The drift score - is ``1.0 - similarity`` (0.0 = identical, 1.0 = completely different). + return self.pre_execute_check(ctx, input_data).to_legacy_tuple() - When the score exceeds ``policy.drift_threshold`` a - ``DRIFT_DETECTED`` governance event is emitted and a warning is - logged. Callers can register listeners for this event to enforce - blocking behaviour if desired. + def post_execute_check(self, ctx: ExecutionContext, output_data: Any) -> PolicyCheckResult: + """Run post-execution validation and return a structured result. + + Args: + ctx: Execution context for the governed operation. + output_data: Output payload to validate after execution. - Returns (valid, reason) tuple. + Returns: + A structured policy-check result. """ + from agent_os.policies.decision import PolicyCheckResult + ctx.call_count += 1 # Drift detection: compare output against baseline @@ -1178,16 +1225,29 @@ def post_execute(self, ctx: ExecutionContext, output_data: Any) -> tuple[bool, s "call_count": ctx.call_count, }) - return True, None + return PolicyCheckResult() + + def post_execute(self, ctx: ExecutionContext, output_data: Any) -> tuple[bool, str | None]: + """Run post-execution validation and return the legacy tuple. + + Args: + ctx: Execution context for the governed operation. + output_data: Output payload to validate after execution. + + Returns: + The legacy ``(valid, reason)`` tuple. + """ + + return self.post_execute_check(ctx, output_data).to_legacy_tuple() @staticmethod def compute_drift(ctx: ExecutionContext, output_data: Any) -> DriftResult | None: """Compute drift between *output_data* and the baseline stored in *ctx*. On the first call the output is recorded as the baseline and ``None`` - is returned (no comparison possible). Subsequent calls use + is returned (no comparison possible). Subsequent calls use ``SequenceMatcher`` to compute a similarity ratio between the - serialised baseline and the current output. The drift score is + serialised baseline and the current output. The drift score is ``1.0 - similarity`` (0.0 = identical, 1.0 = completely different). """ current_text = str(output_data) @@ -1212,23 +1272,75 @@ def compute_drift(ctx: ExecutionContext, output_data: Any) -> DriftResult | None current_hash=current_hash, ) - async def async_pre_execute(self, ctx: ExecutionContext, input_data: Any) -> tuple[bool, str | None]: + async def async_pre_execute_check( + self, + ctx: ExecutionContext, + input_data: Any, + ) -> PolicyCheckResult: + """Run async pre-execution policy checks and return a structured result. + + Args: + ctx: Execution context for the governed operation. + input_data: Input payload to validate before execution. + + Returns: + A structured policy-check result. """ - Async pre-execution policy check. - Defaults to calling the sync version. Override in subclasses - to add async-specific logic (e.g., async database lookups). + return self.pre_execute_check(ctx, input_data) + + async def async_pre_execute( + self, + ctx: ExecutionContext, + input_data: Any, + ) -> tuple[bool, str | None]: + """Run async pre-execution policy checks and return the legacy tuple. + + Args: + ctx: Execution context for the governed operation. + input_data: Input payload to validate before execution. + + Returns: + The legacy ``(allowed, reason)`` tuple. """ - return self.pre_execute(ctx, input_data) - async def async_post_execute(self, ctx: ExecutionContext, output_data: Any) -> tuple[bool, str | None]: + result = await self.async_pre_execute_check(ctx, input_data) + return result.to_legacy_tuple() + + async def async_post_execute_check( + self, + ctx: ExecutionContext, + output_data: Any, + ) -> PolicyCheckResult: + """Run async post-execution validation and return a structured result. + + Args: + ctx: Execution context for the governed operation. + output_data: Output payload to validate after execution. + + Returns: + A structured policy-check result. """ - Async post-execution validation. - Defaults to calling the sync version. Override in subclasses - to add async-specific logic. + return self.post_execute_check(ctx, output_data) + + async def async_post_execute( + self, + ctx: ExecutionContext, + output_data: Any, + ) -> tuple[bool, str | None]: + """Run async post-execution validation and return the legacy tuple. + + Args: + ctx: Execution context for the governed operation. + output_data: Output payload to validate after execution. + + Returns: + The legacy ``(valid, reason)`` tuple. """ - return self.post_execute(ctx, output_data) + + result = await self.async_post_execute_check(ctx, output_data) + return result.to_legacy_tuple() def on_signal(self, signal: str, handler: Callable[..., Any]) -> None: """Register a signal handler.""" @@ -1261,17 +1373,17 @@ def context(self) -> ExecutionContext: async def __call__(self, *args: Any, **kwargs: Any) -> Any: async with self._lock: # Pre-execution check - allowed, reason = await self._integration.async_pre_execute(self._ctx, (args, kwargs)) - if not allowed: - raise PolicyViolationError(reason or "Policy check failed") + pre_result = await self._integration.async_pre_execute_check(self._ctx, (args, kwargs)) + if not pre_result.allowed: + raise PolicyViolationError(pre_result.reason or "Policy check failed") # Execute the wrapped callable result = await self._fn(*args, **kwargs) # Post-execution validation - valid, reason = await self._integration.async_post_execute(self._ctx, result) - if not valid: - raise PolicyViolationError(reason or "Post-execution validation failed") + post_result = await self._integration.async_post_execute_check(self._ctx, result) + if not post_result.allowed: + raise PolicyViolationError(post_result.reason or "Post-execution validation failed") return result diff --git a/agent-governance-python/agent-os/src/agent_os/policies/__init__.py b/agent-governance-python/agent-os/src/agent_os/policies/__init__.py index cb67a3225..f27ca6cde 100644 --- a/agent-governance-python/agent-os/src/agent_os/policies/__init__.py +++ b/agent-governance-python/agent-os/src/agent_os/policies/__init__.py @@ -22,6 +22,7 @@ ExternalPolicyBackend, OPABackend, ) +from .decision import PolicyCheckResult, ViolationCategory from .evaluator import PolicyDecision, PolicyEvaluator from .rate_limiting import RateLimitConfig, RateLimitExceeded, TokenBucket from .schema import ( @@ -53,6 +54,7 @@ "ExternalPolicyBackend", "OPABackend", "PolicyAction", + "PolicyCheckResult", "PolicyCondition", "PolicyConflictResolver", "PolicyDecision", @@ -66,6 +68,7 @@ "RateLimitExceeded", "ResolutionResult", "TokenBucket", + "ViolationCategory", "SharedPolicyDecision", "SharedPolicyEvaluator", "SharedPolicyRule", diff --git a/agent-governance-python/agent-os/src/agent_os/policies/decision.py b/agent-governance-python/agent-os/src/agent_os/policies/decision.py new file mode 100644 index 000000000..42c6ef049 --- /dev/null +++ b/agent-governance-python/agent-os/src/agent_os/policies/decision.py @@ -0,0 +1,79 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Structured policy check decisions for integration-layer governance.""" + +from __future__ import annotations + +from enum import Enum +from typing import Any + +from pydantic import BaseModel, Field + + +class ViolationCategory(str, Enum): + """Categories for integration-layer policy violations.""" + + BLOCKED_TOOL = "blocked_tool" + NOT_ALLOWED_TOOL = "not_allowed_tool" + BLOCKED_PATTERN_INPUT = "blocked_pattern_input" + BLOCKED_PATTERN_TOOL = "blocked_pattern_tool" + BLOCKED_PATTERN_OUTPUT = "blocked_pattern_output" + BLOCKED_PATTERN_MEMORY = "blocked_pattern_memory" + MAX_TOOL_CALLS = "max_tool_calls" + TIMEOUT = "timeout" + HUMAN_APPROVAL = "human_approval_required" + CONFIDENCE_THRESHOLD = "confidence_threshold" + DRIFT = "drift" + POLICY_ERROR = "policy_error" + + +class PolicyCheckResult(BaseModel): + """Structured result of an integration-layer policy check. + + Attributes: + allowed: Whether the policy check allows the operation. + action: Policy action associated with the result. + category: Optional structured violation category. + matched_rule: Optional declarative or legacy rule identifier. + public_message: Sanitized message safe for end users. + detail: Restricted details for logs and audits. + reason: Legacy free-form reason returned by tuple wrappers. + matched_pattern: Policy pattern that matched, when applicable. + matched_text: User text span that matched; omitted by default. + scope: Scope checked by the policy decision. + operation: Adapter-defined operation name. + tool_name: Tool name associated with the result. + index: Batch index associated with the result. + audit_entry: Additional structured audit metadata. + """ + + allowed: bool = True + action: str = "allow" + category: ViolationCategory | None = None + matched_rule: str | None = None + public_message: str = "" + detail: str = "" + reason: str = "" + matched_pattern: str | None = None + matched_text: str | None = None + scope: str | None = None + operation: str | None = None + tool_name: str | None = None + index: int | None = None + audit_entry: dict[str, Any] = Field(default_factory=dict) + + def to_legacy_tuple(self) -> tuple[bool, str | None]: + """Convert this result to the legacy ``(allowed, reason)`` tuple.""" + + return self.allowed, (None if self.allowed else (self.reason or self.detail)) + + def to_public_dict(self) -> dict[str, Any]: + """Return a sanitized public representation of this result.""" + + return { + "allowed": self.allowed, + "action": self.action, + "category": self.category.value if self.category else None, + "matched_rule": self.matched_rule, + "public_message": self.public_message, + } diff --git a/agent-governance-python/agent-os/src/agent_os/policies/decision_factory.py b/agent-governance-python/agent-os/src/agent_os/policies/decision_factory.py new file mode 100644 index 000000000..37d55cb32 --- /dev/null +++ b/agent-governance-python/agent-os/src/agent_os/policies/decision_factory.py @@ -0,0 +1,267 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Factories for structured policy-denial decisions.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from .decision import PolicyCheckResult, ViolationCategory + +_PUBLIC_MESSAGES: dict[ViolationCategory, str] = { + ViolationCategory.BLOCKED_PATTERN_INPUT: "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_TOOL: "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_OUTPUT: "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_MEMORY: "Content blocked by governance policy.", + ViolationCategory.NOT_ALLOWED_TOOL: "Tool not permitted by governance policy.", + ViolationCategory.BLOCKED_TOOL: "Tool blocked by governance policy.", + ViolationCategory.MAX_TOOL_CALLS: "Tool-call limit exceeded.", + ViolationCategory.TIMEOUT: "Execution timeout exceeded.", + ViolationCategory.HUMAN_APPROVAL: "Human approval required.", + ViolationCategory.CONFIDENCE_THRESHOLD: "Confidence below required threshold.", + ViolationCategory.POLICY_ERROR: "Policy evaluation error.", + ViolationCategory.DRIFT: "Behavioral drift detected.", +} + + +def _deny_result( + *, + category: ViolationCategory, + detail: str, + matched_rule: str | None = None, + matched_pattern: str | None = None, + matched_text: str | None = None, + scope: str | None = None, + operation: str | None = None, + tool_name: str | None = None, + index: int | None = None, + audit_entry: dict[str, Any] | None = None, +) -> PolicyCheckResult: + """Build a common denied policy-check result.""" + + return PolicyCheckResult( + allowed=False, + action="deny", + category=category, + matched_rule=matched_rule, + public_message=_PUBLIC_MESSAGES[category], + detail=detail, + reason=detail, + matched_pattern=matched_pattern, + matched_text=matched_text, + scope=scope, + operation=operation, + tool_name=tool_name, + index=index, + audit_entry=audit_entry or {}, + ) + + +def deny_blocked_pattern_input( + matched_pattern: str, + matched_text: str | None = None, + *, + rule_name: str | None = None, + redact_user_text: bool = True, +) -> PolicyCheckResult: + """Return a denial for an input blocked-pattern match.""" + + detail = f"Blocked pattern detected: {matched_pattern}" + return _deny_result( + category=ViolationCategory.BLOCKED_PATTERN_INPUT, + detail=detail, + matched_rule=rule_name, + matched_pattern=matched_pattern, + matched_text=None if redact_user_text else matched_text, + scope="input", + audit_entry={"matched_pattern": matched_pattern}, + ) + + +def deny_blocked_pattern_tool( + matched_pattern: str, + *, + tool_name: str | None = None, + scope_label: str = "tool", + redact_user_text: bool = True, + matched_text: str | None = None, +) -> PolicyCheckResult: + """Return a denial for a tool-argument blocked-pattern match.""" + + if tool_name: + detail = f"Blocked pattern '{matched_pattern}' detected in tool '{tool_name}' arguments" + else: + detail = f"Blocked pattern '{matched_pattern}' detected in tool arguments" + return _deny_result( + category=ViolationCategory.BLOCKED_PATTERN_TOOL, + detail=detail, + matched_pattern=matched_pattern, + matched_text=None if redact_user_text else matched_text, + scope=scope_label, + tool_name=tool_name, + audit_entry={"matched_pattern": matched_pattern}, + ) + + +def deny_blocked_pattern_output( + matched_pattern: str, + *, + redact_user_text: bool = True, + matched_text: str | None = None, +) -> PolicyCheckResult: + """Return a denial for an output blocked-pattern match.""" + + detail = f"Blocked pattern detected in output: {matched_pattern}" + return _deny_result( + category=ViolationCategory.BLOCKED_PATTERN_OUTPUT, + detail=detail, + matched_pattern=matched_pattern, + matched_text=None if redact_user_text else matched_text, + scope="output", + audit_entry={"matched_pattern": matched_pattern}, + ) + + +def deny_blocked_pattern_memory( + matched_pattern: str, + *, + redact_user_text: bool = True, + matched_text: str | None = None, +) -> PolicyCheckResult: + """Return a denial for a memory-write blocked-pattern match.""" + + detail = f"Memory write blocked: blocked pattern '{matched_pattern}' detected" + return _deny_result( + category=ViolationCategory.BLOCKED_PATTERN_MEMORY, + detail=detail, + matched_pattern=matched_pattern, + matched_text=None if redact_user_text else matched_text, + scope="memory", + audit_entry={"matched_pattern": matched_pattern}, + ) + + +def deny_blocked_tool(tool_name: str) -> PolicyCheckResult: + """Return a denial for a tool that matches a blocked tool pattern.""" + + detail = f"Tool '{tool_name}' matches blocked pattern" + return _deny_result( + category=ViolationCategory.BLOCKED_TOOL, + detail=detail, + scope="tool", + tool_name=tool_name, + audit_entry={"tool_name": tool_name}, + ) + + +def deny_not_allowed_tool(tool_name: str, allowed: Sequence[str]) -> PolicyCheckResult: + """Return a denial for a tool outside the configured allow-list.""" + + detail = f"Tool '{tool_name}' not in allowed list: {allowed}" + return _deny_result( + category=ViolationCategory.NOT_ALLOWED_TOOL, + detail=detail, + scope="tool", + tool_name=tool_name, + audit_entry={"allowed_tools": list(allowed), "tool_name": tool_name}, + ) + + +def deny_max_tool_calls(limit: int, current: int | None = None) -> PolicyCheckResult: + """Return a denial for exceeding the configured tool-call limit.""" + + detail = f"Max tool calls exceeded ({limit})" + audit_entry: dict[str, Any] = {"limit": limit} + if current is not None: + audit_entry["current"] = current + return _deny_result( + category=ViolationCategory.MAX_TOOL_CALLS, + detail=detail, + scope="tool", + audit_entry=audit_entry, + ) + + +def deny_timeout(limit_s: int | float, elapsed_s: int | float | None = None) -> PolicyCheckResult: + """Return a denial for exceeding the configured execution timeout.""" + + detail = f"Timeout exceeded ({limit_s}s)" + audit_entry: dict[str, Any] = {"limit_s": limit_s} + if elapsed_s is not None: + audit_entry["elapsed_s"] = elapsed_s + return _deny_result( + category=ViolationCategory.TIMEOUT, + detail=detail, + audit_entry=audit_entry, + ) + + +def deny_human_approval(tool_name: str | None = None) -> PolicyCheckResult: + """Return a denial for a human-approval requirement.""" + + if tool_name: + detail = f"Tool '{tool_name}' requires human approval per governance policy" + else: + detail = "Execution requires human approval per governance policy" + return _deny_result( + category=ViolationCategory.HUMAN_APPROVAL, + detail=detail, + scope="tool" if tool_name else None, + tool_name=tool_name, + audit_entry={"tool_name": tool_name} if tool_name else {}, + ) + + +def deny_confidence_threshold( + threshold: float, + observed: float | None = None, +) -> PolicyCheckResult: + """Return a denial for a confidence score below the configured threshold.""" + + if observed is None: + detail = f"Confidence below threshold of {threshold}" + else: + detail = f"Confidence {observed:.2f} below threshold {threshold:.2f}" + audit_entry: dict[str, Any] = {"threshold": threshold} + if observed is not None: + audit_entry["observed"] = observed + return _deny_result( + category=ViolationCategory.CONFIDENCE_THRESHOLD, + detail=detail, + audit_entry=audit_entry, + ) + + +def deny_policy_error(detail: str, *, matched_rule: str | None = None) -> PolicyCheckResult: + """Return a denial for a policy evaluation error or fail-closed decision.""" + + return _deny_result( + category=ViolationCategory.POLICY_ERROR, + detail=detail, + matched_rule=matched_rule, + audit_entry={"matched_rule": matched_rule} if matched_rule else {}, + ) + + +def deny_drift( + score: float, + threshold: float, + *, + baseline_hash: str | None = None, + current_hash: str | None = None, +) -> PolicyCheckResult: + """Return a denial for behavioral drift above the configured threshold.""" + + detail = f"Drift score {score:.2f} exceeds threshold {threshold:.2f}" + audit_entry: dict[str, Any] = {"drift_score": score, "threshold": threshold} + if baseline_hash is not None: + audit_entry["baseline_hash"] = baseline_hash + if current_hash is not None: + audit_entry["current_hash"] = current_hash + return _deny_result( + category=ViolationCategory.DRIFT, + detail=detail, + scope="output", + audit_entry=audit_entry, + ) diff --git a/agent-governance-python/agent-os/tests/test_legacy_constructors.py b/agent-governance-python/agent-os/tests/test_legacy_constructors.py new file mode 100644 index 000000000..2c77a1feb --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_legacy_constructors.py @@ -0,0 +1,49 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for legacy exception constructor compatibility.""" + +from __future__ import annotations + +import pytest + +from agent_os.exceptions import PolicyViolationError + +CONSTRUCTOR_CASES = [ + ("message_only", ("msg",), {}, "POLICY_VIOLATION", {}), + ("message_and_code", ("msg", "CUSTOM_CODE"), {}, "CUSTOM_CODE", {}), + ("message_code_details", ("msg", "CUSTOM", {"k": "v"}), {}, "CUSTOM", {"k": "v"}), + ( + "keyword_arguments", + (), + {"message": "msg", "error_code": "X", "details": {"y": 1}}, + "X", + {"y": 1}, + ), +] + + +class TestLegacyConstructors: + """Verify legacy PolicyViolationError construction remains supported.""" + + @pytest.mark.parametrize( + ("case_name", "args", "kwargs", "expected_code", "expected_details"), + CONSTRUCTOR_CASES, + ids=[case[0] for case in CONSTRUCTOR_CASES], + ) + def test_policy_violation_error_legacy_forms( + self, + case_name: str, + args: tuple[object, ...], + kwargs: dict[str, object], + expected_code: str, + expected_details: dict[str, object], + ) -> None: + error = PolicyViolationError(*args, **kwargs) + data = error.to_dict() + + assert case_name + assert error.check_result is None + assert error.error_code == expected_code + assert error.details == expected_details + assert data["error"] == expected_code + assert data["message"] == "msg" diff --git a/agent-governance-python/agent-os/tests/test_policy_check_result_no_leak.py b/agent-governance-python/agent-os/tests/test_policy_check_result_no_leak.py new file mode 100644 index 000000000..e99e6a4fb --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_policy_check_result_no_leak.py @@ -0,0 +1,224 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for sanitized policy-check decisions.""" + +from __future__ import annotations + +from collections.abc import Callable + +import pytest + +from agent_os.policies.decision import PolicyCheckResult, ViolationCategory +from agent_os.policies.decision_factory import ( + deny_blocked_pattern_input, + deny_blocked_pattern_memory, + deny_blocked_pattern_output, + deny_blocked_pattern_tool, + deny_blocked_tool, + deny_confidence_threshold, + deny_drift, + deny_human_approval, + deny_max_tool_calls, + deny_not_allowed_tool, + deny_policy_error, + deny_timeout, +) + +FactoryBuilder = Callable[[], PolicyCheckResult] + +PUBLIC_KEYS = {"allowed", "action", "category", "matched_rule", "public_message"} + +FACTORY_CASES: list[tuple[str, FactoryBuilder, str, ViolationCategory, tuple[str, ...]]] = [ + ( + "blocked_pattern_input", + lambda: deny_blocked_pattern_input("secret-input-pattern", "secret input text"), + "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_INPUT, + ("secret-input-pattern",), + ), + ( + "blocked_pattern_tool", + lambda: deny_blocked_pattern_tool( + "secret-tool-pattern", tool_name="danger_tool", matched_text="secret tool text" + ), + "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_TOOL, + ("secret-tool-pattern", "danger_tool"), + ), + ( + "blocked_pattern_output", + lambda: deny_blocked_pattern_output("secret-output-pattern", matched_text="secret output"), + "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_OUTPUT, + ("secret-output-pattern",), + ), + ( + "blocked_pattern_memory", + lambda: deny_blocked_pattern_memory("secret-memory-pattern", matched_text="secret memory"), + "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_MEMORY, + ("secret-memory-pattern",), + ), + ( + "blocked_tool", + lambda: deny_blocked_tool("danger_tool"), + "Tool blocked by governance policy.", + ViolationCategory.BLOCKED_TOOL, + ("danger_tool",), + ), + ( + "not_allowed_tool", + lambda: deny_not_allowed_tool("danger_tool", ["read_file", "write_file"]), + "Tool not permitted by governance policy.", + ViolationCategory.NOT_ALLOWED_TOOL, + ("danger_tool", "read_file", "write_file"), + ), + ( + "max_tool_calls", + lambda: deny_max_tool_calls(5, current=6), + "Tool-call limit exceeded.", + ViolationCategory.MAX_TOOL_CALLS, + ("5", "6"), + ), + ( + "timeout", + lambda: deny_timeout(30, elapsed_s=31), + "Execution timeout exceeded.", + ViolationCategory.TIMEOUT, + ("30", "31"), + ), + ( + "human_approval", + lambda: deny_human_approval("approval_tool"), + "Human approval required.", + ViolationCategory.HUMAN_APPROVAL, + ("approval_tool",), + ), + ( + "confidence_threshold", + lambda: deny_confidence_threshold(0.9, observed=0.42), + "Confidence below required threshold.", + ViolationCategory.CONFIDENCE_THRESHOLD, + ("0.9", "0.42"), + ), + ( + "policy_error", + lambda: deny_policy_error("policy detail includes policy-secret", matched_rule="rule-17"), + "Policy evaluation error.", + ViolationCategory.POLICY_ERROR, + ("policy-secret", "17"), + ), + ( + "drift", + lambda: deny_drift( + 0.91, + 0.7, + baseline_hash="baseline-secret-hash", + current_hash="current-secret-hash", + ), + "Behavioral drift detected.", + ViolationCategory.DRIFT, + ("0.91", "0.7", "baseline-secret-hash", "current-secret-hash"), + ), +] + +REDACTABLE_PATTERN_CASES: list[tuple[str, Callable[..., PolicyCheckResult]]] = [ + ("blocked_pattern_input", deny_blocked_pattern_input), + ("blocked_pattern_tool", deny_blocked_pattern_tool), + ("blocked_pattern_output", deny_blocked_pattern_output), + ("blocked_pattern_memory", deny_blocked_pattern_memory), +] + + +class TestPolicyCheckResultNoLeak: + """Verify policy decisions separate public text from audit detail.""" + + @pytest.mark.parametrize( + ("case_name", "builder", "public_message", "expected_category", "sensitive_values"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_factory_public_message_is_sanitized( + self, + case_name: str, + builder: FactoryBuilder, + public_message: str, + expected_category: ViolationCategory, + sensitive_values: tuple[str, ...], + ) -> None: + result = builder() + + assert case_name + assert result.public_message == public_message + assert result.action == "deny" + assert result.allowed is False + assert result.category is expected_category + for sensitive in sensitive_values: + assert sensitive not in result.public_message + assert sensitive in f"{result.detail} {result.matched_pattern} {result.audit_entry}" + assert result.matched_text is None + + @pytest.mark.parametrize( + ("case_name", "builder", "public_message", "expected_category", "sensitive_values"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_public_dict_omits_restricted_fields( + self, + case_name: str, + builder: FactoryBuilder, + public_message: str, + expected_category: ViolationCategory, + sensitive_values: tuple[str, ...], + ) -> None: + result = builder() + public_dict = result.to_public_dict() + + assert case_name + assert public_message + assert expected_category + assert sensitive_values + assert set(public_dict) == PUBLIC_KEYS + assert "detail" not in public_dict + assert "matched_pattern" not in public_dict + assert "matched_text" not in public_dict + assert "audit_entry" not in public_dict + + @pytest.mark.parametrize( + ("case_name", "builder", "public_message", "expected_category", "sensitive_values"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_model_dump_round_trips( + self, + case_name: str, + builder: FactoryBuilder, + public_message: str, + expected_category: ViolationCategory, + sensitive_values: tuple[str, ...], + ) -> None: + result = builder() + roundtripped = PolicyCheckResult.model_validate(result.model_dump()) + + assert case_name + assert public_message + assert expected_category + assert sensitive_values + assert roundtripped == result + + @pytest.mark.parametrize( + ("case_name", "factory"), + REDACTABLE_PATTERN_CASES, + ids=[case[0] for case in REDACTABLE_PATTERN_CASES], + ) + def test_redactable_factories_preserve_text_only_when_requested( + self, + case_name: str, + factory: Callable[..., PolicyCheckResult], + ) -> None: + text = f"matched text for {case_name}" + redacted = factory("secret-pattern", matched_text=text) + unredacted = factory("secret-pattern", matched_text=text, redact_user_text=False) + + assert redacted.matched_text is None + assert unredacted.matched_text == text diff --git a/agent-governance-python/agent-os/tests/test_policy_violation_error_safety.py b/agent-governance-python/agent-os/tests/test_policy_violation_error_safety.py new file mode 100644 index 000000000..679bfad66 --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_policy_violation_error_safety.py @@ -0,0 +1,151 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for safe policy-violation exception messages.""" + +from __future__ import annotations + +import pickle +from collections.abc import Callable + +import pytest + +from agent_os.exceptions import PolicyViolationError +from agent_os.integrations.base import PatternType +from agent_os.policies.decision import PolicyCheckResult +from agent_os.policies.decision_factory import ( + deny_blocked_pattern_input, + deny_blocked_pattern_memory, + deny_blocked_pattern_output, + deny_blocked_pattern_tool, + deny_blocked_tool, + deny_confidence_threshold, + deny_drift, + deny_human_approval, + deny_max_tool_calls, + deny_not_allowed_tool, + deny_policy_error, + deny_timeout, +) + +FactoryBuilder = Callable[[], PolicyCheckResult] + +FACTORY_CASES: list[tuple[str, FactoryBuilder, str]] = [ + ( + "blocked_pattern_input", + lambda: deny_blocked_pattern_input("secret-input-pattern", "secret input text"), + "secret-input-pattern", + ), + ( + "blocked_pattern_tool", + lambda: deny_blocked_pattern_tool("secret-tool-pattern", tool_name="danger_tool"), + "secret-tool-pattern", + ), + ( + "blocked_pattern_output", + lambda: deny_blocked_pattern_output("secret-output-pattern"), + "secret-output-pattern", + ), + ( + "blocked_pattern_memory", + lambda: deny_blocked_pattern_memory("secret-memory-pattern"), + "secret-memory-pattern", + ), + ("blocked_tool", lambda: deny_blocked_tool("danger_tool"), "danger_tool"), + ( + "not_allowed_tool", + lambda: deny_not_allowed_tool("danger_tool", ["read_file", "write_file"]), + "read_file", + ), + ("max_tool_calls", lambda: deny_max_tool_calls(5, current=6), "5"), + ("timeout", lambda: deny_timeout(30, elapsed_s=31), "30"), + ("human_approval", lambda: deny_human_approval("approval_tool"), "approval_tool"), + ("confidence_threshold", lambda: deny_confidence_threshold(0.9, observed=0.42), "0.90"), + ( + "policy_error", + lambda: deny_policy_error("policy detail includes policy-secret"), + "policy-secret", + ), + ("drift", lambda: deny_drift(0.91, 0.7), "0.91"), +] + +PROPERTY_CASES: list[tuple[str, FactoryBuilder, tuple[str, ...]]] = [ + ( + "allowed_tools", + lambda: deny_not_allowed_tool("delete_file", ["read_file", "write_file"]), + ("delete_file", "read_file", "write_file"), + ), + ( + "blocked_patterns", + lambda: deny_blocked_pattern_input(r"\b\d{3}-\d{2}-\d{4}\b"), + (r"\b\d{3}-\d{2}-\d{4}\b", PatternType.REGEX.value), + ), + ("blocked_tool", lambda: deny_blocked_tool("shell_exec"), ("shell_exec",)), + ("max_tool_calls", lambda: deny_max_tool_calls(5), ("5",)), + ("timeout_seconds", lambda: deny_timeout(30), ("30",)), +] + + +class TestPolicyViolationErrorSafety: + """Verify exceptions expose sanitized messages and retain audit details.""" + + @pytest.mark.parametrize( + ("case_name", "builder", "detail_value"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_from_check_result_public_message_is_safe( + self, + case_name: str, + builder: FactoryBuilder, + detail_value: str, + ) -> None: + result = builder() + + with pytest.raises(PolicyViolationError) as exc_info: + raise PolicyViolationError.from_check_result(result) + + error = exc_info.value + assert case_name + assert str(error) == result.public_message + assert detail_value not in str(error) + assert detail_value not in repr(error) + assert detail_value in error.details["detail"] + assert error.check_result is result + + @pytest.mark.parametrize( + ("case_name", "builder", "detail_value"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_pickle_round_trip_preserves_public_message_safety( + self, + case_name: str, + builder: FactoryBuilder, + detail_value: str, + ) -> None: + error = PolicyViolationError.from_check_result(builder()) + roundtripped = pickle.loads(pickle.dumps(error)) + + # The attached check_result is diagnostic state; public-message safety is the contract. + assert case_name + assert str(roundtripped) == str(error) + assert detail_value not in str(roundtripped) + assert detail_value not in repr(roundtripped) + + @pytest.mark.parametrize( + ("case_name", "builder", "forbidden_values"), + PROPERTY_CASES, + ids=[case[0] for case in PROPERTY_CASES], + ) + def test_synthetic_policy_values_do_not_leak_to_exception_message( + self, + case_name: str, + builder: FactoryBuilder, + forbidden_values: tuple[str, ...], + ) -> None: + error = PolicyViolationError.from_check_result(builder()) + + assert case_name + for forbidden in forbidden_values: + assert forbidden not in str(error) + assert forbidden not in repr(error) diff --git a/agent-governance-python/agent-os/tests/test_pre_execute_check_contract.py b/agent-governance-python/agent-os/tests/test_pre_execute_check_contract.py new file mode 100644 index 000000000..d86515c5a --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_pre_execute_check_contract.py @@ -0,0 +1,199 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for structured integration policy-check wrapper contracts.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any + +import pytest + +from agent_os.integrations.base import ( + BaseIntegration, + ExecutionContext, + GovernancePolicy, +) +from agent_os.policies.decision import PolicyCheckResult, ViolationCategory +from agent_os.policies.decision_factory import ( + deny_blocked_pattern_input, + deny_human_approval, + deny_max_tool_calls, + deny_not_allowed_tool, + deny_timeout, +) + + +class _TestKernel(BaseIntegration): + """Minimal concrete integration for policy contract tests.""" + + def wrap(self, agent: Any) -> Any: + return agent + + def unwrap(self, governed_agent: Any) -> Any: + return governed_agent + + +@dataclass +class _LowConfidenceInput: + """Input object carrying confidence metadata.""" + + value: str + confidence: float + + +@dataclass(frozen=True) +class _PreCase: + """Fixture data for a pre-execution denial.""" + + name: str + policy_factory: Callable[[], GovernancePolicy] + context_mutator: Callable[[ExecutionContext], None] + input_factory: Callable[[], Any] + expected_category: ViolationCategory + + +PRE_DENIAL_CASES = [ + _PreCase( + name="max_tool_calls", + policy_factory=lambda: GovernancePolicy(max_tool_calls=5), + context_mutator=lambda ctx: setattr(ctx, "call_count", 5), + input_factory=lambda: "safe input", + expected_category=ViolationCategory.MAX_TOOL_CALLS, + ), + _PreCase( + name="timeout", + policy_factory=lambda: GovernancePolicy(timeout_seconds=1), + context_mutator=lambda ctx: setattr(ctx, "start_time", datetime.now() - timedelta(seconds=2)), + input_factory=lambda: "safe input", + expected_category=ViolationCategory.TIMEOUT, + ), + _PreCase( + name="blocked_pattern", + policy_factory=lambda: GovernancePolicy(blocked_patterns=["bar"]), + context_mutator=lambda ctx: None, + input_factory=lambda: "foo bar baz", + expected_category=ViolationCategory.BLOCKED_PATTERN_INPUT, + ), + _PreCase( + name="human_approval", + policy_factory=lambda: GovernancePolicy(require_human_approval=True), + context_mutator=lambda ctx: None, + input_factory=lambda: "safe input", + expected_category=ViolationCategory.HUMAN_APPROVAL, + ), + _PreCase( + name="confidence_threshold", + policy_factory=lambda: GovernancePolicy(confidence_threshold=0.9), + context_mutator=lambda ctx: None, + input_factory=lambda: _LowConfidenceInput("risky action", 0.2), + expected_category=ViolationCategory.CONFIDENCE_THRESHOLD, + ), +] + +LEGACY_REASON_CASES = [ + ("max_tool_calls", lambda: deny_max_tool_calls(5).reason, "Max tool calls exceeded (5)"), + ("timeout", lambda: deny_timeout(0.1).reason, "Timeout exceeded (0.1s)"), + ( + "blocked_pattern_input", + lambda: deny_blocked_pattern_input("bar", "foo bar baz").reason, + "Blocked pattern detected: bar", + ), + ( + "human_approval", + lambda: deny_human_approval().reason, + "Execution requires human approval per governance policy", + ), + ( + "not_allowed_tool", + lambda: deny_not_allowed_tool("c", ["a", "b"]).reason, + "Tool 'c' not in allowed list: ['a', 'b']", + ), +] + + +def _kernel_and_context(case: _PreCase) -> tuple[_TestKernel, ExecutionContext, Any]: + kernel = _TestKernel(policy=case.policy_factory()) + ctx = kernel.create_context(f"agent-{case.name}") + case.context_mutator(ctx) + return kernel, ctx, case.input_factory() + + +class TestPreExecuteCheckContract: + """Verify structured pre-checks match legacy pre-execute wrappers.""" + + @pytest.mark.parametrize("case", PRE_DENIAL_CASES, ids=[case.name for case in PRE_DENIAL_CASES]) + def test_pre_execute_check_returns_structured_denial(self, case: _PreCase) -> None: + kernel, ctx, input_data = _kernel_and_context(case) + + result = kernel.pre_execute_check(ctx, input_data) + + assert isinstance(result, PolicyCheckResult) + assert result.allowed is False + assert result.category is case.expected_category + assert result.to_legacy_tuple() == kernel.pre_execute(ctx, input_data) + + @pytest.mark.parametrize("case", PRE_DENIAL_CASES, ids=[case.name for case in PRE_DENIAL_CASES]) + async def test_async_pre_execute_check_returns_structured_denial(self, case: _PreCase) -> None: + kernel, ctx, input_data = _kernel_and_context(case) + + result = await kernel.async_pre_execute_check(ctx, input_data) + + assert isinstance(result, PolicyCheckResult) + assert result.allowed is False + assert result.category is case.expected_category + assert result.to_legacy_tuple() == await kernel.async_pre_execute(ctx, input_data) + + +class TestPostExecuteCheckContract: + """Verify structured post-checks match legacy post-execute wrappers.""" + + @pytest.mark.parametrize("case", PRE_DENIAL_CASES, ids=[case.name for case in PRE_DENIAL_CASES]) + def test_post_execute_check_matches_legacy_tuple_for_same_policies( + self, + case: _PreCase, + ) -> None: + check_kernel, check_ctx, _ = _kernel_and_context(case) + legacy_kernel, legacy_ctx, _ = _kernel_and_context(case) + + result = check_kernel.post_execute_check(check_ctx, "safe output") + legacy_tuple = legacy_kernel.post_execute(legacy_ctx, "safe output") + + assert isinstance(result, PolicyCheckResult) + assert result.to_legacy_tuple() == legacy_tuple + assert check_ctx.call_count == legacy_ctx.call_count + + @pytest.mark.parametrize("case", PRE_DENIAL_CASES, ids=[case.name for case in PRE_DENIAL_CASES]) + async def test_async_post_execute_check_matches_legacy_tuple_for_same_policies( + self, + case: _PreCase, + ) -> None: + check_kernel, check_ctx, _ = _kernel_and_context(case) + legacy_kernel, legacy_ctx, _ = _kernel_and_context(case) + + result = await check_kernel.async_post_execute_check(check_ctx, "safe output") + legacy_tuple = await legacy_kernel.async_post_execute(legacy_ctx, "safe output") + + assert isinstance(result, PolicyCheckResult) + assert result.to_legacy_tuple() == legacy_tuple + assert check_ctx.call_count == legacy_ctx.call_count + + +class TestLegacyReasonSnapshots: + """Pin byte-identical legacy denial reason strings.""" + + @pytest.mark.parametrize( + ("case_name", "reason_factory", "expected_reason"), + LEGACY_REASON_CASES, + ids=[case[0] for case in LEGACY_REASON_CASES], + ) + def test_factory_reason_matches_legacy_snapshot( + self, + case_name: str, + reason_factory: Callable[[], str], + expected_reason: str, + ) -> None: + assert case_name + assert reason_factory() == expected_reason diff --git a/agent-governance-python/agent-os/tests/test_public_api_surface.py b/agent-governance-python/agent-os/tests/test_public_api_surface.py new file mode 100644 index 000000000..b5069bd6a --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_public_api_surface.py @@ -0,0 +1,104 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests that pin public API signatures touched by policy decisions.""" + +from __future__ import annotations + +import inspect +from collections.abc import Callable +from typing import Any + +import pytest + +from agent_os import exceptions +from agent_os.integrations.base import BaseIntegration +from agent_os.policies import decision, decision_factory + +SIGNATURE_CASES: list[tuple[str, Callable[..., Any], str]] = [ + ( + "PolicyViolationError.__init__", + exceptions.PolicyViolationError.__init__, + "(self, message, error_code=None, details=None)", + ), + ( + "PolicyError.__init__", + exceptions.PolicyError.__init__, + "(self, message, error_code=None, details=None)", + ), + ( + "AgentOSError.__init__", + exceptions.AgentOSError.__init__, + "(self, message, error_code=None, details=None)", + ), + ( + "BaseIntegration.pre_execute", + BaseIntegration.pre_execute, + "(self, ctx: 'ExecutionContext', input_data: 'Any') -> 'tuple[bool, str | None]'", + ), + ( + "BaseIntegration.post_execute", + BaseIntegration.post_execute, + "(self, ctx: 'ExecutionContext', output_data: 'Any') -> 'tuple[bool, str | None]'", + ), + ( + "BaseIntegration.async_pre_execute", + BaseIntegration.async_pre_execute, + "(self, ctx: 'ExecutionContext', input_data: 'Any') -> 'tuple[bool, str | None]'", + ), + ( + "BaseIntegration.async_post_execute", + BaseIntegration.async_post_execute, + "(self, ctx: 'ExecutionContext', output_data: 'Any') -> 'tuple[bool, str | None]'", + ), +] + +ASYNC_METHODS = [ + BaseIntegration.async_pre_execute, + BaseIntegration.async_post_execute, +] + +NEW_FACTORY_NAMES = [ + "deny_blocked_pattern_input", + "deny_blocked_pattern_tool", + "deny_blocked_pattern_output", + "deny_blocked_pattern_memory", + "deny_blocked_tool", + "deny_not_allowed_tool", + "deny_max_tool_calls", + "deny_timeout", + "deny_human_approval", + "deny_confidence_threshold", + "deny_policy_error", + "deny_drift", +] + + +class TestPublicApiSurface: + """Pin legacy signatures and additive policy-decision symbols.""" + + @pytest.mark.parametrize( + ("symbol_name", "symbol", "expected_signature"), + SIGNATURE_CASES, + ids=[case[0] for case in SIGNATURE_CASES], + ) + def test_public_signatures_are_pinned( + self, + symbol_name: str, + symbol: Callable[..., Any], + expected_signature: str, + ) -> None: + assert symbol_name + assert str(inspect.signature(symbol)) == expected_signature + + @pytest.mark.parametrize("symbol", ASYNC_METHODS) + def test_async_wrappers_remain_coroutines(self, symbol: Callable[..., Any]) -> None: + assert inspect.iscoroutinefunction(symbol) + + @pytest.mark.parametrize("name", NEW_FACTORY_NAMES) + def test_new_decision_factories_exist(self, name: str) -> None: + assert callable(getattr(decision_factory, name)) + + def test_new_decision_symbols_exist(self) -> None: + assert inspect.isclass(decision.ViolationCategory) + assert inspect.isclass(decision.PolicyCheckResult) + assert callable(exceptions.PolicyViolationError.from_check_result) diff --git a/docs/adr/0011-additive-policy-check-contract.md b/docs/adr/0011-additive-policy-check-contract.md new file mode 100644 index 000000000..a6f0afeeb --- /dev/null +++ b/docs/adr/0011-additive-policy-check-contract.md @@ -0,0 +1,186 @@ +# ADR 0011: Additive Policy Check Contract + +- Status: Proposed +- Date: 2026-04-29 + +## Context + +AGT currently exposes policy outcomes through several independent contracts. The +same denial can appear as a declarative `PolicyDecision`, a legacy +`BaseIntegration` `(allowed, reason)` tuple, or an adapter-raised +`PolicyViolationError`. Those paths have different message shapes, different +exception identities, and different levels of detail. Declarative policy rules +can be safe when authors provide a user-facing message, but integration and +adapter paths often interpolate internal policy values such as regex patterns, +allow-lists, limits, or timeouts into strings that hosts may surface to users. + +The verified reproductions showed this split clearly. `PolicyEvaluator.evaluate` +can return a safe authored reason for an SSN-blocking rule, while +`BaseIntegration.pre_execute` returns a reason that includes the raw SSN regex. +Adapter-level tool checks raise yet another error string that also includes the +matched pattern and adds tool context. The result is both a disclosure issue and +a migration problem: hosts cannot reliably catch or classify governance denials +without substring-matching free-form English, and adapters duplicate their own +exception classes instead of using the canonical one. + +This ADR does not supersede ADR 0009. ADR 0009 covers RFC 9334 RATS architecture +alignment and remains independent of policy error sanitization. + +## Decision + +Add a structured, additive policy-check contract for integration-layer policy +outcomes while preserving every existing public shape. + +Introduce a new `agent_os.policies.decision` module containing +`ViolationCategory` and `PolicyCheckResult`. `ViolationCategory` is the stable, +typed denial taxonomy for hosts and adapters. `PolicyCheckResult` carries the +legacy allow/deny outcome plus separate fields for safe public text, restricted +detail, matched policy internals, optional matched user text, scope, operation, +tool name, and audit metadata. It also provides serializers for the two intended +views: `to_legacy_tuple()` for existing callers and `to_public_dict()` for host +responses. + +Introduce `agent_os.policies.decision_factory` as the only place integration +policy denial results are constructed. The factory set includes +`deny_blocked_pattern_input`, `deny_blocked_pattern_tool`, +`deny_blocked_pattern_output`, `deny_blocked_pattern_memory`, +`deny_blocked_tool`, `deny_not_allowed_tool`, `deny_max_tool_calls`, +`deny_timeout`, `deny_human_approval`, `deny_confidence_threshold`, and generic +policy-error builders. Each factory sets a fixed `public_message` keyed by +category and places sensitive values only in restricted fields such as `detail`, +`matched_pattern`, and `audit_entry`. + +Extend the canonical `agent_os.exceptions.PolicyViolationError` with an +additive `from_check_result` classmethod. The existing constructor forms remain +valid, `str(e)` remains based on `args[0]`, and legacy instances have +`e.check_result is None`. Exceptions created from a `PolicyCheckResult` surface +only the sanitized `public_message` through `str(e)` while retaining structured +details for server-side audit. + +Add opt-in `BaseIntegration` check methods beside the existing tuple methods: +`pre_execute_check`, `post_execute_check`, `async_pre_execute_check`, and +`async_post_execute_check`. These return `PolicyCheckResult`. Existing +`pre_execute`, `post_execute`, `async_pre_execute`, and `async_post_execute` +signatures and return types are preserved and are implemented as wrappers that +return `result.to_legacy_tuple()`. + +The change is purely additive. No existing signature, return type, import path, +exception constructor, declarative `PolicyDecision` field, or event payload is +removed or renamed. Legacy reason strings remain byte-identical for callers of +the tuple APIs, so existing tests and substring-based compatibility behavior +continue to work during the migration window. + +Adapter conversion is intentionally sequenced after the foundation. Each +adapter or surface should move in its own PR, replacing direct string-built +denials with `decision_factory` results and +`PolicyViolationError.from_check_result(...)`. Adapter-local +`PolicyViolationError` symbols become re-exports of the canonical class rather +than deleted symbols. A parametrized parity harness tracks every surface from +the discovery inventory, starts with expected failures, and removes each xfail +as one surface is converted. + +### Host Migration Guide + +Hosts should treat `str(e)` as the only end-user-safe message for +`PolicyViolationError`. For typed behavior, dispatch on +`e.check_result.category` when it is present. Hosts must not surface +`e.details["detail"]` to end users because it intentionally carries restricted +audit information, including the matched policy pattern, allow-list, limit, or +other internal value that caused the denial. + +Mixed-version hosts must tolerate older AGT versions and legacy exception +construction. If `e.check_result` is `None`, fall back to +`e.details.get("category")` when available, and then to the host's existing +substring matcher until all deployed adapters have moved to the structured +contract. + +Clients that still substring-match during the migration window can map old +message fragments to the new categories as follows: + +| Legacy message fragment | New `ViolationCategory` | +|---|---| +| `Blocked pattern detected` in input checks | `BLOCKED_PATTERN_INPUT` | +| `Blocked pattern` and `tool` / `arguments` | `BLOCKED_PATTERN_TOOL` | +| `matches blocked pattern` for tool names | `BLOCKED_TOOL` | +| `not in allowed list` | `NOT_ALLOWED_TOOL` | +| `Max tool calls exceeded` | `MAX_TOOL_CALLS` | +| `Timeout exceeded` | `TIMEOUT` | +| `requires human approval` | `HUMAN_APPROVAL` | +| `Memory write blocked` | `BLOCKED_PATTERN_MEMORY` | +| `Confidence below threshold` | `CONFIDENCE_THRESHOLD` | +| Any unclassified policy denial | `POLICY_ERROR` | + +Before, a host sanitized by substring-matching free-form text: + +```python +except Exception as e: + msg = str(e) + if "blocked by policy" in msg or "Blocked pattern" in msg: + return "This action was blocked by governance policy." +``` + +After, it can dispatch on the structured category and surface the already-safe +exception text: + +```python +from agent_os.exceptions import PolicyViolationError +from agent_os.policies.decision import ViolationCategory + +try: + ... +except PolicyViolationError as e: + result = e.check_result + category = result.category if result else None + message = str(e) + if category == ViolationCategory.BLOCKED_PATTERN_TOOL: + route = "tool_policy_denial" + else: + route = "policy_denial" + sse_emit("tool_error", {"message": message, "category": category.value if category else None}) +``` + +Existing defense-in-depth sanitizers may remain during the migration. They +should become no-ops for converted adapters because the exception message is +already sanitized at the source. + +## Consequences + +Backward compatibility is preserved for legacy callers. Tuple-returning +integration methods keep their signatures and byte-identical reason strings, +legacy `PolicyViolationError(...)` construction keeps working, and declarative +policy evaluation remains unchanged. + +Converted surfaces eliminate policy-internal leaks by construction because +public messages are fixed by category and sensitive values live only in +restricted structured fields. Audit fidelity is preserved through `detail`, +`matched_pattern`, optional `matched_text`, and `audit_entry`, but those fields +are explicitly server-side data. + +Defense in depth still applies. Hosts can retain existing sanitizers while they +roll out typed dispatch, and server-side logging controls must still protect +restricted details. + +## Alternatives considered + +### Pure string sanitization + +Rejected. Sanitizing exception strings at the edge addresses the symptom but not +the cause. It leaves each adapter free to build unsafe strings and requires +consumers to keep guessing which substrings are policy internals. + +### Breaking-API unification + +Rejected. Replacing the tuple APIs, renaming existing `PolicyDecision` types, or +changing exception constructors would be cleaner in isolation but too costly for +existing hosts, adapters, and tests. The additive contract gets the safety +benefit without a flag day. + +### Host-side filters only + +Rejected. Requiring every consumer to maintain its own deny-message sanitizer +duplicates effort and produces inconsistent behavior. AGT should provide a safe +contract at the source while hosts keep filters only as defense in depth. + +## References + +- `agent-governance-python/agent-os/AGENTS.md` — opt-in snippet for adapter authors diff --git a/docs/adr/index.md b/docs/adr/index.md index 70206dc05..db0b802b6 100644 --- a/docs/adr/index.md +++ b/docs/adr/index.md @@ -13,3 +13,4 @@ Key architectural decisions and their rationale. | [ADR-0007](0007-external-jwks-federation-for-cross-org-identity.md) | External JWKS federation for cross-org identity | | [ADR-0008](0008-cross-org-policy-federation.md) | Cross-org policy federation above identity | | [ADR-0009](0009-rfc-9334-rats-architecture-alignment.md) | RFC 9334 (RATS) architecture alignment | +| [ADR-0011](0011-additive-policy-check-contract.md) | Additive policy check contract | diff --git a/docs/security-audits/2026-04-29-additive-policy-check-contract.md b/docs/security-audits/2026-04-29-additive-policy-check-contract.md new file mode 100644 index 000000000..e266cbc74 --- /dev/null +++ b/docs/security-audits/2026-04-29-additive-policy-check-contract.md @@ -0,0 +1,110 @@ +# 2026-04-29 — Additive Policy-Check Contract + +PR: [microsoft/agent-governance-toolkit#1594](https://github.com/microsoft/agent-governance-toolkit/pull/1594) +ADR: [docs/adr/0011-additive-policy-check-contract.md](../adr/0011-additive-policy-check-contract.md) + +## What changed and why + +This PR introduces an **additive** structured policy-check contract under +`agent_os.policies.*`: + +- `agent_os.policies.decision` — `ViolationCategory` enum and `PolicyCheckResult` + Pydantic model with `to_legacy_tuple()` and `to_public_dict()` serializers. +- `agent_os.policies.decision_factory` — single source of truth for denial-result + construction; sanitized public-message templates keyed by category. +- `PolicyViolationError.from_check_result(result)` — additive classmethod on the + canonical exception. The legacy `(message, error_code, details)` constructor is + preserved verbatim. `str(e)` is the sanitized public message; audit fidelity + lives in `e.details["detail"]` and `e.check_result`. +- `BaseIntegration.{pre,post}_execute_check` (sync + async) — return + `PolicyCheckResult`. The legacy tuple-returning methods are reimplemented as + thin wrappers over the new methods; reason strings are byte-identical. +- Internal migration of `AsyncGovernedWrapper` and `PolicyInterceptor` to the + `*_check` variants. External adapter API is unchanged. + +**Why now:** adapter denial sites currently interpolate raw regex tokens, +allow-list contents, and limit numbers into user-visible error text. Hosts +have no programmatic way to dispatch on violation category without +substring-matching free-form English. This contract gives the kernel a single +canonical way to represent a policy decision so that every denial path can +deliver a sanitized public message at the source while preserving full audit +fidelity in restricted, server-side fields. + +## Threat model impact + +This change **reduces** the kernel's attack surface; it does not add any new +capability or power. + +| Dimension | Direction | +|---|---| +| Information leakage in user-visible error text | **Reduced.** Public messages are fixed by `ViolationCategory` and never include matched user input, raw regex patterns, allow-list contents, or limit numbers. Sensitive values live only in restricted structured fields. | +| Policy bypass surface | **Unchanged.** No existing check is removed, weakened, or made conditional. All previously denying paths still deny. Legacy callers continue to receive byte-identical reason strings via `to_legacy_tuple()`. | +| Authentication / identity | **Unchanged.** No identity, signing, or trust-handshake code is modified. | +| Privilege boundaries | **Unchanged.** Execution rings, kill switch, and approval gates are untouched. | +| New trust assumptions | **None.** The new types are pure data carriers; they do not perform privileged actions. | +| External input handling | **Tightened.** `decision_factory` factories default to `redact_user_text=True` for input/tool/output/memory denials, so user-supplied text cannot reach `PolicyCheckResult.matched_text`. | +| Audit fidelity | **Preserved.** The audit channel (`details["detail"]`, `check_result.audit_entry`, `matched_pattern`) retains every value the legacy path logged. | +| Backward compatibility | **Preserved.** Public exception constructor signature is unchanged; legacy tuple methods produce byte-identical strings; no breaking API change. | + +### Specific mitigations applied + +- **Public message templates are static.** `decision_factory._PUBLIC_MESSAGES` + is a fixed dict keyed by `ViolationCategory`. There is no string-interpolation + path from policy data into the public message. +- **`redact_user_text=True` is the default for user-input categories.** + Factories for `BLOCKED_PATTERN_INPUT`, `BLOCKED_PATTERN_TOOL`, + `BLOCKED_PATTERN_OUTPUT`, and `BLOCKED_PATTERN_MEMORY` never persist the + matched user text in the result. +- **Cedar / `PolicyEvaluator` gate is preserved.** `pre_execute_check` runs the + declarative policy evaluator first and routes its denials through + `deny_policy_error()` so existing Cedar deny text is unchanged. +- **Lazy imports inside `BaseIntegration` methods** keep the + `policies` package free of import cycles, so the new contract cannot be + trivially short-circuited by import-order tricks. + +### Surfaces not yet converted (out of scope for this PR) + +The 14 adapter modules and the `bridge.py`, `lite.py`, `trust_root.py`, and +finance-soc2 example denial sites that still interpolate policy internals are +explicitly **out of scope** for this PR. They are tracked in the parity harness +PR ([#1598](https://github.com/microsoft/agent-governance-toolkit/pull/1598)) +and converted one-by-one in subsequent per-adapter PRs (d / eᵢ). Each +conversion removes one `xfail` entry from the harness, so the remediation is +both visible and gated. + +## Test coverage + +All new tests live in `agent-governance-python/agent-os/tests/`: + +| File | Purpose | Tests | +|---|---|---| +| `test_policy_check_result_no_leak.py` | Verifies `to_public_dict()` and `decision_factory` outputs never carry matched user text, raw regex tokens, allow-list contents, or limit numbers. | 27 | +| `test_policy_violation_error_safety.py` | Verifies `str(e)` is the sanitized public message; `e.details["detail"]` and `e.check_result.audit_entry` retain audit fidelity; `from_check_result` round-trips. | 24 | +| `test_legacy_constructors.py` | Verifies the pre-existing `PolicyViolationError(message, error_code, details)` constructor still produces byte-identical instances. | 6 | +| `test_pre_execute_check_contract.py` | Asserts `*_check(...).to_legacy_tuple()` is byte-identical to the legacy `*_execute(...)` tuple for every denial category, sync and async. | 32 | +| `test_public_api_surface.py` | `inspect.signature` snapshot of every previously-public symbol the foundation touches; catches any future regression of the additive guarantee. | 31 | + +Total: **120 new tests, all passing.** Combined run on touched packages +(`pytest tests/ --ignore=tests/test_cmd_sign.py`) is **3308 baseline + 120 new += 3428 passed**, 52 skipped, 0 failed. Full Docker test suite green across all +packages. + +A local manual verification script (run by the author, not committed) exercises +the legacy-tuple parity, structured `PolicyCheckResult`, sanitized exception, +legacy constructor preservation, and `redact_user_text` honoring in five +end-to-end checks. + +## Reviewer focus + +Concentrate review on: + +1. **Sanitization invariants in `decision_factory.py`.** Every factory for a + user-input category must set `redact_user_text=True` by default and must + route raw values only into `matched_pattern` or `audit_entry`, never into + `public_message` or `matched_text`. +2. **Reason-string identity for legacy callers.** Inspect the `*_check` → + `to_legacy_tuple()` chain in `BaseIntegration`; the snapshot tests in + `test_pre_execute_check_contract.py` should fail loudly if any reason text + diverges by even one character. +3. **`PolicyViolationError.__init__` is still byte-compatible.** Any change to + the constructor signature is out of scope and would break legacy callers.