11 — WebSocket endpoints
ws({...}) mocks a WebSocket route on the same port as your HTTP mocks. Stream frames out, validate frames in, and broadcast from any HTTP handler via ctx.endpoint(url).
Run this chapter in 30 seconds
- Open in StackBlitz → — full Node sandbox in your browser, no install.
- Wait for
npm installto finish, then in the Terminal tab run:npx tsx examples/11-ai-agent-ws/server.ts - In a second terminal:
npx tsx examples/11-ai-agent-ws/client.ts(drives a streaming reply, mid-stream cancel, and a broadcast from the HTTP side).
Concept
A WS endpoint is just another row in endpoints. Instead of data / dataFile / handler, attach ws: ws({...}). The factory takes lifecycle hooks (onConnect, onMessage, onClose) plus optional message / query / params zod schemas.
Each connection gets its own state (typed via initialState). Inside a hook, send(frame) pushes to the current client; cross-endpoint code can grab a WsHandle and broadcast to all of them.
The WsEndpoint<Out, In> sentinel in your Endpoints map types both directions:
Out— frames the server sends to clientsIn— frames the server receives
Code
import { mockr, ws, handler, type WsEndpoint } from '@yoyo-org/mockr';
type ClientEvent = { type: 'message'; content: string } | { type: 'cancel' };
type ServerEvent =
| { type: 'hello'; conversationId: string }
| { type: 'content_delta'; text: string }
| { type: 'message_stop'; stopReason: 'end_turn' | 'cancelled' }
| { type: 'broadcast'; text: string };
type Endpoints = {
'/ws/agent': WsEndpoint<ServerEvent, ClientEvent>;
};
interface ConnState { cancelled: boolean }
mockr<Endpoints>({
port: 3011,
endpoints: [
{
url: '/ws/agent',
ws: ws<ServerEvent, ClientEvent, ConnState>({
initialState: () => ({ cancelled: false }),
onConnect: ({ send, query }) => {
const cid = typeof query.conversationId === 'string' ? query.conversationId : 'demo';
send({ type: 'hello', conversationId: cid });
},
onMessage: async ({ data, send, state, signal }) => {
if (data.type === 'cancel') { state.cancelled = true; return; }
state.cancelled = false;
for (const tok of data.content.split(/(\s+)/)) {
if (signal.aborted || state.cancelled) {
send({ type: 'message_stop', stopReason: 'cancelled' });
return;
}
await new Promise((r) => setTimeout(r, 15));
send({ type: 'content_delta', text: tok });
}
send({ type: 'message_stop', stopReason: 'end_turn' });
},
}),
},
// Cross-endpoint: HTTP request → WS broadcast.
{
url: '/api/broadcast',
method: 'POST',
handler: handler({
fn: (req, ctx) => {
const text = (req.body as { text?: string })?.text ?? 'ping';
ctx.endpoint('/ws/agent').broadcast({ type: 'broadcast', text });
return { body: { delivered: ctx.endpoint('/ws/agent').count() } };
},
}),
},
],
});Hooks
| Hook | Fires | Receives |
|---|---|---|
onConnect | once after the upgrade succeeds | { send, state, query, params, headers, id, subprotocol } |
onMessage | for every inbound frame (after schema validation) | { data, send, state, query, params, headers, id, signal } |
onClose | once after the socket closes (peer or server) | { state, code, reason, id } |
signal aborts when the socket closes — wire it through long async work to bail out cleanly.
Validation
Frames are JSON-decoded before reaching onMessage. Pass a message schema to validate. Failed validation emits { type: '__mockr_error', code, message } to the client and skips the hook.
import { z } from 'zod';
ws({
message: z.discriminatedUnion('type', [
z.object({ type: z.literal('message'), content: z.string() }),
z.object({ type: z.literal('cancel') }),
]),
onMessage: ({ data }) => { /* `data` is typed from the schema */ },
});query and params schemas reject the upgrade with 400 before the socket opens — so onConnect can trust them.
Cross-endpoint: WsHandle
ctx.endpoint('/ws/...') (inside any handler) and server.endpoint('/ws/...') (outside) both return a WsHandle<Out>:
| Method | Description |
|---|---|
broadcast(frame, filter?) | Send to every connected client (optional predicate) |
send(clientId, frame) | Send to one client by id |
close(clientId?, code?, reason?) | Close one client or all |
clients() | Snapshot of WsClient[] (id, query, params, headers, state, connectedAt) |
count() | Number of open connections |
Use it to push server-initiated events (e.g., a webhook arrives, fan it out to everyone listening) without leaking ws internals into your handler code.
Try it
Open in StackBlitz → — paste each command into the StackBlitz Terminal once npx tsx examples/11-ai-agent-ws/server.ts is running.
From a browser / node client
const sock = new WebSocket('ws://localhost:3011/ws/agent?conversationId=demo');
sock.onmessage = (e) => console.log(JSON.parse(e.data));
sock.onopen = () => sock.send(JSON.stringify({ type: 'message', content: 'hello' }));Driver script in the repo
# in a second terminal — connects, drives streaming + cancel + broadcast
npx tsx examples/11-ai-agent-ws/client.tsPush from HTTP — broadcast to every connected ws client
curl -s -X POST http://localhost:3011/api/broadcast \
-H 'Content-Type: application/json' \
-d '{"text":"live update"}'
# → { "delivered": <number of open ws clients> }Every connected client receives { type: 'broadcast', text: 'live update' }.
What's next
- Reference → WebSocket API for the full type surface.
- Reference → Endpoints for
EndpointHandle/MockrServer.