diff --git a/agent-governance-python/agent-os/src/agent_os/integrations/__init__.py b/agent-governance-python/agent-os/src/agent_os/integrations/__init__.py index 92b4b1926..ed11e40d0 100644 --- a/agent-governance-python/agent-os/src/agent_os/integrations/__init__.py +++ b/agent-governance-python/agent-os/src/agent_os/integrations/__init__.py @@ -62,6 +62,10 @@ from agent_os.integrations.anthropic_adapter import AnthropicKernel, GovernedAnthropicClient from agent_os.integrations.autogen_adapter import AutoGenKernel from agent_os.integrations.crewai_adapter import CrewAIKernel +try: + from agent_os.integrations.crewai_adapter import GovernanceHooks as CrewAIGovernanceHooks +except ImportError: + pass from agent_os.integrations.gemini_adapter import GeminiKernel, GovernedGeminiModel from agent_os.integrations.google_adk_adapter import ( ADKExecutionContext, diff --git a/agent-governance-python/agent-os/src/agent_os/integrations/crewai_adapter.py b/agent-governance-python/agent-os/src/agent_os/integrations/crewai_adapter.py index 05e4ec9c2..11fb9b37b 100644 --- a/agent-governance-python/agent-os/src/agent_os/integrations/crewai_adapter.py +++ b/agent-governance-python/agent-os/src/agent_os/integrations/crewai_adapter.py @@ -3,15 +3,25 @@ """ CrewAI Integration -Wraps CrewAI crews and agents with Agent OS governance. +Provides governance for CrewAI crews and agents via **native execution hooks** +(``@before_tool_call``, ``@after_tool_call``, ``@before_llm_call``, +``@after_llm_call``) introduced in CrewAI 0.80+. -Usage: - from agent_os.integrations import CrewAIKernel +Recommended usage (native hooks):: - kernel = CrewAIKernel() - governed_crew = kernel.wrap(my_crew) + from agent_os.integrations.crewai_adapter import CrewAIKernel, GovernancePolicy + + kernel = CrewAIKernel(policy=GovernancePolicy( + blocked_patterns=["DROP TABLE"], + allowed_tools=["search", "calculator"], + )) + hooks = kernel.as_hooks() # registers governance hooks globally + result = my_crew.kickoff() # hooks intercept every tool & LLM call + hooks.unregister() # clean up when done + +Legacy usage (deprecated):: - # Now all crew executions go through Agent OS + governed_crew = kernel.wrap(my_crew) result = governed_crew.kickoff() """ @@ -31,7 +41,21 @@ ToolCallRequest, ) -logger = logging.getLogger(__name__) +# ── Graceful import of CrewAI native hooks ──────────────────────── +# CrewAI 0.80+ provides decorator-based execution hooks. When the +# hooks module is unavailable (older CrewAI or CrewAI not installed), +# we fall back to the legacy proxy approach. + +try: + from crewai.hooks import ( + before_tool_call as _before_tool_call, + after_tool_call as _after_tool_call, + before_llm_call as _before_llm_call, + after_llm_call as _after_llm_call, + ) + _HOOKS_AVAILABLE = True +except ImportError: + _HOOKS_AVAILABLE = False # Patterns used to detect potential PII / secrets in memory writes _PII_PATTERNS = [ @@ -41,31 +65,519 @@ ] -class CrewAIKernel(BaseIntegration): +# ═══════════════════════════════════════════════════════════════════ +# GovernanceHooks – native CrewAI execution hooks +# ═══════════════════════════════════════════════════════════════════ + +class GovernanceHooks: + """Native CrewAI governance hooks for Agent OS. + + Registers four global execution hooks that intercept every tool call + and LLM call across all agents in a crew: + + * ``before_tool_call`` – allowlist / blocklist, blocked-pattern scan, + Cedar/OPA ``pre_execute`` gate. + * ``after_tool_call`` – blocked-pattern scan on tool output, drift + detection via ``post_execute``. + * ``before_llm_call`` – content filter on input messages. + * ``after_llm_call`` – blocked-pattern scan on LLM response. + + Parameters + ---------- + kernel : CrewAIKernel + The governing kernel whose policy is enforced. + name : str, optional + Human-readable name for logging (default ``"governance"``). + + Notes + ----- + CrewAI hooks are **global** – they apply to every crew in the + current process. Only one ``GovernanceHooks`` instance should be + active at a time. Call :meth:`unregister` to deactivate. + + Examples + -------- + >>> kernel = CrewAIKernel(policy=GovernancePolicy(allowed_tools=["search"])) + >>> hooks = kernel.as_hooks() + >>> result = my_crew.kickoff() + >>> hooks.unregister() """ - CrewAI adapter for Agent OS. - - Supports: - - Crew (kickoff, kickoff_async) - - Individual agents within crews - - Task execution monitoring - - Individual tool call interception (allowed_tools, blocked_patterns) - - Deep hooks: step-by-step task execution, memory interception, - and sub-agent delegation detection (when ``deep_hooks_enabled`` is True). + + def __init__(self, kernel: "CrewAIKernel", name: str = "governance"): + self._kernel = kernel + self._name = name + self._ctx = kernel.create_context(f"crewai-hooks-{name}") + self._registered = False + self._hook_fns: list[Any] = [] + logger.debug( + "GovernanceHooks created: name=%s, hooks_available=%s", + name, + _HOOKS_AVAILABLE, + ) + + # ── Registration ────────────────────────────────────────────── + + def register(self) -> "GovernanceHooks": + """Register the four governance hooks with CrewAI. + + Returns + ------- + GovernanceHooks + Self, for chaining. + + Raises + ------ + RuntimeError + If ``crewai.hooks`` is not available. + """ + if not _HOOKS_AVAILABLE: + raise RuntimeError( + "crewai.hooks is not available. " + "Upgrade to CrewAI 0.80+ or use the legacy wrap() method." + ) + if self._registered: + logger.debug("GovernanceHooks already registered, skipping") + return self + + # Create governed hook functions and register them + bt = _before_tool_call(self._make_before_tool_call()) + at = _after_tool_call(self._make_after_tool_call()) + bl = _before_llm_call(self._make_before_llm_call()) + al = _after_llm_call(self._make_after_llm_call()) + self._hook_fns = [bt, at, bl, al] + + self._registered = True + logger.info("[%s] Governance hooks registered with CrewAI", self._name) + return self + + def unregister(self) -> None: + """Deactivate governance hooks. + + .. note:: + CrewAI's global hook registry currently does not expose an + ``unregister`` API. This method clears the internal state + so re-registration is possible but does not remove the + previously registered functions from CrewAI's registry. + """ + self._registered = False + self._hook_fns.clear() + logger.info("[%s] Governance hooks unregistered", self._name) + + # ── Hook Factories ──────────────────────────────────────────── + + def _make_before_tool_call(self): + """Return the ``before_tool_call`` governance function. + + Returns + ------- + callable + A function conforming to CrewAI's ``ToolCallHookContext`` + protocol that returns ``False`` to block or ``None`` to allow. + """ + kernel = self._kernel + ctx = self._ctx + name = self._name + + def governance_before_tool(context) -> "bool | None": + """Governance gate executed before every tool call. + + Checks tool allowlist/blocklist, scans arguments for blocked + patterns, and runs Cedar/OPA ``pre_execute`` evaluation. + + Parameters + ---------- + context : ToolCallHookContext + CrewAI hook context with ``tool_name``, ``tool_input``, + ``agent``, ``task``, and ``crew`` attributes. + + Returns + ------- + bool | None + ``False`` to block the tool call, ``None`` to allow. + """ + tool_name = getattr(context, "tool_name", "unknown") + tool_input = getattr(context, "tool_input", {}) + agent_name = getattr( + getattr(context, "agent", None), "role", + getattr(getattr(context, "agent", None), "name", "unknown"), + ) + + logger.debug( + "[%s] before_tool_call: tool=%s agent=%s", + name, tool_name, agent_name, + ) + + # ─── 1. Tool allowlist check ─────────────────────── + if kernel.policy.allowed_tools: + if tool_name not in kernel.policy.allowed_tools: + logger.info( + "[%s] Policy DENY: tool '%s' not in allowed_tools", + name, tool_name, + ) + return False + + # ─── 2. Blocked-pattern scan on arguments ───────────── + args_str = str(tool_input) + matched = kernel.policy.matches_pattern(args_str) + if matched: + logger.info( + "[%s] Policy DENY: blocked pattern '%s' in tool args", + name, matched[0], + ) + return False + + # ─── 3. Blocked-pattern scan on tool name ───────────── + name_matched = kernel.policy.matches_pattern(tool_name) + if name_matched: + logger.info( + "[%s] Policy DENY: blocked pattern '%s' in tool name", + name, name_matched[0], + ) + return False + + # ─── 4. Cedar/OPA pre_execute gate ──────────────────── + allowed, reason = kernel.pre_execute( + ctx, {"tool_name": tool_name, "tool_args": tool_input}, + ) + if not allowed: + logger.info( + "[%s] Policy DENY (pre_execute): %s", name, reason, + ) + return False + + # ─── 5. Increment call count / max check ────────────── + ctx.call_count += 1 + if kernel.policy.max_tool_calls and ctx.call_count > kernel.policy.max_tool_calls: + logger.info( + "[%s] Policy DENY: max_tool_calls (%d) exceeded", + name, kernel.policy.max_tool_calls, + ) + return False + + logger.debug( + "[%s] Tool ALLOW: tool=%s count=%d", + name, tool_name, ctx.call_count, + ) + return None # allow + + return governance_before_tool + + def _make_after_tool_call(self): + """Return the ``after_tool_call`` governance function. + + Returns + ------- + callable + A function that checks tool output for blocked patterns + and runs ``post_execute`` drift detection. + """ + kernel = self._kernel + ctx = self._ctx + name = self._name + + def governance_after_tool(context) -> None: + """Governance gate executed after every tool call. + + Scans the tool result for blocked patterns and runs + drift detection via ``post_execute``. + + Parameters + ---------- + context : ToolCallHookContext + CrewAI hook context with ``tool_result`` available. + + Returns + ------- + None + Always returns ``None``. Violations are raised as + ``PolicyViolationError``. + + Raises + ------ + PolicyViolationError + If the tool output contains a blocked pattern. + """ + tool_name = getattr(context, "tool_name", "unknown") + tool_result = getattr(context, "tool_result", None) + + if tool_result and isinstance(tool_result, str): + # Blocked-pattern check on output + matched = kernel.policy.matches_pattern(tool_result) + if matched: + logger.info( + "[%s] Policy DENY: blocked pattern '%s' in tool output", + name, matched[0], + ) + raise PolicyViolationError( + f"Blocked pattern '{matched[0]}' detected in tool output" + ) + + # Drift detection / checkpointing via base post_execute + valid, reason = kernel.post_execute(ctx, tool_result) + if not valid: + logger.info( + "[%s] Policy DENY (post_execute) on tool output: %s", + name, reason, + ) + raise PolicyViolationError(reason) + + logger.debug("[%s] after_tool_call OK: tool=%s", name, tool_name) + return None + + return governance_after_tool + + def _make_before_llm_call(self): + """Return the ``before_llm_call`` governance function. + + Returns + ------- + callable + A function that scans LLM input messages for blocked + patterns and runs ``pre_execute`` checks. + """ + kernel = self._kernel + ctx = self._ctx + name = self._name + + def governance_before_llm(context) -> "bool | None": + """Governance gate executed before every LLM call. + + Scans the message list for blocked patterns and runs + Cedar/OPA ``pre_execute`` checks. + + Parameters + ---------- + context : LLMCallHookContext + CrewAI context with ``messages``, ``agent``, ``task``, + ``iterations`` attributes. + + Returns + ------- + bool | None + ``False`` to block the LLM call, ``None`` to allow. + """ + messages = getattr(context, "messages", None) or [] + + # ─── 1. Content filter on input messages ────────────── + for msg in messages: + content = None + if isinstance(msg, dict): + content = msg.get("content", "") + elif isinstance(msg, str): + content = msg + else: + content = getattr(msg, "content", str(msg)) + + if content and isinstance(content, str): + matched = kernel.policy.matches_pattern(content) + if matched: + logger.info( + "[%s] Policy DENY: blocked pattern '%s' in LLM input", + name, matched[0], + ) + return False + + # ─── 2. Cedar/OPA pre_execute gate ──────────────────── + combined_input = " ".join( + str(m.get("content", m) if isinstance(m, dict) else m) + for m in messages + ) if messages else "" + + if combined_input.strip(): + allowed, reason = kernel.pre_execute(ctx, combined_input) + if not allowed: + logger.info( + "[%s] Policy DENY (pre_execute) on LLM input: %s", + name, reason, + ) + return False + + return None # allow + + return governance_before_llm + + def _make_after_llm_call(self): + """Return the ``after_llm_call`` governance function. + + Returns + ------- + callable + A function that scans LLM output for blocked patterns. + """ + kernel = self._kernel + ctx = self._ctx + name = self._name + + def governance_after_llm(context) -> "str | None": + """Governance gate executed after every LLM call. + + Scans the LLM response for blocked patterns and runs + ``post_execute`` drift detection. + + Parameters + ---------- + context : LLMCallHookContext + CrewAI context with ``response`` available. + + Returns + ------- + str | None + ``None`` to keep original response. Violations are + raised as ``PolicyViolationError``. + + Raises + ------ + PolicyViolationError + If the LLM output contains a blocked pattern. + """ + response = getattr(context, "response", None) + + if response and isinstance(response, str) and response.strip(): + # Blocked-pattern check on LLM output + matched = kernel.policy.matches_pattern(response) + if matched: + logger.info( + "[%s] Policy DENY: blocked pattern '%s' in LLM output", + name, matched[0], + ) + raise PolicyViolationError( + f"Blocked pattern '{matched[0]}' detected in LLM output" + ) + + # Drift detection / checkpointing + valid, reason = kernel.post_execute(ctx, response.strip()) + if not valid: + logger.info( + "[%s] Policy DENY (post_execute) on LLM output: %s", + name, reason, + ) + raise PolicyViolationError(reason) + + return None # keep original response + + return governance_after_llm + + # ── Convenience properties ──────────────────────────────────── + + @property + def kernel(self) -> "CrewAIKernel": + """Return the governing kernel.""" + return self._kernel + + @property + def context(self): + """Return the execution context.""" + return self._ctx + + @property + def is_registered(self) -> bool: + """Return whether hooks are currently registered.""" + return self._registered + + def __repr__(self) -> str: + return ( + f"GovernanceHooks(name={self._name!r}, " + f"registered={self._registered})" + ) + + +# ═══════════════════════════════════════════════════════════════════ +# CrewAIKernel – main adapter +# ═══════════════════════════════════════════════════════════════════ + +class CrewAIKernel(BaseIntegration): + """CrewAI adapter for Agent OS. + + Provides governance for CrewAI crews via two mechanisms: + + **Recommended (native hooks)**: + Use :meth:`as_hooks` to register global execution hooks that + intercept every tool and LLM call across all agents. + + **Legacy (deprecated)**: + Use :meth:`wrap` to create a proxy crew object. + + Parameters + ---------- + policy : GovernancePolicy, optional + The governance policy to enforce. + deep_hooks_enabled : bool + When ``True`` (default), the legacy :meth:`wrap` method also + applies step-level, memory, and delegation interception. + evaluator : Any, optional + Cedar/OPA policy evaluator for fine-grained access control. + + Examples + -------- + >>> kernel = CrewAIKernel(policy=GovernancePolicy(allowed_tools=["search"])) + >>> hooks = kernel.as_hooks() + >>> # All crew executions now go through governance + >>> result = my_crew.kickoff({"topic": "AI governance"}) + >>> hooks.unregister() """ - def __init__(self, policy: Optional[GovernancePolicy] = None, deep_hooks_enabled: bool = True, evaluator: Any = None): + def __init__( + self, + policy: Optional[GovernancePolicy] = None, + deep_hooks_enabled: bool = True, + evaluator: Any = None, + ): super().__init__(policy, evaluator=evaluator) self.deep_hooks_enabled = deep_hooks_enabled self._wrapped_crews: dict[int, Any] = {} self._step_log: list[dict[str, Any]] = [] self._memory_audit_log: list[dict[str, Any]] = [] self._delegation_log: list[dict[str, Any]] = [] - logger.debug("CrewAIKernel initialized with policy=%s deep_hooks_enabled=%s", policy, deep_hooks_enabled) + logger.debug( + "CrewAIKernel initialized with policy=%s deep_hooks_enabled=%s", + policy, deep_hooks_enabled, + ) + + # ── Native hooks (recommended) ──────────────────────────────── + + def as_hooks(self, name: str = "governance") -> GovernanceHooks: + """Create and register native CrewAI governance hooks. + + This is the **recommended** integration path. The returned + :class:`GovernanceHooks` instance registers four global hooks + (``before_tool_call``, ``after_tool_call``, ``before_llm_call``, + ``after_llm_call``) that enforce governance on every tool and + LLM call across all agents in any crew. + + Parameters + ---------- + name : str + Human-readable name for the hooks instance (used in logs). + + Returns + ------- + GovernanceHooks + The registered hooks instance. + + Raises + ------ + RuntimeError + If ``crewai.hooks`` module is not available. + + Examples + -------- + >>> hooks = kernel.as_hooks("prod-governance") + >>> result = my_crew.kickoff() + >>> hooks.unregister() + """ + hooks = GovernanceHooks(self, name=name) + hooks.register() + return hooks + + # ── Legacy proxy (deprecated) ───────────────────────────────── def wrap(self, crew: Any) -> Any: - """ - Wrap a CrewAI crew with governance. + """Wrap a CrewAI crew with governance. + + .. deprecated:: + Use :meth:`as_hooks` instead. The proxy-based approach + mutates tool, memory, and agent objects. ``wrap()`` will + be removed in v1.0. Intercepts: - kickoff() / kickoff_async() @@ -73,6 +585,15 @@ def wrap(self, crew: Any) -> Any: - Individual tool calls within agents - Task completions """ + import warnings + warnings.warn( + "CrewAIKernel.wrap() is deprecated. Use kernel.as_hooks() instead, " + "which leverages CrewAI's native execution hooks. " + "wrap() will be removed in v1.0.", + DeprecationWarning, + stacklevel=2, + ) + crew_id = getattr(crew, 'id', None) or f"crew-{id(crew)}" crew_name = getattr(crew, 'name', crew_id) ctx = self.create_context(crew_id) @@ -84,7 +605,7 @@ def wrap(self, crew: Any) -> Any: kernel = self class GovernedCrewAICrew: - """CrewAI crew wrapped with Agent OS governance""" + """CrewAI crew wrapped with Agent OS governance.""" def __init__(self): self._original = original @@ -93,7 +614,7 @@ def __init__(self): self._crew_name = crew_name def kickoff(self, inputs: dict = None) -> Any: - """Governed kickoff""" + """Governed kickoff.""" logger.info("Crew execution started: crew_name=%s", self._crew_name) allowed, reason = self._kernel.pre_execute(self._ctx, inputs) if not allowed: @@ -116,7 +637,7 @@ def kickoff(self, inputs: dict = None) -> Any: return result async def kickoff_async(self, inputs: dict = None) -> Any: - """Governed async kickoff""" + """Governed async kickoff.""" logger.info("Async crew execution started: crew_name=%s", self._crew_name) allowed, reason = self._kernel.pre_execute(self._ctx, inputs) if not allowed: @@ -256,11 +777,11 @@ def __getattr__(self, name): return GovernedCrewAICrew() def unwrap(self, governed_crew: Any) -> Any: - """Get original crew from wrapped version""" + """Get original crew from wrapped version.""" logger.debug("Unwrapping governed crew") return governed_crew._original - # ── Deep Integration Hooks ──────────────────────────────────── + # ── Deep Integration Hooks (legacy) ─────────────────────────── def _intercept_task_steps( self, agent: Any, agent_name: str, crew_name: str @@ -460,8 +981,20 @@ def governed_delegate(*args: Any, **kwargs: Any) -> Any: agent.delegate_work = governed_delegate -# Convenience function +# ── Convenience function (deprecated) ───────────────────────────── + def wrap(crew: Any, policy: Optional[GovernancePolicy] = None) -> Any: - """Quick wrapper for CrewAI crews""" + """Quick wrapper for CrewAI crews. + + .. deprecated:: + Use ``CrewAIKernel(policy).as_hooks()`` instead. + """ + import warnings + warnings.warn( + "crewai_adapter.wrap() is deprecated. " + "Use CrewAIKernel(policy).as_hooks() instead.", + DeprecationWarning, + stacklevel=2, + ) logger.debug("Using convenience wrap function for crew") return CrewAIKernel(policy).wrap(crew) diff --git a/agent-governance-python/agent-os/tests/test_crewai_hooks.py b/agent-governance-python/agent-os/tests/test_crewai_hooks.py new file mode 100644 index 000000000..84326096b --- /dev/null +++ b/agent-governance-python/agent-os/tests/test_crewai_hooks.py @@ -0,0 +1,588 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +"""Tests for native CrewAI GovernanceHooks integration. + +Covers: +- GovernanceHooks init, registration, and properties +- before_tool_call governance (allowlist, blocklist, patterns, Cedar) +- after_tool_call governance (output patterns, post_execute) +- before_llm_call governance (input content filter) +- after_llm_call governance (output content filter) +- as_hooks() factory +- Deprecation warnings for wrap() +- Backward compatibility with existing wrap() API +""" + +import sys +import types +import warnings +from unittest.mock import MagicMock, patch + +import pytest + +# ── Stub crewai.hooks before importing the adapter ──────────────── +# CrewAI is not installed in the test environment, so we create +# minimal stubs that capture the registered functions. + +_registered_hooks: dict[str, list] = { + "before_tool_call": [], + "after_tool_call": [], + "before_llm_call": [], + "after_llm_call": [], +} + + +def _make_hook_decorator(hook_type: str): + """Create a fake CrewAI hook decorator that captures the function.""" + def decorator(fn): + _registered_hooks[hook_type].append(fn) + return fn + return decorator + + +# Install the crewai.hooks stub module +_hooks_module = types.ModuleType("crewai.hooks") +_hooks_module.before_tool_call = _make_hook_decorator("before_tool_call") +_hooks_module.after_tool_call = _make_hook_decorator("after_tool_call") +_hooks_module.before_llm_call = _make_hook_decorator("before_llm_call") +_hooks_module.after_llm_call = _make_hook_decorator("after_llm_call") + +_crewai_module = types.ModuleType("crewai") +sys.modules["crewai"] = _crewai_module +sys.modules["crewai.hooks"] = _hooks_module + +from agent_os.integrations.crewai_adapter import ( + CrewAIKernel, + GovernanceHooks, + GovernancePolicy, + PolicyViolationError, +) + + +# ── Fixtures ────────────────────────────────────────────────────── + +@pytest.fixture(autouse=True) +def _clear_hooks(): + """Clear the global hook registry between tests.""" + for key in _registered_hooks: + _registered_hooks[key].clear() + yield + for key in _registered_hooks: + _registered_hooks[key].clear() + + +def _make_tool_context( + tool_name="search", + tool_input=None, + agent_name="researcher", + tool_result=None, +): + """Create a mock ToolCallHookContext.""" + ctx = MagicMock() + ctx.tool_name = tool_name + ctx.tool_input = tool_input or {"query": "hello"} + agent = MagicMock() + agent.role = agent_name + agent.name = agent_name + ctx.agent = agent + ctx.task = MagicMock() + ctx.crew = MagicMock() + ctx.tool_result = tool_result + return ctx + + +def _make_llm_context(messages=None, response=None, iterations=1): + """Create a mock LLMCallHookContext.""" + ctx = MagicMock() + ctx.messages = messages or [{"role": "user", "content": "Hello"}] + ctx.agent = MagicMock() + ctx.agent.role = "researcher" + ctx.task = MagicMock() + ctx.crew = MagicMock() + ctx.llm = MagicMock() + ctx.iterations = iterations + ctx.response = response + return ctx + + +# ═══════════════════════════════════════════════════════════════════ +# Test GovernanceHooks Init +# ═══════════════════════════════════════════════════════════════════ + +class TestGovernanceHooksInit: + """Tests for GovernanceHooks initialization and properties.""" + + def test_as_hooks_returns_hooks_instance(self): + """as_hooks() returns a GovernanceHooks instance.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks = kernel.as_hooks() + assert isinstance(hooks, GovernanceHooks) + + def test_as_hooks_custom_name(self): + """as_hooks() accepts a custom name.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks = kernel.as_hooks(name="prod-guard") + assert "prod-guard" in repr(hooks) + + def test_kernel_property(self): + """GovernanceHooks.kernel returns the parent kernel.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks = kernel.as_hooks() + assert hooks.kernel is kernel + + def test_context_property(self): + """GovernanceHooks.context returns an ExecutionContext.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks = kernel.as_hooks() + assert hooks.context is not None + + def test_is_registered(self): + """GovernanceHooks.is_registered is True after registration.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks = kernel.as_hooks() + assert hooks.is_registered is True + + def test_repr(self): + """repr shows name and registration status.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks = kernel.as_hooks(name="test") + assert "name='test'" in repr(hooks) + assert "registered=True" in repr(hooks) + + def test_unregister(self): + """unregister() clears the registered state.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks = kernel.as_hooks() + hooks.unregister() + assert hooks.is_registered is False + + def test_context_has_correct_policy(self): + """The context inherits the kernel's policy settings.""" + policy = GovernancePolicy(blocked_patterns=["secret"]) + kernel = CrewAIKernel(policy) + hooks = kernel.as_hooks() + assert hooks.context.policy.blocked_patterns == ["secret"] + + def test_hooks_registered_with_crewai(self): + """All four hook types are registered with CrewAI.""" + kernel = CrewAIKernel(GovernancePolicy()) + kernel.as_hooks() + assert len(_registered_hooks["before_tool_call"]) == 1 + assert len(_registered_hooks["after_tool_call"]) == 1 + assert len(_registered_hooks["before_llm_call"]) == 1 + assert len(_registered_hooks["after_llm_call"]) == 1 + + +# ═══════════════════════════════════════════════════════════════════ +# Test before_tool_call +# ═══════════════════════════════════════════════════════════════════ + +class TestBeforeToolCall: + """Tests for before_tool_call governance hook.""" + + def test_allowed_tool_passes(self): + """Tool in allowed_tools list passes governance.""" + kernel = CrewAIKernel(GovernancePolicy(allowed_tools=["search"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context(tool_name="search") + result = hook_fn(ctx) + assert result is None # None = allow + + def test_tool_not_in_allowed_list_blocked(self): + """Tool NOT in allowed_tools list is blocked.""" + kernel = CrewAIKernel(GovernancePolicy(allowed_tools=["search"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context(tool_name="delete_database") + result = hook_fn(ctx) + assert result is False + + def test_blocked_tool_name_via_pattern(self): + """Tool matching a blocked pattern is blocked.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["rm_rf"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context(tool_name="rm_rf") + result = hook_fn(ctx) + assert result is False + + def test_blocked_pattern_in_args(self): + """Blocked pattern in tool args blocks the call.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["DROP TABLE"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context(tool_input={"query": "DROP TABLE users"}) + result = hook_fn(ctx) + assert result is False + + def test_blocked_pattern_in_tool_name(self): + """Blocked pattern matching tool name blocks the call.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["hack"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context(tool_name="hack_system") + result = hook_fn(ctx) + assert result is False + + def test_call_count_incremented(self): + """Each allowed tool call increments the call count.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks = kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context() + hook_fn(ctx) + assert hooks.context.call_count == 1 + hook_fn(ctx) + assert hooks.context.call_count == 2 + + def test_max_tool_calls_blocks(self): + """Exceeding max_tool_calls blocks further calls.""" + kernel = CrewAIKernel(GovernancePolicy(max_tool_calls=2)) + hooks = kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context() + + assert hook_fn(ctx) is None # call 1 OK + assert hook_fn(ctx) is None # call 2 OK + assert hook_fn(ctx) is False # call 3 blocked + + def test_cedar_deny_blocks_tool(self): + """Cedar evaluator deny blocks tool call.""" + evaluator = MagicMock() + evaluator.evaluate.return_value = MagicMock(allowed=False, reason="Cedar denied") + kernel = CrewAIKernel(GovernancePolicy(), evaluator=evaluator) + kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context() + result = hook_fn(ctx) + assert result is False + + def test_no_policy_restrictions_allows_all(self): + """Default policy allows all tools.""" + kernel = CrewAIKernel(GovernancePolicy()) + kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context(tool_name="anything") + result = hook_fn(ctx) + assert result is None + + +# ═══════════════════════════════════════════════════════════════════ +# Test after_tool_call +# ═══════════════════════════════════════════════════════════════════ + +class TestAfterToolCall: + """Tests for after_tool_call governance hook.""" + + def test_clean_output_passes(self): + """Tool output without blocked patterns passes.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["SECRET"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_tool_call"][0] + ctx = _make_tool_context(tool_result="normal result") + result = hook_fn(ctx) + assert result is None + + def test_blocked_pattern_in_output_raises(self): + """Blocked pattern in tool output raises PolicyViolationError.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["SECRET"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_tool_call"][0] + ctx = _make_tool_context(tool_result="Contains SECRET data") + with pytest.raises(PolicyViolationError, match="SECRET"): + hook_fn(ctx) + + def test_none_output_passes(self): + """None tool result is allowed.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["bad"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_tool_call"][0] + ctx = _make_tool_context(tool_result=None) + result = hook_fn(ctx) + assert result is None + + def test_non_string_output_passes(self): + """Non-string tool result is passed through.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["bad"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_tool_call"][0] + ctx = _make_tool_context(tool_result=42) + result = hook_fn(ctx) + assert result is None + + +# ═══════════════════════════════════════════════════════════════════ +# Test before_llm_call +# ═══════════════════════════════════════════════════════════════════ + +class TestBeforeLLMCall: + """Tests for before_llm_call governance hook.""" + + def test_clean_messages_pass(self): + """Messages without blocked patterns pass.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["hack"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_llm_call"][0] + ctx = _make_llm_context(messages=[{"role": "user", "content": "Hello world"}]) + result = hook_fn(ctx) + assert result is None + + def test_blocked_pattern_in_message_content(self): + """Blocked pattern in message content blocks LLM call.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["hack"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_llm_call"][0] + ctx = _make_llm_context(messages=[{"role": "user", "content": "try to hack the system"}]) + result = hook_fn(ctx) + assert result is False + + def test_blocked_pattern_in_string_message(self): + """Blocked pattern in a plain string message blocks LLM call.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["DROP"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_llm_call"][0] + ctx = _make_llm_context(messages=["DROP TABLE users"]) + result = hook_fn(ctx) + assert result is False + + def test_empty_messages_pass(self): + """Empty message list is allowed.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["bad"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_llm_call"][0] + ctx = _make_llm_context(messages=[]) + result = hook_fn(ctx) + assert result is None + + def test_none_messages_pass(self): + """None messages are allowed.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["bad"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_llm_call"][0] + ctx = _make_llm_context(messages=None) + result = hook_fn(ctx) + assert result is None + + def test_cedar_deny_blocks_llm_input(self): + """Cedar evaluator deny blocks LLM call.""" + evaluator = MagicMock() + evaluator.evaluate.return_value = MagicMock(allowed=False, reason="Cedar denied") + kernel = CrewAIKernel(GovernancePolicy(), evaluator=evaluator) + kernel.as_hooks() + hook_fn = _registered_hooks["before_llm_call"][0] + ctx = _make_llm_context(messages=[{"role": "user", "content": "Hello"}]) + result = hook_fn(ctx) + assert result is False + + def test_message_with_object_content(self): + """Message objects with .content attribute are scanned.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["FORBIDDEN"])) + kernel.as_hooks() + hook_fn = _registered_hooks["before_llm_call"][0] + msg = MagicMock() + msg.content = "This is FORBIDDEN content" + ctx = _make_llm_context(messages=[msg]) + result = hook_fn(ctx) + assert result is False + + +# ═══════════════════════════════════════════════════════════════════ +# Test after_llm_call +# ═══════════════════════════════════════════════════════════════════ + +class TestAfterLLMCall: + """Tests for after_llm_call governance hook.""" + + def test_clean_response_passes(self): + """Clean LLM response passes through.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["SECRET"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_llm_call"][0] + ctx = _make_llm_context(response="Normal response text") + result = hook_fn(ctx) + assert result is None + + def test_blocked_pattern_in_response_raises(self): + """Blocked pattern in LLM response raises PolicyViolationError.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["SECRET"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_llm_call"][0] + ctx = _make_llm_context(response="Contains SECRET data") + with pytest.raises(PolicyViolationError, match="SECRET"): + hook_fn(ctx) + + def test_none_response_passes(self): + """None response is allowed.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["bad"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_llm_call"][0] + ctx = _make_llm_context(response=None) + result = hook_fn(ctx) + assert result is None + + def test_empty_response_passes(self): + """Empty/whitespace response is allowed.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["bad"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_llm_call"][0] + ctx = _make_llm_context(response=" ") + result = hook_fn(ctx) + assert result is None + + def test_non_string_response_passes(self): + """Non-string response is passed through.""" + kernel = CrewAIKernel(GovernancePolicy(blocked_patterns=["bad"])) + kernel.as_hooks() + hook_fn = _registered_hooks["after_llm_call"][0] + ctx = _make_llm_context(response=42) + result = hook_fn(ctx) + assert result is None + + +# ═══════════════════════════════════════════════════════════════════ +# Test as_hooks() Integration +# ═══════════════════════════════════════════════════════════════════ + +class TestAsHooksIntegration: + """Tests for the as_hooks() factory and integration patterns.""" + + def test_tool_then_llm_flow(self): + """Full flow: tool call followed by LLM call, both governed.""" + kernel = CrewAIKernel(GovernancePolicy( + blocked_patterns=["DANGER"], + allowed_tools=["search"], + )) + kernel.as_hooks() + + bt_fn = _registered_hooks["before_tool_call"][0] + at_fn = _registered_hooks["after_tool_call"][0] + bl_fn = _registered_hooks["before_llm_call"][0] + al_fn = _registered_hooks["after_llm_call"][0] + + # Tool call OK + tool_ctx = _make_tool_context(tool_name="search") + assert bt_fn(tool_ctx) is None + tool_ctx.tool_result = "safe result" + assert at_fn(tool_ctx) is None + + # LLM call OK + llm_ctx = _make_llm_context( + messages=[{"role": "user", "content": "summarize results"}], + response="Here is the summary", + ) + assert bl_fn(llm_ctx) is None + assert al_fn(llm_ctx) is None + + def test_cedar_evaluator_passed_through(self): + """Cedar evaluator on kernel is used for tool pre_execute.""" + evaluator = MagicMock() + evaluator.evaluate.return_value = MagicMock(allowed=True, reason="") + kernel = CrewAIKernel(GovernancePolicy(), evaluator=evaluator) + kernel.as_hooks() + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context() + result = hook_fn(ctx) + assert result is None # permitted + + def test_multiple_hooks_independent(self): + """Multiple as_hooks() calls create independent registrations.""" + k1 = CrewAIKernel(GovernancePolicy(allowed_tools=["a"])) + k2 = CrewAIKernel(GovernancePolicy(allowed_tools=["b"])) + k1.as_hooks(name="h1") + k2.as_hooks(name="h2") + # Both should be registered (2 each) + assert len(_registered_hooks["before_tool_call"]) == 2 + + def test_shared_kernel_state(self): + """Multiple hooks from same kernel share call_count.""" + kernel = CrewAIKernel(GovernancePolicy()) + hooks1 = kernel.as_hooks(name="h1") + hook_fn = _registered_hooks["before_tool_call"][0] + ctx = _make_tool_context() + hook_fn(ctx) + assert hooks1.context.call_count == 1 + + +# ═══════════════════════════════════════════════════════════════════ +# Test Deprecation Warnings +# ═══════════════════════════════════════════════════════════════════ + +class TestDeprecationWarnings: + """Tests that wrap() emit DeprecationWarning.""" + + def test_wrap_emits_deprecation_warning(self): + """CrewAIKernel.wrap() emits a DeprecationWarning.""" + kernel = CrewAIKernel(GovernancePolicy()) + crew = MagicMock() + crew.id = "test-crew" + crew.agents = [] + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + kernel.wrap(crew) + deprecation_warnings = [x for x in w if issubclass(x.category, DeprecationWarning)] + assert len(deprecation_warnings) >= 1 + assert "as_hooks()" in str(deprecation_warnings[0].message) + + def test_module_wrap_emits_deprecation_warning(self): + """Module-level wrap() emits a DeprecationWarning.""" + from agent_os.integrations.crewai_adapter import wrap + crew = MagicMock() + crew.id = "test-crew" + crew.agents = [] + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + wrap(crew) + deprecation_warnings = [x for x in w if issubclass(x.category, DeprecationWarning)] + assert len(deprecation_warnings) >= 1 + assert "as_hooks()" in str(deprecation_warnings[0].message) + + +# ═══════════════════════════════════════════════════════════════════ +# Test Backward Compatibility +# ═══════════════════════════════════════════════════════════════════ + +class TestBackwardCompatibility: + """Tests that the legacy wrap() API still works.""" + + def test_wrap_kickoff_still_works(self): + """Legacy wrap() + kickoff() still returns results.""" + kernel = CrewAIKernel(GovernancePolicy()) + crew = MagicMock() + crew.id = "crew-42" + crew.kickoff.return_value = "crew-result" + crew.agents = [] + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + governed = kernel.wrap(crew) + result = governed.kickoff({"topic": "AI"}) + assert result == "crew-result" + + def test_wrap_blocks_on_policy_violation(self): + """Legacy wrap() blocks on blocked pattern.""" + policy = GovernancePolicy(blocked_patterns=["hack"]) + kernel = CrewAIKernel(policy) + crew = MagicMock() + crew.id = "crew-42" + crew.agents = [] + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + governed = kernel.wrap(crew) + with pytest.raises(PolicyViolationError): + governed.kickoff({"input": "hack the system"}) + + def test_unwrap_still_works(self): + """unwrap() returns the original crew object.""" + kernel = CrewAIKernel(GovernancePolicy()) + crew = MagicMock() + crew.id = "crew-42" + crew.agents = [] + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + governed = kernel.wrap(crew) + assert kernel.unwrap(governed) is crew + +