Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions backend/app/gateway/routers/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from app.gateway.authz import require_permission
from app.gateway.deps import get_checkpointer, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge
from app.gateway.routers.thread_runs import RunCreateRequest
from app.gateway.routers.thread_runs import RunCreateRequest, trim_run_message_page
Comment thread
LittleChenLiya marked this conversation as resolved.
Outdated
from app.gateway.services import sse_consumer, start_run
from deerflow.runtime import serialize_channel_values

Expand Down Expand Up @@ -129,8 +129,7 @@ async def run_messages(
before_seq=before_seq,
after_seq=after_seq,
)
has_more = len(rows) > limit
data = rows[:limit] if has_more else rows
data, has_more = trim_run_message_page(rows, limit=limit, after_seq=after_seq)
return {"data": data, "has_more": has_more}


Expand Down
15 changes: 13 additions & 2 deletions backend/app/gateway/routers/thread_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
# ---------------------------------------------------------------------------


def trim_run_message_page(rows: list[dict], *, limit: int, after_seq: int | None) -> tuple[list[dict], bool]:
"""Trim a limit+1 run message page without dropping the visible boundary row."""
has_more = len(rows) > limit
if not has_more:
return rows, False

if after_seq is not None:
return rows[:limit], True

return rows[-limit:], True
Comment thread
LittleChenLiya marked this conversation as resolved.
Outdated


class RunCreateRequest(BaseModel):
assistant_id: str | None = Field(default=None, description="Agent / assistant to use")
input: dict[str, Any] | None = Field(default=None, description="Graph input (e.g. {messages: [...]})")
Expand Down Expand Up @@ -396,8 +408,7 @@ async def list_run_messages(
before_seq=before_seq,
after_seq=after_seq,
)
has_more = len(rows) > limit
data = rows[:limit] if has_more else rows
data, has_more = trim_run_message_page(rows, limit=limit, after_seq=after_seq)
return {"data": data, "has_more": has_more}


Expand Down
49 changes: 49 additions & 0 deletions backend/tests/test_runs_api_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,55 @@ def test_run_messages_has_more_true_when_extra_row_returned():
body = response.json()
assert body["has_more"] is True
assert len(body["data"]) == 50 # trimmed to limit
assert [m["seq"] for m in body["data"]] == list(range(2, 52))


def test_run_messages_default_page_keeps_newest_messages_when_extra_row_returned():
Comment thread
LittleChenLiya marked this conversation as resolved.
"""Default latest-page trimming drops the older sentinel row, not the newest message."""
rows = [_make_message(i) for i in range(16, 67)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
response = client.get("/api/runs/run-2/messages")
assert response.status_code == 200
body = response.json()
assert body["has_more"] is True
assert [m["seq"] for m in body["data"]] == list(range(17, 67))


def test_run_messages_before_seq_page_keeps_newest_side_when_extra_row_returned():
Comment thread
LittleChenLiya marked this conversation as resolved.
"""Backward pagination trims the older sentinel so adjacent pages do not miss the boundary message."""
rows = [_make_message(i) for i in range(1, 18)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
response = client.get("/api/runs/run-2/messages?before_seq=18&limit=16")
assert response.status_code == 200
body = response.json()
assert body["has_more"] is True
assert [m["seq"] for m in body["data"]] == list(range(2, 18))


def test_run_messages_after_seq_page_keeps_oldest_side_when_extra_row_returned():
Comment thread
LittleChenLiya marked this conversation as resolved.
"""Forward pagination still trims the newer sentinel row."""
rows = [_make_message(i) for i in range(11, 62)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
response = client.get("/api/runs/run-2/messages?after_seq=10")
assert response.status_code == 200
body = response.json()
assert body["has_more"] is True
assert [m["seq"] for m in body["data"]] == list(range(11, 61))


def test_run_messages_passes_after_seq_to_event_store():
Expand Down
37 changes: 37 additions & 0 deletions backend/tests/test_thread_run_messages_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,43 @@ def test_has_more_true_when_extra_row_returned():
body = response.json()
assert body["has_more"] is True
assert len(body["data"]) == 50 # trimmed to limit
assert [m["seq"] for m in body["data"]] == list(range(2, 52))


def test_default_page_keeps_newest_messages_when_extra_row_returned():
"""Default latest-page trimming drops the older sentinel row, not the newest message."""
rows = [_make_message(i) for i in range(16, 67)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
response = client.get("/api/threads/thread-2/runs/run-2/messages")
assert response.status_code == 200
body = response.json()
assert body["has_more"] is True
assert [m["seq"] for m in body["data"]] == list(range(17, 67))


def test_before_seq_page_keeps_newest_side_when_extra_row_returned():
"""Backward pagination trims the older sentinel so adjacent pages do not miss the boundary message."""
rows = [_make_message(i) for i in range(1, 18)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
response = client.get("/api/threads/thread-2/runs/run-2/messages?before_seq=18&limit=16")
assert response.status_code == 200
body = response.json()
assert body["has_more"] is True
assert [m["seq"] for m in body["data"]] == list(range(2, 18))


def test_after_seq_page_keeps_oldest_side_when_extra_row_returned():
"""Forward pagination still trims the newer sentinel row."""
rows = [_make_message(i) for i in range(11, 62)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
response = client.get("/api/threads/thread-2/runs/run-2/messages?after_seq=10")
assert response.status_code == 200
body = response.json()
assert body["has_more"] is True
assert [m["seq"] for m in body["data"]] == list(range(11, 61))


def test_after_seq_forwarded_to_event_store():
Expand Down
Loading