Skip to content

11 — WebSocket endpoints

ws({...}) mocks a WebSocket route on the same port as your HTTP mocks. Stream frames out, validate frames in, and broadcast from any HTTP handler via ctx.endpoint(url).

Run this chapter in 30 seconds

  1. Open in StackBlitz → — full Node sandbox in your browser, no install.
  2. Wait for npm install to finish, then in the Terminal tab run:
    npx tsx examples/11-ai-agent-ws/server.ts
  3. In a second terminal: npx tsx examples/11-ai-agent-ws/client.ts (drives a streaming reply, mid-stream cancel, and a broadcast from the HTTP side).

Concept

A WS endpoint is just another row in endpoints. Instead of data / dataFile / handler, attach ws: ws({...}). The factory takes lifecycle hooks (onConnect, onMessage, onClose) plus optional message / query / params zod schemas.

Each connection gets its own state (typed via initialState). Inside a hook, send(frame) pushes to the current client; cross-endpoint code can grab a WsHandle and broadcast to all of them.

The WsEndpoint<Out, In> sentinel in your Endpoints map types both directions:

  • Out — frames the server sends to clients
  • In — frames the server receives

Code

ts
import { mockr, ws, handler, type WsEndpoint } from '@yoyo-org/mockr';

type ClientEvent = { type: 'message'; content: string } | { type: 'cancel' };
type ServerEvent =
  | { type: 'hello'; conversationId: string }
  | { type: 'content_delta'; text: string }
  | { type: 'message_stop'; stopReason: 'end_turn' | 'cancelled' }
  | { type: 'broadcast'; text: string };

type Endpoints = {
  '/ws/agent': WsEndpoint<ServerEvent, ClientEvent>;
};

interface ConnState { cancelled: boolean }

mockr<Endpoints>({
  port: 3011,
  endpoints: [
    {
      url: '/ws/agent',
      ws: ws<ServerEvent, ClientEvent, ConnState>({
        initialState: () => ({ cancelled: false }),
        onConnect: ({ send, query }) => {
          const cid = typeof query.conversationId === 'string' ? query.conversationId : 'demo';
          send({ type: 'hello', conversationId: cid });
        },
        onMessage: async ({ data, send, state, signal }) => {
          if (data.type === 'cancel') { state.cancelled = true; return; }
          state.cancelled = false;

          for (const tok of data.content.split(/(\s+)/)) {
            if (signal.aborted || state.cancelled) {
              send({ type: 'message_stop', stopReason: 'cancelled' });
              return;
            }
            await new Promise((r) => setTimeout(r, 15));
            send({ type: 'content_delta', text: tok });
          }
          send({ type: 'message_stop', stopReason: 'end_turn' });
        },
      }),
    },

    // Cross-endpoint: HTTP request → WS broadcast.
    {
      url: '/api/broadcast',
      method: 'POST',
      handler: handler({
        fn: (req, ctx) => {
          const text = (req.body as { text?: string })?.text ?? 'ping';
          ctx.endpoint('/ws/agent').broadcast({ type: 'broadcast', text });
          return { body: { delivered: ctx.endpoint('/ws/agent').count() } };
        },
      }),
    },
  ],
});

Hooks

HookFiresReceives
onConnectonce after the upgrade succeeds{ send, state, query, params, headers, id, subprotocol }
onMessagefor every inbound frame (after schema validation){ data, send, state, query, params, headers, id, signal }
onCloseonce after the socket closes (peer or server){ state, code, reason, id }

signal aborts when the socket closes — wire it through long async work to bail out cleanly.

Validation

Frames are JSON-decoded before reaching onMessage. Pass a message schema to validate. Failed validation emits { type: '__mockr_error', code, message } to the client and skips the hook.

ts
import { z } from 'zod';

ws({
  message: z.discriminatedUnion('type', [
    z.object({ type: z.literal('message'), content: z.string() }),
    z.object({ type: z.literal('cancel') }),
  ]),
  onMessage: ({ data }) => { /* `data` is typed from the schema */ },
});

query and params schemas reject the upgrade with 400 before the socket opens — so onConnect can trust them.

Cross-endpoint: WsHandle

ctx.endpoint('/ws/...') (inside any handler) and server.endpoint('/ws/...') (outside) both return a WsHandle<Out>:

MethodDescription
broadcast(frame, filter?)Send to every connected client (optional predicate)
send(clientId, frame)Send to one client by id
close(clientId?, code?, reason?)Close one client or all
clients()Snapshot of WsClient[] (id, query, params, headers, state, connectedAt)
count()Number of open connections

Use it to push server-initiated events (e.g., a webhook arrives, fan it out to everyone listening) without leaking ws internals into your handler code.

Try it

Open in StackBlitz → — paste each command into the StackBlitz Terminal once npx tsx examples/11-ai-agent-ws/server.ts is running.

From a browser / node client

ts
const sock = new WebSocket('ws://localhost:3011/ws/agent?conversationId=demo');
sock.onmessage = (e) => console.log(JSON.parse(e.data));
sock.onopen = () => sock.send(JSON.stringify({ type: 'message', content: 'hello' }));

Driver script in the repo

bash
# in a second terminal — connects, drives streaming + cancel + broadcast
npx tsx examples/11-ai-agent-ws/client.ts

Push from HTTP — broadcast to every connected ws client

bash
curl -s -X POST http://localhost:3011/api/broadcast \
  -H 'Content-Type: application/json' \
  -d '{"text":"live update"}'
# → { "delivered": <number of open ws clients> }

Every connected client receives { type: 'broadcast', text: 'live update' }.

What's next

MIT License