Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions backend/app/gateway/pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Shared pagination helpers for gateway routers."""

from __future__ import annotations


def trim_run_message_page(rows: list[dict], *, limit: int, after_seq: int | None) -> tuple[list[dict], bool]:
"""Trim a ``limit + 1`` run-message page while preserving page boundaries."""
has_more = len(rows) > limit
if not has_more:
return rows, False

if after_seq is not None:
return rows[:limit], True

return rows[-limit:], True
4 changes: 2 additions & 2 deletions backend/app/gateway/routers/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from app.gateway.authz import require_permission
from app.gateway.deps import get_checkpointer, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge
from app.gateway.pagination import trim_run_message_page
from app.gateway.routers.thread_runs import RunCreateRequest
from app.gateway.services import sse_consumer, start_run, wait_for_run_completion
from deerflow.runtime import serialize_channel_values
Expand Down Expand Up @@ -129,8 +130,7 @@ async def run_messages(
before_seq=before_seq,
after_seq=after_seq,
)
has_more = len(rows) > limit
data = rows[:limit] if has_more else rows
data, has_more = trim_run_message_page(rows, limit=limit, after_seq=after_seq)
return {"data": data, "has_more": has_more}


Expand Down
4 changes: 2 additions & 2 deletions backend/app/gateway/routers/thread_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from app.gateway.authz import require_permission
from app.gateway.deps import get_checkpointer, get_current_user, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge
from app.gateway.pagination import trim_run_message_page
from app.gateway.services import sse_consumer, start_run, wait_for_run_completion
from deerflow.runtime import RunRecord, RunStatus, serialize_channel_values

Expand Down Expand Up @@ -402,8 +403,7 @@ async def list_run_messages(
before_seq=before_seq,
after_seq=after_seq,
)
has_more = len(rows) > limit
data = rows[:limit] if has_more else rows
data, has_more = trim_run_message_page(rows, limit=limit, after_seq=after_seq)
return {"data": data, "has_more": has_more}


Expand Down
16 changes: 16 additions & 0 deletions backend/tests/_run_message_pagination_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from fastapi.testclient import TestClient


def assert_run_message_page(
client: TestClient,
url: str,
*,
expected_seq: list[int],
has_more: bool = True,
) -> None:
response = client.get(url)

assert response.status_code == 200
body = response.json()
assert body["has_more"] is has_more
assert [m["seq"] for m in body["data"]] == expected_seq
46 changes: 46 additions & 0 deletions backend/tests/test_runs_api_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from unittest.mock import AsyncMock, MagicMock

from _router_auth_helpers import make_authed_test_app
from _run_message_pagination_helpers import assert_run_message_page
from fastapi.testclient import TestClient

from app.gateway.routers import runs
Expand Down Expand Up @@ -97,6 +98,51 @@ def test_run_messages_has_more_true_when_extra_row_returned():
body = response.json()
assert body["has_more"] is True
assert len(body["data"]) == 50 # trimmed to limit
assert [m["seq"] for m in body["data"]] == list(range(2, 52))


def test_run_messages_default_page_keeps_newest_messages_when_extra_row_returned():
"""Default latest-page trimming drops the older sentinel row, not the newest message."""
rows = [_make_message(i) for i in range(16, 67)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
assert_run_message_page(client, "/api/runs/run-2/messages", expected_seq=list(range(17, 67)))


def test_run_messages_before_seq_page_keeps_newest_side_when_extra_row_returned():
"""Backward pagination trims the older sentinel so adjacent pages do not miss the boundary message."""
rows = [_make_message(i) for i in range(1, 18)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/runs/run-2/messages?before_seq=18&limit=16",
expected_seq=list(range(2, 18)),
)


def test_run_messages_after_seq_page_keeps_oldest_side_when_extra_row_returned():
"""Forward pagination still trims the newer sentinel row."""
rows = [_make_message(i) for i in range(11, 62)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/runs/run-2/messages?after_seq=10",
expected_seq=list(range(11, 61)),
)


def test_run_messages_passes_after_seq_to_event_store():
Expand Down
38 changes: 38 additions & 0 deletions backend/tests/test_thread_run_messages_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from unittest.mock import AsyncMock, MagicMock

from _router_auth_helpers import make_authed_test_app
from _run_message_pagination_helpers import assert_run_message_page
from fastapi.testclient import TestClient

from app.gateway.routers import thread_runs
Expand Down Expand Up @@ -88,6 +89,43 @@ def test_has_more_true_when_extra_row_returned():
body = response.json()
assert body["has_more"] is True
assert len(body["data"]) == 50 # trimmed to limit
assert [m["seq"] for m in body["data"]] == list(range(2, 52))


def test_default_page_keeps_newest_messages_when_extra_row_returned():
"""Default latest-page trimming drops the older sentinel row, not the newest message."""
rows = [_make_message(i) for i in range(16, 67)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/threads/thread-2/runs/run-2/messages",
expected_seq=list(range(17, 67)),
)


def test_before_seq_page_keeps_newest_side_when_extra_row_returned():
"""Backward pagination trims the older sentinel so adjacent pages do not miss the boundary message."""
rows = [_make_message(i) for i in range(1, 18)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/threads/thread-2/runs/run-2/messages?before_seq=18&limit=16",
expected_seq=list(range(2, 18)),
)


def test_after_seq_page_keeps_oldest_side_when_extra_row_returned():
"""Forward pagination still trims the newer sentinel row."""
rows = [_make_message(i) for i in range(11, 62)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/threads/thread-2/runs/run-2/messages?after_seq=10",
expected_seq=list(range(11, 61)),
)


def test_after_seq_forwarded_to_event_store():
Expand Down
86 changes: 76 additions & 10 deletions frontend/src/core/threads/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,55 @@ function findLatestUnloadedRunIndex(
return -1;
}

type RunMessagesPageResponse = {
data: RunMessage[];
has_more?: boolean;
hasMore?: boolean;
};

export function runMessagesPageHasMore(result: RunMessagesPageResponse) {
return result.has_more ?? result.hasMore ?? false;
}

export function getOldestRunMessageSeq(messages: RunMessage[]) {
let oldestSeq: number | null = null;
for (const message of messages) {
if (typeof message.seq !== "number") {
continue;
}
oldestSeq =
oldestSeq === null ? message.seq : Math.min(oldestSeq, message.seq);
}
return oldestSeq;
}

export function getNextRunMessagesBeforeSeq(
result: RunMessagesPageResponse,
): number | null | undefined {
if (!runMessagesPageHasMore(result)) {
return null;
}
return getOldestRunMessageSeq(result.data) ?? undefined;
}

export function buildRunMessagesUrl(
baseUrl: string,
threadId: string,
runId: string,
beforeSeq?: number,
) {
const normalizedBaseUrl = baseUrl.replace(/\/$/, "");
const path = `/api/threads/${encodeURIComponent(threadId)}/runs/${encodeURIComponent(runId)}/messages`;
const url = new URL(
`${normalizedBaseUrl}${path}`,
typeof window !== "undefined" ? window.location.origin : "http://localhost",
);
if (beforeSeq !== undefined) {
url.searchParams.set("before_seq", String(beforeSeq));
}
return normalizedBaseUrl ? url.toString() : `${url.pathname}${url.search}`;
}

export function mergeMessages(
historyMessages: Message[],
threadMessages: Message[],
Expand Down Expand Up @@ -747,6 +796,7 @@ export function useThreadHistory(threadId: string) {
const pendingLoadRef = useRef(false);
const loadingRunIdRef = useRef<string | null>(null);
const loadedRunIdsRef = useRef<Set<string>>(new Set());
const runBeforeSeqRef = useRef<Map<string, number>>(new Map());
const [loading, setLoading] = useState(false);
const [messages, setMessages] = useState<Message[]>([]);

Expand Down Expand Up @@ -787,16 +837,20 @@ export function useThreadHistory(threadId: string) {

const requestThreadId = threadIdRef.current;
loadingRunIdRef.current = run.run_id;
const result: { data: RunMessage[]; hasMore: boolean } = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(requestThreadId)}/runs/${encodeURIComponent(run.run_id)}/messages`,
{
method: "GET",
headers: {
"Content-Type": "application/json",
},
credentials: "include",
const beforeSeq = runBeforeSeqRef.current.get(run.run_id);
const url = buildRunMessagesUrl(
getBackendBaseURL(),
requestThreadId,
run.run_id,
beforeSeq,
);
const result: RunMessagesPageResponse = await fetch(url, {
method: "GET",
headers: {
"Content-Type": "application/json",
},
).then((res) => {
credentials: "include",
}).then((res) => {
return res.json();
});
const _messages = result.data
Expand All @@ -808,7 +862,18 @@ export function useThreadHistory(threadId: string) {
setMessages((prev) =>
dedupeMessagesByIdentity([..._messages, ...prev]),
);
loadedRunIdsRef.current.add(run.run_id);
const nextBeforeSeq = getNextRunMessagesBeforeSeq(result);
if (typeof nextBeforeSeq === "number") {
runBeforeSeqRef.current.set(run.run_id, nextBeforeSeq);
pendingLoadRef.current = true;
} else if (nextBeforeSeq === undefined) {
console.warn(
`Run ${run.run_id} returned has_more without message seq values; leaving it pending for retry.`,
);
} else {
runBeforeSeqRef.current.delete(run.run_id);
loadedRunIdsRef.current.add(run.run_id);
}
indexRef.current = findLatestUnloadedRunIndex(
runsRef.current,
loadedRunIdsRef.current,
Expand All @@ -832,6 +897,7 @@ export function useThreadHistory(threadId: string) {
pendingLoadRef.current = false;
loadingRunIdRef.current = null;
loadedRunIdsRef.current = new Set();
runBeforeSeqRef.current = new Map();
loadingRef.current = false;
setLoading(false);
setMessages([]);
Expand Down
1 change: 1 addition & 0 deletions frontend/src/core/threads/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export interface AgentThread extends Thread<AgentThreadState> {

export interface RunMessage {
run_id: string;
seq?: number;
content: Message;
metadata: {
caller: string;
Expand Down
71 changes: 71 additions & 0 deletions frontend/tests/unit/core/threads/message-merge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,24 @@ import type { Message } from "@langchain/langgraph-sdk";
import { expect, test } from "vitest";

import {
buildRunMessagesUrl,
getNextRunMessagesBeforeSeq,
getOldestRunMessageSeq,
getVisibleOptimisticMessages,
mergeMessages,
runMessagesPageHasMore,
} from "@/core/threads/hooks";
import type { RunMessage } from "@/core/threads/types";

function runMessage(seq?: number): RunMessage {
return {
run_id: "run-1",
...(seq === undefined ? {} : { seq }),
content: {} as Message,
metadata: { caller: "" },
created_at: "2026-05-22T00:00:00Z",
};
}

test("mergeMessages removes duplicate messages already present in history", () => {
const human = {
Expand Down Expand Up @@ -155,3 +170,59 @@ test("getVisibleOptimisticMessages hides optimistic user input after later serve
optimisticHuman,
]);
});

test("runMessagesPageHasMore reads backend snake_case pagination field", () => {
expect(runMessagesPageHasMore({ data: [], has_more: true })).toBe(true);
expect(runMessagesPageHasMore({ data: [], has_more: false })).toBe(false);
});

test("runMessagesPageHasMore keeps compatibility with camelCase pagination field", () => {
expect(runMessagesPageHasMore({ data: [], hasMore: true })).toBe(true);
});

test("getOldestRunMessageSeq returns the cursor for the next older run page", () => {
expect(
getOldestRunMessageSeq([runMessage(8), runMessage(9), runMessage(10)]),
).toBe(8);
});

test("getOldestRunMessageSeq ignores rows without seq", () => {
expect(getOldestRunMessageSeq([runMessage()])).toBeNull();
});

test("getNextRunMessagesBeforeSeq keeps runs pending when has_more lacks seq", () => {
expect(
getNextRunMessagesBeforeSeq({ data: [runMessage()], has_more: true }),
).toBeUndefined();
});

test("getNextRunMessagesBeforeSeq marks runs loaded when no more pages exist", () => {
expect(
getNextRunMessagesBeforeSeq({ data: [runMessage()], has_more: false }),
).toBeNull();
});

test("buildRunMessagesUrl encodes path segments and optional before_seq", () => {
expect(
buildRunMessagesUrl(
"https://api.example.test/",
"thread/with space",
"run?one",
18,
),
).toBe(
"https://api.example.test/api/threads/thread%2Fwith%20space/runs/run%3Fone/messages?before_seq=18",
);
});

test("buildRunMessagesUrl omits before_seq when loading the latest page", () => {
expect(
buildRunMessagesUrl("https://api.example.test", "thread-1", "run-1"),
).toBe("https://api.example.test/api/threads/thread-1/runs/run-1/messages");
});

test("buildRunMessagesUrl returns a relative URL when using the nginx proxy", () => {
expect(buildRunMessagesUrl("", "thread-1", "run-1", 42)).toBe(
"/api/threads/thread-1/runs/run-1/messages?before_seq=42",
);
});
Loading