diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 6ab5b69e7c..c3cbf23905 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -126,8 +126,10 @@ Blocking-IO runtime gate (`tests/blocking_io/`): `asyncio.to_thread` offload around `LocalSkillStorage.load_skills`, fix for #1917); `test_sqlite_lifespan.py` (locks the offload around SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912); - and `test_jsonl_run_event_store.py` (locks `JsonlRunEventStore`'s async - API offloading its file IO via `asyncio.to_thread`, fix #3084). + `test_jsonl_run_event_store.py` (locks `JsonlRunEventStore`'s async + API offloading its file IO via `asyncio.to_thread`, fix #3084); and + `test_uploads_middleware.py` (locks `UploadsMiddleware.abefore_agent` + offloading the uploads-directory scan off the event loop). - `test_gate_smoke.py` is a meta-test asserting the gate actually catches unoffloaded blocking IO and that the `@pytest.mark.allow_blocking_io` opt-out works. diff --git a/backend/docs/BLOCKING_IO_DETECTION.md b/backend/docs/BLOCKING_IO_DETECTION.md index b874020fa4..65fc43ce52 100644 --- a/backend/docs/BLOCKING_IO_DETECTION.md +++ b/backend/docs/BLOCKING_IO_DETECTION.md @@ -144,6 +144,9 @@ The runtime anchors protect confirmed blocking-IO bug shapes: (fix #3084); this anchor drives the real async API under the gate so any blocking IO reintroduced on the loop fails, not only removal of one `to_thread` call. +- `UploadsMiddleware.before_agent` uploads-directory scan: a sync-only middleware + hook runs on the event loop under async graph execution, so the scan is + offloaded via `abefore_agent` + `run_in_executor`. - Gate health checks: Blockbuster catches unoffloaded calls, opt-out works, and patches are restored after exceptions. diff --git a/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py index 5a9ee8301e..61e8220782 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py @@ -7,6 +7,7 @@ from langchain.agents import AgentState from langchain.agents.middleware import AgentMiddleware from langchain_core.messages import HumanMessage +from langchain_core.runnables import run_in_executor from langgraph.runtime import Runtime from deerflow.config.paths import Paths, get_paths @@ -293,3 +294,16 @@ def before_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict "uploaded_files": new_files, "messages": messages, } + + @override + async def abefore_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict | None: + """Async hook that offloads the synchronous uploads scan off the event loop. + + ``before_agent`` performs blocking filesystem IO (directory enumeration, + ``stat``, reading sibling ``.md`` outlines). When the graph runs async, + langgraph would otherwise execute the sync hook directly on the event + loop, so it is dispatched to a worker thread via ``run_in_executor``. + ``run_in_executor`` copies the current context, so the ``user_id`` + contextvar read by ``get_effective_user_id()`` is preserved. + """ + return await run_in_executor(None, self.before_agent, state, runtime) diff --git a/backend/tests/blocking_io/test_uploads_middleware.py b/backend/tests/blocking_io/test_uploads_middleware.py new file mode 100644 index 0000000000..35d3ec0098 --- /dev/null +++ b/backend/tests/blocking_io/test_uploads_middleware.py @@ -0,0 +1,56 @@ +"""Regression anchor: UploadsMiddleware must not block the event loop. + +``before_agent`` scans the thread uploads directory (``exists`` / ``iterdir`` / +``stat`` plus reading sibling ``.md`` outlines). LangChain wires a sync-only +``before_agent`` as ``RunnableCallable(before_agent, None)``; langgraph's +``ainvoke`` runs it directly on the event loop when ``afunc is None``. So the +filesystem scan must be offloaded (the middleware provides ``abefore_agent``). + +This anchor drives the real ``create_agent`` graph via ``ainvoke`` under the +strict Blockbuster gate. If the scan regresses back onto the event loop, +Blockbuster raises ``BlockingError`` and this test fails. + +The graph/middleware construction is offloaded with ``asyncio.to_thread`` only +because ``Paths.__init__`` resolves paths synchronously; the surface under test +(``before_agent``'s directory scan) is exercised on the event loop, not +bypassed. +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +import pytest +from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel +from langchain_core.messages import AIMessage, HumanMessage + +pytestmark = pytest.mark.asyncio + + +class _FakeModel(FakeMessagesListChatModel): + """FakeMessagesListChatModel with a no-op ``bind_tools`` for create_agent.""" + + def bind_tools(self, tools, **kwargs): # type: ignore[override] + return self + + +async def test_before_agent_uploads_scan_does_not_block_event_loop(tmp_path: Path) -> None: + from langchain.agents import create_agent + + from deerflow.agents.middlewares.uploads_middleware import UploadsMiddleware + from deerflow.runtime.user_context import get_effective_user_id + + mw = await asyncio.to_thread(UploadsMiddleware, str(tmp_path)) + uploads_dir = await asyncio.to_thread(mw._paths.sandbox_uploads_dir, "t1", user_id=get_effective_user_id()) + uploads_dir.mkdir(parents=True, exist_ok=True) # test-side seeding (not in scanned_modules) + (uploads_dir / "existing.txt").write_text("hello", encoding="utf-8") + + agent = await asyncio.to_thread(lambda: create_agent(model=_FakeModel(responses=[AIMessage(content="ok")]), tools=[], middleware=[mw])) + + result = await agent.ainvoke( + {"messages": [HumanMessage(content="hi")]}, + {"configurable": {"thread_id": "t1"}}, + ) + + assert result["messages"]