diff --git a/backend/app/gateway/pagination.py b/backend/app/gateway/pagination.py new file mode 100644 index 0000000000..8dd7270396 --- /dev/null +++ b/backend/app/gateway/pagination.py @@ -0,0 +1,15 @@ +"""Shared pagination helpers for gateway routers.""" + +from __future__ import annotations + + +def trim_run_message_page(rows: list[dict], *, limit: int, after_seq: int | None) -> tuple[list[dict], bool]: + """Trim a ``limit + 1`` run-message page while preserving page boundaries.""" + has_more = len(rows) > limit + if not has_more: + return rows, False + + if after_seq is not None: + return rows[:limit], True + + return rows[-limit:], True diff --git a/backend/app/gateway/routers/runs.py b/backend/app/gateway/routers/runs.py index 1e61ffd25d..a0aae03cd2 100644 --- a/backend/app/gateway/routers/runs.py +++ b/backend/app/gateway/routers/runs.py @@ -15,6 +15,7 @@ from app.gateway.authz import require_permission from app.gateway.deps import get_checkpointer, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge +from app.gateway.pagination import trim_run_message_page from app.gateway.routers.thread_runs import RunCreateRequest from app.gateway.services import sse_consumer, start_run, wait_for_run_completion from deerflow.runtime import serialize_channel_values @@ -129,8 +130,7 @@ async def run_messages( before_seq=before_seq, after_seq=after_seq, ) - has_more = len(rows) > limit - data = rows[:limit] if has_more else rows + data, has_more = trim_run_message_page(rows, limit=limit, after_seq=after_seq) return {"data": data, "has_more": has_more} diff --git a/backend/app/gateway/routers/thread_runs.py b/backend/app/gateway/routers/thread_runs.py index 9fc4dfa683..65bf0698f1 100644 --- a/backend/app/gateway/routers/thread_runs.py +++ b/backend/app/gateway/routers/thread_runs.py @@ -21,6 +21,7 @@ from app.gateway.authz import require_permission from app.gateway.deps import get_checkpointer, get_current_user, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge +from app.gateway.pagination import trim_run_message_page from app.gateway.services import sse_consumer, start_run, wait_for_run_completion from deerflow.runtime import RunRecord, RunStatus, serialize_channel_values @@ -402,8 +403,7 @@ async def list_run_messages( before_seq=before_seq, after_seq=after_seq, ) - has_more = len(rows) > limit - data = rows[:limit] if has_more else rows + data, has_more = trim_run_message_page(rows, limit=limit, after_seq=after_seq) return {"data": data, "has_more": has_more} diff --git a/backend/tests/_run_message_pagination_helpers.py b/backend/tests/_run_message_pagination_helpers.py new file mode 100644 index 0000000000..264a80a519 --- /dev/null +++ b/backend/tests/_run_message_pagination_helpers.py @@ -0,0 +1,16 @@ +from fastapi.testclient import TestClient + + +def assert_run_message_page( + client: TestClient, + url: str, + *, + expected_seq: list[int], + has_more: bool = True, +) -> None: + response = client.get(url) + + assert response.status_code == 200 + body = response.json() + assert body["has_more"] is has_more + assert [m["seq"] for m in body["data"]] == expected_seq diff --git a/backend/tests/test_runs_api_endpoints.py b/backend/tests/test_runs_api_endpoints.py index 1826e4d8ed..dfa5869c88 100644 --- a/backend/tests/test_runs_api_endpoints.py +++ b/backend/tests/test_runs_api_endpoints.py @@ -5,6 +5,7 @@ from unittest.mock import AsyncMock, MagicMock from _router_auth_helpers import make_authed_test_app +from _run_message_pagination_helpers import assert_run_message_page from fastapi.testclient import TestClient from app.gateway.routers import runs @@ -97,6 +98,51 @@ def test_run_messages_has_more_true_when_extra_row_returned(): body = response.json() assert body["has_more"] is True assert len(body["data"]) == 50 # trimmed to limit + assert [m["seq"] for m in body["data"]] == list(range(2, 52)) + + +def test_run_messages_default_page_keeps_newest_messages_when_extra_row_returned(): + """Default latest-page trimming drops the older sentinel row, not the newest message.""" + rows = [_make_message(i) for i in range(16, 67)] + run_record = {"run_id": "run-2", "thread_id": "thread-2"} + app = _make_app( + run_store=_make_run_store(run_record), + event_store=_make_event_store(rows), + ) + with TestClient(app) as client: + assert_run_message_page(client, "/api/runs/run-2/messages", expected_seq=list(range(17, 67))) + + +def test_run_messages_before_seq_page_keeps_newest_side_when_extra_row_returned(): + """Backward pagination trims the older sentinel so adjacent pages do not miss the boundary message.""" + rows = [_make_message(i) for i in range(1, 18)] + run_record = {"run_id": "run-2", "thread_id": "thread-2"} + app = _make_app( + run_store=_make_run_store(run_record), + event_store=_make_event_store(rows), + ) + with TestClient(app) as client: + assert_run_message_page( + client, + "/api/runs/run-2/messages?before_seq=18&limit=16", + expected_seq=list(range(2, 18)), + ) + + +def test_run_messages_after_seq_page_keeps_oldest_side_when_extra_row_returned(): + """Forward pagination still trims the newer sentinel row.""" + rows = [_make_message(i) for i in range(11, 62)] + run_record = {"run_id": "run-2", "thread_id": "thread-2"} + app = _make_app( + run_store=_make_run_store(run_record), + event_store=_make_event_store(rows), + ) + with TestClient(app) as client: + assert_run_message_page( + client, + "/api/runs/run-2/messages?after_seq=10", + expected_seq=list(range(11, 61)), + ) def test_run_messages_passes_after_seq_to_event_store(): diff --git a/backend/tests/test_thread_run_messages_pagination.py b/backend/tests/test_thread_run_messages_pagination.py index 9098e2b73f..6d36001ae9 100644 --- a/backend/tests/test_thread_run_messages_pagination.py +++ b/backend/tests/test_thread_run_messages_pagination.py @@ -6,6 +6,7 @@ from unittest.mock import AsyncMock, MagicMock from _router_auth_helpers import make_authed_test_app +from _run_message_pagination_helpers import assert_run_message_page from fastapi.testclient import TestClient from app.gateway.routers import thread_runs @@ -88,6 +89,43 @@ def test_has_more_true_when_extra_row_returned(): body = response.json() assert body["has_more"] is True assert len(body["data"]) == 50 # trimmed to limit + assert [m["seq"] for m in body["data"]] == list(range(2, 52)) + + +def test_default_page_keeps_newest_messages_when_extra_row_returned(): + """Default latest-page trimming drops the older sentinel row, not the newest message.""" + rows = [_make_message(i) for i in range(16, 67)] + app = _make_app(event_store=_make_event_store(rows)) + with TestClient(app) as client: + assert_run_message_page( + client, + "/api/threads/thread-2/runs/run-2/messages", + expected_seq=list(range(17, 67)), + ) + + +def test_before_seq_page_keeps_newest_side_when_extra_row_returned(): + """Backward pagination trims the older sentinel so adjacent pages do not miss the boundary message.""" + rows = [_make_message(i) for i in range(1, 18)] + app = _make_app(event_store=_make_event_store(rows)) + with TestClient(app) as client: + assert_run_message_page( + client, + "/api/threads/thread-2/runs/run-2/messages?before_seq=18&limit=16", + expected_seq=list(range(2, 18)), + ) + + +def test_after_seq_page_keeps_oldest_side_when_extra_row_returned(): + """Forward pagination still trims the newer sentinel row.""" + rows = [_make_message(i) for i in range(11, 62)] + app = _make_app(event_store=_make_event_store(rows)) + with TestClient(app) as client: + assert_run_message_page( + client, + "/api/threads/thread-2/runs/run-2/messages?after_seq=10", + expected_seq=list(range(11, 61)), + ) def test_after_seq_forwarded_to_event_store(): diff --git a/frontend/src/core/threads/hooks.ts b/frontend/src/core/threads/hooks.ts index a4fd93d77f..6b489ec440 100644 --- a/frontend/src/core/threads/hooks.ts +++ b/frontend/src/core/threads/hooks.ts @@ -119,6 +119,55 @@ function findLatestUnloadedRunIndex( return -1; } +type RunMessagesPageResponse = { + data: RunMessage[]; + has_more?: boolean; + hasMore?: boolean; +}; + +export function runMessagesPageHasMore(result: RunMessagesPageResponse) { + return result.has_more ?? result.hasMore ?? false; +} + +export function getOldestRunMessageSeq(messages: RunMessage[]) { + let oldestSeq: number | null = null; + for (const message of messages) { + if (typeof message.seq !== "number") { + continue; + } + oldestSeq = + oldestSeq === null ? message.seq : Math.min(oldestSeq, message.seq); + } + return oldestSeq; +} + +export function getNextRunMessagesBeforeSeq( + result: RunMessagesPageResponse, +): number | null | undefined { + if (!runMessagesPageHasMore(result)) { + return null; + } + return getOldestRunMessageSeq(result.data) ?? undefined; +} + +export function buildRunMessagesUrl( + baseUrl: string, + threadId: string, + runId: string, + beforeSeq?: number, +) { + const normalizedBaseUrl = baseUrl.replace(/\/$/, ""); + const path = `/api/threads/${encodeURIComponent(threadId)}/runs/${encodeURIComponent(runId)}/messages`; + const url = new URL( + `${normalizedBaseUrl}${path}`, + typeof window !== "undefined" ? window.location.origin : "http://localhost", + ); + if (beforeSeq !== undefined) { + url.searchParams.set("before_seq", String(beforeSeq)); + } + return normalizedBaseUrl ? url.toString() : `${url.pathname}${url.search}`; +} + export function mergeMessages( historyMessages: Message[], threadMessages: Message[], @@ -801,6 +850,7 @@ export function useThreadHistory(threadId: string) { const pendingLoadRef = useRef(false); const loadingRunIdRef = useRef(null); const loadedRunIdsRef = useRef>(new Set()); + const runBeforeSeqRef = useRef>(new Map()); const [loading, setLoading] = useState(false); const [messages, setMessages] = useState([]); @@ -841,16 +891,20 @@ export function useThreadHistory(threadId: string) { const requestThreadId = threadIdRef.current; loadingRunIdRef.current = run.run_id; - const result: { data: RunMessage[]; hasMore: boolean } = await fetch( - `${getBackendBaseURL()}/api/threads/${encodeURIComponent(requestThreadId)}/runs/${encodeURIComponent(run.run_id)}/messages`, - { - method: "GET", - headers: { - "Content-Type": "application/json", - }, - credentials: "include", + const beforeSeq = runBeforeSeqRef.current.get(run.run_id); + const url = buildRunMessagesUrl( + getBackendBaseURL(), + requestThreadId, + run.run_id, + beforeSeq, + ); + const result: RunMessagesPageResponse = await fetch(url, { + method: "GET", + headers: { + "Content-Type": "application/json", }, - ).then((res) => { + credentials: "include", + }).then((res) => { return res.json(); }); const _messages = result.data @@ -862,7 +916,18 @@ export function useThreadHistory(threadId: string) { setMessages((prev) => dedupeMessagesByIdentity([..._messages, ...prev]), ); - loadedRunIdsRef.current.add(run.run_id); + const nextBeforeSeq = getNextRunMessagesBeforeSeq(result); + if (typeof nextBeforeSeq === "number") { + runBeforeSeqRef.current.set(run.run_id, nextBeforeSeq); + pendingLoadRef.current = true; + } else if (nextBeforeSeq === undefined) { + console.warn( + `Run ${run.run_id} returned has_more without message seq values; leaving it pending for retry.`, + ); + } else { + runBeforeSeqRef.current.delete(run.run_id); + loadedRunIdsRef.current.add(run.run_id); + } indexRef.current = findLatestUnloadedRunIndex( runsRef.current, loadedRunIdsRef.current, @@ -886,6 +951,7 @@ export function useThreadHistory(threadId: string) { pendingLoadRef.current = false; loadingRunIdRef.current = null; loadedRunIdsRef.current = new Set(); + runBeforeSeqRef.current = new Map(); loadingRef.current = false; setLoading(false); setMessages([]); diff --git a/frontend/src/core/threads/types.ts b/frontend/src/core/threads/types.ts index dafb073494..a1d51f22ce 100644 --- a/frontend/src/core/threads/types.ts +++ b/frontend/src/core/threads/types.ts @@ -25,6 +25,7 @@ export interface AgentThread extends Thread { export interface RunMessage { run_id: string; + seq?: number; content: Message; metadata: { caller: string; diff --git a/frontend/tests/unit/core/threads/message-merge.test.ts b/frontend/tests/unit/core/threads/message-merge.test.ts index a6e612bfdf..f73fa09412 100644 --- a/frontend/tests/unit/core/threads/message-merge.test.ts +++ b/frontend/tests/unit/core/threads/message-merge.test.ts @@ -2,10 +2,25 @@ import type { Message } from "@langchain/langgraph-sdk"; import { expect, test } from "vitest"; import { + buildRunMessagesUrl, + getNextRunMessagesBeforeSeq, + getOldestRunMessageSeq, getSummarizationMiddlewareMessages, getVisibleOptimisticMessages, mergeMessages, + runMessagesPageHasMore, } from "@/core/threads/hooks"; +import type { RunMessage } from "@/core/threads/types"; + +function runMessage(seq?: number): RunMessage { + return { + run_id: "run-1", + ...(seq === undefined ? {} : { seq }), + content: {} as Message, + metadata: { caller: "" }, + created_at: "2026-05-22T00:00:00Z", + }; +} test("mergeMessages removes duplicate messages already present in history", () => { const human = { @@ -254,3 +269,59 @@ test("getVisibleOptimisticMessages hides optimistic user input after later serve optimisticHuman, ]); }); + +test("runMessagesPageHasMore reads backend snake_case pagination field", () => { + expect(runMessagesPageHasMore({ data: [], has_more: true })).toBe(true); + expect(runMessagesPageHasMore({ data: [], has_more: false })).toBe(false); +}); + +test("runMessagesPageHasMore keeps compatibility with camelCase pagination field", () => { + expect(runMessagesPageHasMore({ data: [], hasMore: true })).toBe(true); +}); + +test("getOldestRunMessageSeq returns the cursor for the next older run page", () => { + expect( + getOldestRunMessageSeq([runMessage(8), runMessage(9), runMessage(10)]), + ).toBe(8); +}); + +test("getOldestRunMessageSeq ignores rows without seq", () => { + expect(getOldestRunMessageSeq([runMessage()])).toBeNull(); +}); + +test("getNextRunMessagesBeforeSeq keeps runs pending when has_more lacks seq", () => { + expect( + getNextRunMessagesBeforeSeq({ data: [runMessage()], has_more: true }), + ).toBeUndefined(); +}); + +test("getNextRunMessagesBeforeSeq marks runs loaded when no more pages exist", () => { + expect( + getNextRunMessagesBeforeSeq({ data: [runMessage()], has_more: false }), + ).toBeNull(); +}); + +test("buildRunMessagesUrl encodes path segments and optional before_seq", () => { + expect( + buildRunMessagesUrl( + "https://api.example.test/", + "thread/with space", + "run?one", + 18, + ), + ).toBe( + "https://api.example.test/api/threads/thread%2Fwith%20space/runs/run%3Fone/messages?before_seq=18", + ); +}); + +test("buildRunMessagesUrl omits before_seq when loading the latest page", () => { + expect( + buildRunMessagesUrl("https://api.example.test", "thread-1", "run-1"), + ).toBe("https://api.example.test/api/threads/thread-1/runs/run-1/messages"); +}); + +test("buildRunMessagesUrl returns a relative URL when using the nginx proxy", () => { + expect(buildRunMessagesUrl("", "thread-1", "run-1", 42)).toBe( + "/api/threads/thread-1/runs/run-1/messages?before_seq=42", + ); +});