Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/handlers/RequestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ export abstract class RequestHandler<
this.scheduledCleanups.set(requestId, cleanups)
}

private async exhaustCleanups(
protected async exhaustCleanups(
cleanups: Array<() => MaybePromise<void>>,
): Promise<void> {
const errors: Array<Error> = []
Expand Down
135 changes: 87 additions & 48 deletions src/core/sse.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { invariant } from 'outvariant'
import { DeferredPromise } from '@open-draft/deferred-promise'
import { Emitter } from 'strict-event-emitter'
import { Emitter, TypedEvent } from 'rettime'
import type { ResponseResolver } from './handlers/RequestHandler'
import {
HttpHandler,
Expand All @@ -14,6 +14,7 @@ import { getTimestamp } from './utils/logging/getTimestamp'
import { devUtils } from './utils/internal/devUtils'
import { colors } from './ws/utils/attachWebSocketLogger'
import { toPublicUrl } from './utils/request/toPublicUrl'
import type { MaybePromise } from './typeUtils'

type EventMapConstraint = {
message?: unknown
Expand Down Expand Up @@ -140,7 +141,14 @@ class ServerSentEventHandler<
return matches
}

async log(_args: { request: Request; response: Response }): Promise<void> {
async log(args: { request: Request; response: Response }): Promise<void> {
/**
* @note Cancel the response stream because it's not needed for logging.
* Otherwise, this cloned response remains unconsumed and its original
* doesn't propagate stream cancelations at all.
*/
args.response.body?.cancel()

/**
* @note Skip the default `this.log()` logic so that when this handler is logged
* upon handling the request, nothing is printed (we log SSE requests early).
Expand All @@ -155,16 +163,14 @@ class ServerSentEventHandler<
const publicUrl = toPublicUrl(request.url)

/* eslint-disable no-console */
emitter.on('message', (payload) => {
emitter.on('message', ({ data }) => {
console.groupCollapsed(
devUtils.formatMessage(
`${getTimestamp()} SSE %s %c⇣%c ${payload.event}`,
),
devUtils.formatMessage(`${getTimestamp()} SSE %s %c⇣%c ${data.event}`),
publicUrl,
`color:${colors.mocked}`,
'color:inherit',
)
console.log(payload.frames)
console.log(data.frames)
console.groupEnd()
})

Expand All @@ -191,6 +197,18 @@ class ServerSentEventHandler<
})
/* eslint-enable no-console */
}

protected async exhaustCleanups(
cleanups: Array<() => MaybePromise<void>>,
): Promise<void> {
const onClose = () => {
this.#emitter.removeListener('error', onClose)
this.#emitter.removeListener('close', onClose)
void super.exhaustCleanups(cleanups)
}

this.#emitter.once('error', onClose).once('close', onClose)
}
Comment on lines +200 to +211
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm the emitter is per-handler (single instance for all connections of a given sse route)
# and that there is no per-connection scoping when emitting 'close'/'error'.
rg -nP -C3 '#emitter\s*[:=]|kClientEmitter|new\s+Emitter<' --type=ts
rg -nP -C2 'emit\(\s*new\s+TypedEvent\(' --type=ts

Repository: mswjs/msw

Length of output: 3767


🏁 Script executed:

# Find where exhaustCleanups is called and the surrounding context
rg -nP -B5 -A5 'exhaustCleanups' src/core/sse.ts | head -100

# Check the resolver handler and finalize registration
rg -nP -B10 -A5 'runScheduledCleanups|finalize|resolver\(' src/core/sse.ts | head -150

Repository: mswjs/msw

Length of output: 895


🏁 Script executed:

# Find the parent class and runScheduledCleanups
rg -nP 'class HttpHandler|runScheduledCleanups|finalize' src/core/ --type=ts | head -50

# Check the full handler flow
rg -nP -B15 'await resolver' src/core/sse.ts | head -80

Repository: mswjs/msw

Length of output: 1799


🏁 Script executed:

# Get the full runScheduledCleanups implementation
sed -n '535,570p' src/core/handlers/RequestHandler.ts

Repository: mswjs/msw

Length of output: 971


🏁 Script executed:

# Check if there are any tests that demonstrate synchronous close or multiple connections
fd -e test.ts -e spec.ts src/ | xargs rg -l 'sse\|ServerSentEvent' | head -5

# Get the createEventStream function to see if there's any per-connection state
rg -nP -B5 -A20 'function createEventStream|const createEventStream' src/core/sse.ts

Repository: mswjs/msw

Length of output: 1183


🏁 Script executed:

# Look for test files related to sse
find src -type f -name '*.test.ts' -o -name '*.spec.ts' | xargs grep -l 'sse\|ServerSentEvent' 2>/dev/null | head -3

Repository: mswjs/msw

Length of output: 174


🏁 Script executed:

# Search for SSE-specific tests
find src -type f \( -name '*.test.ts' -o -name '*.spec.ts' \) | xargs grep -l 'ServerSentEventHandler\|http\.sse' 2>/dev/null

# Also check for any actual SSE handler usage examples
rg -l 'http\.sse\|sse\(' --type=ts src/ | grep -v node_modules

Repository: mswjs/msw

Length of output: 35


Cleanup wiring on the shared emitter cross-fires across connections and has a startup race.

Two correctness issues stem from registering cleanups on the per-handler #emitter:

  1. Multi-connection cross-fire. #emitter is shared by every connection that this sse(...) handler accepts. When connection A closes, the emitter emits close once and fires every once('close', onClose) listener registered so far — including the onClose captured for connection B's cleanups. B's clearInterval (or other per-connection cleanup from the issue #2630 example) will run while B is still streaming, dropping its background work.

  2. Resolver-synchronous close race. If a resolver synchronously calls client.close() after registering finalize(callback), the event fires before exhaustCleanups attaches listeners. The cleanup will never run because the 'close' event is emitted synchronously (line 336) while the resolver is still executing, before runScheduledCleanupsexhaustCleanups can attach the listener (line 210).

The cleanup binding must be per-connection. One option is to track close state on the client (the #closed deferred already exists) and pass a per-connection emitter or close-promise into the resolver context, running super.exhaustCleanups(cleanups) when that promise resolves — handling both "already closed" (run immediately) and "closes later" (await) cases.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/core/sse.ts` around lines 200 - 211, The current exhaustCleanups
registers listeners on the shared `#emitter` which causes cross-connection firing
and misses synchronous closes; change it to use a per-connection close signal
instead of `#emitter`: create a per-connection promise/emitter (or use the
existing `#closed` deferred tied to this client instance) and, inside
exhaustCleanups, if that per-connection closed is already resolved call
super.exhaustCleanups(cleanups) immediately, otherwise attach a single
per-connection listener (or await the per-connection close promise) to invoke
super.exhaustCleanups(cleanups) when that specific connection closes; remove
usage of the shared `#emitter` for per-connection cleanup wiring to avoid
cross-fire and race conditions when resolvers call client.close() synchronously.

}

type Values<T> = T[keyof T]
Expand All @@ -216,9 +234,9 @@ type ToEventDiscriminatedUnion<T> = Values<{
}>

type ServerSentEventClientEventMap = {
message: [payload: EventStreamMessage]
error: []
close: []
message: TypedEvent<EventStreamMessage>
error: TypedEvent
close: TypedEvent
}

const kClientEmitter = Symbol.for('kClientEmitter')
Expand All @@ -229,11 +247,13 @@ class ServerSentEventClient<
private [kClientEmitter]?: Emitter<ServerSentEventClientEventMap>

#encoder: TextEncoder
#writer: WritableStreamDefaultWriter
#controller: ReadableStreamDefaultController<Uint8Array>
#closed: DeferredPromise<void>

constructor(writable: WritableStream) {
constructor(controller: ReadableStreamDefaultController<Uint8Array>) {
this.#encoder = new TextEncoder()
this.#writer = writable.getWriter()
this.#controller = controller
this.#closed = new DeferredPromise()
}

/**
Expand Down Expand Up @@ -289,37 +309,50 @@ class ServerSentEventClient<
* error.
*/
public error(): void {
this.#writer.abort().catch((error) => {
console.error(error)
devUtils.error(
'Failed to abort server-side EventSource. Please see the original error above.',
)
})
this[kClientEmitter]?.emit('error')
if (this.#closed.state !== 'pending') {
return
}

this.#controller.error()
this.#closed.resolve()
this[kClientEmitter]?.emit(new TypedEvent('error'))
}

/**
* Closes the underlying `EventSource`, closing the connection.
*/
public close(): void {
this.#writer.close().catch((error) => {
if (this.#closed.state !== 'pending') {
return
}

try {
this.#controller.close()
this.#closed.resolve()
} catch {
//
}

this[kClientEmitter]?.emit(new TypedEvent('close'))
}

#enqueue(chunk: Uint8Array): void {
if (this.#closed.state !== 'pending') {
return
}

try {
this.#controller.enqueue(chunk)
} catch (error) {
console.error(error)
devUtils.error(
'Failed to close server-side EventSource. Please see the original error above.',
'Failed to write to server-side EventSource. Please see the original error above.',
)
})
this[kClientEmitter]?.emit('close')
}
}

#sendRetry(retry: number): void {
this.#writer
.write(this.#encoder.encode(`retry:${retry}\n\n`))
.catch((error) => {
console.error(error)
devUtils.error(
'Failed to send a retry packet to server-side EventSource. Please see the original error above.',
)
})
this.#enqueue(this.#encoder.encode(`retry:${retry}\n\n`))
}

#sendMessage(message: {
Expand Down Expand Up @@ -350,21 +383,18 @@ class ServerSentEventClient<

frames.push('', '')

this.#writer
.write(this.#encoder.encode(frames.join('\n')))
.catch((error) => {
console.error(error)
devUtils.error(
'Failed to send a message to server-side EventSource. Please see the original error above.',
)
})
this.#enqueue(this.#encoder.encode(frames.join('\n')))

this[kClientEmitter]?.emit('message', {
id: message.id,
event: message.event?.toString() || 'message',
data: message.data,
frames,
})
this[kClientEmitter]?.emit(
new TypedEvent('message', {
data: {
id: message.id,
event: message.event?.toString() || 'message',
data: message.data,
frames,
},
}),
)
}
}

Expand Down Expand Up @@ -991,9 +1021,18 @@ function createEventStream<EventMap extends EventMapConstraint>(
request.url,
)

const { readable, writable } = new TransformStream()
let controller!: ReadableStreamDefaultController<Uint8Array>

const readable = new ReadableStream<Uint8Array>({
start(defaultController) {
controller = defaultController
},
cancel() {
client.close()
},
})

const client = new ServerSentEventClient<EventMap>(writable)
const client = new ServerSentEventClient<EventMap>(controller)
const server = new ServerSentEventServer({
request,
client,
Expand Down
6 changes: 5 additions & 1 deletion src/mockServiceWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ async function handleRequest(event, requestId, requestInterceptedAt) {
// Send back the response clone for the "response:*" life-cycle events.
// Ensure MSW is active and ready to handle the message, otherwise
// this message will pend indefinitely.
if (client && activeClientIds.has(client.id)) {
if (
client &&
activeClientIds.has(client.id) &&
response.headers.get('content-type') !== 'text/event-stream'
) {
Comment thread
kettanaito marked this conversation as resolved.
const serializedRequest = await serializeRequest(requestCloneForEvents)

// Clone the response so both the client and the library could consume it.
Expand Down
127 changes: 127 additions & 0 deletions test/browser/sse-api/sse.finally.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import type { sse } from 'msw'
import type { setupWorker } from 'msw/browser'
import { DeferredPromise } from '@open-draft/deferred-promise'
import { test, expect } from '../playwright.extend'

declare namespace window {
export const msw: {
setupWorker: typeof setupWorker
sse: typeof sse
}
}

const EXAMPLE_URL = new URL('./sse.mocks.ts', import.meta.url)

test('runs cleanup after the event source is closed by the client', async ({
loadExample,
page,
}) => {
await loadExample(EXAMPLE_URL, {
skipActivation: true,
})

const finalizedAt = new DeferredPromise<number>()
await page.exposeFunction('notifyFinalized', () => {
finalizedAt.resolve(Date.now())
})

await page.evaluate(async () => {
const { setupWorker, sse } = window.msw

const worker = setupWorker(
sse('http://localhost/stream', ({ finalize }) => {
finalize(() => window.notifyFinalized())
}),
)
await worker.start()
})

const closedAt = await page.evaluate(() => {
const source = new EventSource('http://localhost/stream')

return new Promise<number>((resolve) => {
source.addEventListener('open', () => {
source.close()
resolve(Date.now())
})
})
})

await expect(finalizedAt).resolves.toBeGreaterThanOrEqual(closedAt)
})

test('runs cleanup after the event source is closed by the handler', async ({
loadExample,
page,
}) => {
await loadExample(EXAMPLE_URL, {
skipActivation: true,
})

const finalizedAt = new DeferredPromise<number>()
await page.exposeFunction('notifyFinalized', () => {
finalizedAt.resolve(Date.now())
})

await page.evaluate(async () => {
const { setupWorker, sse } = window.msw

const worker = setupWorker(
sse('http://localhost/stream', ({ client, finalize }) => {
setTimeout(() => client.close(), 250)
finalize(() => window.notifyFinalized())
}),
)
await worker.start()
})

const closedAt = await page.evaluate(() => {
const source = new EventSource('http://localhost/stream')

return new Promise<number>((resolve) => {
source.addEventListener('open', () => {
resolve(Date.now())
})
})
})

await expect(finalizedAt).resolves.toBeGreaterThanOrEqual(closedAt)
})

test('runs cleanup after the event source is errored by the handler', async ({
loadExample,
page,
}) => {
await loadExample(EXAMPLE_URL, {
skipActivation: true,
})

const finalizedAt = new DeferredPromise<number>()
await page.exposeFunction('notifyFinalized', () => {
finalizedAt.resolve(Date.now())
})

await page.evaluate(async () => {
const { setupWorker, sse } = window.msw

const worker = setupWorker(
sse('http://localhost/stream', ({ client, finalize }) => {
setTimeout(() => client.error(), 250)
finalize(() => window.notifyFinalized())
}),
)
await worker.start()
})

const closedAt = await page.evaluate(() => {
const source = new EventSource('http://localhost/stream')

return new Promise<number>((resolve) => {
source.addEventListener('open', () => {
resolve(Date.now())
})
})
})

await expect(finalizedAt).resolves.toBeGreaterThanOrEqual(closedAt)
})
Loading