fix interruption for pydantic-ai chatbot#9620
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Bundle ReportChanges will increase total bundle size by 86.54kB (0.34%) ⬆️. This is within the configured threshold ✅ Detailed changes
Affected Assets, Files, and Routes:view changes for bundle: marimo-esmAssets Changed:
Files in
Files in
Files in
|
There was a problem hiding this comment.
1 issue found across 6 files
Architecture diagram
sequenceDiagram
participant UI as Chatbot (Frontend)
participant RS as ReadableStream Controller
participant API as Marimo Kernel (Backend)
participant Task as Async Prompt Task
participant AI as AI Model / Generator
Note over UI,AI: NEW: Prompt Execution with Request ID Tracking
UI->>UI: NEW: generateUUID() as request_id
UI->>API: send_prompt(request_id, messages)
API->>Task: NEW: Create task keyed by request_id
Task->>AI: Invoke model generator
loop Streaming
AI-->>Task: yield delta
Task->>Task: NEW: await asyncio.sleep(0) (interruption point)
Task-->>UI: WS: stream_chunk(message_id=request_id, content)
UI->>UI: CHANGED: routeIncomingChatChunk()
opt NEW: message_id == activeRequestId
UI->>RS: enqueue(content)
end
end
Note over UI,AI: NEW: Interruption Flow (User clicks "Stop")
UI->>RS: close()
UI->>UI: activeRequestIdRef.current = null
UI->>API: NEW: cancel_prompt(request_id)
API->>Task: task.cancel()
alt Task Interrupted
Task->>AI: Raise CancelledError (sync/async)
AI->>Task: Cleanup (try/finally)
Task->>Task: NEW: serializer.close_open_blocks()
Task-->>UI: WS: NEW: Send AbortChunk + is_final: true
end
Note over UI,AI: Stale Chunk Handling (Race Condition)
UI->>UI: User starts NEW prompt (request_id=B)
Task-->>UI: WS: Late chunk from OLD prompt (request_id=A)
UI->>UI: NEW: Check message_id (A) != activeRequestId (B)
UI->>UI: NEW: Drop stale chunk (prevents SDK parser error)
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR fixes a streaming interruption bug in mo.ui.chat when using pydantic-ai (Vercel AI SDK protocol), where late chunks from an aborted run could corrupt the next run’s chunk stream/state.
Changes:
- Frontend: generate a per-prompt
request_id, route incoming chunks through a stale-chunk filter keyed by the active request, and call a newcancel_promptRPC on abort. - Backend: tag all streamed chunks with the request/message id, track in-flight prompt tasks by
request_id, addcancel_promptRPC, and emit protocol-consistent end/abort chunks on cancellation. - Tests: add backend cancellation/serializer tests and frontend stale-chunk routing tests.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
tests/_plugins/ui/_impl/chat/test_chat.py |
Adds coverage for request_id tagging, cancellation behavior, and serializer block-drain on cancel. |
marimo/_plugins/ui/_impl/chat/chat.py |
Implements request-scoped streaming, cooperative cancellation, and cancellation chunk emission/draining. |
frontend/src/plugins/impl/chat/types.ts |
Adds request_id to send requests and introduces CancelPromptRequest type. |
frontend/src/plugins/impl/chat/ChatPlugin.tsx |
Extends RPC surface with cancel_prompt and validates request_id on send_prompt. |
frontend/src/plugins/impl/chat/chat-ui.tsx |
Generates request ids, drops stale chunks, and triggers backend cancel on Stop/abort. |
frontend/src/plugins/impl/chat/__tests__/chat-ui.test.ts |
Adds unit tests ensuring stale chunks are dropped and active streams are not closed by stale finals. |
Comments suppressed due to low confidence (1)
frontend/src/plugins/impl/chat/chat-ui.tsx:258
ReadableStreamunderlying sourcestart(controller)return value is ignored by the platform, so the cleanup callback that removes the abort listener will never run. This leaves anabortevent listener attached tosignallonger than intended (and the closure retainscontroller/props). Usesignal.addEventListener('abort', abortHandler, { once: true }), or storeabortHandler/signaland remove the listener incancel()and/or when the stream is closed.
start(controller) {
frontendStreamControllerRef.current = controller;
activeRequestIdRef.current = requestId;
const abortHandler = () => {
// Close the local controller first so the chat status flips to
// "ready" immediately and any racing chunks are dropped; then
// fire-and-forget the backend cancel so the kernel stops the
// model and we don't waste tokens / leak chunks to the next
// run.
try {
controller.close();
} catch (error) {
Logger.debug("Controller may already be closed", { error });
}
frontendStreamControllerRef.current = null;
activeRequestIdRef.current = null;
void props
.cancel_prompt({ request_id: requestId })
.catch((error: Error) => {
Logger.debug("cancel_prompt failed", { error });
});
};
signal?.addEventListener("abort", abortHandler);
return () => {
signal?.removeEventListener("abort", abortHandler);
};
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
## 📝 Summary <!-- If this PR closes any issues, list them here by number (e.g., Closes #123). Detail the specific changes made in this pull request. Explain the problem addressed and how it was resolved. If applicable, provide before and after comparisons, screenshots, or any relevant details to help reviewers understand the changes easily. --> Supports more AI sdk parts, refactors logic to be more maintainable. - ChatMessage no longer loses unmodeled SDK fields. Approvals (and e.g. `callProviderMetadata`, `providerExecuted`, `preliminary`) live on AI SDK parts that marimo's typed dataclasses don't model. The message now snapshots the raw wire payload per-part in _raw_parts, so those fields survive every round-trip - `pydantic_ai._build_ui_messages` uses the raw payload. It now calls `message.raw_or_dumped_parts()` so the approval/tool state the frontend just sent us makes it into the agent run unmodified. The old `asdict` + `_remove_none_values` path was lossy. - `sanitize_part` strips keys the AI SDK's { ...part, state, ... } spread can leak from prior tool states (e.g. a stale output clinging to an approval-requested part). - hasPendingToolCalls (frontend) rewritten. The old predicate treated "every tool ready & no trailing text" silently looped whenever an assistant message ended in a non-text part (file, source-url, data-*, reasoning) after a completed tool call. The fix uses some native AI sdk logic and some testing. <img width="921" height="787" alt="image" src="https://github.com/user-attachments/assets/53f4a146-9554-4135-b9e9-459317a823dc" /> ## 📋 Pre-Review Checklist <!-- These checks need to be completed before a PR is reviewed --> - [x] For large changes, or changes that affect the public API: this change was discussed or approved through an issue, on [Discord](https://marimo.io/discord?ref=pr), or the community [discussions](https://github.com/marimo-team/marimo/discussions) (Please provide a link if applicable). - [x] Any AI generated code has been reviewed line-by-line by the human PR author, who stands by it. - [x] Video or media evidence is provided for any visual changes (optional). <!-- PR is more likely to be merged if evidence is provided for changes made --> ## ✅ Merge Checklist - [x] I have read the [contributor guidelines](https://github.com/marimo-team/marimo/blob/main/CONTRIBUTING.md). - [ ] Documentation has been updated where applicable, including docstrings for API changes. - [x] Tests have been added for the changes made. --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
📝 Summary
Fixes #9562.
Received reasoning-delta for missing reasoning part with ID "..."after Stop → resend inmo.ui.chat.Root cause: marimo routes all chat chunks through one long-lived
ReadableStream(kernel → WebSocket), so stale chunks from a stopped run can leak into a new run's stream — the SDK parser then sees areasoning-deltafor an id it never saw areasoning-startfor and throws.Changes
Frontend (
frontend/src/plugins/impl/chat/)request_idper prompt; track it inactiveRequestIdRef.routeIncomingChatChunkand drop chunks whosemessage_iddoesn't match the active run.fetchabort, fire acancel_promptRPC to the backend.Backend (
marimo/_plugins/ui/_impl/chat/chat.py)cancel_promptRPC; in-flight prompts run asasyncio.Tasks keyed byrequest_idand are cancelled cooperatively.await asyncio.sleep(0)in the sync-generator loop soCancelledErrorcan be raised at yield points.AbortChunkand synthesize*-endfor any opentext/reasoningblocks so the UI parser transitions parts out ofstate: "streaming".ChunkSerializer: single drain helper shared byon_endandclose_open_blocks; cancel-path errors are logged at DEBUG instead of silently swallowed.mo.ui.chatdescribing how users can handled aborted chats / tool calls.📋 Pre-Review Checklist
✅ Merge Checklist