Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions backend/app/gateway/routers/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,21 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
and removes the thread_meta row from the configured ThreadMetaStore
(sqlite or memory).
"""
from app.gateway.deps import get_thread_store
from app.gateway.deps import get_feedback_repo, get_run_event_store, get_run_store, get_thread_store

user_id = get_effective_user_id()

# Remove metadata before irreversible cleanup so failures do not leave
# clients retrying after local data has already been removed.
try:
thread_store = get_thread_store(request)
await thread_store.delete(thread_id)
except Exception:
logger.exception("Failed to delete thread_meta for %s", sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to delete thread metadata.")

# Clean local filesystem
response = _delete_thread_data(thread_id, user_id=get_effective_user_id())
response = _delete_thread_data(thread_id, user_id=user_id)
Comment thread
LittleChenLiya marked this conversation as resolved.

# Remove checkpoints (best-effort)
checkpointer = getattr(request.app.state, "checkpointer", None)
Expand All @@ -232,13 +243,23 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
except Exception:
logger.debug("Could not delete checkpoints for thread %s (not critical)", sanitize_log_param(thread_id))

# Remove thread_meta row (best-effort) — required for sqlite backend
# so the deleted thread no longer appears in /threads/search.
try:
thread_store = get_thread_store(request)
await thread_store.delete(thread_id)
event_store = get_run_event_store(request)
await event_store.delete_by_thread(thread_id, user_id=user_id)
except Exception:
logger.debug("Could not delete run_events for thread %s (not critical)", sanitize_log_param(thread_id))

try:
feedback_repo = get_feedback_repo(request)
await feedback_repo.delete_by_thread(thread_id, user_id=user_id)
except Exception:
logger.debug("Could not delete feedback for thread %s (not critical)", sanitize_log_param(thread_id))

try:
run_store = get_run_store(request)
await run_store.delete_by_thread(thread_id, user_id=user_id)
except Exception:
logger.debug("Could not delete thread_meta for %s (not critical)", sanitize_log_param(thread_id))
logger.debug("Could not delete runs for thread %s (not critical)", sanitize_log_param(thread_id))
Comment thread
LittleChenLiya marked this conversation as resolved.

return response

Expand Down
21 changes: 20 additions & 1 deletion backend/packages/harness/deerflow/persistence/feedback/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
from datetime import UTC, datetime

from sqlalchemy import case, func, select
from sqlalchemy import case, delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from deerflow.persistence.feedback.model import FeedbackRow
Expand Down Expand Up @@ -187,6 +187,25 @@ async def delete_by_run(
await session.commit()
return True

async def delete_by_thread(
self,
thread_id: str,
*,
user_id: str | None | _AutoSentinel = AUTO,
) -> int:
"""Delete feedback records for a thread. Return the number of deleted records."""
resolved_user_id = resolve_user_id(user_id, method_name="FeedbackRepository.delete_by_thread")
async with self._sf() as session:
conditions = [FeedbackRow.thread_id == thread_id]
if resolved_user_id is not None:
conditions.append(FeedbackRow.user_id == resolved_user_id)
count_stmt = select(func.count()).select_from(FeedbackRow).where(*conditions)
count = await session.scalar(count_stmt) or 0
if count > 0:
await session.execute(delete(FeedbackRow).where(*conditions))
await session.commit()
return count

async def list_by_thread_grouped(
self,
thread_id: str,
Expand Down
17 changes: 16 additions & 1 deletion backend/packages/harness/deerflow/persistence/run/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import UTC, datetime
from typing import Any

from sqlalchemy import func, select, update
from sqlalchemy import delete, func, select, update
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from deerflow.persistence.run.model import RunRow
Expand Down Expand Up @@ -186,6 +186,21 @@ async def delete(
await session.delete(row)
await session.commit()

async def delete_by_thread(
self,
thread_id,
*,
user_id: str | None | _AutoSentinel = AUTO,
) -> int:
resolved_user_id = resolve_user_id(user_id, method_name="RunRepository.delete_by_thread")
async with self._sf() as session:
conditions = [RunRow.thread_id == thread_id]
if resolved_user_id is not None:
conditions.append(RunRow.user_id == resolved_user_id)
result = await session.execute(delete(RunRow).where(*conditions))
await session.commit()
return result.rowcount or 0

async def list_pending(self, *, before=None):
if before is None:
before_dt = datetime.now(UTC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ async def count_messages(self, thread_id: str) -> int:
"""Count displayable messages (category=message) in a thread."""

@abc.abstractmethod
async def delete_by_thread(self, thread_id: str) -> int:
async def delete_by_thread(self, thread_id: str, *, user_id: str | None = None) -> int:
"""Delete all events for a thread. Return the number of deleted events."""

@abc.abstractmethod
async def delete_by_run(self, thread_id: str, run_id: str) -> int:
async def delete_by_run(self, thread_id: str, run_id: str, *, user_id: str | None = None) -> int:
"""Delete all events for a specific run. Return the number of deleted events."""
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def count_messages(self, thread_id):
all_events = await asyncio.to_thread(self._read_thread_events, thread_id)
return sum(1 for e in all_events if e.get("category") == "message")

async def delete_by_thread(self, thread_id):
async def delete_by_thread(self, thread_id, *, user_id=None):
async with self._get_write_lock(thread_id):
all_events = await asyncio.to_thread(self._read_thread_events, thread_id)
count = len(all_events)
Expand All @@ -210,7 +210,7 @@ async def delete_by_thread(self, thread_id):
self._write_locks.pop(thread_id, None)
return count

async def delete_by_run(self, thread_id, run_id):
async def delete_by_run(self, thread_id, run_id, *, user_id=None):
async with self._get_write_lock(thread_id):
events = await asyncio.to_thread(self._read_run_events, thread_id, run_id)
count = len(events)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ async def count_messages(self, thread_id):
all_events = self._events.get(thread_id, [])
return sum(1 for e in all_events if e["category"] == "message")

async def delete_by_thread(self, thread_id):
async def delete_by_thread(self, thread_id, *, user_id=None):
events = self._events.pop(thread_id, [])
self._seq_counters.pop(thread_id, None)
return len(events)

async def delete_by_run(self, thread_id, run_id):
async def delete_by_run(self, thread_id, run_id, *, user_id=None):
all_events = self._events.get(thread_id, [])
if not all_events:
return 0
Expand Down
5 changes: 5 additions & 0 deletions backend/packages/harness/deerflow/runtime/runs/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ async def update_status(
async def delete(self, run_id: str) -> None:
pass

@abc.abstractmethod
async def delete_by_thread(self, thread_id: str, *, user_id: str | None = None) -> int:
"""Delete all runs for a thread. Return the number of deleted runs."""
pass

@abc.abstractmethod
async def update_model_name(
self,
Expand Down
16 changes: 16 additions & 0 deletions backend/packages/harness/deerflow/runtime/runs/store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ async def update_model_name(self, run_id, model_name):
async def delete(self, run_id):
self._runs.pop(run_id, None)

async def delete_by_thread(self, thread_id, *, user_id=None):
def matches_thread(run):
if run["thread_id"] != thread_id:
return False
if user_id is None:
return True
return run.get("user_id") == user_id

run_ids = []
for run_id, run in self._runs.items():
if matches_thread(run):
run_ids.append(run_id)
for run_id in run_ids:
self._runs.pop(run_id, None)
return len(run_ids)

async def update_run_completion(self, run_id, *, status, **kwargs):
if run_id in self._runs:
self._runs[run_id]["status"] = status
Expand Down
1 change: 1 addition & 0 deletions backend/tests/_router_auth_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def make_authed_test_app(

repo = MagicMock()
repo.check_access = AsyncMock(return_value=owner_check_passes)
repo.delete = AsyncMock(return_value=None)
app.state.thread_store = repo

return app
Expand Down
48 changes: 45 additions & 3 deletions backend/tests/test_feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,33 @@ async def test_delete_by_run_nonexistent(self, tmp_path):
assert deleted is False
await _cleanup()

@pytest.mark.anyio
async def test_delete_by_thread(self, tmp_path):
repo = await _make_feedback_repo(tmp_path)
await repo.create(thread_id="t1", run_id="r1", rating=1, user_id="u1")
await repo.create(thread_id="t1", run_id="r2", rating=-1, user_id="u1")
await repo.create(thread_id="t2", run_id="r3", rating=1, user_id="u1")

deleted = await repo.delete_by_thread("t1", user_id="u1")

assert deleted == 2
assert await repo.list_by_thread("t1", user_id="u1") == []
assert len(await repo.list_by_thread("t2", user_id="u1")) == 1
await _cleanup()

@pytest.mark.anyio
async def test_delete_by_thread_owner_filter(self, tmp_path):
repo = await _make_feedback_repo(tmp_path)
await repo.create(thread_id="t1", run_id="r1", rating=1, user_id="u1")
await repo.create(thread_id="t1", run_id="r2", rating=-1, user_id="u2")

deleted = await repo.delete_by_thread("t1", user_id="u1")

assert deleted == 1
assert await repo.list_by_thread("t1", user_id="u1") == []
assert len(await repo.list_by_thread("t1", user_id="u2")) == 1
await _cleanup()

@pytest.mark.anyio
async def test_list_by_thread_grouped(self, tmp_path):
repo = await _make_feedback_repo(tmp_path)
Expand Down Expand Up @@ -269,8 +296,18 @@ async def test_follow_up_auto_detection_logic(self):
from deerflow.runtime.runs.store.memory import MemoryRunStore

store = MemoryRunStore()
await store.put("r1", thread_id="t1", status="success")
await store.put("r2", thread_id="t1", status="error")
await store.put(
"r1",
thread_id="t1",
status="success",
created_at="2026-01-01T00:00:00+00:00",
)
await store.put(
"r2",
thread_id="t1",
status="error",
created_at="2026-01-01T00:00:01+00:00",
)

# Auto-detect: list_by_thread returns newest first
recent = await store.list_by_thread("t1", limit=1)
Expand All @@ -281,7 +318,12 @@ async def test_follow_up_auto_detection_logic(self):
assert follow_up is None

# Now add a successful run
await store.put("r3", thread_id="t1", status="success")
await store.put(
"r3",
thread_id="t1",
status="success",
created_at="2026-01-01T00:00:02+00:00",
)
recent = await store.list_by_thread("t1", limit=1)
follow_up = None
if recent and recent[0].get("status") == "success":
Expand Down
20 changes: 20 additions & 0 deletions backend/tests/test_persistence_scaffold.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,26 @@ async def test_delete(self, store):
await store.delete("r1")
assert await store.get("r1") is None

@pytest.mark.anyio
async def test_delete_by_thread(self, store):
await store.put("r1", thread_id="t1")
await store.put("r2", thread_id="t1")
await store.put("r3", thread_id="t2")

assert await store.delete_by_thread("t1") == 2
assert await store.get("r1") is None
assert await store.get("r2") is None
assert await store.get("r3") is not None

@pytest.mark.anyio
async def test_delete_by_thread_owner_filter(self, store):
await store.put("r1", thread_id="t1", user_id="alice")
await store.put("r2", thread_id="t1", user_id="bob")

assert await store.delete_by_thread("t1", user_id="alice") == 1
assert await store.get("r1") is None
assert await store.get("r2") is not None

@pytest.mark.anyio
async def test_delete_nonexistent_is_noop(self, store):
await store.delete("nope") # should not raise
Expand Down
27 changes: 27 additions & 0 deletions backend/tests/test_run_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ async def update_status(self, *args, **kwargs):
async def delete(self, *args, **kwargs):
return None

async def delete_by_thread(self, *args, **kwargs):
return 0

async def update_model_name(self, *args, **kwargs):
return None

Expand Down Expand Up @@ -153,6 +156,30 @@ async def test_delete(self, tmp_path):
assert await repo.get("r1") is None
await _cleanup()

@pytest.mark.anyio
async def test_delete_by_thread(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1")
await repo.put("r2", thread_id="t1")
await repo.put("r3", thread_id="t2")
deleted = await repo.delete_by_thread("t1")
assert deleted == 2
assert await repo.get("r1") is None
assert await repo.get("r2") is None
assert await repo.get("r3") is not None
await _cleanup()

@pytest.mark.anyio
async def test_delete_by_thread_owner_filter(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1", user_id="alice")
await repo.put("r2", thread_id="t1", user_id="bob")
deleted = await repo.delete_by_thread("t1", user_id="alice")
assert deleted == 1
assert await repo.get("r1", user_id=None) is None
assert await repo.get("r2", user_id=None) is not None
await _cleanup()

@pytest.mark.anyio
async def test_delete_nonexistent_is_noop(self, tmp_path):
repo = await _make_repo(tmp_path)
Expand Down
3 changes: 3 additions & 0 deletions backend/tests/test_runtime_lifecycle_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class _RunController:

def __init__(self) -> None:
self.started = threading.Event()
self.blocked = threading.Event()
self.checkpoint_written = threading.Event()
self.cancelled = threading.Event()
self.release = threading.Event()
Expand Down Expand Up @@ -110,6 +111,7 @@ async def astream(self, graph_input, config=None, stream_mode=None, subgraphs=Fa
yield _stream_item_for_mode(stream_mode, state)

if self.block_after_first_chunk:
self.controller.blocked.set()
while not self.controller.release.is_set():
await asyncio.sleep(0.05)
except asyncio.CancelledError:
Expand Down Expand Up @@ -565,6 +567,7 @@ def test_cancel_interrupt_stops_running_background_run(isolated_app):
assert created.status_code == 200, created.text
run_id = created.json()["run_id"]
assert controller.started.wait(5), "fake agent never started"
assert controller.blocked.wait(5), "fake agent never reached blocking section"

cancelled = client.post(
f"/api/threads/{thread_id}/runs/{run_id}/cancel?wait=true&action=interrupt",
Expand Down
Loading
Loading