Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ Blocking-IO runtime gate (`tests/blocking_io/`):
`tests/support/detectors/blocking_io_runtime.py`). Any sync blocking IO
call whose stack passes through DeerFlow business code while running on
the asyncio event loop raises `BlockingError` and fails the test.
- Two regression anchors live there: `test_skills_load.py` (locks the
- Regression anchors live there: `test_skills_load.py` (locks the
`asyncio.to_thread` offload around `LocalSkillStorage.load_skills`, fix
for #1917) and `test_sqlite_lifespan.py` (locks the offload around
SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912).
for #1917); `test_sqlite_lifespan.py` (locks the offload around
SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912);
and `test_uploads_middleware.py` (locks `UploadsMiddleware.abefore_agent`
offloading the uploads-directory scan off the event loop).
- `test_gate_smoke.py` is a meta-test asserting the gate actually catches
unoffloaded blocking IO and that the `@pytest.mark.allow_blocking_io`
opt-out works.
Expand Down
5 changes: 4 additions & 1 deletion backend/docs/BLOCKING_IO_DETECTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,14 @@ that production actually executes.

## Current runtime coverage

The initial runtime anchors protect confirmed blocking-IO bug shapes:
The runtime anchors protect confirmed blocking-IO bug shapes:

- SQLite checkpointer setup, including path resolution and parent-directory
creation.
- Subagent skill metadata loading through `SubagentExecutor._load_skills()`.
- `UploadsMiddleware.before_agent` uploads-directory scan: a sync-only middleware
hook runs on the event loop under async graph execution, so the scan is
offloaded via `abefore_agent` + `run_in_executor`.
- Gate health checks: Blockbuster catches unoffloaded calls, opt-out works, and
patches are restored after exceptions.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langchain_core.runnables import run_in_executor
from langgraph.runtime import Runtime

from deerflow.config.paths import Paths, get_paths
Expand Down Expand Up @@ -293,3 +294,16 @@ def before_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict
"uploaded_files": new_files,
"messages": messages,
}

@override
async def abefore_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict | None:
"""Async hook that offloads the synchronous uploads scan off the event loop.

``before_agent`` performs blocking filesystem IO (directory enumeration,
``stat``, reading sibling ``.md`` outlines). When the graph runs async,
langgraph would otherwise execute the sync hook directly on the event
loop, so it is dispatched to a worker thread via ``run_in_executor``.
``run_in_executor`` copies the current context, so the ``user_id``
contextvar read by ``get_effective_user_id()`` is preserved.
"""
return await run_in_executor(None, self.before_agent, state, runtime)
56 changes: 56 additions & 0 deletions backend/tests/blocking_io/test_uploads_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Regression anchor: UploadsMiddleware must not block the event loop.

``before_agent`` scans the thread uploads directory (``exists`` / ``iterdir`` /
``stat`` plus reading sibling ``.md`` outlines). LangChain wires a sync-only
``before_agent`` as ``RunnableCallable(before_agent, None)``; langgraph's
``ainvoke`` runs it directly on the event loop when ``afunc is None``. So the
filesystem scan must be offloaded (the middleware provides ``abefore_agent``).

This anchor drives the real ``create_agent`` graph via ``ainvoke`` under the
strict Blockbuster gate. If the scan regresses back onto the event loop,
Blockbuster raises ``BlockingError`` and this test fails.

The graph/middleware construction is offloaded with ``asyncio.to_thread`` only
because ``Paths.__init__`` resolves paths synchronously; the surface under test
(``before_agent``'s directory scan) is exercised on the event loop, not
bypassed.
"""

from __future__ import annotations

import asyncio
from pathlib import Path

import pytest
from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel
from langchain_core.messages import AIMessage, HumanMessage

pytestmark = pytest.mark.asyncio


class _FakeModel(FakeMessagesListChatModel):
"""FakeMessagesListChatModel with a no-op ``bind_tools`` for create_agent."""

def bind_tools(self, tools, **kwargs): # type: ignore[override]
return self


async def test_before_agent_uploads_scan_does_not_block_event_loop(tmp_path: Path) -> None:
from langchain.agents import create_agent

from deerflow.agents.middlewares.uploads_middleware import UploadsMiddleware
from deerflow.runtime.user_context import get_effective_user_id

mw = await asyncio.to_thread(UploadsMiddleware, str(tmp_path))
uploads_dir = await asyncio.to_thread(mw._paths.sandbox_uploads_dir, "t1", user_id=get_effective_user_id())
uploads_dir.mkdir(parents=True, exist_ok=True) # test-side seeding (not in scanned_modules)
(uploads_dir / "existing.txt").write_text("hello", encoding="utf-8")

agent = await asyncio.to_thread(lambda: create_agent(model=_FakeModel(responses=[AIMessage(content="ok")]), tools=[], middleware=[mw]))

result = await agent.ainvoke(
{"messages": [HumanMessage(content="hi")]},
{"configurable": {"thread_id": "t1"}},
)

assert result["messages"]
Loading