Skip to content

Commit a6fba86

Browse files
committed
feat(wip): support bypassing subscriptions
1 parent a71feed commit a6fba86

2 files changed

Lines changed: 167 additions & 5 deletions

File tree

src/core/handlers/GraphQLSubscriptionHandler.ts

Lines changed: 119 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
import { invariant } from 'outvariant'
12
import { OperationTypeNode } from 'graphql'
2-
import type { WebSocketConnectionData } from '@mswjs/interceptors/WebSocket'
3+
import type {
4+
WebSocketConnectionData,
5+
WebSocketServerConnection,
6+
} from '@mswjs/interceptors/WebSocket'
37
import {
48
GraphQLHandler,
59
type GraphQLHandlerInfo,
@@ -62,6 +66,10 @@ type GraphQLWebSocketNextMessage = {
6266
payload: unknown
6367
}
6468

69+
type GraphQLSubscriptionIncomingMessage =
70+
| GraphQLWebSocketAcknowledgeMessage
71+
| GraphQLWebSocketNextMessage
72+
6573
interface GraphQLWebSocketSubscribeMessagePayload<
6674
Variables extends GraphQLVariables = GraphQLVariables,
6775
> {
@@ -73,6 +81,8 @@ interface GraphQLWebSocketSubscribeMessagePayload<
7381
export class GraphQLInternalPubsub {
7482
public pubsub: GraphQLPubsub
7583
public webSocketLink: WebSocketLink
84+
public server?: WebSocketServerConnection
85+
7686
private subscriptions: Map<string, GraphQLWebSocketSubscribeMessage>
7787

7888
constructor(public readonly url: Path) {
@@ -89,7 +99,9 @@ export class GraphQLInternalPubsub {
8999

90100
const webSocketHandler = this.webSocketLink.addEventListener(
91101
'connection',
92-
({ client }) => {
102+
({ client, server }) => {
103+
this.server = server
104+
93105
client.addEventListener('message', (event) => {
94106
if (typeof event.data !== 'string') {
95107
return
@@ -138,13 +150,13 @@ export class GraphQLInternalPubsub {
138150
}
139151
}
140152

141-
private createAcknowledgeMessage() {
153+
protected createAcknowledgeMessage() {
142154
return JSON.stringify({
143155
type: 'connection_ack',
144156
} satisfies GraphQLWebSocketAcknowledgeMessage)
145157
}
146158

147-
private createSubscribeMessage(args: { id: string; payload: unknown }) {
159+
protected createSubscribeMessage(args: { id: string; payload: unknown }) {
148160
return JSON.stringify({
149161
id: args.id,
150162
type: 'next',
@@ -178,7 +190,7 @@ export class GraphQLSubscription<
178190
}
179191

180192
/**
181-
* Publishes data to this subscription.
193+
* Publishes data to all subscribed GraphQL clients.
182194
*/
183195
public publish(payload: { data?: Query }): void {
184196
this.args.internalPubsub.pubsub.publish(payload, ({ subscription }) => {
@@ -197,6 +209,31 @@ export class GraphQLSubscription<
197209
} satisfies GraphQLWebSocketCompleteMessage),
198210
)
199211
}
212+
213+
/**
214+
* Establishes this GraphQL subscription to the actual server.
215+
*/
216+
public subscribe() {
217+
const { server } = this.args.internalPubsub
218+
219+
/**
220+
* @note One can only call `subscription.subscribe()` inside the
221+
* GraphQL subscription handler. By that point, the WebSocket connection
222+
* has been established and intercepted so the `server` reference
223+
* is guaranteed.
224+
*/
225+
invariant(
226+
server,
227+
'Failed to establish GraphQL subscription ("%s") to the actual server ("%s"): `subscription.subscribe()` was called outside the subscription handler',
228+
this.args.message.payload.query,
229+
this.args.internalPubsub.url,
230+
)
231+
232+
return new GraphQLServerSubscription({
233+
server,
234+
message: this.args.message,
235+
})
236+
}
200237
}
201238

202239
export type GraphQLSubscriptionName<
@@ -334,3 +371,80 @@ export function createGraphQLSubscriptionHandler(
334371
)
335372
}
336373
}
374+
375+
interface GraphQLServerSubscriptionEventMap {
376+
connection_ack: Event
377+
next: MessageEvent<GraphQLWebSocketNextMessage>
378+
}
379+
380+
/**
381+
* Representation of a GraphQL subscription to the actual server.
382+
* You interface with this object from the client's perspective.
383+
*/
384+
class GraphQLServerSubscription {
385+
protected readonly server: WebSocketServerConnection
386+
387+
private eventTarget: EventTarget
388+
389+
constructor(
390+
readonly args: {
391+
server: WebSocketServerConnection
392+
message: GraphQLWebSocketSubscribeMessage
393+
},
394+
) {
395+
this.eventTarget = new EventTarget()
396+
this.server = args.server
397+
398+
this.server.connect()
399+
this.server.addEventListener('open', () => {
400+
// Once the server connetion has been established, send the
401+
// subscription intent message. This is the same message as
402+
// was sent from the GraphQL client.
403+
this.server.send(JSON.stringify(this.args.message))
404+
})
405+
406+
this.server.addEventListener('message', (event) => {
407+
if (typeof event.data !== 'string') {
408+
return
409+
}
410+
411+
const message = jsonParse<GraphQLSubscriptionIncomingMessage>(event.data)
412+
if (!message) {
413+
return
414+
}
415+
416+
switch (message.type) {
417+
case 'connection_ack': {
418+
this.eventTarget.dispatchEvent(new Event('connection_ack'))
419+
break
420+
}
421+
422+
case 'next': {
423+
/**
424+
* @fixme This isn't the same `event` from the server
425+
* so calling `event.preventDefault()` won't prevent anything.
426+
*/
427+
this.eventTarget.dispatchEvent(
428+
new MessageEvent('next', {
429+
data: message,
430+
}),
431+
)
432+
break
433+
}
434+
}
435+
})
436+
}
437+
438+
public addEventListener<
439+
EventType extends keyof GraphQLServerSubscriptionEventMap,
440+
>(
441+
event: EventType,
442+
listener: (event: GraphQLServerSubscriptionEventMap[EventType]) => void,
443+
) {
444+
this.eventTarget.addEventListener(event, { handleEvent: listener })
445+
}
446+
447+
public unsubscribe(): void {
448+
this.server.close()
449+
}
450+
}

test/node/graphql-api/graphql-subscription.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,51 @@ subscription OnCommentAdded {
119119
service: 'user-service',
120120
})
121121
})
122+
123+
it('supports bypassing a GraphQL subscription', async () => {
124+
const api = graphql.link('https://api.example.com/graphql')
125+
server.use(
126+
api.subscription('OnCommentAdded', async ({ subscription }) => {
127+
// This creates a NEW subscription for the same event in the actual server.
128+
// - There is no client events to prevent! Subscription intent is sent ONCE.
129+
const commentSubscription = subscription.subscribe()
130+
131+
// Event listeners here are modeled after the subscription protocol:
132+
// - acknowledge, server confirmed the subscription.
133+
// - next, server is sending data to the client.
134+
// - error, error happened (is this event a thing?). Server connection errors!
135+
// - complete, server has completed this subscription.
136+
commentSubscription.addEventListener('next', (event) => {
137+
// You can still prevent messages from the original server.
138+
// By default, they are forwarded to the client, just like with mocking WebSockets.
139+
event.preventDefault()
140+
// The event data is already parsed to drop the implementation details of the subscription.
141+
subscription.publish({
142+
data: event.data.payload,
143+
})
144+
})
145+
146+
// You can unsubscribe from the original server subscription at any time.
147+
commentSubscription.unsubscribe()
148+
}),
149+
)
150+
151+
const client = createClient({
152+
url: 'wss://api.example.com/graphql',
153+
})
154+
const subscription = client.iterate({
155+
query: `
156+
subscription OnCommentAdded {
157+
commentAdded {
158+
text
159+
}
160+
}
161+
`,
162+
})
163+
164+
await subscription.next()
165+
166+
/**
167+
* @fixme Expect the subscription payload from the ACTUAL server.
168+
*/
169+
})

0 commit comments

Comments
 (0)