Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 137 additions & 2 deletions frontend/src/core/threads/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type SendMessageOptions = {
additionalKwargs?: Record<string, unknown>;
};

type PartialAiOverlapPredicate = (message: Message, index: number) => boolean;

function isNonEmptyString(value: string | undefined): value is string {
return typeof value === "string" && value.length > 0;
}
Expand Down Expand Up @@ -106,6 +108,102 @@ function dedupeMessagesByIdentity(messages: Message[]): Message[] {
});
}

function messageTextContent(message: Message): string | undefined {
if (typeof message.content === "string") {
return message.content;
}
if (!Array.isArray(message.content)) {
return undefined;
}
const textPart = message.content.find((part) => part.type === "text");
return textPart?.text;
}

function withMessageTextContent(message: Message, text: string): Message {
if (typeof message.content === "string") {
return { ...message, content: text };
}
if (!Array.isArray(message.content)) {
return message;
}
let replaced = false;
const content = message.content.map((part) => {
if (!replaced && part.type === "text") {
replaced = true;
return { ...part, text };
}
return part;
});
return { ...message, content };
}

function longestTextOverlap(left: string, right: string): number {
const maxLength = Math.min(left.length, right.length);
for (let length = maxLength; length > 0; length--) {
if (left.endsWith(right.slice(0, length))) {
return length;
}
}
return 0;
}

function shouldJoinPartialAiText(
historyText: string,
threadText: string,
allowDisjointText: boolean,
) {
if (!historyText || !threadText) {
return false;
}
if (threadText.startsWith(historyText)) {
return false;
}
if (historyText.endsWith(threadText)) {
return true;
}
const overlap = longestTextOverlap(historyText, threadText);
if (overlap > 0) {
return true;
}
if (allowDisjointText) {
return true;
}
return /\s$/.test(historyText) || /^\s/.test(threadText);
}

function mergeOverlappingMessage(
historyMessage: Message,
threadMessage: Message,
allowDisjointText: boolean,
): Message {
if (
historyMessage.type !== "ai" ||
threadMessage.type !== "ai" ||
isHiddenFromUIMessage(historyMessage) ||
isHiddenFromUIMessage(threadMessage)
) {
return threadMessage;
}

const historyText = messageTextContent(historyMessage);
const threadText = messageTextContent(threadMessage);
if (!historyText || !threadText) {
return threadMessage;
}
if (threadText.startsWith(historyText)) {
return threadMessage;
}
if (!shouldJoinPartialAiText(historyText, threadText, allowDisjointText)) {
return threadMessage;
}

const overlap = longestTextOverlap(historyText, threadText);
return withMessageTextContent(
threadMessage,
`${historyText}${threadText.slice(overlap)}`,
);
}

function findLatestUnloadedRunIndex(
runs: Run[],
loadedRunIds: ReadonlySet<string>,
Expand All @@ -123,13 +221,46 @@ export function mergeMessages(
historyMessages: Message[],
threadMessages: Message[],
optimisticMessages: Message[],
preservePartialAiOverlap: boolean | PartialAiOverlapPredicate = false,
): Message[] {
const historyByIdentity = new Map<string, Message>();
for (const message of historyMessages) {
const identity = messageIdentity(message);
if (identity) {
historyByIdentity.set(identity, message);
}
}
const shouldPreservePartialAiOverlap =
typeof preservePartialAiOverlap === "function"
? preservePartialAiOverlap
: preservePartialAiOverlap
? () => true
: null;
const mergedThreadMessages = shouldPreservePartialAiOverlap
? threadMessages.map((message, index) => {
if (!shouldPreservePartialAiOverlap(message, index)) {
return message;
}
const identity = messageIdentity(message);
const historyMessage = identity
? historyByIdentity.get(identity)
: null;
return historyMessage
? mergeOverlappingMessage(
historyMessage,
message,
typeof preservePartialAiOverlap === "function",
)
: message;
})
: threadMessages;

// Only visible live messages should trim overlapping history. Hidden messages
// are UI control messages in this path, not observability records; any hidden
// message that must survive as task/tracing data should use custom events or a
// separate state channel instead of participating in this overlap heuristic.
const threadMessageIds = new Set(
threadMessages
mergedThreadMessages
.filter((message) => !isHiddenFromUIMessage(message))
.map(messageIdentity)
.filter(isNonEmptyString),
Expand All @@ -154,7 +285,7 @@ export function mergeMessages(

return dedupeMessagesByIdentity([
...historyMessages.slice(0, cutoff),
...threadMessages,
...mergedThreadMessages,
...optimisticMessages,
]);
}
Expand Down Expand Up @@ -766,6 +897,10 @@ export function useThreadStream({
history,
thread.messages,
visibleOptimisticMessages,
thread.isLoading
? (message, index) =>
Boolean(thread.getMessagesMetadata(message, index)?.streamMetadata)
: false,
);
const pendingUsageMessages = thread.isLoading
? getMessagesAfterBaseline(
Expand Down
148 changes: 148 additions & 0 deletions frontend/tests/unit/core/threads/message-merge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,154 @@ test("mergeMessages lets live thread messages replace overlapping history", () =
]);
});

test("mergeMessages keeps persisted AI prefix when reconnect stream resumes with only new text", () => {
const human = {
id: "human-1",
type: "human",
content: "write a report",
} as Message;
const persistedAiPrefix = {
id: "ai-1",
type: "ai",
content: "The first section is already persisted. ",
} as Message;
const reconnectedAiDelta = {
id: "ai-1",
type: "ai",
content: "The second section arrives after navigation.",
} as Message;

expect(
mergeMessages([human, persistedAiPrefix], [reconnectedAiDelta], [], true),
).toEqual([
human,
{
...reconnectedAiDelta,
content:
"The first section is already persisted. The second section arrives after navigation.",
},
]);
});

test("mergeMessages keeps persisted AI prefix for stream-metadata deltas without text overlap", () => {
const persistedAiPrefix = {
id: "ai-1",
type: "ai",
content: "已经输出的一百个字",
} as Message;
const reconnectedAiDelta = {
id: "ai-1",
type: "ai",
content: "继续输出后续内容",
} as Message;

expect(
mergeMessages([persistedAiPrefix], [reconnectedAiDelta], [], () => true),
).toEqual([
{
...reconnectedAiDelta,
content: "已经输出的一百个字继续输出后续内容",
},
]);
});

test("mergeMessages does not duplicate overlapping AI text when reconnect stream repeats the tail", () => {
const persistedAiPrefix = {
id: "ai-1",
type: "ai",
content: "The answer starts here and ",
} as Message;
const reconnectedAiDelta = {
id: "ai-1",
type: "ai",
content: "and continues here.",
} as Message;

expect(
mergeMessages([persistedAiPrefix], [reconnectedAiDelta], [], true),
).toEqual([
{
...reconnectedAiDelta,
content: "The answer starts here and continues here.",
},
]);
});

test("mergeMessages leaves full live AI text unchanged when it already contains history", () => {
const persistedAiPrefix = {
id: "ai-1",
type: "ai",
content: "partial",
} as Message;
const liveFullAi = {
id: "ai-1",
type: "ai",
content: "partial response",
} as Message;

expect(mergeMessages([persistedAiPrefix], [liveFullAi], [], true)).toEqual([
liveFullAi,
]);
});

test("mergeMessages does not stitch unrelated live AI replacement text while streaming", () => {
const persistedAi = {
id: "ai-1",
type: "ai",
content: "old answer",
} as Message;
const liveReplacementAi = {
id: "ai-1",
type: "ai",
content: "new answer",
response_metadata: { model: "test-model" },
} as Message;

expect(mergeMessages([persistedAi], [liveReplacementAi], [], true)).toEqual([
liveReplacementAi,
]);
});

test("mergeMessages does not stitch same-id live AI text without stream metadata", () => {
const persistedAi = {
id: "ai-1",
type: "ai",
content: "已经保存的旧文本",
} as Message;
const liveReplacementAi = {
id: "ai-1",
type: "ai",
content: "新的完整替换文本",
} as Message;

expect(
mergeMessages([persistedAi], [liveReplacementAi], [], () => false),
).toEqual([liveReplacementAi]);
});

test("mergeMessages keeps live AI metadata when reconnect stream repeats the persisted suffix", () => {
const persistedAiPrefix = {
id: "ai-1",
type: "ai",
content: "The answer starts here.",
} as Message;
const reconnectedAiSuffix = {
id: "ai-1",
type: "ai",
content: "starts here.",
response_metadata: { model: "test-model" },
} as Message;

expect(
mergeMessages([persistedAiPrefix], [reconnectedAiSuffix], [], true),
).toEqual([
{
...reconnectedAiSuffix,
content: "The answer starts here.",
},
]);
});

test("mergeMessages deduplicates tool messages by tool_call_id", () => {
const oldTool = {
id: "tool-message-old",
Expand Down
Loading