Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ Blocking-IO runtime gate (`tests/blocking_io/`):
`asyncio.to_thread` offload around `LocalSkillStorage.load_skills`, fix
for #1917); `test_sqlite_lifespan.py` (locks the offload around
SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912);
and `test_jsonl_run_event_store.py` (locks `JsonlRunEventStore`'s async
API offloading its file IO via `asyncio.to_thread`, fix #3084).
`test_jsonl_run_event_store.py` (locks `JsonlRunEventStore`'s async
API offloading its file IO via `asyncio.to_thread`, fix #3084); and
`test_uploads_middleware.py` (locks `UploadsMiddleware.abefore_agent`
offloading the uploads-directory scan off the event loop).
- `test_gate_smoke.py` is a meta-test asserting the gate actually catches
unoffloaded blocking IO and that the `@pytest.mark.allow_blocking_io`
opt-out works.
Expand Down
3 changes: 3 additions & 0 deletions backend/docs/BLOCKING_IO_DETECTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ The runtime anchors protect confirmed blocking-IO bug shapes:
(fix #3084); this anchor drives the real async API under the gate so any
blocking IO reintroduced on the loop fails, not only removal of one
`to_thread` call.
- `UploadsMiddleware.before_agent` uploads-directory scan: a sync-only middleware
hook runs on the event loop under async graph execution, so the scan is
offloaded via `abefore_agent` + `run_in_executor`.
- Gate health checks: Blockbuster catches unoffloaded calls, opt-out works, and
patches are restored after exceptions.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langchain_core.runnables import run_in_executor
from langgraph.runtime import Runtime

from deerflow.config.paths import Paths, get_paths
Expand Down Expand Up @@ -293,3 +294,16 @@ def before_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict
"uploaded_files": new_files,
"messages": messages,
}

@override
async def abefore_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict | None:
"""Async hook that offloads the synchronous uploads scan off the event loop.

``before_agent`` performs blocking filesystem IO (directory enumeration,
``stat``, reading sibling ``.md`` outlines). When the graph runs async,
langgraph would otherwise execute the sync hook directly on the event
loop, so it is dispatched to a worker thread via ``run_in_executor``.
``run_in_executor`` copies the current context, so the ``user_id``
contextvar read by ``get_effective_user_id()`` is preserved.
"""
return await run_in_executor(None, self.before_agent, state, runtime)
56 changes: 56 additions & 0 deletions backend/tests/blocking_io/test_uploads_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Regression anchor: UploadsMiddleware must not block the event loop.

``before_agent`` scans the thread uploads directory (``exists`` / ``iterdir`` /
``stat`` plus reading sibling ``.md`` outlines). LangChain wires a sync-only
``before_agent`` as ``RunnableCallable(before_agent, None)``; langgraph's
``ainvoke`` runs it directly on the event loop when ``afunc is None``. So the
filesystem scan must be offloaded (the middleware provides ``abefore_agent``).

This anchor drives the real ``create_agent`` graph via ``ainvoke`` under the
strict Blockbuster gate. If the scan regresses back onto the event loop,
Blockbuster raises ``BlockingError`` and this test fails.

The graph/middleware construction is offloaded with ``asyncio.to_thread`` only
because ``Paths.__init__`` resolves paths synchronously; the surface under test
(``before_agent``'s directory scan) is exercised on the event loop, not
bypassed.
"""

from __future__ import annotations

import asyncio
from pathlib import Path

import pytest
from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel
from langchain_core.messages import AIMessage, HumanMessage

pytestmark = pytest.mark.asyncio


class _FakeModel(FakeMessagesListChatModel):
"""FakeMessagesListChatModel with a no-op ``bind_tools`` for create_agent."""

def bind_tools(self, tools, **kwargs): # type: ignore[override]
return self


async def test_before_agent_uploads_scan_does_not_block_event_loop(tmp_path: Path) -> None:
from langchain.agents import create_agent

from deerflow.agents.middlewares.uploads_middleware import UploadsMiddleware
from deerflow.runtime.user_context import get_effective_user_id

mw = await asyncio.to_thread(UploadsMiddleware, str(tmp_path))
uploads_dir = await asyncio.to_thread(mw._paths.sandbox_uploads_dir, "t1", user_id=get_effective_user_id())
uploads_dir.mkdir(parents=True, exist_ok=True) # test-side seeding (not in scanned_modules)
(uploads_dir / "existing.txt").write_text("hello", encoding="utf-8")

agent = await asyncio.to_thread(lambda: create_agent(model=_FakeModel(responses=[AIMessage(content="ok")]), tools=[], middleware=[mw]))

result = await agent.ainvoke(
{"messages": [HumanMessage(content="hi")]},
{"configurable": {"thread_id": "t1"}},
)

assert result["messages"]
Loading