From 3bd6c77602f5a7269094ff6a31c83a2a2af529f6 Mon Sep 17 00:00:00 2001 From: "Elton Carr, II" Date: Wed, 29 Apr 2026 13:58:19 -0700 Subject: [PATCH 1/7] feat(policies): additive PolicyCheckResult + decision factories MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an additive structured policy-check contract for integration-layer governance, addressing the policy-internals leak surfaces enumerated in .plans/agt-unify-policy-decisions.md (PR (a)+(b) foundation). * New module `agent_os.policies.decision` with `ViolationCategory` enum and `PolicyCheckResult` Pydantic model (`to_legacy_tuple`/`to_public_dict` serializers). * New module `agent_os.policies.decision_factory` — single source of truth for denial result construction; sanitized public-message templates keyed by category. * `PolicyViolationError.from_check_result` classmethod (additive); legacy `(message, error_code, details)` constructor preserved verbatim. `str(e)` is the sanitized public_message; `e.details['detail']` retains audit fidelity. * `BaseIntegration` gains `pre_execute_check` / `post_execute_check` (sync+async) returning `PolicyCheckResult`. Legacy tuple methods reimplemented as thin wrappers; reason strings byte-identical (AC-5). * `AsyncGovernedWrapper` and `PolicyInterceptor` migrated internally to `*_check` variants. External adapter API unchanged. * ADR docs/adr/0011-additive-policy-check-contract.md with Host Migration Guide (AC-15, AC-16). 0009 RATS unaffected. * AGENTS.md updated with one-paragraph opt-in snippet (T13). Tests: 120 new (test_policy_check_result_no_leak, test_policy_violation_error_safety, test_legacy_constructors, test_pre_execute_check_contract, test_public_api_surface). Baseline 257 still green. Zero new ruff or mypy findings on touched files. Refs #1574 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- agent-governance-python/agent-os/AGENTS.md | 14 + .../agent-os/src/agent_os/exceptions.py | 24 ++ .../src/agent_os/integrations/base.py | 238 +++++++++++----- .../src/agent_os/policies/__init__.py | 15 +- .../src/agent_os/policies/decision.py | 79 ++++++ .../src/agent_os/policies/decision_factory.py | 267 ++++++++++++++++++ .../tests/test_legacy_constructors.py | 49 ++++ .../tests/test_policy_check_result_no_leak.py | 224 +++++++++++++++ .../test_policy_violation_error_safety.py | 151 ++++++++++ .../tests/test_pre_execute_check_contract.py | 199 +++++++++++++ .../agent-os/tests/test_public_api_surface.py | 104 +++++++ .../0011-additive-policy-check-contract.md | 187 ++++++++++++ docs/adr/index.md | 1 + 13 files changed, 1483 insertions(+), 69 deletions(-) create mode 100644 agent-governance-python/agent-os/src/agent_os/policies/decision.py create mode 100644 agent-governance-python/agent-os/src/agent_os/policies/decision_factory.py create mode 100644 agent-governance-python/agent-os/tests/test_legacy_constructors.py create mode 100644 agent-governance-python/agent-os/tests/test_policy_check_result_no_leak.py create mode 100644 agent-governance-python/agent-os/tests/test_policy_violation_error_safety.py create mode 100644 agent-governance-python/agent-os/tests/test_pre_execute_check_contract.py create mode 100644 agent-governance-python/agent-os/tests/test_public_api_surface.py create mode 100644 docs/adr/0011-additive-policy-check-contract.md diff --git a/agent-governance-python/agent-os/AGENTS.md b/agent-governance-python/agent-os/AGENTS.md index 5a4f417d3..7b79b3854 100644 --- a/agent-governance-python/agent-os/AGENTS.md +++ b/agent-governance-python/agent-os/AGENTS.md @@ -62,6 +62,20 @@ ruff format . - Event types: `GovernanceEventType.POLICY_CHECK`, `.POLICY_VIOLATION`, `.TOOL_CALL_BLOCKED`, `.CHECKPOINT_CREATED` - Tests go in `tests/` (unit) or `modules/*/tests/` (module-specific) +## Policy Check Result (additive) + +Policy checks should prefer the additive `*_execute_check` methods when new code needs structured denial data, while preserving the legacy tuple-returning methods for compatibility. Raise the canonical `PolicyViolationError.from_check_result(result)` for new typed flows; hosts should surface `str(e)` and use `e.check_result.category` for dispatch instead of substring-matching internal details. + +```python +from agent_os.policies.decision import ViolationCategory +from agent_os.exceptions import PolicyViolationError + +result = kernel.pre_execute_check(ctx, input_data) +if not result.allowed: + raise PolicyViolationError.from_check_result(result) +# Hosts: surface str(e); switch on e.check_result.category for typed dispatch. +``` + ## Boundaries - **Never modify** `tests/test_mcp_server.py` (known pre-existing failure, excluded from CI) diff --git a/agent-governance-python/agent-os/src/agent_os/exceptions.py b/agent-governance-python/agent-os/src/agent_os/exceptions.py index a223f5d6b..94bc9e438 100644 --- a/agent-governance-python/agent-os/src/agent_os/exceptions.py +++ b/agent-governance-python/agent-os/src/agent_os/exceptions.py @@ -8,7 +8,13 @@ for structured error handling and logging. """ +from __future__ import annotations + from datetime import datetime, timezone +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from agent_os.policies.decision import PolicyCheckResult class AgentOSError(Exception): @@ -43,6 +49,24 @@ class PolicyViolationError(PolicyError): def __init__(self, message, error_code=None, details=None): super().__init__(message, error_code or "POLICY_VIOLATION", details) + self.check_result = None + + @classmethod + def from_check_result(cls, result: PolicyCheckResult) -> PolicyViolationError: + """Create a policy violation error from a structured check result.""" + + details = { + "category": result.category.value if result.category else None, + "matched_rule": result.matched_rule, + "detail": result.detail, + "scope": result.scope, + "operation": result.operation, + "tool_name": result.tool_name, + **result.audit_entry, + } + e = cls(result.public_message, "POLICY_VIOLATION", details) + e.check_result = result + return e class PolicyDeniedError(PolicyError): diff --git a/agent-governance-python/agent-os/src/agent_os/integrations/base.py b/agent-governance-python/agent-os/src/agent_os/integrations/base.py index f5fde7eaf..4338b4efd 100644 --- a/agent-governance-python/agent-os/src/agent_os/integrations/base.py +++ b/agent-governance-python/agent-os/src/agent_os/integrations/base.py @@ -19,7 +19,10 @@ from dataclasses import dataclass, field from datetime import datetime from enum import Enum -from typing import Any, Callable, Protocol +from typing import TYPE_CHECKING, Any, Callable, Protocol + +if TYPE_CHECKING: + from agent_os.policies.decision import PolicyCheckResult logger = logging.getLogger(__name__) @@ -678,34 +681,45 @@ def __init__(self, policy: GovernancePolicy, context: ExecutionContext | None = self.context = context def intercept(self, request: ToolCallRequest) -> ToolCallResult: + from agent_os.policies.decision_factory import ( + deny_blocked_pattern_tool, + deny_human_approval, + deny_max_tool_calls, + deny_not_allowed_tool, + ) + # Check human approval requirement if self.policy.require_human_approval: + result = deny_human_approval(request.tool_name) return ToolCallResult( allowed=False, - reason=f"Tool '{request.tool_name}' requires human approval per governance policy", + reason=result.reason, ) # Check allowed tools if self.policy.allowed_tools and request.tool_name not in self.policy.allowed_tools: + result = deny_not_allowed_tool(request.tool_name, self.policy.allowed_tools) return ToolCallResult( allowed=False, - reason=f"Tool '{request.tool_name}' not in allowed list: {self.policy.allowed_tools}", + reason=result.reason, ) # Check blocked patterns args_str = str(request.arguments) matched = self.policy.matches_pattern(args_str) if matched: + result = deny_blocked_pattern_tool(matched[0]) return ToolCallResult( allowed=False, - reason=f"Blocked pattern '{matched[0]}' detected in tool arguments", + reason=result.reason, ) # Check call count if self.context and self.context.call_count >= self.policy.max_tool_calls: + result = deny_max_tool_calls(self.policy.max_tool_calls, self.context.call_count) return ToolCallResult( allowed=False, - reason=f"Max tool calls exceeded ({self.policy.max_tool_calls})", + reason=result.reason, ) return ToolCallResult(allowed=True) @@ -956,7 +970,7 @@ def from_cedar( policy_content: str | None = None, entities: list[dict[str, Any]] | None = None, **kwargs: Any, - ) -> "BaseIntegration": + ) -> BaseIntegration: """Create an integration with Cedar policy evaluation. Convenience factory that configures a ``PolicyEvaluator`` with a @@ -1036,16 +1050,26 @@ def emit(self, event_type: GovernanceEventType, data: dict[str, Any]) -> None: event_type, exc, exc_info=True, ) - def pre_execute(self, ctx: ExecutionContext, input_data: Any) -> tuple[bool, str | None]: - """ - Pre-execution policy check. + def pre_execute_check(self, ctx: ExecutionContext, input_data: Any) -> PolicyCheckResult: + """Run pre-execution policy checks and return a structured result. - When a ``PolicyEvaluator`` is configured, it is consulted **before** - the standard ``GovernancePolicy`` checks so that enterprise Cedar/OPA - policies always take precedence. + Args: + ctx: Execution context for the governed operation. + input_data: Input payload to validate before execution. - Returns (allowed, reason) tuple. + Returns: + A structured policy-check result. """ + from agent_os.policies.decision import PolicyCheckResult + from agent_os.policies.decision_factory import ( + deny_blocked_pattern_input, + deny_confidence_threshold, + deny_human_approval, + deny_max_tool_calls, + deny_policy_error, + deny_timeout, + ) + event_base = {"agent_id": ctx.agent_id, "timestamp": datetime.now().isoformat()} self.emit(GovernanceEventType.POLICY_CHECK, {**event_base, "phase": "pre_execute"}) @@ -1071,67 +1095,90 @@ def pre_execute(self, ctx: ExecutionContext, input_data: Any) -> tuple[bool, str ) allowed, reason = self._evaluate_policy(cedar_ctx) if not allowed: + result = deny_policy_error(reason) self.emit( GovernanceEventType.TOOL_CALL_BLOCKED, - {**event_base, "reason": reason, "source": "cedar"}, + {**event_base, "reason": result.reason, "source": "cedar"}, ) - return False, reason + return result # Check call count if ctx.call_count >= self.policy.max_tool_calls: - reason = f"Max tool calls exceeded ({self.policy.max_tool_calls})" - self.emit(GovernanceEventType.POLICY_VIOLATION, {**event_base, "reason": reason}) - return False, reason + result = deny_max_tool_calls(self.policy.max_tool_calls, ctx.call_count) + self.emit( + GovernanceEventType.POLICY_VIOLATION, + {**event_base, "reason": result.reason}, + ) + return result # Check timeout elapsed = (datetime.now() - ctx.start_time).total_seconds() if elapsed > self.policy.timeout_seconds: - reason = f"Timeout exceeded ({self.policy.timeout_seconds}s)" - self.emit(GovernanceEventType.POLICY_VIOLATION, {**event_base, "reason": reason}) - return False, reason + result = deny_timeout(self.policy.timeout_seconds, elapsed) + self.emit( + GovernanceEventType.POLICY_VIOLATION, + {**event_base, "reason": result.reason}, + ) + return result # Check blocked patterns input_str = str(input_data) matched = self.policy.matches_pattern(input_str) if matched: - reason = f"Blocked pattern detected: {matched[0]}" - self.emit(GovernanceEventType.TOOL_CALL_BLOCKED, {**event_base, "reason": reason, "pattern": matched[0]}) - return False, reason + result = deny_blocked_pattern_input(matched[0], input_str) + self.emit( + GovernanceEventType.TOOL_CALL_BLOCKED, + {**event_base, "reason": result.reason, "pattern": matched[0]}, + ) + return result # Check human approval requirement if self.policy.require_human_approval: - reason = "Execution requires human approval per governance policy" - self.emit(GovernanceEventType.POLICY_VIOLATION, {**event_base, "reason": reason}) - return False, reason + result = deny_human_approval() + self.emit( + GovernanceEventType.POLICY_VIOLATION, + {**event_base, "reason": result.reason}, + ) + return result # Check confidence threshold if self.policy.confidence_threshold > 0.0: - confidence = getattr(input_data, 'confidence', None) + confidence = getattr(input_data, "confidence", None) if isinstance(confidence, (int, float)) and confidence < self.policy.confidence_threshold: - reason = ( - f"Confidence {confidence:.2f} below threshold " - f"{self.policy.confidence_threshold:.2f}" + result = deny_confidence_threshold(self.policy.confidence_threshold, confidence) + self.emit( + GovernanceEventType.POLICY_VIOLATION, + {**event_base, "reason": result.reason}, ) - self.emit(GovernanceEventType.POLICY_VIOLATION, {**event_base, "reason": reason}) - return False, reason + return result - return True, None + return PolicyCheckResult() - def post_execute(self, ctx: ExecutionContext, output_data: Any) -> tuple[bool, str | None]: + def pre_execute(self, ctx: ExecutionContext, input_data: Any) -> tuple[bool, str | None]: + """Run pre-execution policy checks and return the legacy tuple. + + Args: + ctx: Execution context for the governed operation. + input_data: Input payload to validate before execution. + + Returns: + The legacy ``(allowed, reason)`` tuple. """ - Post-execution validation including drift detection. - Computes a similarity score between the serialized output and the - baseline (first output) using ``SequenceMatcher``. The drift score - is ``1.0 - similarity`` (0.0 = identical, 1.0 = completely different). + return self.pre_execute_check(ctx, input_data).to_legacy_tuple() - When the score exceeds ``policy.drift_threshold`` a - ``DRIFT_DETECTED`` governance event is emitted and a warning is - logged. Callers can register listeners for this event to enforce - blocking behaviour if desired. + def post_execute_check(self, ctx: ExecutionContext, output_data: Any) -> PolicyCheckResult: + """Run post-execution validation and return a structured result. + + Args: + ctx: Execution context for the governed operation. + output_data: Output payload to validate after execution. - Returns (valid, reason) tuple. + Returns: + A structured policy-check result. """ + from agent_os.policies.decision import PolicyCheckResult + ctx.call_count += 1 # Drift detection: compare output against baseline @@ -1178,16 +1225,29 @@ def post_execute(self, ctx: ExecutionContext, output_data: Any) -> tuple[bool, s "call_count": ctx.call_count, }) - return True, None + return PolicyCheckResult() + + def post_execute(self, ctx: ExecutionContext, output_data: Any) -> tuple[bool, str | None]: + """Run post-execution validation and return the legacy tuple. + + Args: + ctx: Execution context for the governed operation. + output_data: Output payload to validate after execution. + + Returns: + The legacy ``(valid, reason)`` tuple. + """ + + return self.post_execute_check(ctx, output_data).to_legacy_tuple() @staticmethod def compute_drift(ctx: ExecutionContext, output_data: Any) -> DriftResult | None: """Compute drift between *output_data* and the baseline stored in *ctx*. On the first call the output is recorded as the baseline and ``None`` - is returned (no comparison possible). Subsequent calls use + is returned (no comparison possible). Subsequent calls use ``SequenceMatcher`` to compute a similarity ratio between the - serialised baseline and the current output. The drift score is + serialised baseline and the current output. The drift score is ``1.0 - similarity`` (0.0 = identical, 1.0 = completely different). """ current_text = str(output_data) @@ -1212,23 +1272,75 @@ def compute_drift(ctx: ExecutionContext, output_data: Any) -> DriftResult | None current_hash=current_hash, ) - async def async_pre_execute(self, ctx: ExecutionContext, input_data: Any) -> tuple[bool, str | None]: + async def async_pre_execute_check( + self, + ctx: ExecutionContext, + input_data: Any, + ) -> PolicyCheckResult: + """Run async pre-execution policy checks and return a structured result. + + Args: + ctx: Execution context for the governed operation. + input_data: Input payload to validate before execution. + + Returns: + A structured policy-check result. """ - Async pre-execution policy check. - Defaults to calling the sync version. Override in subclasses - to add async-specific logic (e.g., async database lookups). + return self.pre_execute_check(ctx, input_data) + + async def async_pre_execute( + self, + ctx: ExecutionContext, + input_data: Any, + ) -> tuple[bool, str | None]: + """Run async pre-execution policy checks and return the legacy tuple. + + Args: + ctx: Execution context for the governed operation. + input_data: Input payload to validate before execution. + + Returns: + The legacy ``(allowed, reason)`` tuple. """ - return self.pre_execute(ctx, input_data) - async def async_post_execute(self, ctx: ExecutionContext, output_data: Any) -> tuple[bool, str | None]: + result = await self.async_pre_execute_check(ctx, input_data) + return result.to_legacy_tuple() + + async def async_post_execute_check( + self, + ctx: ExecutionContext, + output_data: Any, + ) -> PolicyCheckResult: + """Run async post-execution validation and return a structured result. + + Args: + ctx: Execution context for the governed operation. + output_data: Output payload to validate after execution. + + Returns: + A structured policy-check result. """ - Async post-execution validation. - Defaults to calling the sync version. Override in subclasses - to add async-specific logic. + return self.post_execute_check(ctx, output_data) + + async def async_post_execute( + self, + ctx: ExecutionContext, + output_data: Any, + ) -> tuple[bool, str | None]: + """Run async post-execution validation and return the legacy tuple. + + Args: + ctx: Execution context for the governed operation. + output_data: Output payload to validate after execution. + + Returns: + The legacy ``(valid, reason)`` tuple. """ - return self.post_execute(ctx, output_data) + + result = await self.async_post_execute_check(ctx, output_data) + return result.to_legacy_tuple() def on_signal(self, signal: str, handler: Callable[..., Any]) -> None: """Register a signal handler.""" @@ -1261,17 +1373,17 @@ def context(self) -> ExecutionContext: async def __call__(self, *args: Any, **kwargs: Any) -> Any: async with self._lock: # Pre-execution check - allowed, reason = await self._integration.async_pre_execute(self._ctx, (args, kwargs)) - if not allowed: - raise PolicyViolationError(reason or "Policy check failed") + pre_result = await self._integration.async_pre_execute_check(self._ctx, (args, kwargs)) + if not pre_result.allowed: + raise PolicyViolationError(pre_result.reason or "Policy check failed") # Execute the wrapped callable result = await self._fn(*args, **kwargs) # Post-execution validation - valid, reason = await self._integration.async_post_execute(self._ctx, result) - if not valid: - raise PolicyViolationError(reason or "Post-execution validation failed") + post_result = await self._integration.async_post_execute_check(self._ctx, result) + if not post_result.allowed: + raise PolicyViolationError(post_result.reason or "Post-execution validation failed") return result diff --git a/agent-governance-python/agent-os/src/agent_os/policies/__init__.py b/agent-governance-python/agent-os/src/agent_os/policies/__init__.py index cb67a3225..f0d212337 100644 --- a/agent-governance-python/agent-os/src/agent_os/policies/__init__.py +++ b/agent-governance-python/agent-os/src/agent_os/policies/__init__.py @@ -8,6 +8,12 @@ """ from .async_evaluator import AsyncPolicyEvaluator, ConcurrencyStats +from .backends import ( + BackendDecision, + CedarBackend, + ExternalPolicyBackend, + OPABackend, +) from .bridge import document_to_governance, governance_to_document from .conflict_resolution import ( CandidateDecision, @@ -16,12 +22,7 @@ PolicyScope, ResolutionResult, ) -from .backends import ( - BackendDecision, - CedarBackend, - ExternalPolicyBackend, - OPABackend, -) +from .decision import PolicyCheckResult, ViolationCategory from .evaluator import PolicyDecision, PolicyEvaluator from .rate_limiting import RateLimitConfig, RateLimitExceeded, TokenBucket from .schema import ( @@ -53,6 +54,7 @@ "ExternalPolicyBackend", "OPABackend", "PolicyAction", + "PolicyCheckResult", "PolicyCondition", "PolicyConflictResolver", "PolicyDecision", @@ -66,6 +68,7 @@ "RateLimitExceeded", "ResolutionResult", "TokenBucket", + "ViolationCategory", "SharedPolicyDecision", "SharedPolicyEvaluator", "SharedPolicyRule", diff --git a/agent-governance-python/agent-os/src/agent_os/policies/decision.py b/agent-governance-python/agent-os/src/agent_os/policies/decision.py new file mode 100644 index 000000000..42c6ef049 --- /dev/null +++ b/agent-governance-python/agent-os/src/agent_os/policies/decision.py @@ -0,0 +1,79 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Structured policy check decisions for integration-layer governance.""" + +from __future__ import annotations + +from enum import Enum +from typing import Any + +from pydantic import BaseModel, Field + + +class ViolationCategory(str, Enum): + """Categories for integration-layer policy violations.""" + + BLOCKED_TOOL = "blocked_tool" + NOT_ALLOWED_TOOL = "not_allowed_tool" + BLOCKED_PATTERN_INPUT = "blocked_pattern_input" + BLOCKED_PATTERN_TOOL = "blocked_pattern_tool" + BLOCKED_PATTERN_OUTPUT = "blocked_pattern_output" + BLOCKED_PATTERN_MEMORY = "blocked_pattern_memory" + MAX_TOOL_CALLS = "max_tool_calls" + TIMEOUT = "timeout" + HUMAN_APPROVAL = "human_approval_required" + CONFIDENCE_THRESHOLD = "confidence_threshold" + DRIFT = "drift" + POLICY_ERROR = "policy_error" + + +class PolicyCheckResult(BaseModel): + """Structured result of an integration-layer policy check. + + Attributes: + allowed: Whether the policy check allows the operation. + action: Policy action associated with the result. + category: Optional structured violation category. + matched_rule: Optional declarative or legacy rule identifier. + public_message: Sanitized message safe for end users. + detail: Restricted details for logs and audits. + reason: Legacy free-form reason returned by tuple wrappers. + matched_pattern: Policy pattern that matched, when applicable. + matched_text: User text span that matched; omitted by default. + scope: Scope checked by the policy decision. + operation: Adapter-defined operation name. + tool_name: Tool name associated with the result. + index: Batch index associated with the result. + audit_entry: Additional structured audit metadata. + """ + + allowed: bool = True + action: str = "allow" + category: ViolationCategory | None = None + matched_rule: str | None = None + public_message: str = "" + detail: str = "" + reason: str = "" + matched_pattern: str | None = None + matched_text: str | None = None + scope: str | None = None + operation: str | None = None + tool_name: str | None = None + index: int | None = None + audit_entry: dict[str, Any] = Field(default_factory=dict) + + def to_legacy_tuple(self) -> tuple[bool, str | None]: + """Convert this result to the legacy ``(allowed, reason)`` tuple.""" + + return self.allowed, (None if self.allowed else (self.reason or self.detail)) + + def to_public_dict(self) -> dict[str, Any]: + """Return a sanitized public representation of this result.""" + + return { + "allowed": self.allowed, + "action": self.action, + "category": self.category.value if self.category else None, + "matched_rule": self.matched_rule, + "public_message": self.public_message, + } diff --git a/agent-governance-python/agent-os/src/agent_os/policies/decision_factory.py b/agent-governance-python/agent-os/src/agent_os/policies/decision_factory.py new file mode 100644 index 000000000..37d55cb32 --- /dev/null +++ b/agent-governance-python/agent-os/src/agent_os/policies/decision_factory.py @@ -0,0 +1,267 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Factories for structured policy-denial decisions.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from .decision import PolicyCheckResult, ViolationCategory + +_PUBLIC_MESSAGES: dict[ViolationCategory, str] = { + ViolationCategory.BLOCKED_PATTERN_INPUT: "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_TOOL: "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_OUTPUT: "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_MEMORY: "Content blocked by governance policy.", + ViolationCategory.NOT_ALLOWED_TOOL: "Tool not permitted by governance policy.", + ViolationCategory.BLOCKED_TOOL: "Tool blocked by governance policy.", + ViolationCategory.MAX_TOOL_CALLS: "Tool-call limit exceeded.", + ViolationCategory.TIMEOUT: "Execution timeout exceeded.", + ViolationCategory.HUMAN_APPROVAL: "Human approval required.", + ViolationCategory.CONFIDENCE_THRESHOLD: "Confidence below required threshold.", + ViolationCategory.POLICY_ERROR: "Policy evaluation error.", + ViolationCategory.DRIFT: "Behavioral drift detected.", +} + + +def _deny_result( + *, + category: ViolationCategory, + detail: str, + matched_rule: str | None = None, + matched_pattern: str | None = None, + matched_text: str | None = None, + scope: str | None = None, + operation: str | None = None, + tool_name: str | None = None, + index: int | None = None, + audit_entry: dict[str, Any] | None = None, +) -> PolicyCheckResult: + """Build a common denied policy-check result.""" + + return PolicyCheckResult( + allowed=False, + action="deny", + category=category, + matched_rule=matched_rule, + public_message=_PUBLIC_MESSAGES[category], + detail=detail, + reason=detail, + matched_pattern=matched_pattern, + matched_text=matched_text, + scope=scope, + operation=operation, + tool_name=tool_name, + index=index, + audit_entry=audit_entry or {}, + ) + + +def deny_blocked_pattern_input( + matched_pattern: str, + matched_text: str | None = None, + *, + rule_name: str | None = None, + redact_user_text: bool = True, +) -> PolicyCheckResult: + """Return a denial for an input blocked-pattern match.""" + + detail = f"Blocked pattern detected: {matched_pattern}" + return _deny_result( + category=ViolationCategory.BLOCKED_PATTERN_INPUT, + detail=detail, + matched_rule=rule_name, + matched_pattern=matched_pattern, + matched_text=None if redact_user_text else matched_text, + scope="input", + audit_entry={"matched_pattern": matched_pattern}, + ) + + +def deny_blocked_pattern_tool( + matched_pattern: str, + *, + tool_name: str | None = None, + scope_label: str = "tool", + redact_user_text: bool = True, + matched_text: str | None = None, +) -> PolicyCheckResult: + """Return a denial for a tool-argument blocked-pattern match.""" + + if tool_name: + detail = f"Blocked pattern '{matched_pattern}' detected in tool '{tool_name}' arguments" + else: + detail = f"Blocked pattern '{matched_pattern}' detected in tool arguments" + return _deny_result( + category=ViolationCategory.BLOCKED_PATTERN_TOOL, + detail=detail, + matched_pattern=matched_pattern, + matched_text=None if redact_user_text else matched_text, + scope=scope_label, + tool_name=tool_name, + audit_entry={"matched_pattern": matched_pattern}, + ) + + +def deny_blocked_pattern_output( + matched_pattern: str, + *, + redact_user_text: bool = True, + matched_text: str | None = None, +) -> PolicyCheckResult: + """Return a denial for an output blocked-pattern match.""" + + detail = f"Blocked pattern detected in output: {matched_pattern}" + return _deny_result( + category=ViolationCategory.BLOCKED_PATTERN_OUTPUT, + detail=detail, + matched_pattern=matched_pattern, + matched_text=None if redact_user_text else matched_text, + scope="output", + audit_entry={"matched_pattern": matched_pattern}, + ) + + +def deny_blocked_pattern_memory( + matched_pattern: str, + *, + redact_user_text: bool = True, + matched_text: str | None = None, +) -> PolicyCheckResult: + """Return a denial for a memory-write blocked-pattern match.""" + + detail = f"Memory write blocked: blocked pattern '{matched_pattern}' detected" + return _deny_result( + category=ViolationCategory.BLOCKED_PATTERN_MEMORY, + detail=detail, + matched_pattern=matched_pattern, + matched_text=None if redact_user_text else matched_text, + scope="memory", + audit_entry={"matched_pattern": matched_pattern}, + ) + + +def deny_blocked_tool(tool_name: str) -> PolicyCheckResult: + """Return a denial for a tool that matches a blocked tool pattern.""" + + detail = f"Tool '{tool_name}' matches blocked pattern" + return _deny_result( + category=ViolationCategory.BLOCKED_TOOL, + detail=detail, + scope="tool", + tool_name=tool_name, + audit_entry={"tool_name": tool_name}, + ) + + +def deny_not_allowed_tool(tool_name: str, allowed: Sequence[str]) -> PolicyCheckResult: + """Return a denial for a tool outside the configured allow-list.""" + + detail = f"Tool '{tool_name}' not in allowed list: {allowed}" + return _deny_result( + category=ViolationCategory.NOT_ALLOWED_TOOL, + detail=detail, + scope="tool", + tool_name=tool_name, + audit_entry={"allowed_tools": list(allowed), "tool_name": tool_name}, + ) + + +def deny_max_tool_calls(limit: int, current: int | None = None) -> PolicyCheckResult: + """Return a denial for exceeding the configured tool-call limit.""" + + detail = f"Max tool calls exceeded ({limit})" + audit_entry: dict[str, Any] = {"limit": limit} + if current is not None: + audit_entry["current"] = current + return _deny_result( + category=ViolationCategory.MAX_TOOL_CALLS, + detail=detail, + scope="tool", + audit_entry=audit_entry, + ) + + +def deny_timeout(limit_s: int | float, elapsed_s: int | float | None = None) -> PolicyCheckResult: + """Return a denial for exceeding the configured execution timeout.""" + + detail = f"Timeout exceeded ({limit_s}s)" + audit_entry: dict[str, Any] = {"limit_s": limit_s} + if elapsed_s is not None: + audit_entry["elapsed_s"] = elapsed_s + return _deny_result( + category=ViolationCategory.TIMEOUT, + detail=detail, + audit_entry=audit_entry, + ) + + +def deny_human_approval(tool_name: str | None = None) -> PolicyCheckResult: + """Return a denial for a human-approval requirement.""" + + if tool_name: + detail = f"Tool '{tool_name}' requires human approval per governance policy" + else: + detail = "Execution requires human approval per governance policy" + return _deny_result( + category=ViolationCategory.HUMAN_APPROVAL, + detail=detail, + scope="tool" if tool_name else None, + tool_name=tool_name, + audit_entry={"tool_name": tool_name} if tool_name else {}, + ) + + +def deny_confidence_threshold( + threshold: float, + observed: float | None = None, +) -> PolicyCheckResult: + """Return a denial for a confidence score below the configured threshold.""" + + if observed is None: + detail = f"Confidence below threshold of {threshold}" + else: + detail = f"Confidence {observed:.2f} below threshold {threshold:.2f}" + audit_entry: dict[str, Any] = {"threshold": threshold} + if observed is not None: + audit_entry["observed"] = observed + return _deny_result( + category=ViolationCategory.CONFIDENCE_THRESHOLD, + detail=detail, + audit_entry=audit_entry, + ) + + +def deny_policy_error(detail: str, *, matched_rule: str | None = None) -> PolicyCheckResult: + """Return a denial for a policy evaluation error or fail-closed decision.""" + + return _deny_result( + category=ViolationCategory.POLICY_ERROR, + detail=detail, + matched_rule=matched_rule, + audit_entry={"matched_rule": matched_rule} if matched_rule else {}, + ) + + +def deny_drift( + score: float, + threshold: float, + *, + baseline_hash: str | None = None, + current_hash: str | None = None, +) -> PolicyCheckResult: + """Return a denial for behavioral drift above the configured threshold.""" + + detail = f"Drift score {score:.2f} exceeds threshold {threshold:.2f}" + audit_entry: dict[str, Any] = {"drift_score": score, "threshold": threshold} + if baseline_hash is not None: + audit_entry["baseline_hash"] = baseline_hash + if current_hash is not None: + audit_entry["current_hash"] = current_hash + return _deny_result( + category=ViolationCategory.DRIFT, + detail=detail, + scope="output", + audit_entry=audit_entry, + ) diff --git a/agent-governance-python/agent-os/tests/test_legacy_constructors.py b/agent-governance-python/agent-os/tests/test_legacy_constructors.py new file mode 100644 index 000000000..2c77a1feb --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_legacy_constructors.py @@ -0,0 +1,49 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for legacy exception constructor compatibility.""" + +from __future__ import annotations + +import pytest + +from agent_os.exceptions import PolicyViolationError + +CONSTRUCTOR_CASES = [ + ("message_only", ("msg",), {}, "POLICY_VIOLATION", {}), + ("message_and_code", ("msg", "CUSTOM_CODE"), {}, "CUSTOM_CODE", {}), + ("message_code_details", ("msg", "CUSTOM", {"k": "v"}), {}, "CUSTOM", {"k": "v"}), + ( + "keyword_arguments", + (), + {"message": "msg", "error_code": "X", "details": {"y": 1}}, + "X", + {"y": 1}, + ), +] + + +class TestLegacyConstructors: + """Verify legacy PolicyViolationError construction remains supported.""" + + @pytest.mark.parametrize( + ("case_name", "args", "kwargs", "expected_code", "expected_details"), + CONSTRUCTOR_CASES, + ids=[case[0] for case in CONSTRUCTOR_CASES], + ) + def test_policy_violation_error_legacy_forms( + self, + case_name: str, + args: tuple[object, ...], + kwargs: dict[str, object], + expected_code: str, + expected_details: dict[str, object], + ) -> None: + error = PolicyViolationError(*args, **kwargs) + data = error.to_dict() + + assert case_name + assert error.check_result is None + assert error.error_code == expected_code + assert error.details == expected_details + assert data["error"] == expected_code + assert data["message"] == "msg" diff --git a/agent-governance-python/agent-os/tests/test_policy_check_result_no_leak.py b/agent-governance-python/agent-os/tests/test_policy_check_result_no_leak.py new file mode 100644 index 000000000..e99e6a4fb --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_policy_check_result_no_leak.py @@ -0,0 +1,224 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for sanitized policy-check decisions.""" + +from __future__ import annotations + +from collections.abc import Callable + +import pytest + +from agent_os.policies.decision import PolicyCheckResult, ViolationCategory +from agent_os.policies.decision_factory import ( + deny_blocked_pattern_input, + deny_blocked_pattern_memory, + deny_blocked_pattern_output, + deny_blocked_pattern_tool, + deny_blocked_tool, + deny_confidence_threshold, + deny_drift, + deny_human_approval, + deny_max_tool_calls, + deny_not_allowed_tool, + deny_policy_error, + deny_timeout, +) + +FactoryBuilder = Callable[[], PolicyCheckResult] + +PUBLIC_KEYS = {"allowed", "action", "category", "matched_rule", "public_message"} + +FACTORY_CASES: list[tuple[str, FactoryBuilder, str, ViolationCategory, tuple[str, ...]]] = [ + ( + "blocked_pattern_input", + lambda: deny_blocked_pattern_input("secret-input-pattern", "secret input text"), + "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_INPUT, + ("secret-input-pattern",), + ), + ( + "blocked_pattern_tool", + lambda: deny_blocked_pattern_tool( + "secret-tool-pattern", tool_name="danger_tool", matched_text="secret tool text" + ), + "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_TOOL, + ("secret-tool-pattern", "danger_tool"), + ), + ( + "blocked_pattern_output", + lambda: deny_blocked_pattern_output("secret-output-pattern", matched_text="secret output"), + "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_OUTPUT, + ("secret-output-pattern",), + ), + ( + "blocked_pattern_memory", + lambda: deny_blocked_pattern_memory("secret-memory-pattern", matched_text="secret memory"), + "Content blocked by governance policy.", + ViolationCategory.BLOCKED_PATTERN_MEMORY, + ("secret-memory-pattern",), + ), + ( + "blocked_tool", + lambda: deny_blocked_tool("danger_tool"), + "Tool blocked by governance policy.", + ViolationCategory.BLOCKED_TOOL, + ("danger_tool",), + ), + ( + "not_allowed_tool", + lambda: deny_not_allowed_tool("danger_tool", ["read_file", "write_file"]), + "Tool not permitted by governance policy.", + ViolationCategory.NOT_ALLOWED_TOOL, + ("danger_tool", "read_file", "write_file"), + ), + ( + "max_tool_calls", + lambda: deny_max_tool_calls(5, current=6), + "Tool-call limit exceeded.", + ViolationCategory.MAX_TOOL_CALLS, + ("5", "6"), + ), + ( + "timeout", + lambda: deny_timeout(30, elapsed_s=31), + "Execution timeout exceeded.", + ViolationCategory.TIMEOUT, + ("30", "31"), + ), + ( + "human_approval", + lambda: deny_human_approval("approval_tool"), + "Human approval required.", + ViolationCategory.HUMAN_APPROVAL, + ("approval_tool",), + ), + ( + "confidence_threshold", + lambda: deny_confidence_threshold(0.9, observed=0.42), + "Confidence below required threshold.", + ViolationCategory.CONFIDENCE_THRESHOLD, + ("0.9", "0.42"), + ), + ( + "policy_error", + lambda: deny_policy_error("policy detail includes policy-secret", matched_rule="rule-17"), + "Policy evaluation error.", + ViolationCategory.POLICY_ERROR, + ("policy-secret", "17"), + ), + ( + "drift", + lambda: deny_drift( + 0.91, + 0.7, + baseline_hash="baseline-secret-hash", + current_hash="current-secret-hash", + ), + "Behavioral drift detected.", + ViolationCategory.DRIFT, + ("0.91", "0.7", "baseline-secret-hash", "current-secret-hash"), + ), +] + +REDACTABLE_PATTERN_CASES: list[tuple[str, Callable[..., PolicyCheckResult]]] = [ + ("blocked_pattern_input", deny_blocked_pattern_input), + ("blocked_pattern_tool", deny_blocked_pattern_tool), + ("blocked_pattern_output", deny_blocked_pattern_output), + ("blocked_pattern_memory", deny_blocked_pattern_memory), +] + + +class TestPolicyCheckResultNoLeak: + """Verify policy decisions separate public text from audit detail.""" + + @pytest.mark.parametrize( + ("case_name", "builder", "public_message", "expected_category", "sensitive_values"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_factory_public_message_is_sanitized( + self, + case_name: str, + builder: FactoryBuilder, + public_message: str, + expected_category: ViolationCategory, + sensitive_values: tuple[str, ...], + ) -> None: + result = builder() + + assert case_name + assert result.public_message == public_message + assert result.action == "deny" + assert result.allowed is False + assert result.category is expected_category + for sensitive in sensitive_values: + assert sensitive not in result.public_message + assert sensitive in f"{result.detail} {result.matched_pattern} {result.audit_entry}" + assert result.matched_text is None + + @pytest.mark.parametrize( + ("case_name", "builder", "public_message", "expected_category", "sensitive_values"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_public_dict_omits_restricted_fields( + self, + case_name: str, + builder: FactoryBuilder, + public_message: str, + expected_category: ViolationCategory, + sensitive_values: tuple[str, ...], + ) -> None: + result = builder() + public_dict = result.to_public_dict() + + assert case_name + assert public_message + assert expected_category + assert sensitive_values + assert set(public_dict) == PUBLIC_KEYS + assert "detail" not in public_dict + assert "matched_pattern" not in public_dict + assert "matched_text" not in public_dict + assert "audit_entry" not in public_dict + + @pytest.mark.parametrize( + ("case_name", "builder", "public_message", "expected_category", "sensitive_values"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_model_dump_round_trips( + self, + case_name: str, + builder: FactoryBuilder, + public_message: str, + expected_category: ViolationCategory, + sensitive_values: tuple[str, ...], + ) -> None: + result = builder() + roundtripped = PolicyCheckResult.model_validate(result.model_dump()) + + assert case_name + assert public_message + assert expected_category + assert sensitive_values + assert roundtripped == result + + @pytest.mark.parametrize( + ("case_name", "factory"), + REDACTABLE_PATTERN_CASES, + ids=[case[0] for case in REDACTABLE_PATTERN_CASES], + ) + def test_redactable_factories_preserve_text_only_when_requested( + self, + case_name: str, + factory: Callable[..., PolicyCheckResult], + ) -> None: + text = f"matched text for {case_name}" + redacted = factory("secret-pattern", matched_text=text) + unredacted = factory("secret-pattern", matched_text=text, redact_user_text=False) + + assert redacted.matched_text is None + assert unredacted.matched_text == text diff --git a/agent-governance-python/agent-os/tests/test_policy_violation_error_safety.py b/agent-governance-python/agent-os/tests/test_policy_violation_error_safety.py new file mode 100644 index 000000000..679bfad66 --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_policy_violation_error_safety.py @@ -0,0 +1,151 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for safe policy-violation exception messages.""" + +from __future__ import annotations + +import pickle +from collections.abc import Callable + +import pytest + +from agent_os.exceptions import PolicyViolationError +from agent_os.integrations.base import PatternType +from agent_os.policies.decision import PolicyCheckResult +from agent_os.policies.decision_factory import ( + deny_blocked_pattern_input, + deny_blocked_pattern_memory, + deny_blocked_pattern_output, + deny_blocked_pattern_tool, + deny_blocked_tool, + deny_confidence_threshold, + deny_drift, + deny_human_approval, + deny_max_tool_calls, + deny_not_allowed_tool, + deny_policy_error, + deny_timeout, +) + +FactoryBuilder = Callable[[], PolicyCheckResult] + +FACTORY_CASES: list[tuple[str, FactoryBuilder, str]] = [ + ( + "blocked_pattern_input", + lambda: deny_blocked_pattern_input("secret-input-pattern", "secret input text"), + "secret-input-pattern", + ), + ( + "blocked_pattern_tool", + lambda: deny_blocked_pattern_tool("secret-tool-pattern", tool_name="danger_tool"), + "secret-tool-pattern", + ), + ( + "blocked_pattern_output", + lambda: deny_blocked_pattern_output("secret-output-pattern"), + "secret-output-pattern", + ), + ( + "blocked_pattern_memory", + lambda: deny_blocked_pattern_memory("secret-memory-pattern"), + "secret-memory-pattern", + ), + ("blocked_tool", lambda: deny_blocked_tool("danger_tool"), "danger_tool"), + ( + "not_allowed_tool", + lambda: deny_not_allowed_tool("danger_tool", ["read_file", "write_file"]), + "read_file", + ), + ("max_tool_calls", lambda: deny_max_tool_calls(5, current=6), "5"), + ("timeout", lambda: deny_timeout(30, elapsed_s=31), "30"), + ("human_approval", lambda: deny_human_approval("approval_tool"), "approval_tool"), + ("confidence_threshold", lambda: deny_confidence_threshold(0.9, observed=0.42), "0.90"), + ( + "policy_error", + lambda: deny_policy_error("policy detail includes policy-secret"), + "policy-secret", + ), + ("drift", lambda: deny_drift(0.91, 0.7), "0.91"), +] + +PROPERTY_CASES: list[tuple[str, FactoryBuilder, tuple[str, ...]]] = [ + ( + "allowed_tools", + lambda: deny_not_allowed_tool("delete_file", ["read_file", "write_file"]), + ("delete_file", "read_file", "write_file"), + ), + ( + "blocked_patterns", + lambda: deny_blocked_pattern_input(r"\b\d{3}-\d{2}-\d{4}\b"), + (r"\b\d{3}-\d{2}-\d{4}\b", PatternType.REGEX.value), + ), + ("blocked_tool", lambda: deny_blocked_tool("shell_exec"), ("shell_exec",)), + ("max_tool_calls", lambda: deny_max_tool_calls(5), ("5",)), + ("timeout_seconds", lambda: deny_timeout(30), ("30",)), +] + + +class TestPolicyViolationErrorSafety: + """Verify exceptions expose sanitized messages and retain audit details.""" + + @pytest.mark.parametrize( + ("case_name", "builder", "detail_value"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_from_check_result_public_message_is_safe( + self, + case_name: str, + builder: FactoryBuilder, + detail_value: str, + ) -> None: + result = builder() + + with pytest.raises(PolicyViolationError) as exc_info: + raise PolicyViolationError.from_check_result(result) + + error = exc_info.value + assert case_name + assert str(error) == result.public_message + assert detail_value not in str(error) + assert detail_value not in repr(error) + assert detail_value in error.details["detail"] + assert error.check_result is result + + @pytest.mark.parametrize( + ("case_name", "builder", "detail_value"), + FACTORY_CASES, + ids=[case[0] for case in FACTORY_CASES], + ) + def test_pickle_round_trip_preserves_public_message_safety( + self, + case_name: str, + builder: FactoryBuilder, + detail_value: str, + ) -> None: + error = PolicyViolationError.from_check_result(builder()) + roundtripped = pickle.loads(pickle.dumps(error)) + + # The attached check_result is diagnostic state; public-message safety is the contract. + assert case_name + assert str(roundtripped) == str(error) + assert detail_value not in str(roundtripped) + assert detail_value not in repr(roundtripped) + + @pytest.mark.parametrize( + ("case_name", "builder", "forbidden_values"), + PROPERTY_CASES, + ids=[case[0] for case in PROPERTY_CASES], + ) + def test_synthetic_policy_values_do_not_leak_to_exception_message( + self, + case_name: str, + builder: FactoryBuilder, + forbidden_values: tuple[str, ...], + ) -> None: + error = PolicyViolationError.from_check_result(builder()) + + assert case_name + for forbidden in forbidden_values: + assert forbidden not in str(error) + assert forbidden not in repr(error) diff --git a/agent-governance-python/agent-os/tests/test_pre_execute_check_contract.py b/agent-governance-python/agent-os/tests/test_pre_execute_check_contract.py new file mode 100644 index 000000000..d86515c5a --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_pre_execute_check_contract.py @@ -0,0 +1,199 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for structured integration policy-check wrapper contracts.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any + +import pytest + +from agent_os.integrations.base import ( + BaseIntegration, + ExecutionContext, + GovernancePolicy, +) +from agent_os.policies.decision import PolicyCheckResult, ViolationCategory +from agent_os.policies.decision_factory import ( + deny_blocked_pattern_input, + deny_human_approval, + deny_max_tool_calls, + deny_not_allowed_tool, + deny_timeout, +) + + +class _TestKernel(BaseIntegration): + """Minimal concrete integration for policy contract tests.""" + + def wrap(self, agent: Any) -> Any: + return agent + + def unwrap(self, governed_agent: Any) -> Any: + return governed_agent + + +@dataclass +class _LowConfidenceInput: + """Input object carrying confidence metadata.""" + + value: str + confidence: float + + +@dataclass(frozen=True) +class _PreCase: + """Fixture data for a pre-execution denial.""" + + name: str + policy_factory: Callable[[], GovernancePolicy] + context_mutator: Callable[[ExecutionContext], None] + input_factory: Callable[[], Any] + expected_category: ViolationCategory + + +PRE_DENIAL_CASES = [ + _PreCase( + name="max_tool_calls", + policy_factory=lambda: GovernancePolicy(max_tool_calls=5), + context_mutator=lambda ctx: setattr(ctx, "call_count", 5), + input_factory=lambda: "safe input", + expected_category=ViolationCategory.MAX_TOOL_CALLS, + ), + _PreCase( + name="timeout", + policy_factory=lambda: GovernancePolicy(timeout_seconds=1), + context_mutator=lambda ctx: setattr(ctx, "start_time", datetime.now() - timedelta(seconds=2)), + input_factory=lambda: "safe input", + expected_category=ViolationCategory.TIMEOUT, + ), + _PreCase( + name="blocked_pattern", + policy_factory=lambda: GovernancePolicy(blocked_patterns=["bar"]), + context_mutator=lambda ctx: None, + input_factory=lambda: "foo bar baz", + expected_category=ViolationCategory.BLOCKED_PATTERN_INPUT, + ), + _PreCase( + name="human_approval", + policy_factory=lambda: GovernancePolicy(require_human_approval=True), + context_mutator=lambda ctx: None, + input_factory=lambda: "safe input", + expected_category=ViolationCategory.HUMAN_APPROVAL, + ), + _PreCase( + name="confidence_threshold", + policy_factory=lambda: GovernancePolicy(confidence_threshold=0.9), + context_mutator=lambda ctx: None, + input_factory=lambda: _LowConfidenceInput("risky action", 0.2), + expected_category=ViolationCategory.CONFIDENCE_THRESHOLD, + ), +] + +LEGACY_REASON_CASES = [ + ("max_tool_calls", lambda: deny_max_tool_calls(5).reason, "Max tool calls exceeded (5)"), + ("timeout", lambda: deny_timeout(0.1).reason, "Timeout exceeded (0.1s)"), + ( + "blocked_pattern_input", + lambda: deny_blocked_pattern_input("bar", "foo bar baz").reason, + "Blocked pattern detected: bar", + ), + ( + "human_approval", + lambda: deny_human_approval().reason, + "Execution requires human approval per governance policy", + ), + ( + "not_allowed_tool", + lambda: deny_not_allowed_tool("c", ["a", "b"]).reason, + "Tool 'c' not in allowed list: ['a', 'b']", + ), +] + + +def _kernel_and_context(case: _PreCase) -> tuple[_TestKernel, ExecutionContext, Any]: + kernel = _TestKernel(policy=case.policy_factory()) + ctx = kernel.create_context(f"agent-{case.name}") + case.context_mutator(ctx) + return kernel, ctx, case.input_factory() + + +class TestPreExecuteCheckContract: + """Verify structured pre-checks match legacy pre-execute wrappers.""" + + @pytest.mark.parametrize("case", PRE_DENIAL_CASES, ids=[case.name for case in PRE_DENIAL_CASES]) + def test_pre_execute_check_returns_structured_denial(self, case: _PreCase) -> None: + kernel, ctx, input_data = _kernel_and_context(case) + + result = kernel.pre_execute_check(ctx, input_data) + + assert isinstance(result, PolicyCheckResult) + assert result.allowed is False + assert result.category is case.expected_category + assert result.to_legacy_tuple() == kernel.pre_execute(ctx, input_data) + + @pytest.mark.parametrize("case", PRE_DENIAL_CASES, ids=[case.name for case in PRE_DENIAL_CASES]) + async def test_async_pre_execute_check_returns_structured_denial(self, case: _PreCase) -> None: + kernel, ctx, input_data = _kernel_and_context(case) + + result = await kernel.async_pre_execute_check(ctx, input_data) + + assert isinstance(result, PolicyCheckResult) + assert result.allowed is False + assert result.category is case.expected_category + assert result.to_legacy_tuple() == await kernel.async_pre_execute(ctx, input_data) + + +class TestPostExecuteCheckContract: + """Verify structured post-checks match legacy post-execute wrappers.""" + + @pytest.mark.parametrize("case", PRE_DENIAL_CASES, ids=[case.name for case in PRE_DENIAL_CASES]) + def test_post_execute_check_matches_legacy_tuple_for_same_policies( + self, + case: _PreCase, + ) -> None: + check_kernel, check_ctx, _ = _kernel_and_context(case) + legacy_kernel, legacy_ctx, _ = _kernel_and_context(case) + + result = check_kernel.post_execute_check(check_ctx, "safe output") + legacy_tuple = legacy_kernel.post_execute(legacy_ctx, "safe output") + + assert isinstance(result, PolicyCheckResult) + assert result.to_legacy_tuple() == legacy_tuple + assert check_ctx.call_count == legacy_ctx.call_count + + @pytest.mark.parametrize("case", PRE_DENIAL_CASES, ids=[case.name for case in PRE_DENIAL_CASES]) + async def test_async_post_execute_check_matches_legacy_tuple_for_same_policies( + self, + case: _PreCase, + ) -> None: + check_kernel, check_ctx, _ = _kernel_and_context(case) + legacy_kernel, legacy_ctx, _ = _kernel_and_context(case) + + result = await check_kernel.async_post_execute_check(check_ctx, "safe output") + legacy_tuple = await legacy_kernel.async_post_execute(legacy_ctx, "safe output") + + assert isinstance(result, PolicyCheckResult) + assert result.to_legacy_tuple() == legacy_tuple + assert check_ctx.call_count == legacy_ctx.call_count + + +class TestLegacyReasonSnapshots: + """Pin byte-identical legacy denial reason strings.""" + + @pytest.mark.parametrize( + ("case_name", "reason_factory", "expected_reason"), + LEGACY_REASON_CASES, + ids=[case[0] for case in LEGACY_REASON_CASES], + ) + def test_factory_reason_matches_legacy_snapshot( + self, + case_name: str, + reason_factory: Callable[[], str], + expected_reason: str, + ) -> None: + assert case_name + assert reason_factory() == expected_reason diff --git a/agent-governance-python/agent-os/tests/test_public_api_surface.py b/agent-governance-python/agent-os/tests/test_public_api_surface.py new file mode 100644 index 000000000..b5069bd6a --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_public_api_surface.py @@ -0,0 +1,104 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests that pin public API signatures touched by policy decisions.""" + +from __future__ import annotations + +import inspect +from collections.abc import Callable +from typing import Any + +import pytest + +from agent_os import exceptions +from agent_os.integrations.base import BaseIntegration +from agent_os.policies import decision, decision_factory + +SIGNATURE_CASES: list[tuple[str, Callable[..., Any], str]] = [ + ( + "PolicyViolationError.__init__", + exceptions.PolicyViolationError.__init__, + "(self, message, error_code=None, details=None)", + ), + ( + "PolicyError.__init__", + exceptions.PolicyError.__init__, + "(self, message, error_code=None, details=None)", + ), + ( + "AgentOSError.__init__", + exceptions.AgentOSError.__init__, + "(self, message, error_code=None, details=None)", + ), + ( + "BaseIntegration.pre_execute", + BaseIntegration.pre_execute, + "(self, ctx: 'ExecutionContext', input_data: 'Any') -> 'tuple[bool, str | None]'", + ), + ( + "BaseIntegration.post_execute", + BaseIntegration.post_execute, + "(self, ctx: 'ExecutionContext', output_data: 'Any') -> 'tuple[bool, str | None]'", + ), + ( + "BaseIntegration.async_pre_execute", + BaseIntegration.async_pre_execute, + "(self, ctx: 'ExecutionContext', input_data: 'Any') -> 'tuple[bool, str | None]'", + ), + ( + "BaseIntegration.async_post_execute", + BaseIntegration.async_post_execute, + "(self, ctx: 'ExecutionContext', output_data: 'Any') -> 'tuple[bool, str | None]'", + ), +] + +ASYNC_METHODS = [ + BaseIntegration.async_pre_execute, + BaseIntegration.async_post_execute, +] + +NEW_FACTORY_NAMES = [ + "deny_blocked_pattern_input", + "deny_blocked_pattern_tool", + "deny_blocked_pattern_output", + "deny_blocked_pattern_memory", + "deny_blocked_tool", + "deny_not_allowed_tool", + "deny_max_tool_calls", + "deny_timeout", + "deny_human_approval", + "deny_confidence_threshold", + "deny_policy_error", + "deny_drift", +] + + +class TestPublicApiSurface: + """Pin legacy signatures and additive policy-decision symbols.""" + + @pytest.mark.parametrize( + ("symbol_name", "symbol", "expected_signature"), + SIGNATURE_CASES, + ids=[case[0] for case in SIGNATURE_CASES], + ) + def test_public_signatures_are_pinned( + self, + symbol_name: str, + symbol: Callable[..., Any], + expected_signature: str, + ) -> None: + assert symbol_name + assert str(inspect.signature(symbol)) == expected_signature + + @pytest.mark.parametrize("symbol", ASYNC_METHODS) + def test_async_wrappers_remain_coroutines(self, symbol: Callable[..., Any]) -> None: + assert inspect.iscoroutinefunction(symbol) + + @pytest.mark.parametrize("name", NEW_FACTORY_NAMES) + def test_new_decision_factories_exist(self, name: str) -> None: + assert callable(getattr(decision_factory, name)) + + def test_new_decision_symbols_exist(self) -> None: + assert inspect.isclass(decision.ViolationCategory) + assert inspect.isclass(decision.PolicyCheckResult) + assert callable(exceptions.PolicyViolationError.from_check_result) diff --git a/docs/adr/0011-additive-policy-check-contract.md b/docs/adr/0011-additive-policy-check-contract.md new file mode 100644 index 000000000..5e2c59a49 --- /dev/null +++ b/docs/adr/0011-additive-policy-check-contract.md @@ -0,0 +1,187 @@ +# ADR 0011: Additive Policy Check Contract + +- Status: Proposed +- Date: 2026-04-29 + +## Context + +AGT currently exposes policy outcomes through several independent contracts. The +same denial can appear as a declarative `PolicyDecision`, a legacy +`BaseIntegration` `(allowed, reason)` tuple, or an adapter-raised +`PolicyViolationError`. Those paths have different message shapes, different +exception identities, and different levels of detail. Declarative policy rules +can be safe when authors provide a user-facing message, but integration and +adapter paths often interpolate internal policy values such as regex patterns, +allow-lists, limits, or timeouts into strings that hosts may surface to users. + +The verified reproductions showed this split clearly. `PolicyEvaluator.evaluate` +can return a safe authored reason for an SSN-blocking rule, while +`BaseIntegration.pre_execute` returns a reason that includes the raw SSN regex. +Adapter-level tool checks raise yet another error string that also includes the +matched pattern and adds tool context. The result is both a disclosure issue and +a migration problem: hosts cannot reliably catch or classify governance denials +without substring-matching free-form English, and adapters duplicate their own +exception classes instead of using the canonical one. + +This ADR does not supersede ADR 0009. ADR 0009 covers RFC 9334 RATS architecture +alignment and remains independent of policy error sanitization. + +## Decision + +Add a structured, additive policy-check contract for integration-layer policy +outcomes while preserving every existing public shape. + +Introduce a new `agent_os.policies.decision` module containing +`ViolationCategory` and `PolicyCheckResult`. `ViolationCategory` is the stable, +typed denial taxonomy for hosts and adapters. `PolicyCheckResult` carries the +legacy allow/deny outcome plus separate fields for safe public text, restricted +detail, matched policy internals, optional matched user text, scope, operation, +tool name, and audit metadata. It also provides serializers for the two intended +views: `to_legacy_tuple()` for existing callers and `to_public_dict()` for host +responses. + +Introduce `agent_os.policies.decision_factory` as the only place integration +policy denial results are constructed. The factory set includes +`deny_blocked_pattern_input`, `deny_blocked_pattern_tool`, +`deny_blocked_pattern_output`, `deny_blocked_pattern_memory`, +`deny_blocked_tool`, `deny_not_allowed_tool`, `deny_max_tool_calls`, +`deny_timeout`, `deny_human_approval`, `deny_confidence_threshold`, and generic +policy-error builders. Each factory sets a fixed `public_message` keyed by +category and places sensitive values only in restricted fields such as `detail`, +`matched_pattern`, and `audit_entry`. + +Extend the canonical `agent_os.exceptions.PolicyViolationError` with an +additive `from_check_result` classmethod. The existing constructor forms remain +valid, `str(e)` remains based on `args[0]`, and legacy instances have +`e.check_result is None`. Exceptions created from a `PolicyCheckResult` surface +only the sanitized `public_message` through `str(e)` while retaining structured +details for server-side audit. + +Add opt-in `BaseIntegration` check methods beside the existing tuple methods: +`pre_execute_check`, `post_execute_check`, `async_pre_execute_check`, and +`async_post_execute_check`. These return `PolicyCheckResult`. Existing +`pre_execute`, `post_execute`, `async_pre_execute`, and `async_post_execute` +signatures and return types are preserved and are implemented as wrappers that +return `result.to_legacy_tuple()`. + +The change is purely additive. No existing signature, return type, import path, +exception constructor, declarative `PolicyDecision` field, or event payload is +removed or renamed. Legacy reason strings remain byte-identical for callers of +the tuple APIs, so existing tests and substring-based compatibility behavior +continue to work during the migration window. + +Adapter conversion is intentionally sequenced after the foundation. Each +adapter or surface should move in its own PR, replacing direct string-built +denials with `decision_factory` results and +`PolicyViolationError.from_check_result(...)`. Adapter-local +`PolicyViolationError` symbols become re-exports of the canonical class rather +than deleted symbols. A parametrized parity harness tracks every surface from +the discovery inventory, starts with expected failures, and removes each xfail +as one surface is converted. + +### Host Migration Guide + +Hosts should treat `str(e)` as the only end-user-safe message for +`PolicyViolationError`. For typed behavior, dispatch on +`e.check_result.category` when it is present. Hosts must not surface +`e.details["detail"]` to end users because it intentionally carries restricted +audit information, including the matched policy pattern, allow-list, limit, or +other internal value that caused the denial. + +Mixed-version hosts must tolerate older AGT versions and legacy exception +construction. If `e.check_result` is `None`, fall back to +`e.details.get("category")` when available, and then to the host's existing +substring matcher until all deployed adapters have moved to the structured +contract. + +Clients that still substring-match during the migration window can map old +message fragments to the new categories as follows: + +| Legacy message fragment | New `ViolationCategory` | +|---|---| +| `Blocked pattern detected` in input checks | `BLOCKED_PATTERN_INPUT` | +| `Blocked pattern` and `tool` / `arguments` | `BLOCKED_PATTERN_TOOL` | +| `matches blocked pattern` for tool names | `BLOCKED_TOOL` | +| `not in allowed list` | `NOT_ALLOWED_TOOL` | +| `Max tool calls exceeded` | `MAX_TOOL_CALLS` | +| `Timeout exceeded` | `TIMEOUT` | +| `requires human approval` | `HUMAN_APPROVAL` | +| `Memory write blocked` | `BLOCKED_PATTERN_MEMORY` | +| `Confidence below threshold` | `CONFIDENCE_THRESHOLD` | +| Any unclassified policy denial | `POLICY_ERROR` | + +Before, a host such as Zava sanitized by substring-matching free-form text: + +```python +except Exception as e: + msg = str(e) + if "blocked by policy" in msg or "Blocked pattern" in msg: + return "This action was blocked by governance policy." +``` + +After, it can dispatch on the structured category and surface the already-safe +exception text: + +```python +from agent_os.exceptions import PolicyViolationError +from agent_os.policies.decision import ViolationCategory + +try: + ... +except PolicyViolationError as e: + result = e.check_result + category = result.category if result else None + message = str(e) + if category == ViolationCategory.BLOCKED_PATTERN_TOOL: + route = "tool_policy_denial" + else: + route = "policy_denial" + sse_emit("tool_error", {"message": message, "category": category.value if category else None}) +``` + +Existing defense-in-depth sanitizers may remain during the migration. They +should become no-ops for converted adapters because the exception message is +already sanitized at the source. + +## Consequences + +Backward compatibility is preserved for legacy callers. Tuple-returning +integration methods keep their signatures and byte-identical reason strings, +legacy `PolicyViolationError(...)` construction keeps working, and declarative +policy evaluation remains unchanged. + +Converted surfaces eliminate policy-internal leaks by construction because +public messages are fixed by category and sensitive values live only in +restricted structured fields. Audit fidelity is preserved through `detail`, +`matched_pattern`, optional `matched_text`, and `audit_entry`, but those fields +are explicitly server-side data. + +Defense in depth still applies. Hosts can retain existing sanitizers while they +roll out typed dispatch, and server-side logging controls must still protect +restricted details. + +## Alternatives considered + +### Pure string sanitization + +Rejected. Sanitizing exception strings at the edge addresses the symptom but not +the cause. It leaves each adapter free to build unsafe strings and requires +consumers to keep guessing which substrings are policy internals. + +### Breaking-API unification + +Rejected. Replacing the tuple APIs, renaming existing `PolicyDecision` types, or +changing exception constructors would be cleaner in isolation but too costly for +existing hosts, adapters, and tests. The additive contract gets the safety +benefit without a flag day. + +### Host-side filters only + +Rejected. Requiring every consumer to maintain its own deny-message sanitizer +duplicates effort and produces inconsistent behavior. AGT should provide a safe +contract at the source while hosts keep filters only as defense in depth. + +## References + +- [Unify Policy Decisions plan](file:///C:/Users/eltonc/OneDrive%20-%20Microsoft/C%26E/AGT/.plans/agt-unify-policy-decisions.md) +- [Policy leak surface inventory](file:///C:/Users/eltonc/OneDrive%20-%20Microsoft/C%26E/AGT/.plans/agt-policy-leak-surface-inventory.md) diff --git a/docs/adr/index.md b/docs/adr/index.md index 70206dc05..db0b802b6 100644 --- a/docs/adr/index.md +++ b/docs/adr/index.md @@ -13,3 +13,4 @@ Key architectural decisions and their rationale. | [ADR-0007](0007-external-jwks-federation-for-cross-org-identity.md) | External JWKS federation for cross-org identity | | [ADR-0008](0008-cross-org-policy-federation.md) | Cross-org policy federation above identity | | [ADR-0009](0009-rfc-9334-rats-architecture-alignment.md) | RFC 9334 (RATS) architecture alignment | +| [ADR-0011](0011-additive-policy-check-contract.md) | Additive policy check contract | From 52f674caea4850030e8bfc7b7874c40b6aa873fb Mon Sep 17 00:00:00 2001 From: "Elton Carr, II" Date: Wed, 29 Apr 2026 15:10:36 -0700 Subject: [PATCH 2/7] chore(spell): add foundation PR terms to cspell dictionary Adds pytest, mypy, isort, pyupgrade, Pydantic, Docstrings, classmethod, xfail, Zava. --- .cspell-repo-terms.txt | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.cspell-repo-terms.txt b/.cspell-repo-terms.txt index bed6e4340..842d4e0da 100644 --- a/.cspell-repo-terms.txt +++ b/.cspell-repo-terms.txt @@ -84,3 +84,12 @@ sybil davidequarracino Marp Slidev +pytest +mypy +isort +pyupgrade +Pydantic +Docstrings +classmethod +xfail +Zava From 7b3f81806e2a1e69774c9f73556985ae65fcd39b Mon Sep 17 00:00:00 2001 From: "Elton Carr, II" Date: Wed, 29 Apr 2026 15:12:56 -0700 Subject: [PATCH 3/7] docs(adr): remove personal/internal refs from ADR 0011 Removes a host name from the migration example (uses generic phrasing) and drops local OneDrive file:/// links from the references section. Also removes the now-unused dictionary entry. --- .cspell-repo-terms.txt | 1 - docs/adr/0011-additive-policy-check-contract.md | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.cspell-repo-terms.txt b/.cspell-repo-terms.txt index 842d4e0da..b5efd5137 100644 --- a/.cspell-repo-terms.txt +++ b/.cspell-repo-terms.txt @@ -92,4 +92,3 @@ Pydantic Docstrings classmethod xfail -Zava diff --git a/docs/adr/0011-additive-policy-check-contract.md b/docs/adr/0011-additive-policy-check-contract.md index 5e2c59a49..a6f0afeeb 100644 --- a/docs/adr/0011-additive-policy-check-contract.md +++ b/docs/adr/0011-additive-policy-check-contract.md @@ -110,7 +110,7 @@ message fragments to the new categories as follows: | `Confidence below threshold` | `CONFIDENCE_THRESHOLD` | | Any unclassified policy denial | `POLICY_ERROR` | -Before, a host such as Zava sanitized by substring-matching free-form text: +Before, a host sanitized by substring-matching free-form text: ```python except Exception as e: @@ -183,5 +183,4 @@ contract at the source while hosts keep filters only as defense in depth. ## References -- [Unify Policy Decisions plan](file:///C:/Users/eltonc/OneDrive%20-%20Microsoft/C%26E/AGT/.plans/agt-unify-policy-decisions.md) -- [Policy leak surface inventory](file:///C:/Users/eltonc/OneDrive%20-%20Microsoft/C%26E/AGT/.plans/agt-policy-leak-surface-inventory.md) +- `agent-governance-python/agent-os/AGENTS.md` — opt-in snippet for adapter authors From 48a4ea54f4c5e018e0c5713ace65f7a0535b1575 Mon Sep 17 00:00:00 2001 From: "Elton Carr, II" Date: Wed, 29 Apr 2026 15:30:22 -0700 Subject: [PATCH 4/7] chore(policies): minimize __init__.py churn Restore the original ordering of pre-existing imports; the additive PR only inserts the single .decision line and two __all__ entries. The earlier reorder was an isort auto-fix not required by CI lint rules (which use --select E,F,W only). --- .../agent-os/src/agent_os/policies/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/agent-governance-python/agent-os/src/agent_os/policies/__init__.py b/agent-governance-python/agent-os/src/agent_os/policies/__init__.py index f0d212337..f27ca6cde 100644 --- a/agent-governance-python/agent-os/src/agent_os/policies/__init__.py +++ b/agent-governance-python/agent-os/src/agent_os/policies/__init__.py @@ -8,12 +8,6 @@ """ from .async_evaluator import AsyncPolicyEvaluator, ConcurrencyStats -from .backends import ( - BackendDecision, - CedarBackend, - ExternalPolicyBackend, - OPABackend, -) from .bridge import document_to_governance, governance_to_document from .conflict_resolution import ( CandidateDecision, @@ -22,6 +16,12 @@ PolicyScope, ResolutionResult, ) +from .backends import ( + BackendDecision, + CedarBackend, + ExternalPolicyBackend, + OPABackend, +) from .decision import PolicyCheckResult, ViolationCategory from .evaluator import PolicyDecision, PolicyEvaluator from .rate_limiting import RateLimitConfig, RateLimitExceeded, TokenBucket From e1a76e537fd6ab0d28bba8d2248a89d3e063a776 Mon Sep 17 00:00:00 2001 From: "Elton Carr, II" Date: Wed, 29 Apr 2026 15:44:54 -0700 Subject: [PATCH 5/7] chore(spell): scope spell-check fix to PR-introduced terms Reverts the global .cspell-repo-terms.txt additions. Reword classmethod and xfail in ADR 0011 (PR-introduced terms). Add an in-file cspell:ignore directive to AGENTS.md for pre-existing technical terms (pytest, mypy, isort, pyupgrade, Pydantic, Docstrings) that were inherited debt surfaced by editing the file. --- .cspell-repo-terms.txt | 8 -------- agent-governance-python/agent-os/AGENTS.md | 2 ++ docs/adr/0011-additive-policy-check-contract.md | 4 ++-- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/.cspell-repo-terms.txt b/.cspell-repo-terms.txt index b5efd5137..bed6e4340 100644 --- a/.cspell-repo-terms.txt +++ b/.cspell-repo-terms.txt @@ -84,11 +84,3 @@ sybil davidequarracino Marp Slidev -pytest -mypy -isort -pyupgrade -Pydantic -Docstrings -classmethod -xfail diff --git a/agent-governance-python/agent-os/AGENTS.md b/agent-governance-python/agent-os/AGENTS.md index 7b79b3854..7b5f9c1dc 100644 --- a/agent-governance-python/agent-os/AGENTS.md +++ b/agent-governance-python/agent-os/AGENTS.md @@ -1,5 +1,7 @@ # Agent-OS — Coding Agent Instructions + + ## Project Overview Agent-OS is a **governance-first kernel for AI agents** — a Python framework providing policy enforcement, semantic intent classification, identity management, and execution control for autonomous AI agents. diff --git a/docs/adr/0011-additive-policy-check-contract.md b/docs/adr/0011-additive-policy-check-contract.md index a6f0afeeb..608fd7ecb 100644 --- a/docs/adr/0011-additive-policy-check-contract.md +++ b/docs/adr/0011-additive-policy-check-contract.md @@ -51,7 +51,7 @@ category and places sensitive values only in restricted fields such as `detail`, `matched_pattern`, and `audit_entry`. Extend the canonical `agent_os.exceptions.PolicyViolationError` with an -additive `from_check_result` classmethod. The existing constructor forms remain +additive `from_check_result` class method. The existing constructor forms remain valid, `str(e)` remains based on `args[0]`, and legacy instances have `e.check_result is None`. Exceptions created from a `PolicyCheckResult` surface only the sanitized `public_message` through `str(e)` while retaining structured @@ -76,7 +76,7 @@ denials with `decision_factory` results and `PolicyViolationError.from_check_result(...)`. Adapter-local `PolicyViolationError` symbols become re-exports of the canonical class rather than deleted symbols. A parametrized parity harness tracks every surface from -the discovery inventory, starts with expected failures, and removes each xfail +the discovery inventory, starts with expected failures, and removes each entry as one surface is converted. ### Host Migration Guide From ce5993ab3a613fa536d9b2f138213af6cff2c4b5 Mon Sep 17 00:00:00 2001 From: "Elton Carr, II" Date: Wed, 29 Apr 2026 15:47:42 -0700 Subject: [PATCH 6/7] chore(spell): account for terms in repo dictionary Restore classmethod and xfail in ADR 0011 (their precise meaning matters). Drop the cspell:ignore directive from AGENTS.md and instead track all 8 surfaced terms in .cspell-repo-terms.txt: classmethod, Docstrings, isort, mypy, Pydantic, pytest, pyupgrade, xfail. --- .cspell-repo-terms.txt | 8 ++++++++ agent-governance-python/agent-os/AGENTS.md | 2 -- docs/adr/0011-additive-policy-check-contract.md | 4 ++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/.cspell-repo-terms.txt b/.cspell-repo-terms.txt index bed6e4340..92c57cfd4 100644 --- a/.cspell-repo-terms.txt +++ b/.cspell-repo-terms.txt @@ -84,3 +84,11 @@ sybil davidequarracino Marp Slidev +classmethod +Docstrings +isort +mypy +Pydantic +pytest +pyupgrade +xfail diff --git a/agent-governance-python/agent-os/AGENTS.md b/agent-governance-python/agent-os/AGENTS.md index 7b5f9c1dc..7b79b3854 100644 --- a/agent-governance-python/agent-os/AGENTS.md +++ b/agent-governance-python/agent-os/AGENTS.md @@ -1,7 +1,5 @@ # Agent-OS — Coding Agent Instructions - - ## Project Overview Agent-OS is a **governance-first kernel for AI agents** — a Python framework providing policy enforcement, semantic intent classification, identity management, and execution control for autonomous AI agents. diff --git a/docs/adr/0011-additive-policy-check-contract.md b/docs/adr/0011-additive-policy-check-contract.md index 608fd7ecb..a6f0afeeb 100644 --- a/docs/adr/0011-additive-policy-check-contract.md +++ b/docs/adr/0011-additive-policy-check-contract.md @@ -51,7 +51,7 @@ category and places sensitive values only in restricted fields such as `detail`, `matched_pattern`, and `audit_entry`. Extend the canonical `agent_os.exceptions.PolicyViolationError` with an -additive `from_check_result` class method. The existing constructor forms remain +additive `from_check_result` classmethod. The existing constructor forms remain valid, `str(e)` remains based on `args[0]`, and legacy instances have `e.check_result is None`. Exceptions created from a `PolicyCheckResult` surface only the sanitized `public_message` through `str(e)` while retaining structured @@ -76,7 +76,7 @@ denials with `decision_factory` results and `PolicyViolationError.from_check_result(...)`. Adapter-local `PolicyViolationError` symbols become re-exports of the canonical class rather than deleted symbols. A parametrized parity harness tracks every surface from -the discovery inventory, starts with expected failures, and removes each entry +the discovery inventory, starts with expected failures, and removes each xfail as one surface is converted. ### Host Migration Guide From d7a4375412deb3f2d88e420b8000208cb386e60c Mon Sep 17 00:00:00 2001 From: "Elton Carr, II" Date: Wed, 29 Apr 2026 16:06:07 -0700 Subject: [PATCH 7/7] test(policies): add adapter parity harness with xfail allowlist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements PR (c) / T8 for the unify-policy-decisions plan as a purely additive parity harness. The tests enumerate the policy leak surface inventory and mark current adapter/surface gaps with strict xfail entries so follow-on adapter conversion PRs can remove one allowlist entry at a time. Each subsequent PR (d/eᵢ) should convert the corresponding adapter or surface to the canonical PolicyViolationError contract, then remove its matching xfail row from these harnesses. The skip rows document inventory items that need an injection seam before an in-process denial can be asserted. Inventory source: agt-policy-leak-surface-inventory.md. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/test_adapter_exception_identity.py | 74 +++ .../tests/test_adapter_str_no_leak.py | 528 ++++++++++++++++++ 2 files changed, 602 insertions(+) create mode 100644 agent-governance-python/agent-os/tests/test_adapter_exception_identity.py create mode 100644 agent-governance-python/agent-os/tests/test_adapter_str_no_leak.py diff --git a/agent-governance-python/agent-os/tests/test_adapter_exception_identity.py b/agent-governance-python/agent-os/tests/test_adapter_exception_identity.py new file mode 100644 index 000000000..506b9ed26 --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_adapter_exception_identity.py @@ -0,0 +1,74 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Parity harness for adapter ``PolicyViolationError`` identity.""" + +from __future__ import annotations + +import importlib + +import pytest + +from agent_os.exceptions import PolicyViolationError as canonical_PolicyViolationError + + +_PENDING_CONVERSION = pytest.mark.xfail( + reason="Pending PR (d/eᵢ) conversion", + strict=True, +) + + +@pytest.mark.parametrize( + "module_path", + [ + pytest.param( + "agent_os.integrations.langchain_adapter", + marks=_PENDING_CONVERSION, + id="langchain_adapter", + ), + pytest.param( + "agent_os.integrations.anthropic_adapter", + marks=_PENDING_CONVERSION, + id="anthropic_adapter", + ), + pytest.param( + "agent_os.integrations.gemini_adapter", + marks=_PENDING_CONVERSION, + id="gemini_adapter", + ), + pytest.param( + "agent_os.integrations.google_adk_adapter", + marks=_PENDING_CONVERSION, + id="google_adk_adapter", + ), + pytest.param( + "agent_os.integrations.mistral_adapter", + marks=_PENDING_CONVERSION, + id="mistral_adapter", + ), + pytest.param( + "agent_os.integrations.openai_adapter", + marks=_PENDING_CONVERSION, + id="openai_adapter", + ), + pytest.param( + "agent_os.integrations.openai_agents_sdk", + marks=_PENDING_CONVERSION, + id="openai_agents_sdk", + ), + pytest.param( + "agent_os.integrations.semantic_kernel_adapter", + marks=_PENDING_CONVERSION, + id="semantic_kernel_adapter", + ), + pytest.param( + "agent_os.integrations.smolagents_adapter", + marks=_PENDING_CONVERSION, + id="smolagents_adapter", + ), + ], +) +def test_policy_violation_error_is_canonical(module_path: str) -> None: + """Adapter must re-export the canonical PolicyViolationError.""" + mod = importlib.import_module(module_path) + + assert mod.PolicyViolationError is canonical_PolicyViolationError diff --git a/agent-governance-python/agent-os/tests/test_adapter_str_no_leak.py b/agent-governance-python/agent-os/tests/test_adapter_str_no_leak.py new file mode 100644 index 000000000..eee1530d6 --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_adapter_str_no_leak.py @@ -0,0 +1,528 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Xfail parity harness for adapter denial-string sanitization.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any + +import pytest + +from agent_os.exceptions import PolicyViolationError +from agent_os.integrations.base import ( + BaseIntegration, + ExecutionContext, + GovernancePolicy, + PolicyInterceptor, + ToolCallRequest, +) +from agent_os.policies.bridge import governance_to_document + + +@dataclass(frozen=True) +class _DenialSnapshot: + """Observed public and audit strings for one denial path.""" + + public_message: str + audit_text: str + + +@dataclass(frozen=True) +class _LeakCase: + """Inventory entry for one leaky denial surface.""" + + name: str + trigger: Callable[[], _DenialSnapshot] + forbidden_fragments: tuple[str, ...] + audit_fragment: str + xfail_reason: str + + +class _TestKernel(BaseIntegration): + """Minimal concrete integration for in-process denial checks.""" + + def wrap(self, agent: Any) -> Any: + return agent + + def unwrap(self, governed_agent: Any) -> Any: + return governed_agent + + +def _from_exception(exc: BaseException) -> _DenialSnapshot: + details = getattr(exc, "details", {}) or {} + detail = str(details.get("detail", "")) + check_result = getattr(exc, "check_result", None) + audit_entry = getattr(check_result, "audit_entry", {}) or {} + audit_text = " ".join([detail, str(audit_entry)]) + return _DenialSnapshot(public_message=str(exc), audit_text=audit_text) + + +def _from_legacy_reason(reason: str) -> _DenialSnapshot: + try: + raise PolicyViolationError(reason) + except PolicyViolationError as exc: + return _from_exception(exc) + + +def _from_tool_result(reason: str | None) -> _DenialSnapshot: + assert reason is not None + return _from_legacy_reason(reason) + + +def _base_context(policy: GovernancePolicy, *, call_count: int = 0) -> tuple[_TestKernel, ExecutionContext]: + kernel = _TestKernel(policy=policy) + ctx = kernel.create_context("agent-parity") + ctx.call_count = call_count + return kernel, ctx + + +def _policy_interceptor_allowed_tool() -> _DenialSnapshot: + result = PolicyInterceptor(GovernancePolicy(allowed_tools=["safe_tool"])).intercept( + ToolCallRequest(tool_name="danger_tool", arguments={}) + ) + return _from_tool_result(result.reason) + + +def _policy_interceptor_blocked_pattern() -> _DenialSnapshot: + result = PolicyInterceptor(GovernancePolicy(blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"])).intercept( + ToolCallRequest(tool_name="lookup", arguments={"value": "123-45-6789"}) + ) + return _from_tool_result(result.reason) + + +def _policy_interceptor_max_tool_calls() -> _DenialSnapshot: + policy = GovernancePolicy(max_tool_calls=3) + _, ctx = _base_context(policy, call_count=3) + result = PolicyInterceptor(policy, ctx).intercept(ToolCallRequest(tool_name="lookup", arguments={})) + return _from_tool_result(result.reason) + + +def _base_pre_execute_max_tool_calls() -> _DenialSnapshot: + kernel, ctx = _base_context(GovernancePolicy(max_tool_calls=5), call_count=5) + allowed, reason = kernel.pre_execute(ctx, "safe input") + assert allowed is False + return _from_tool_result(reason) + + +def _base_pre_execute_timeout() -> _DenialSnapshot: + kernel, ctx = _base_context(GovernancePolicy(timeout_seconds=1)) + ctx.start_time = datetime.now() - timedelta(seconds=2) + allowed, reason = kernel.pre_execute(ctx, "safe input") + assert allowed is False + return _from_tool_result(reason) + + +def _base_pre_execute_blocked_pattern() -> _DenialSnapshot: + kernel, ctx = _base_context(GovernancePolicy(blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"])) + allowed, reason = kernel.pre_execute(ctx, "customer ssn 123-45-6789") + assert allowed is False + return _from_tool_result(reason) + + +def _bridge_message(rule_name: str, policy: GovernancePolicy) -> _DenialSnapshot: + document = governance_to_document(policy) + rule = next(rule for rule in document.rules if rule.name == rule_name) + return _from_legacy_reason(rule.message or "") + + +def _langchain_allow_list() -> _DenialSnapshot: + from agent_os.integrations import langchain_adapter as adapter + + kernel = adapter.LangChainKernel(policy=GovernancePolicy(allowed_tools=["safe_tool"])) + try: + kernel._check_tool_policy("danger_tool", (), {}, None) + except adapter.PolicyViolationError as exc: + return _from_exception(exc) + raise AssertionError("expected LangChain policy violation") + + +def _langchain_blocked_pattern_args() -> _DenialSnapshot: + from agent_os.integrations import langchain_adapter as adapter + + kernel = adapter.LangChainKernel(policy=GovernancePolicy(blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"])) + try: + kernel._check_tool_policy("lookup", ("customer ssn 123-45-6789",), {}, None) + except adapter.PolicyViolationError as exc: + return _from_exception(exc) + raise AssertionError("expected LangChain policy violation") + + +def _langchain_blocked_pattern_tool_name() -> _DenialSnapshot: + from agent_os.integrations import langchain_adapter as adapter + + kernel = adapter.LangChainKernel(policy=GovernancePolicy(blocked_patterns=["delete"])) + try: + kernel._check_tool_policy("delete_records", (), {}, None) + except adapter.PolicyViolationError as exc: + return _from_exception(exc) + raise AssertionError("expected LangChain policy violation") + + +def _openai_agents_allow_list() -> _DenialSnapshot: + from agent_os.integrations.openai_agents_sdk import GovernancePolicy as OaiPolicy + from agent_os.integrations.openai_agents_sdk import OpenAIAgentsKernel + + kernel = OpenAIAgentsKernel(policy=OaiPolicy(allowed_tools=["safe_tool"])) + allowed, reason = kernel._check_tool_allowed("danger_tool") + assert allowed is False + return _from_tool_result(reason) + + +def _openai_agents_blocked_pattern() -> _DenialSnapshot: + from agent_os.integrations.openai_agents_sdk import GovernancePolicy as OaiPolicy + from agent_os.integrations.openai_agents_sdk import OpenAIAgentsKernel + + kernel = OpenAIAgentsKernel(policy=OaiPolicy(blocked_patterns=["secret_code"])) + allowed, reason = kernel._check_content("contains secret_code") + assert allowed is False + return _from_tool_result(reason) + + +def _google_adk_allow_list() -> _DenialSnapshot: + from agent_os.integrations.google_adk_adapter import GoogleADKKernel + + kernel = GoogleADKKernel(allowed_tools=["safe_tool"]) + allowed, reason = kernel._check_tool_allowed("danger_tool") + assert allowed is False + return _from_tool_result(reason) + + +def _google_adk_blocked_pattern() -> _DenialSnapshot: + from agent_os.integrations.google_adk_adapter import GoogleADKKernel + + kernel = GoogleADKKernel(blocked_patterns=["secret_code"]) + allowed, reason = kernel._check_content("contains secret_code") + assert allowed is False + return _from_tool_result(reason) + + +def _smolagents_allow_list() -> _DenialSnapshot: + from agent_os.integrations.smolagents_adapter import SmolagentsKernel + + kernel = SmolagentsKernel(allowed_tools=["safe_tool"]) + allowed, reason = kernel._check_tool_allowed("danger_tool") + assert allowed is False + return _from_tool_result(reason) + + +def _smolagents_blocked_pattern() -> _DenialSnapshot: + from agent_os.integrations.smolagents_adapter import SmolagentsKernel + + kernel = SmolagentsKernel(blocked_patterns=["secret_code"]) + allowed, reason = kernel._check_content("contains secret_code") + assert allowed is False + return _from_tool_result(reason) + + +def _a2a_blocked_pattern() -> _DenialSnapshot: + from agent_os.integrations.a2a_adapter import A2AGovernanceAdapter + + adapter = A2AGovernanceAdapter(blocked_patterns=["secret_code"]) + allowed, reason = adapter._check_content(["contains secret_code"]) + assert allowed is False + return _from_tool_result(reason) + + +def _a2a_allowed_skill() -> _DenialSnapshot: + from agent_os.integrations.a2a_adapter import A2AGovernanceAdapter + + adapter = A2AGovernanceAdapter(allowed_skills=["safe_skill"]) + evaluation = adapter.evaluate_task({"skill_id": "danger_skill", "messages": []}) + assert evaluation.allowed is False + return _from_tool_result(evaluation.reason) + + +def _guardrails_regex_validator() -> _DenialSnapshot: + from agent_os.integrations.guardrails_adapter import RegexValidator + + outcome = RegexValidator([r"\b\d{3}-\d{2}-\d{4}\b"]).validate("customer ssn 123-45-6789") + assert outcome.passed is False + return _from_legacy_reason(outcome.error_message or "") + + +def _pydantic_ai_human_approval() -> _DenialSnapshot: + from agent_os.integrations.pydantic_ai_adapter import HumanApprovalRequired + + try: + raise HumanApprovalRequired("delete_records", {"id": "123"}) + except PolicyViolationError as exc: + return _from_exception(exc) + + +def _trust_root_allowed_tool() -> _DenialSnapshot: + from agent_os.trust_root import TrustRoot + + decision = TrustRoot([GovernancePolicy(allowed_tools=["safe_tool"])]).validate_action( + {"tool": "danger_tool", "arguments": {}} + ) + assert decision.allowed is False + return _from_tool_result(decision.reason) + + +def _trust_root_blocked_pattern() -> _DenialSnapshot: + from agent_os.trust_root import TrustRoot + + decision = TrustRoot([GovernancePolicy(blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"])]).validate_action( + {"tool": "lookup", "arguments": {"value": "123-45-6789"}} + ) + assert decision.allowed is False + return _from_tool_result(decision.reason) + + +def _xfail(case: _LeakCase) -> Any: + return pytest.param(case, marks=pytest.mark.xfail(reason=case.xfail_reason, strict=True), id=case.name) + + +def _skip(name: str, reason: str) -> Any: + case = _LeakCase(name, lambda: _DenialSnapshot("", ""), (), "", reason) + return pytest.param(case, marks=pytest.mark.skip(reason=reason), id=name) + + +LEAK_CASES = [ + _xfail( + _LeakCase( + "base-policy-interceptor-allow-list", + _policy_interceptor_allowed_tool, + ("safe_tool", "danger_tool"), + "safe_tool", + "Pending PR (eᵢ) for base PolicyInterceptor", + ) + ), + _xfail( + _LeakCase( + "base-policy-interceptor-blocked-pattern", + _policy_interceptor_blocked_pattern, + (r"\b", r"\d", "{3}", "123-45-6789"), + r"\d{3}", + "Pending PR (eᵢ) for base PolicyInterceptor", + ) + ), + _xfail( + _LeakCase( + "base-policy-interceptor-max-tool-calls", + _policy_interceptor_max_tool_calls, + ("3",), + "3", + "Pending PR (eᵢ) for base PolicyInterceptor", + ) + ), + _xfail( + _LeakCase( + "base-pre-execute-max-tool-calls", + _base_pre_execute_max_tool_calls, + ("5",), + "5", + "Pending PR (eᵢ) for base pre_execute conversion", + ) + ), + _xfail( + _LeakCase( + "base-pre-execute-timeout", + _base_pre_execute_timeout, + ("1s", "1"), + "1", + "Pending PR (eᵢ) for base pre_execute conversion", + ) + ), + _xfail( + _LeakCase( + "base-pre-execute-blocked-pattern", + _base_pre_execute_blocked_pattern, + (r"\b", r"\d", "{3}", "123-45-6789"), + r"\d{3}", + "Pending PR (eᵢ) for base pre_execute conversion", + ) + ), + _xfail( + _LeakCase( + "bridge-max-tokens", + lambda: _bridge_message("max_tokens", GovernancePolicy(max_tokens=4096)), + ("4096",), + "4096", + "Pending PR (eᵢ) for policy bridge", + ) + ), + _xfail( + _LeakCase( + "bridge-max-tool-calls", + lambda: _bridge_message("max_tool_calls", GovernancePolicy(max_tool_calls=7)), + ("7",), + "7", + "Pending PR (eᵢ) for policy bridge", + ) + ), + _xfail( + _LeakCase( + "bridge-blocked-pattern", + lambda: _bridge_message("blocked_pattern_0", GovernancePolicy(blocked_patterns=["secret_code"])), + ("secret_code",), + "secret_code", + "Pending PR (eᵢ) for policy bridge", + ) + ), + _xfail( + _LeakCase( + "bridge-confidence-threshold", + lambda: _bridge_message("confidence_threshold", GovernancePolicy(confidence_threshold=0.82)), + ("0.82",), + "0.82", + "Pending PR (eᵢ) for policy bridge", + ) + ), + _xfail( + _LeakCase( + "langchain-allow-list", + _langchain_allow_list, + ("safe_tool", "danger_tool"), + "safe_tool", + "Pending PR (d) for langchain", + ) + ), + _xfail( + _LeakCase( + "langchain-blocked-pattern-args", + _langchain_blocked_pattern_args, + (r"\b", r"\d", "{3}", "123-45-6789"), + r"\d{3}", + "Pending PR (d) for langchain", + ) + ), + _xfail( + _LeakCase( + "langchain-blocked-pattern-tool-name", + _langchain_blocked_pattern_tool_name, + ("delete", "delete_records"), + "delete", + "Pending PR (d) for langchain", + ) + ), + _xfail( + _LeakCase( + "openai-agents-allow-list", + _openai_agents_allow_list, + ("danger_tool",), + "danger_tool", + "Pending PR (eᵢ) for openai_agents_sdk", + ) + ), + _xfail( + _LeakCase( + "openai-agents-blocked-pattern", + _openai_agents_blocked_pattern, + ("secret_code",), + "secret_code", + "Pending PR (eᵢ) for openai_agents_sdk", + ) + ), + _xfail( + _LeakCase( + "google-adk-allow-list", + _google_adk_allow_list, + ("danger_tool",), + "danger_tool", + "Pending PR (eᵢ) for google_adk_adapter", + ) + ), + _xfail( + _LeakCase( + "google-adk-blocked-pattern", + _google_adk_blocked_pattern, + ("secret_code",), + "secret_code", + "Pending PR (eᵢ) for google_adk_adapter", + ) + ), + _xfail( + _LeakCase( + "smolagents-allow-list", + _smolagents_allow_list, + ("danger_tool",), + "danger_tool", + "Pending PR (eᵢ) for smolagents_adapter", + ) + ), + _xfail( + _LeakCase( + "smolagents-blocked-pattern", + _smolagents_blocked_pattern, + ("secret_code",), + "secret_code", + "Pending PR (eᵢ) for smolagents_adapter", + ) + ), + _xfail( + _LeakCase( + "a2a-blocked-pattern", + _a2a_blocked_pattern, + ("secret_code",), + "secret_code", + "Pending PR (eᵢ) for a2a_adapter", + ) + ), + _xfail( + _LeakCase( + "a2a-allowed-skill", + _a2a_allowed_skill, + ("danger_skill",), + "danger_skill", + "Pending PR (eᵢ) for a2a_adapter", + ) + ), + _xfail( + _LeakCase( + "guardrails-regex-validator", + _guardrails_regex_validator, + (r"\b", r"\d", "{3}", "123-45-6789"), + r"\d{3}", + "Pending PR (eᵢ) for guardrails_adapter", + ) + ), + _xfail( + _LeakCase( + "pydantic-ai-human-approval", + _pydantic_ai_human_approval, + ("delete_records",), + "delete_records", + "Pending PR (eᵢ) for pydantic_ai_adapter", + ) + ), + _xfail( + _LeakCase( + "trust-root-allow-list", + _trust_root_allowed_tool, + ("safe_tool", "danger_tool"), + "safe_tool", + "Pending PR (eᵢ) for trust_root", + ) + ), + _xfail( + _LeakCase( + "trust-root-blocked-pattern", + _trust_root_blocked_pattern, + (r"\b", r"\d", "{3}", "123-45-6789"), + r"\d{3}", + "Pending PR (eᵢ) for trust_root", + ) + ), + _skip("openai-adapter-human-approval", "requires PR (eᵢ) refactor to inject OpenAI client seam"), + _skip("anthropic-adapter-human-approval", "requires PR (eᵢ) refactor to inject Anthropic response seam"), + _skip("gemini-adapter-human-approval", "requires PR (eᵢ) refactor to inject Gemini response seam"), + _skip("mistral-adapter-tool-policy", "requires PR (eᵢ) refactor to inject Mistral response seam"), + _skip("semantic-kernel-memory-save", "requires PR (eᵢ) refactor to inject async Semantic Kernel memory seam"), + _skip("crewai-adapter-memory-and-task", "requires PR (eᵢ) refactor to inject CrewAI task/memory seams"), + _skip("autogen-adapter-function-hook", "requires PR (eᵢ) refactor to inject AutoGen function-call seam"), + _skip("finance-soc2-example", "example denial uses app-local ToolCallResult; convert in PR (eᵢ) before asserting exception details"), +] + + +@pytest.mark.parametrize("case", LEAK_CASES) +def test_adapter_denial_string_does_not_leak_policy_internals(case: _LeakCase) -> None: + """Denial public text stays safe while audit details retain raw evidence.""" + snapshot = case.trigger() + + for forbidden in case.forbidden_fragments: + assert forbidden not in snapshot.public_message + assert case.audit_fragment in snapshot.audit_text