diff --git a/frontend/src/core/threads/hooks.ts b/frontend/src/core/threads/hooks.ts index a4fd93d77f..e830d51bcf 100644 --- a/frontend/src/core/threads/hooks.ts +++ b/frontend/src/core/threads/hooks.ts @@ -51,6 +51,8 @@ type SendMessageOptions = { additionalKwargs?: Record; }; +type PartialAiOverlapPredicate = (message: Message, index: number) => boolean; + function isNonEmptyString(value: string | undefined): value is string { return typeof value === "string" && value.length > 0; } @@ -106,6 +108,102 @@ function dedupeMessagesByIdentity(messages: Message[]): Message[] { }); } +function messageTextContent(message: Message): string | undefined { + if (typeof message.content === "string") { + return message.content; + } + if (!Array.isArray(message.content)) { + return undefined; + } + const textPart = message.content.find((part) => part.type === "text"); + return textPart?.text; +} + +function withMessageTextContent(message: Message, text: string): Message { + if (typeof message.content === "string") { + return { ...message, content: text }; + } + if (!Array.isArray(message.content)) { + return message; + } + let replaced = false; + const content = message.content.map((part) => { + if (!replaced && part.type === "text") { + replaced = true; + return { ...part, text }; + } + return part; + }); + return { ...message, content }; +} + +function longestTextOverlap(left: string, right: string): number { + const maxLength = Math.min(left.length, right.length); + for (let length = maxLength; length > 0; length--) { + if (left.endsWith(right.slice(0, length))) { + return length; + } + } + return 0; +} + +function shouldJoinPartialAiText( + historyText: string, + threadText: string, + allowDisjointText: boolean, +) { + if (!historyText || !threadText) { + return false; + } + if (threadText.startsWith(historyText)) { + return false; + } + if (historyText.endsWith(threadText)) { + return true; + } + const overlap = longestTextOverlap(historyText, threadText); + if (overlap > 0) { + return true; + } + if (allowDisjointText) { + return true; + } + return /\s$/.test(historyText) || /^\s/.test(threadText); +} + +function mergeOverlappingMessage( + historyMessage: Message, + threadMessage: Message, + allowDisjointText: boolean, +): Message { + if ( + historyMessage.type !== "ai" || + threadMessage.type !== "ai" || + isHiddenFromUIMessage(historyMessage) || + isHiddenFromUIMessage(threadMessage) + ) { + return threadMessage; + } + + const historyText = messageTextContent(historyMessage); + const threadText = messageTextContent(threadMessage); + if (!historyText || !threadText) { + return threadMessage; + } + if (threadText.startsWith(historyText)) { + return threadMessage; + } + if (!shouldJoinPartialAiText(historyText, threadText, allowDisjointText)) { + return threadMessage; + } + + const overlap = longestTextOverlap(historyText, threadText); + return withMessageTextContent( + threadMessage, + `${historyText}${threadText.slice(overlap)}`, + ); +} + function findLatestUnloadedRunIndex( runs: Run[], loadedRunIds: ReadonlySet, @@ -123,13 +221,46 @@ export function mergeMessages( historyMessages: Message[], threadMessages: Message[], optimisticMessages: Message[], + preservePartialAiOverlap: boolean | PartialAiOverlapPredicate = false, ): Message[] { + const historyByIdentity = new Map(); + for (const message of historyMessages) { + const identity = messageIdentity(message); + if (identity) { + historyByIdentity.set(identity, message); + } + } + const shouldPreservePartialAiOverlap = + typeof preservePartialAiOverlap === "function" + ? preservePartialAiOverlap + : preservePartialAiOverlap + ? () => true + : null; + const mergedThreadMessages = shouldPreservePartialAiOverlap + ? threadMessages.map((message, index) => { + if (!shouldPreservePartialAiOverlap(message, index)) { + return message; + } + const identity = messageIdentity(message); + const historyMessage = identity + ? historyByIdentity.get(identity) + : null; + return historyMessage + ? mergeOverlappingMessage( + historyMessage, + message, + typeof preservePartialAiOverlap === "function", + ) + : message; + }) + : threadMessages; + // Only visible live messages should trim overlapping history. Hidden messages // are UI control messages in this path, not observability records; any hidden // message that must survive as task/tracing data should use custom events or a // separate state channel instead of participating in this overlap heuristic. const threadMessageIds = new Set( - threadMessages + mergedThreadMessages .filter((message) => !isHiddenFromUIMessage(message)) .map(messageIdentity) .filter(isNonEmptyString), @@ -154,7 +285,7 @@ export function mergeMessages( return dedupeMessagesByIdentity([ ...historyMessages.slice(0, cutoff), - ...threadMessages, + ...mergedThreadMessages, ...optimisticMessages, ]); } @@ -766,6 +897,10 @@ export function useThreadStream({ history, thread.messages, visibleOptimisticMessages, + thread.isLoading + ? (message, index) => + Boolean(thread.getMessagesMetadata(message, index)?.streamMetadata) + : false, ); const pendingUsageMessages = thread.isLoading ? getMessagesAfterBaseline( diff --git a/frontend/tests/unit/core/threads/message-merge.test.ts b/frontend/tests/unit/core/threads/message-merge.test.ts index a6e612bfdf..979dfdff06 100644 --- a/frontend/tests/unit/core/threads/message-merge.test.ts +++ b/frontend/tests/unit/core/threads/message-merge.test.ts @@ -50,6 +50,154 @@ test("mergeMessages lets live thread messages replace overlapping history", () = ]); }); +test("mergeMessages keeps persisted AI prefix when reconnect stream resumes with only new text", () => { + const human = { + id: "human-1", + type: "human", + content: "write a report", + } as Message; + const persistedAiPrefix = { + id: "ai-1", + type: "ai", + content: "The first section is already persisted. ", + } as Message; + const reconnectedAiDelta = { + id: "ai-1", + type: "ai", + content: "The second section arrives after navigation.", + } as Message; + + expect( + mergeMessages([human, persistedAiPrefix], [reconnectedAiDelta], [], true), + ).toEqual([ + human, + { + ...reconnectedAiDelta, + content: + "The first section is already persisted. The second section arrives after navigation.", + }, + ]); +}); + +test("mergeMessages keeps persisted AI prefix for stream-metadata deltas without text overlap", () => { + const persistedAiPrefix = { + id: "ai-1", + type: "ai", + content: "已经输出的一百个字", + } as Message; + const reconnectedAiDelta = { + id: "ai-1", + type: "ai", + content: "继续输出后续内容", + } as Message; + + expect( + mergeMessages([persistedAiPrefix], [reconnectedAiDelta], [], () => true), + ).toEqual([ + { + ...reconnectedAiDelta, + content: "已经输出的一百个字继续输出后续内容", + }, + ]); +}); + +test("mergeMessages does not duplicate overlapping AI text when reconnect stream repeats the tail", () => { + const persistedAiPrefix = { + id: "ai-1", + type: "ai", + content: "The answer starts here and ", + } as Message; + const reconnectedAiDelta = { + id: "ai-1", + type: "ai", + content: "and continues here.", + } as Message; + + expect( + mergeMessages([persistedAiPrefix], [reconnectedAiDelta], [], true), + ).toEqual([ + { + ...reconnectedAiDelta, + content: "The answer starts here and continues here.", + }, + ]); +}); + +test("mergeMessages leaves full live AI text unchanged when it already contains history", () => { + const persistedAiPrefix = { + id: "ai-1", + type: "ai", + content: "partial", + } as Message; + const liveFullAi = { + id: "ai-1", + type: "ai", + content: "partial response", + } as Message; + + expect(mergeMessages([persistedAiPrefix], [liveFullAi], [], true)).toEqual([ + liveFullAi, + ]); +}); + +test("mergeMessages does not stitch unrelated live AI replacement text while streaming", () => { + const persistedAi = { + id: "ai-1", + type: "ai", + content: "old answer", + } as Message; + const liveReplacementAi = { + id: "ai-1", + type: "ai", + content: "new answer", + response_metadata: { model: "test-model" }, + } as Message; + + expect(mergeMessages([persistedAi], [liveReplacementAi], [], true)).toEqual([ + liveReplacementAi, + ]); +}); + +test("mergeMessages does not stitch same-id live AI text without stream metadata", () => { + const persistedAi = { + id: "ai-1", + type: "ai", + content: "已经保存的旧文本", + } as Message; + const liveReplacementAi = { + id: "ai-1", + type: "ai", + content: "新的完整替换文本", + } as Message; + + expect( + mergeMessages([persistedAi], [liveReplacementAi], [], () => false), + ).toEqual([liveReplacementAi]); +}); + +test("mergeMessages keeps live AI metadata when reconnect stream repeats the persisted suffix", () => { + const persistedAiPrefix = { + id: "ai-1", + type: "ai", + content: "The answer starts here.", + } as Message; + const reconnectedAiSuffix = { + id: "ai-1", + type: "ai", + content: "starts here.", + response_metadata: { model: "test-model" }, + } as Message; + + expect( + mergeMessages([persistedAiPrefix], [reconnectedAiSuffix], [], true), + ).toEqual([ + { + ...reconnectedAiSuffix, + content: "The answer starts here.", + }, + ]); +}); + test("mergeMessages deduplicates tool messages by tool_call_id", () => { const oldTool = { id: "tool-message-old",