= V extends (...args: infer P) => infer R
153 | ? (...args: UnstubifyAll) => Result>
154 | : Result>;
155 |
156 | // Type for the callable part of an `Provider` if `T` is callable.
157 | // This is intersected with methods/properties.
158 | type MaybeCallableProvider = T extends (...args: any[]) => any
159 | ? MethodOrProperty
160 | : unknown;
161 |
162 | // Base type for all other types providing RPC-like interfaces.
163 | // Rewrites all methods/properties to be `MethodOrProperty`s, while preserving callable types.
164 | export type Provider = MaybeCallableProvider &
165 | (T extends Array
166 | ? {
167 | [key: number]: MethodOrProperty;
168 | } & {
169 | map(callback: (elem: U) => V): Result>;
170 | }
171 | : {
172 | [K in Exclude<
173 | keyof T,
174 | symbol | keyof StubBase
175 | >]: MethodOrProperty;
176 | } & {
177 | map(callback: (value: NonNullable) => V): Result>;
178 | });
179 |
--------------------------------------------------------------------------------
/src/batch.ts:
--------------------------------------------------------------------------------
1 | // Copyright (c) 2025 Cloudflare, Inc.
2 | // Licensed under the MIT license found in the LICENSE.txt file or at:
3 | // https://opensource.org/license/mit
4 |
5 | import { RpcStub } from "./core.js";
6 | import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js";
7 | import type { IncomingMessage, ServerResponse, OutgoingHttpHeader, OutgoingHttpHeaders } from "node:http";
8 |
9 | type SendBatchFunc = (batch: string[]) => Promise;
10 |
11 | class BatchClientTransport implements RpcTransport {
12 | constructor(sendBatch: SendBatchFunc) {
13 | this.#promise = this.#scheduleBatch(sendBatch);
14 | }
15 |
16 | #promise: Promise;
17 | #aborted: any;
18 |
19 | #batchToSend: string[] | null = [];
20 | #batchToReceive: string[] | null = null;
21 |
22 | async send(message: string): Promise {
23 | // If the batch was already sent, we just ignore the message, because throwing may cause the
24 | // RPC system to abort prematurely. Once the last receive() is done then we'll throw an error
25 | // that aborts the RPC system at the right time and will propagate to all other requests.
26 | if (this.#batchToSend !== null) {
27 | this.#batchToSend.push(message);
28 | }
29 | }
30 |
31 | async receive(): Promise {
32 | if (!this.#batchToReceive) {
33 | await this.#promise;
34 | }
35 |
36 | let msg = this.#batchToReceive!.shift();
37 | if (msg !== undefined) {
38 | return msg;
39 | } else {
40 | // No more messages. An error thrown here will propagate out of any calls that are still
41 | // open.
42 | throw new Error("Batch RPC request ended.");
43 | }
44 | }
45 |
46 | abort?(reason: any): void {
47 | this.#aborted = reason;
48 | }
49 |
50 | async #scheduleBatch(sendBatch: SendBatchFunc) {
51 | // Wait for microtask queue to clear before sending a batch.
52 | //
53 | // Note that simply waiting for one turn of the microtask queue (await Promise.resolve()) is
54 | // not good enough here as the application needs a chance to call `.then()` on every RPC
55 | // promise in order to explicitly indicate they want the results. Unfortunately, `await`ing
56 | // a thenable does not call `.then()` immediately -- for some reason it waits for a turn of
57 | // the microtask queue first, *then* calls `.then()`.
58 | await new Promise(resolve => setTimeout(resolve, 0));
59 |
60 | if (this.#aborted !== undefined) {
61 | throw this.#aborted;
62 | }
63 |
64 | let batch = this.#batchToSend!;
65 | this.#batchToSend = null;
66 | this.#batchToReceive = await sendBatch(batch);
67 | }
68 | }
69 |
70 | export function newHttpBatchRpcSession(
71 | urlOrRequest: string | Request, options?: RpcSessionOptions): RpcStub {
72 | let sendBatch: SendBatchFunc = async (batch: string[]) => {
73 | let response = await fetch(urlOrRequest, {
74 | method: "POST",
75 | body: batch.join("\n"),
76 | });
77 |
78 | if (!response.ok) {
79 | response.body?.cancel();
80 | throw new Error(`RPC request failed: ${response.status} ${response.statusText}`);
81 | }
82 |
83 | let body = await response.text();
84 | return body == "" ? [] : body.split("\n");
85 | };
86 |
87 | let transport = new BatchClientTransport(sendBatch);
88 | let rpc = new RpcSession(transport, undefined, options);
89 | return rpc.getRemoteMain();
90 | }
91 |
92 | class BatchServerTransport implements RpcTransport {
93 | constructor(batch: string[]) {
94 | this.#batchToReceive = batch;
95 | }
96 |
97 | #batchToSend: string[] = [];
98 | #batchToReceive: string[];
99 | #allReceived: PromiseWithResolvers = Promise.withResolvers();
100 |
101 | async send(message: string): Promise {
102 | this.#batchToSend.push(message);
103 | }
104 |
105 | async receive(): Promise {
106 | let msg = this.#batchToReceive!.shift();
107 | if (msg !== undefined) {
108 | return msg;
109 | } else {
110 | // No more messages.
111 | this.#allReceived.resolve();
112 | return new Promise(r => {});
113 | }
114 | }
115 |
116 | abort?(reason: any): void {
117 | this.#allReceived.reject(reason);
118 | }
119 |
120 | whenAllReceived() {
121 | return this.#allReceived.promise;
122 | }
123 |
124 | getResponseBody(): string {
125 | return this.#batchToSend.join("\n");
126 | }
127 | }
128 |
129 | /**
130 | * Implements the server end of an HTTP batch session, using standard Fetch API types to represent
131 | * HTTP requests and responses.
132 | *
133 | * @param request The request received from the client initiating the session.
134 | * @param localMain The main stub or RpcTarget which the server wishes to expose to the client.
135 | * @param options Optional RPC session options.
136 | * @returns The HTTP response to return to the client. Note that the returned object has mutable
137 | * headers, so you can modify them using e.g. `response.headers.set("Foo", "bar")`.
138 | */
139 | export async function newHttpBatchRpcResponse(
140 | request: Request, localMain: any, options?: RpcSessionOptions): Promise {
141 | if (request.method !== "POST") {
142 | return new Response("This endpoint only accepts POST requests.", { status: 405 });
143 | }
144 |
145 | let body = await request.text();
146 | let batch = body === "" ? [] : body.split("\n");
147 |
148 | let transport = new BatchServerTransport(batch);
149 | let rpc = new RpcSession(transport, localMain, options);
150 |
151 | // TODO: Arguably we should arrange so any attempts to pull promise resolutions from the client
152 | // will reject rather than just hang. But it IS valid to make server->client calls in order to
153 | // then pipeline the result into something returned to the client. We don't want the errors to
154 | // prematurely cancel anything that would eventually complete. So for now we just say, it's the
155 | // app's responsibility to not wait on any server -> client calls since they will never
156 | // complete.
157 |
158 | await transport.whenAllReceived();
159 | await rpc.drain();
160 |
161 | // TODO: Ask RpcSession to dispose everything it is still holding on to?
162 |
163 | return new Response(transport.getResponseBody());
164 | }
165 |
166 | /**
167 | * Implements the server end of an HTTP batch session using traditional Node.js HTTP APIs.
168 | *
169 | * @param request The request received from the client initiating the session.
170 | * @param response The response object, to which the response should be written.
171 | * @param localMain The main stub or RpcTarget which the server wishes to expose to the client.
172 | * @param options Optional RPC session options. You can also pass headers to set on the response.
173 | */
174 | export async function nodeHttpBatchRpcResponse(
175 | request: IncomingMessage, response: ServerResponse,
176 | localMain: any,
177 | options?: RpcSessionOptions & {
178 | headers?: OutgoingHttpHeaders | OutgoingHttpHeader[],
179 | }): Promise {
180 | if (request.method !== "POST") {
181 | response.writeHead(405, "This endpoint only accepts POST requests.");
182 | }
183 |
184 | let body = await new Promise((resolve, reject) => {
185 | let chunks: Buffer[] = [];
186 | request.on("data", chunk => {
187 | chunks.push(chunk);
188 | });
189 | request.on("end", () => {
190 | resolve(Buffer.concat(chunks).toString());
191 | });
192 | request.on("error", reject);
193 | });
194 | let batch = body === "" ? [] : body.split("\n");
195 |
196 | let transport = new BatchServerTransport(batch);
197 | let rpc = new RpcSession(transport, localMain, options);
198 |
199 | await transport.whenAllReceived();
200 | await rpc.drain();
201 |
202 | response.writeHead(200, options?.headers);
203 | response.end(transport.getResponseBody());
204 | }
205 |
--------------------------------------------------------------------------------
/src/index.ts:
--------------------------------------------------------------------------------
1 | // Copyright (c) 2025 Cloudflare, Inc.
2 | // Licensed under the MIT license found in the LICENSE.txt file or at:
3 | // https://opensource.org/license/mit
4 |
5 | import { RpcTarget as RpcTargetImpl, RpcStub as RpcStubImpl, RpcPromise as RpcPromiseImpl } from "./core.js";
6 | import { serialize, deserialize } from "./serialize.js";
7 | import { RpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions } from "./rpc.js";
8 | import { RpcTargetBranded, RpcCompatible, Stub, Stubify, __RPC_TARGET_BRAND } from "./types.js";
9 | import { newWebSocketRpcSession as newWebSocketRpcSessionImpl,
10 | newWorkersWebSocketRpcResponse } from "./websocket.js";
11 | import { newHttpBatchRpcSession as newHttpBatchRpcSessionImpl,
12 | newHttpBatchRpcResponse, nodeHttpBatchRpcResponse } from "./batch.js";
13 | import { newMessagePortRpcSession as newMessagePortRpcSessionImpl } from "./messageport.js";
14 | import { forceInitMap } from "./map.js";
15 |
16 | forceInitMap();
17 |
18 | // Re-export public API types.
19 | export { serialize, deserialize, newWorkersWebSocketRpcResponse, newHttpBatchRpcResponse,
20 | nodeHttpBatchRpcResponse };
21 | export type { RpcTransport, RpcSessionOptions, RpcCompatible };
22 |
23 | // Hack the type system to make RpcStub's types work nicely!
24 | /**
25 | * Represents a reference to a remote object, on which methods may be remotely invoked via RPC.
26 | *
27 | * `RpcStub` can represent any interface (when using TypeScript, you pass the specific interface
28 | * type as `T`, but this isn't known at runtime). The way this works is, `RpcStub` is actually a
29 | * `Proxy`. It makes itself appear as if every possible method / property name is defined. You can
30 | * invoke any method name, and the invocation will be sent to the server. If it turns out that no
31 | * such method exists on the remote object, an exception is thrown back. But the client does not
32 | * actually know, until that point, what methods exist.
33 | */
34 | export type RpcStub> = Stub;
35 | export const RpcStub: {
36 | new >(value: T): RpcStub;
37 | } = RpcStubImpl;
38 |
39 | /**
40 | * Represents the result of an RPC call.
41 | *
42 | * Also used to represent properties. That is, `stub.foo` evaluates to an `RpcPromise` for the
43 | * value of `foo`.
44 | *
45 | * This isn't actually a JavaScript `Promise`. It does, however, have `then()`, `catch()`, and
46 | * `finally()` methods, like `Promise` does, and because it has a `then()` method, JavaScript will
47 | * allow you to treat it like a promise, e.g. you can `await` it.
48 | *
49 | * An `RpcPromise` is also a proxy, just like `RpcStub`, where calling methods or awaiting
50 | * properties will make a pipelined network request.
51 | *
52 | * Note that and `RpcPromise` is "lazy": the actual final result is not requested from the server
53 | * until you actually `await` the promise (or call `then()`, etc. on it). This is an optimization:
54 | * if you only intend to use the promise for pipelining and you never await it, then there's no
55 | * need to transmit the resolution!
56 | */
57 | export type RpcPromise> = Stub & Promise>;
58 | export const RpcPromise: {
59 | // Note: Cannot construct directly!
60 | } = RpcPromiseImpl;
61 |
62 | /**
63 | * Use to construct an `RpcSession` on top of a custom `RpcTransport`.
64 | *
65 | * Most people won't use this. You only need it if you've implemented your own `RpcTransport`.
66 | */
67 | export interface RpcSession = undefined> {
68 | getRemoteMain(): RpcStub;
69 | getStats(): {imports: number, exports: number};
70 |
71 | // Waits until the peer is not waiting on any more promise resolutions from us. This is useful
72 | // in particular to decide when a batch is complete.
73 | drain(): Promise;
74 | }
75 | export const RpcSession: {
76 | new = undefined>(
77 | transport: RpcTransport, localMain?: any, options?: RpcSessionOptions): RpcSession;
78 | } = RpcSessionImpl;
79 |
80 | // RpcTarget needs some hackage too to brand it properly and account for the implementation
81 | // conditionally being imported from "cloudflare:workers".
82 | /**
83 | * Classes which are intended to be passed by reference and called over RPC must extend
84 | * `RpcTarget`. A class which does not extend `RpcTarget` (and which doesn't have built-in support
85 | * from the RPC system) cannot be passed in an RPC message at all; an exception will be thrown.
86 | *
87 | * Note that on Cloudflare Workers, this `RpcTarget` is an alias for the one exported from the
88 | * "cloudflare:workers" module, so they can be used interchangably.
89 | */
90 | export interface RpcTarget extends RpcTargetBranded {};
91 | export const RpcTarget: {
92 | new(): RpcTarget;
93 | } = RpcTargetImpl;
94 |
95 | /**
96 | * Empty interface used as default type parameter for sessions where the other side doesn't
97 | * necessarily export a main interface.
98 | */
99 | interface Empty {}
100 |
101 | /**
102 | * Start a WebSocket session given either an already-open WebSocket or a URL.
103 | *
104 | * @param webSocket Either the `wss://` URL to connect to, or an already-open WebSocket object to
105 | * use.
106 | * @param localMain The main RPC interface to expose to the peer. Returns a stub for the main
107 | * interface exposed from the peer.
108 | */
109 | export let newWebSocketRpcSession: = Empty>
110 | (webSocket: WebSocket | string, localMain?: any, options?: RpcSessionOptions) => RpcStub =
111 | newWebSocketRpcSessionImpl;
112 |
113 | /**
114 | * Initiate an HTTP batch session from the client side.
115 | *
116 | * The parameters to this method have exactly the same signature as `fetch()`, but the return
117 | * value is an RpcStub. You can customize anything about the request except for the method
118 | * (it will always be set to POST) and the body (which the RPC system will fill in).
119 | */
120 | export let newHttpBatchRpcSession:>
121 | (urlOrRequest: string | Request, options?: RpcSessionOptions) => RpcStub =
122 | newHttpBatchRpcSessionImpl;
123 |
124 | /**
125 | * Initiate an RPC session over a MessagePort, which is particularly useful for communicating
126 | * between an iframe and its parent frame in a browser context. Each side should call this function
127 | * on its own end of the MessageChannel.
128 | */
129 | export let newMessagePortRpcSession: = Empty>
130 | (port: MessagePort, localMain?: any, options?: RpcSessionOptions) => RpcStub =
131 | newMessagePortRpcSessionImpl;
132 |
133 | /**
134 | * Implements unified handling of HTTP-batch and WebSocket responses for the Cloudflare Workers
135 | * Runtime.
136 | *
137 | * SECURITY WARNING: This function accepts cross-origin requests. If you do not want this, you
138 | * should validate the `Origin` header before calling this, or use `newHttpBatchRpcSession()` and
139 | * `newWebSocketRpcSession()` directly with appropriate security measures for each type of request.
140 | * But if your API uses in-band authorization (i.e. it has an RPC method that takes the user's
141 | * credentials as parameters and returns the authorized API), then cross-origin requests should
142 | * be safe.
143 | */
144 | export async function newWorkersRpcResponse(request: Request, localMain: any) {
145 | if (request.method === "POST") {
146 | let response = await newHttpBatchRpcResponse(request, localMain);
147 | // Since we're exposing the same API over WebSocket, too, and WebSocket always allows
148 | // cross-origin requests, the API necessarily must be safe for cross-origin use (e.g. because
149 | // it uses in-band authorization, as recommended in the readme). So, we might as well allow
150 | // batch requests to be made cross-origin as well.
151 | response.headers.set("Access-Control-Allow-Origin", "*");
152 | return response;
153 | } else if (request.headers.get("Upgrade")?.toLowerCase() === "websocket") {
154 | return newWorkersWebSocketRpcResponse(request, localMain);
155 | } else {
156 | return new Response("This endpoint only accepts POST or WebSocket requests.", { status: 400 });
157 | }
158 | }
159 |
--------------------------------------------------------------------------------
/__tests__/workerd.test.ts:
--------------------------------------------------------------------------------
1 | // Copyright (c) 2025 Cloudflare, Inc.
2 | // Licensed under the MIT license found in the LICENSE.txt file or at:
3 | // https://opensource.org/license/mit
4 |
5 | ///
6 | import { expect, it, describe } from "vitest";
7 | import { RpcStub as NativeRpcStub, RpcTarget as NativeRpcTarget, env, DurableObject } from "cloudflare:workers";
8 | import { newHttpBatchRpcSession, newWebSocketRpcSession, RpcStub, RpcTarget } from "../src/index-workers.js";
9 | import { Counter, TestTarget } from "./test-util.js";
10 |
11 | class JsCounter extends RpcTarget {
12 | constructor(private i: number = 0) {
13 | super();
14 | }
15 |
16 | increment(amount: number = 1): number {
17 | this.i += amount;
18 | return this.i;
19 | }
20 |
21 | get value() {
22 | return this.i;
23 | }
24 | }
25 |
26 | class NativeCounter extends RpcTarget {
27 | constructor(private i: number = 0) {
28 | super();
29 | }
30 |
31 | increment(amount: number = 1): number {
32 | this.i += amount;
33 | return this.i;
34 | }
35 |
36 | get value() {
37 | return this.i;
38 | }
39 | }
40 |
41 | class CounterFactory extends RpcTarget {
42 | getNative() {
43 | return new NativeRpcStub(new NativeCounter());
44 | }
45 |
46 | getNativeEmbedded() {
47 | return {stub: new NativeRpcStub(new NativeCounter())};
48 | }
49 |
50 | getJs() {
51 | return new RpcStub(new JsCounter());
52 | }
53 |
54 | getJsEmbedded() {
55 | return {stub: new RpcStub(new JsCounter())};
56 | }
57 | }
58 |
59 | describe("workerd compatibility", () => {
60 | it("allows native RpcStubs to be created using userspace RpcTargets", async () => {
61 | let stub = new NativeRpcStub(new JsCounter());
62 | expect(await stub.increment()).toBe(1);
63 | expect(await stub.increment()).toBe(2);
64 |
65 | expect(await stub.value).toBe(2);
66 | })
67 |
68 | it("allows userspace RpcStubs to be created using native RpcTargets", async () => {
69 | let stub = new RpcStub(new NativeCounter());
70 | expect(await stub.increment()).toBe(1);
71 | expect(await stub.increment()).toBe(2);
72 |
73 | expect(await stub.value).toBe(2);
74 | })
75 |
76 | it("can wrap a native stub in a userspace stub", async () => {
77 | let stub = new RpcStub(new NativeRpcStub(new NativeCounter()));
78 | expect(await stub.increment()).toBe(1);
79 | expect(await stub.increment()).toBe(2);
80 |
81 | expect(await stub.value).toBe(2);
82 | })
83 |
84 | it("can return a native stub from a userspace call", async () => {
85 | // Returning a bare stub.
86 | {
87 | let factory = new RpcStub(new CounterFactory());
88 | let stub = await factory.getNative();
89 | expect(await stub.increment()).toBe(1);
90 | expect(await stub.increment()).toBe(2);
91 |
92 | expect(await stub.value).toBe(2);
93 | }
94 |
95 | // Again with a stub wrapped in an object.
96 | {
97 | let factory = new RpcStub(new CounterFactory());
98 | let obj = await factory.getNativeEmbedded();
99 | expect(await obj.stub.increment()).toBe(1);
100 | expect(await obj.stub.increment()).toBe(2);
101 |
102 | expect(await obj.stub.value).toBe(2);
103 | }
104 | })
105 |
106 | it("can wrap a native promise or property in a userspace stub", async () => {
107 | // Wrap a native RpcPromise in a userspace stub.
108 | {
109 | let factory = new NativeRpcStub(new CounterFactory());
110 | let stub = new RpcStub(factory.getNative());
111 | expect(await stub.increment()).toBe(1);
112 | expect(await stub.increment()).toBe(2);
113 |
114 | expect(await stub.value).toBe(2);
115 | }
116 |
117 | // Wrap a native RpcProperty in a userspace stub.
118 | {
119 | let factory = new NativeRpcStub(new CounterFactory());
120 | let stub = new RpcStub(factory.getNativeEmbedded().stub);
121 | expect(await stub.increment()).toBe(1);
122 | expect(await stub.increment()).toBe(2);
123 |
124 | expect(await stub.value).toBe(2);
125 | }
126 | })
127 |
128 | it("can pipeline on a native stub returned from a userspace call", async () => {
129 | {
130 | let factory = new RpcStub(new CounterFactory());
131 | let obj = factory.getNative();
132 | expect(await obj.increment()).toBe(1);
133 | expect(await obj.increment()).toBe(2);
134 |
135 | expect(await obj.value).toBe(2);
136 | }
137 |
138 | {
139 | let factory = new RpcStub(new CounterFactory());
140 | let obj = factory.getNativeEmbedded();
141 | expect(await obj.stub.increment()).toBe(1);
142 | expect(await obj.stub.increment()).toBe(2);
143 |
144 | expect(await obj.stub.value).toBe(2);
145 | }
146 | })
147 |
148 | it("can wrap a userspace stub in a native stub", async () => {
149 | let stub = new NativeRpcStub(new RpcStub(new JsCounter()));
150 | expect(await stub.increment()).toBe(1);
151 | expect(await stub.increment()).toBe(2);
152 |
153 | expect(await stub.value).toBe(2);
154 | })
155 |
156 | it("can return a userspace stub from a native call", async () => {
157 | // Returning a bare stub.
158 | {
159 | let factory = new NativeRpcStub(new CounterFactory());
160 | let stub = await factory.getJs();
161 | expect(await stub.increment()).toBe(1);
162 | expect(await stub.increment()).toBe(2);
163 |
164 | expect(await stub.value).toBe(2);
165 | }
166 |
167 | // Again with a stub wrapped in an object.
168 | {
169 | let factory = new NativeRpcStub(new CounterFactory());
170 | let obj = await factory.getJsEmbedded();
171 | expect(await obj.stub.increment()).toBe(1);
172 | expect(await obj.stub.increment()).toBe(2);
173 |
174 | expect(await obj.stub.value).toBe(2);
175 | }
176 | })
177 |
178 | it("can wrap a userspace promise or property in a native stub", async () => {
179 | // Wrap a userspace RpcPromise in a native stub.
180 | {
181 | let factory = new RpcStub(new CounterFactory());
182 | let stub = new NativeRpcStub(factory.getJs());
183 | expect(await stub.increment()).toBe(1);
184 | expect(await stub.increment()).toBe(2);
185 |
186 | expect(await stub.value).toBe(2);
187 | }
188 |
189 | // Wrap a userspace property (which is actually also an RpcPromise) in a native stub.
190 | {
191 | let factory = new RpcStub(new CounterFactory());
192 | let stub = new NativeRpcStub(factory.getJsEmbedded().stub);
193 | expect(await stub.increment()).toBe(1);
194 | expect(await stub.increment()).toBe(2);
195 |
196 | expect(await stub.value).toBe(2);
197 | }
198 | })
199 |
200 | it("can pipeline on a userspace stub returned from a native call", async () => {
201 | {
202 | let factory = new NativeRpcStub(new CounterFactory());
203 | let obj = factory.getJs();
204 | expect(await obj.increment()).toBe(1);
205 | expect(await obj.increment()).toBe(2);
206 |
207 | expect(await obj.value).toBe(2);
208 | }
209 |
210 | {
211 | let factory = new NativeRpcStub(new CounterFactory());
212 | let obj = factory.getJsEmbedded();
213 | expect(await obj.stub.increment()).toBe(1);
214 | expect(await obj.stub.increment()).toBe(2);
215 |
216 | expect(await obj.stub.value).toBe(2);
217 | }
218 | })
219 |
220 | it("can wrap a ServiceStub in an RpcStub", async () => {
221 | let result = await new RpcStub((env).testServer).greet("World");
222 | expect(result).toBe("Hello, World!");
223 | });
224 | });
225 |
226 | interface Env {
227 | testServer: Fetcher
228 | }
229 |
230 | interface TestDo extends DurableObject {
231 | setValue(val: any): void;
232 | getValue(): any;
233 | }
234 |
235 | interface WorkerdTestTarget extends TestTarget {
236 | getDurableObject(name: string): DurableObjectStub;
237 | }
238 |
239 | describe("workerd RPC server", () => {
240 | it("can accept WebSocket RPC connections", async () => {
241 | let resp = await (env).testServer.fetch("http://foo", {headers: {Upgrade: "websocket"}});
242 | let ws = resp.webSocket;
243 | expect(ws).toBeTruthy();
244 |
245 | ws!.accept();
246 | let cap = newWebSocketRpcSession(ws!);
247 |
248 | expect(await cap.square(5)).toBe(25);
249 |
250 | {
251 | let counter = cap.makeCounter(2);
252 | expect(await counter.increment(3)).toBe(5);
253 | }
254 |
255 | {
256 | let counter = new Counter(4);
257 | expect(await cap.incrementCounter(counter, 9)).toBe(13);
258 | }
259 |
260 | // Test that we can pass a Durable Object stub over RPC.
261 | {
262 | let foo = cap.getDurableObject("foo");
263 | foo.setValue(123);
264 |
265 | let bar = await cap.getDurableObject("bar");
266 | bar.setValue("abc");
267 |
268 | expect(await foo.getValue()).toBe(123);
269 | expect(await bar.getValue()).toBe("abc");
270 | }
271 | })
272 |
273 | it("can accept HTTP batch RPC connections", async () => {
274 | let cap = newHttpBatchRpcSession(
275 | new Request("http://foo", {fetcher: (env).testServer}));
276 |
277 | let promise1 = cap.square(6);
278 |
279 | let counter = cap.makeCounter(2);
280 | let promise2 = counter.increment(3);
281 | let promise3 = cap.incrementCounter(counter, 4);
282 |
283 | expect(await Promise.all([promise1, promise2, promise3]))
284 | .toStrictEqual([36, 5, 9]);
285 | })
286 | });
287 |
--------------------------------------------------------------------------------
/examples/worker-react/web/src/main/App.tsx:
--------------------------------------------------------------------------------
1 | import { useMemo, useState } from 'react'
2 | import { newHttpBatchRpcSession } from 'capnweb'
3 | import type { Api } from '../../../src/worker'
4 |
5 | type Result = {
6 | posts: number
7 | ms: number
8 | user: any
9 | profile: any
10 | notifications: any
11 | trace: Trace
12 | }
13 |
14 | type CallEvent = { label: string, start: number, end: number }
15 | type NetEvent = { label: string, start: number, end: number }
16 | type Trace = { total: number, calls: CallEvent[], network: NetEvent[] }
17 |
18 | export function App() {
19 | const [pipelined, setPipelined] = useState(null)
20 | const [sequential, setSequential] = useState(null)
21 | const [running, setRunning] = useState(false)
22 |
23 | // Network RTT is now simulated on the server (Worker). See wrangler.toml vars.
24 |
25 | // Count RPC POSTs and capture network timing by wrapping fetch while this component is mounted.
26 | const wrapFetch = useMemo(() => {
27 | let posts = 0
28 | let origin = 0
29 | let events: NetEvent[] = []
30 | const orig = globalThis.fetch
31 | function install() {
32 | ;(globalThis as any).fetch = async (input: RequestInfo, init?: RequestInit) => {
33 | const method = (init?.method) || (input instanceof Request ? input.method : 'GET')
34 | const url = input instanceof Request ? input.url : String(input)
35 | if (url.endsWith('/api') && method === 'POST') {
36 | posts++
37 | const start = performance.now() - origin
38 | const resp = await orig(input as any, init)
39 | const end = performance.now() - origin
40 | events.push({ label: 'POST /api', start, end })
41 | return resp
42 | }
43 | return orig(input as any, init)
44 | }
45 | }
46 | function uninstall() { ;(globalThis as any).fetch = orig }
47 | function get() { return posts }
48 | function reset() { posts = 0; events = [] }
49 | function setOrigin(o: number) { origin = o }
50 | function getEvents(): NetEvent[] { return events.slice() }
51 | return { install, uninstall, get, reset, setOrigin, getEvents }
52 | }, [])
53 |
54 | async function runPipelined() {
55 | wrapFetch.reset()
56 | const t0 = performance.now()
57 | wrapFetch.setOrigin(t0)
58 | const calls: CallEvent[] = []
59 | const api = newHttpBatchRpcSession('/api')
60 | const userStart = 0; calls.push({ label: 'authenticate', start: userStart, end: NaN })
61 | const user = api.authenticate('cookie-123')
62 | user.then(() => { calls.find(c => c.label==='authenticate')!.end = performance.now() - t0 })
63 |
64 | const profStart = performance.now() - t0; calls.push({ label: 'getUserProfile', start: profStart, end: NaN })
65 | const profile = api.getUserProfile(user.id)
66 | profile.then(() => { calls.find(c => c.label==='getUserProfile')!.end = performance.now() - t0 })
67 |
68 | const notiStart = performance.now() - t0; calls.push({ label: 'getNotifications', start: notiStart, end: NaN })
69 | const notifications = api.getNotifications(user.id)
70 | notifications.then(() => { calls.find(c => c.label==='getNotifications')!.end = performance.now() - t0 })
71 |
72 | const [u, p, n] = await Promise.all([user, profile, notifications])
73 | const t1 = performance.now()
74 | const net = wrapFetch.getEvents()
75 | const total = t1 - t0
76 | // Ensure any missing ends are set
77 | calls.forEach(c => { if (!Number.isFinite(c.end)) c.end = total })
78 | return { posts: wrapFetch.get(), ms: total, user: u, profile: p, notifications: n,
79 | trace: { total, calls, network: net } }
80 | }
81 |
82 | async function runSequential() {
83 | wrapFetch.reset()
84 | const t0 = performance.now()
85 | wrapFetch.setOrigin(t0)
86 | const calls: CallEvent[] = []
87 | const api1 = newHttpBatchRpcSession('/api')
88 | const aStart = 0; calls.push({ label: 'authenticate', start: aStart, end: NaN })
89 | const uPromise = api1.authenticate('cookie-123')
90 | uPromise.then(() => { calls.find(c => c.label==='authenticate')!.end = performance.now() - t0 })
91 | const u = await uPromise
92 |
93 | const api2 = newHttpBatchRpcSession('/api')
94 | const pStart = performance.now() - t0; calls.push({ label: 'getUserProfile', start: pStart, end: NaN })
95 | const pPromise = api2.getUserProfile(u.id)
96 | pPromise.then(() => { calls.find(c => c.label==='getUserProfile')!.end = performance.now() - t0 })
97 | const p = await pPromise
98 |
99 | const api3 = newHttpBatchRpcSession('/api')
100 | const nStart = performance.now() - t0; calls.push({ label: 'getNotifications', start: nStart, end: NaN })
101 | const nPromise = api3.getNotifications(u.id)
102 | nPromise.then(() => { calls.find(c => c.label==='getNotifications')!.end = performance.now() - t0 })
103 | const n = await nPromise
104 |
105 | const t1 = performance.now()
106 | const net = wrapFetch.getEvents()
107 | const total = t1 - t0
108 | calls.forEach(c => { if (!Number.isFinite(c.end)) c.end = total })
109 | return { posts: wrapFetch.get(), ms: total, user: u, profile: p, notifications: n,
110 | trace: { total, calls, network: net } }
111 | }
112 |
113 | async function runDemo() {
114 | if (running) return
115 | setRunning(true)
116 | wrapFetch.install()
117 | try {
118 | const piped = await runPipelined()
119 | setPipelined(piped)
120 | const seq = await runSequential()
121 | setSequential(seq)
122 | } finally {
123 | wrapFetch.uninstall()
124 | setRunning(false)
125 | }
126 | }
127 |
128 | return (
129 |
130 |
Cap'n Web: Cloudflare Workers + React
131 |
Network RTT is simulated on the server (configurable via SIMULATED_RTT_MS/SIMULATED_RTT_JITTER_MS in wrangler.toml).
132 |
This demo calls the Worker API in two ways:
133 |
134 | - Pipelined batch: dependent calls in one round trip
135 | - Sequential non-batched: three separate round trips
136 |
137 |
140 |
141 | {pipelined && (
142 |
143 | Pipelined (batched)
144 | HTTP POSTs: {pipelined.posts}
145 | Time: {pipelined.ms.toFixed(1)} ms
146 |
147 | {JSON.stringify({
148 | user: pipelined.user,
149 | profile: pipelined.profile,
150 | notifications: pipelined.notifications,
151 | }, null, 2)}
152 |
153 | )}
154 |
155 | {sequential && (
156 |
157 | Sequential (non-batched)
158 | HTTP POSTs: {sequential.posts}
159 | Time: {sequential.ms.toFixed(1)} ms
160 |
161 | {JSON.stringify({
162 | user: sequential.user,
163 | profile: sequential.profile,
164 | notifications: sequential.notifications,
165 | }, null, 2)}
166 |
167 | )}
168 |
169 | {(pipelined && sequential) && (
170 |
171 | Summary
172 | Pipelined: {pipelined.posts} POST, {pipelined.ms.toFixed(1)} ms
173 | Sequential: {sequential.posts} POSTs, {sequential.ms.toFixed(1)} ms
174 |
175 | )}
176 |
177 | )
178 | }
179 |
180 | function TraceView({ trace }: { trace: Trace }) {
181 | const width = 700
182 | const rowHeight = 22
183 | const gap = 8
184 | const rows = [ 'Network', ...trace.calls.map(c => c.label) ]
185 | const totalHeight = rows.length * (rowHeight + gap) + 10
186 | const scale = (t: number) => (t / Math.max(trace.total, 1)) * width
187 |
188 | // Deduplicate call labels in case of repeats
189 | const renderedCalls = trace.calls.map((c, i) => ({...c, idx: i}))
190 |
191 | return (
192 |
221 | )
222 | }
223 |
224 | function colorFor(i: number): string {
225 | const palette = ['#3b82f6', '#22c55e', '#f59e0b', '#ef4444', '#8b5cf6']
226 | return palette[i % palette.length]
227 | }
228 |
--------------------------------------------------------------------------------
/protocol.md:
--------------------------------------------------------------------------------
1 | # RPC Protocol
2 |
3 | ## Serialization
4 |
5 | The protocol uses JSON as its basic serialization, with a preprocessing step to support non-JSON types.
6 |
7 | Why not a binary format? While the author is a big fan of optimized binary protocols in other contexts, it cannot be denied that in a browser, JSON has big advantages. Being built-in to the browser gives it a leg up in performance, code size, and developer tooling.
8 |
9 | Non-JSON types are encoded using arrays. The first element of the array contains a string type code, and the remaining elements contain the parameters needed to construct that type. For example, a `Date` might be encoded as:
10 |
11 | ```
12 | ["date", 1749342170815]
13 | ```
14 |
15 | To encode an array, the array must be wrapped in a second layer of array to create an array expression:
16 |
17 | ```
18 | [["just", "an", "array"]]
19 | ```
20 |
21 | ## Client vs. Server
22 |
23 | The protocol does not have a "client" or a "server"; it is fully bidirectional. Either side can call interfaces exported by the other.
24 |
25 | With that said, for documentation purposes, we often use the words "client" and "server" when describing specific interactions, in order to make the language easier to understand. The word "client" generally refers to the caller of an RPC, or the importer of a stub. The word "server" refers to the callee, or the exporter. This is merely a convention to make explanations more natural.
26 |
27 | ## Imports and Exports
28 |
29 | Each side of an RPC session maintains two tables: imports and exports. One side's exports correspond to the other side's imports. Imports and exports are assigned sequential numeric IDs. However, in some cases an ID needs to be chosen by the importing side, and in some cases by the exporting side. In order to avoid conflicts:
30 |
31 | * When the importing side chooses the ID, it chooses the next positive ID (starting from 1 and going up).
32 | * When the exporting side chooses the ID, it chooses the next negative ID (starting from -1 and going down).
33 | * ID zero is automatically assigned to the "main" interface.
34 |
35 | To be more specific:
36 |
37 | * The importing side chooses the ID when it initiates a call: the ID represents the result of the call.
38 | * The exporting side chooses the ID when it sends a message containing a stub: the ID represents the target of the stub.
39 |
40 | For comparison, in CapTP and Cap'n Proto, there are four tables instead of two: imports, exports, questions, and answers. In this library, we have unified questions with imports, and answers with exports.
41 |
42 | By convention, when describing the meaning of any RPC message, we always take the perspective of the sender. So, if a message contains an "import ID", it is an import from the perspective of the sender, and an export from the perspective of the recipient.
43 |
44 | Note that IDs are never reused. This differs from Cap'n Proto, which always tries to choose the smallest available ID. We assume no session will ever exceed 2^53 IDs, so simply assigning sequentially should be fine.
45 |
46 | ## Push and pull
47 |
48 | An RPC call follows this sequence:
49 |
50 | * The client sends the server a "push" message, containing an expression to evaluate.
51 | * The "push" message is implicitly assigned the next positive ID in the client's import table.
52 | * The expression expresses the call to make.
53 | * Upon receipt, the server evaluates the expression and delivers the call to the application.
54 | * The client subsequently sends the server a "pull" message, specifying the import ID just created by the "push". This expresses that the client is interested in receiving the result of the call as a "resolve" message.
55 | * The client may subsequently refer to the import ID in pipelined requests.
56 | * When the server is done executing the call, it sends a "resolve" message, specifying the export ID of the "push" and an expression for its result.
57 | * Upon receiving the resolution, the client no longer needs the import table entry, so sends a "release" message.
58 | * Upon receipt, the server disposes its copy of the return value, if necessary.
59 |
60 | Some notes:
61 |
62 | * The client does not need to send a "pull" message if it doesn't care to receive the results. In practice, if the application never awaits the promise, then it is never pulled. The promise can still be used in pipelining without pulling.
63 | * Technically, the pushed expression can contain any number of calls, including none. A client could, for example, push a large data structure containing no calls, and then subsequently make multiple calls that use this data structure via "pipelining", to avoid having to send the same data multiple times.
64 | * If the call throws an exception, the server will send a "reject" message instead of "resolve".
65 | * "resolve" and "reject" are the same messages used to resolve exported promises, that is, a promise that was introduced when it was sent as part of some other RPC message. Thus, calls and exported promises work the same. This differs from Cap'n Proto, where returning from a call and resolving an exported promise were entirely different messages (with a lot of duplicated semantics).
66 |
67 | ## Top-level RPC Messages
68 |
69 | The following are the top-level messages that can be sent over the RPC transport.
70 |
71 | `["push", expression]`
72 |
73 | Asks the recipient to evaluate the given expression. The expression is implicitly assigned the next sequential import ID (in the positive direction). The recipient will evaluate the expression, delivering any calls therein to the application. The final result can be pulled, or used in promise pipelining.
74 |
75 | `["pull", importId]`
76 |
77 | Signals that the sender would like to receive a "resolve" message for the resolution of the given import, which must refer to a promise. This is normally only used for imports created by a "push", as exported promises are pulled automatically.
78 |
79 | `["resolve", exportId, expression]`
80 |
81 | Instructs the recipient to evaluate the given expression and then use it as the resolution of the given promise export.
82 |
83 | `["reject", exportId, expression]`
84 |
85 | Instructs the recipient to evaluate the given expression and then use it to reject the given promise export. The expression is not permitted to contain stubs. It typically evaluates to an `Error`, although technically JavaScript does not require that thrown values are `Error`s.
86 |
87 | `["release", importId, refcount]`
88 |
89 | Instructs the recipient to release the given entry in the import table, disposing whatever it is connected to. If the import is a promise, the recipient is no longer obliged to send a "resolve" message for it, though it is still permitted to do so.
90 |
91 | `refcount` is the total number of times this import ID has been "introduced", i.e. the number of times it has been the subject of an "export" or "promise" expression, plus 1 if it was created by a "push". The refcount must be sent to avoid a race condition if the receiving side has recently exported the same ID again. The exporter remembers how many times they have exported this ID, decrementing it by the refcount of any release messages received, and only actually releases the ID when this count reaches zero.
92 |
93 | `["abort", expression]`
94 |
95 | Indicates that the sender has experienced an error causing it to terminate the session. The expression evaluates to the error which caused the abort. No further messages will be sent nor received.
96 |
97 | ## Expressions
98 |
99 | Expressions are JSON-serializable object trees. All JSON types except arrays are interpreted literally. Arrays are further evaluated into a final value as follows.
100 |
101 | `[[...]]`
102 |
103 | An array expression. The inner array contains expressions (one for each array element), which are individually evaluated to produce the final array value.
104 |
105 | For example, this expression represents an object containing an array:
106 |
107 | ```
108 | {
109 | "key": [[
110 | "abc",
111 | ["date", 1757214689123],
112 | [[0]]
113 | ]]
114 | }
115 | ```
116 |
117 | This is an expression which will evaluate to an object. The expression representing the value of the "key" field is an array expression.
118 | - The 1st item in the array expression is an expression for the string "abc"
119 | - The 2nd item is an expression for a date object
120 | - The 3rd item is another array expression containing an integer expression representing zero.
121 |
122 | This expression will evaluate to the following object:
123 | ```
124 | {
125 | "key": [
126 | "abc",
127 | Date(1757214689123),
128 | [0]
129 | ]
130 | }
131 | ```
132 |
133 | `["undefined"]`
134 |
135 | The literal value `undefined`.
136 |
137 | `["inf"]`, `["-inf"]`, `["nan"]`
138 |
139 | The values Infinity, -Infinity, and NaN.
140 |
141 | `["bytes", base64]`
142 |
143 | A `Uint8Array`, represented as a base64-encoded string.
144 |
145 | `["bigint", decimal]`
146 |
147 | A bigint value, represented as a decimal string.
148 |
149 | `["date", number]`
150 |
151 | A JavaScript `Date` value. The number represents milliseconds since the Unix epoch.
152 |
153 | `["error", type, message, stack?]`
154 |
155 | A JavaScript `Error` value. `type` is the name of the specific well-known `Error` subclass, e.g. "TypeError". `message` is a string containing the error message. `stack` may optionally contain the stack trace, though by default stacks will be redacted for security reasons.
156 |
157 | _TODO: We should extend this to encode own properties that have been added to the error._
158 |
159 | `["import", importId, propertyPath, callArguments]`
160 | `["pipeline", importId, propertyPath, callArguments]`
161 |
162 | References an entry on the import table (from the perspective of the sender), possibly performing actions on it.
163 |
164 | If the type is "import", the expression evaluates to a stub. If it is "pipeline", the expression evaluates to a promise. The difference is important because promises must be replaced with their resolution before delivering the message to the application, whereas stubs will be delivered as stubs without waiting for any resolution.
165 |
166 | `propertyPath` is optional. If specified, it is an array of property names (strings or numbers) leading to a specific property of the import's target. The expression evaluates to that property (unless `callArguments` is also specified).
167 |
168 | `callArguments` is also optional. If specified, then the given property should be called as a function. `callArguments` is an array of expressions; these expressions are evaluated to produce the arguments to the call.
169 |
170 | `["remap", importId, propertyPath, captures, instructions]`
171 |
172 | Implements the `.map()` operation. (We call this "remap" so as not to confuse with the serialization of a `Map` object.)
173 |
174 | `importId` and `propertyPath` are the same as for the `"import"` operation. These identify the particular property which is to be mapped.
175 |
176 | `captures` and `instructions` define the mapper function which is to apply to the target value.
177 |
178 | `captures` defines the set of stubs which the mapper function has captured, in the sense of a lambda capture. The body of the function may call these stubs. The format of `captures` is an array, where each member of the array is either `["import", importId]` or `["export", exportId]`, which refer to an entry on the (sender's) import or export table, respectively.
179 |
180 | `instructions` contains a list of expressions which should be evaluated to execute the mapper function on a particular input value. Each instruction is an expression in the same format described in this doc, but with special handling of imports and exports. For the purpose of the instructions in a mapper, there is no export table. The import table, meanwhile, is defined as follows:
181 | * Negative values refer to the `captures` list, starting from -1. So, -1 is `captures[0]`, -2 is `captures[1]`, and so on.
182 | * Zero refers to the input value of the map function.
183 | * Positive values refer to the results of previous instructions, starting from 1. So, 1 is the result of evaluating `instructions[0]`, 2 is the result of evaluating `instructions[1]`, and so on.
184 |
185 | The instructions are always evaluated in order. Each instruction may only import results of instructions that came before it. The last instruction evaluates to the return value of the map function.
186 |
187 | `["export", exportId]`
188 |
189 | The sender is exporting a new stub (or re-exporting a stub that was exported before). The expression evaluates to a stub.
190 |
191 | `["promise", exportId]`
192 |
193 | Like "export", but the expression evaluates to a promise. Promises must be replaced with their resolution before the message is finally delivered to the application.
194 |
195 | The `exportId` in this case is always a newly-allocated ID. The sender will proactively send a "resolve" (or "reject") message for this ID when the promise resolves (unless it is released first). The recipient does not need to "pull" the promise explicitly; it is assumed that the recipient always wants the resolution.
196 |
--------------------------------------------------------------------------------
/src/map.ts:
--------------------------------------------------------------------------------
1 | // Copyright (c) 2025 Cloudflare, Inc.
2 | // Licensed under the MIT license found in the LICENSE.txt file or at:
3 | // https://opensource.org/license/mit
4 |
5 | import { StubHook, PropertyPath, RpcPayload, RpcStub, RpcPromise, withCallInterceptor, ErrorStubHook, mapImpl, PayloadStubHook, unwrapStubAndPath, unwrapStubNoProperties } from "./core.js";
6 | import { Devaluator, Exporter, Importer, ExportId, ImportId, Evaluator } from "./serialize.js";
7 |
8 | let currentMapBuilder: MapBuilder | undefined;
9 |
10 | // We use this type signature when building the instructions for type checking purposes. It
11 | // describes a subset of the overall RPC protocol.
12 | export type MapInstruction =
13 | | ["pipeline", number, PropertyPath]
14 | | ["pipeline", number, PropertyPath, unknown]
15 | | ["remap", number, PropertyPath, ["import", number][], MapInstruction[]]
16 |
17 | class MapBuilder implements Exporter {
18 | private context:
19 | | {parent: undefined, captures: StubHook[], subject: StubHook, path: PropertyPath}
20 | | {parent: MapBuilder, captures: number[], subject: number, path: PropertyPath};
21 | private captureMap: Map = new Map();
22 |
23 | private instructions: MapInstruction[] = [];
24 |
25 | constructor(subject: StubHook, path: PropertyPath) {
26 | if (currentMapBuilder) {
27 | this.context = {
28 | parent: currentMapBuilder,
29 | captures: [],
30 | subject: currentMapBuilder.capture(subject),
31 | path
32 | };
33 | } else {
34 | this.context = {
35 | parent: undefined,
36 | captures: [],
37 | subject,
38 | path
39 | };
40 | }
41 |
42 | currentMapBuilder = this;
43 | }
44 |
45 | unregister() {
46 | currentMapBuilder = this.context.parent;
47 | }
48 |
49 | makeInput(): MapVariableHook {
50 | return new MapVariableHook(this, 0);
51 | }
52 |
53 | makeOutput(result: RpcPayload): StubHook {
54 | let devalued: unknown;
55 | try {
56 | devalued = Devaluator.devaluate(result.value, undefined, this, result);
57 | } finally {
58 | result.dispose();
59 | }
60 |
61 | // The result is the final instruction. This doesn't actually fit our MapInstruction type
62 | // signature, so we cheat a bit.
63 | this.instructions.push(devalued);
64 |
65 | if (this.context.parent) {
66 | this.context.parent.instructions.push(
67 | ["remap", this.context.subject, this.context.path,
68 | this.context.captures.map(cap => ["import", cap]),
69 | this.instructions]
70 | );
71 | return new MapVariableHook(this.context.parent, this.context.parent.instructions.length);
72 | } else {
73 | return this.context.subject.map(this.context.path, this.context.captures, this.instructions);
74 | }
75 | }
76 |
77 | pushCall(hook: StubHook, path: PropertyPath, params: RpcPayload): StubHook {
78 | let devalued = Devaluator.devaluate(params.value, undefined, this, params);
79 | // HACK: Since the args is an array, devaluator will wrap in a second array. Need to unwrap.
80 | // TODO: Clean this up somehow.
81 | devalued = (>devalued)[0];
82 |
83 | let subject = this.capture(hook.dup());
84 | this.instructions.push(["pipeline", subject, path, devalued]);
85 | return new MapVariableHook(this, this.instructions.length);
86 | }
87 |
88 | pushGet(hook: StubHook, path: PropertyPath): StubHook {
89 | let subject = this.capture(hook.dup());
90 | this.instructions.push(["pipeline", subject, path]);
91 | return new MapVariableHook(this, this.instructions.length);
92 | }
93 |
94 | capture(hook: StubHook): number {
95 | if (hook instanceof MapVariableHook && hook.mapper === this) {
96 | // Oh, this is already our own hook.
97 | return hook.idx;
98 | }
99 |
100 | // TODO: Well, the hooks passed in are always unique, so they'll never exist in captureMap.
101 | // I suppose this is a problem with RPC as well. We need a way to identify hooks that are
102 | // dupes of the same target.
103 | let result = this.captureMap.get(hook);
104 | if (result === undefined) {
105 | if (this.context.parent) {
106 | let parentIdx = this.context.parent.capture(hook);
107 | this.context.captures.push(parentIdx);
108 | } else {
109 | this.context.captures.push(hook);
110 | }
111 | result = -this.context.captures.length;
112 | this.captureMap.set(hook, result);
113 | }
114 | return result;
115 | }
116 |
117 | // ---------------------------------------------------------------------------
118 | // implements Exporter
119 |
120 | exportStub(hook: StubHook): ExportId {
121 | // It appears someone did something like:
122 | //
123 | // stub.map(x => { return x.doSomething(new MyRpcTarget()); })
124 | //
125 | // That... won't work. They need to do this instead:
126 | //
127 | // using myTargetStub = new RpcStub(new MyRpcTarget());
128 | // stub.map(x => { return x.doSomething(myTargetStub.dup()); })
129 | //
130 | // TODO(someday): Consider carefully if the inline syntax is maybe OK. If so, perhaps the
131 | // serializer could try calling `getImport()` even for known-local hooks.
132 | // TODO(someday): Do we need to support rpc-thenable somehow?
133 | throw new Error(
134 | "Can't construct an RpcTarget or RPC callback inside a mapper function. Try creating a " +
135 | "new RpcStub outside the callback first, then using it inside the callback.");
136 | }
137 | exportPromise(hook: StubHook): ExportId {
138 | return this.exportStub(hook);
139 | }
140 | getImport(hook: StubHook): ImportId | undefined {
141 | return this.capture(hook);
142 | }
143 |
144 | unexport(ids: Array): void {
145 | // Presumably this MapBuilder is cooked anyway, so we don't really have to release anything.
146 | }
147 |
148 | onSendError(error: Error): Error | void {
149 | // TODO(someday): Can we use the error-sender hook from the RPC system somehow?
150 | }
151 | };
152 |
153 | mapImpl.sendMap = (hook: StubHook, path: PropertyPath, func: (promise: RpcPromise) => unknown) => {
154 | let builder = new MapBuilder(hook, path);
155 | let result: RpcPayload;
156 | try {
157 | result = RpcPayload.fromAppReturn(withCallInterceptor(builder.pushCall.bind(builder), () => {
158 | return func(new RpcPromise(builder.makeInput(), []));
159 | }));
160 | } finally {
161 | builder.unregister();
162 | }
163 |
164 | // Detect misuse: Map callbacks cannot be async.
165 | if (result instanceof Promise) {
166 | // Squelch unhandled rejections from the map function itself -- it'll probably just throw
167 | // something about pulling a MapVariableHook.
168 | result.catch(err => {});
169 |
170 | // Throw an understandable error.
171 | throw new Error("RPC map() callbacks cannot be async.");
172 | }
173 |
174 | return new RpcPromise(builder.makeOutput(result), []);
175 | }
176 |
177 | function throwMapperBuilderUseError(): never {
178 | throw new Error(
179 | "Attempted to use an abstract placeholder from a mapper function. Please make sure your " +
180 | "map function has no side effects.");
181 | }
182 |
183 | // StubHook which represents a variable in a map function.
184 | class MapVariableHook extends StubHook {
185 | constructor(public mapper: MapBuilder, public idx: number) {
186 | super();
187 | }
188 |
189 | // We don't have anything we actually need to dispose, so dup() can just return the same hook.
190 | dup(): StubHook { return this; }
191 | dispose(): void {}
192 |
193 | get(path: PropertyPath): StubHook {
194 | // This can actually be invoked as part of serialization, so we'll need to support it.
195 | if (path.length == 0) {
196 | // Since this hook cannot be pulled anyway, and dispose() is a no-op, we can actually just
197 | // return the same hook again to represent getting the empty path.
198 | return this;
199 | } else if (currentMapBuilder) {
200 | return currentMapBuilder.pushGet(this, path);
201 | } else {
202 | throwMapperBuilderUseError();
203 | }
204 | }
205 |
206 | // Other methods should never be called.
207 | call(path: PropertyPath, args: RpcPayload): StubHook {
208 | // Can't be called; all calls are intercepted.
209 | throwMapperBuilderUseError();
210 | }
211 |
212 | map(path: PropertyPath, captures: StubHook[], instructions: unknown[]): StubHook {
213 | // Can't be called; all map()s are intercepted.
214 | throwMapperBuilderUseError();
215 | }
216 |
217 | pull(): RpcPayload | Promise {
218 | // Map functions cannot await.
219 | throwMapperBuilderUseError();
220 | }
221 |
222 | ignoreUnhandledRejections(): void {
223 | // Probably never called but whatever.
224 | }
225 |
226 | onBroken(callback: (error: any) => void): void {
227 | throwMapperBuilderUseError();
228 | }
229 | }
230 |
231 | // =======================================================================================
232 |
233 | class MapApplicator implements Importer {
234 | private variables: StubHook[];
235 |
236 | constructor(private captures: StubHook[], input: StubHook) {
237 | this.variables = [input];
238 | }
239 |
240 | dispose() {
241 | for (let variable of this.variables) {
242 | variable.dispose();
243 | }
244 | }
245 |
246 | apply(instructions: unknown[]): RpcPayload {
247 | try {
248 | if (instructions.length < 1) {
249 | throw new Error("Invalid empty mapper function.");
250 | }
251 |
252 | for (let instruction of instructions.slice(0, -1)) {
253 | let payload = new Evaluator(this).evaluateCopy(instruction);
254 |
255 | // The payload almost always contains a single stub. As an optimization, unwrap it.
256 | if (payload.value instanceof RpcStub) {
257 | let hook = unwrapStubNoProperties(payload.value);
258 | if (hook) {
259 | this.variables.push(hook);
260 | continue;
261 | }
262 | }
263 |
264 | this.variables.push(new PayloadStubHook(payload));
265 | }
266 |
267 | return new Evaluator(this).evaluateCopy(instructions[instructions.length - 1]);
268 | } finally {
269 | for (let variable of this.variables) {
270 | variable.dispose();
271 | }
272 | }
273 | }
274 |
275 | importStub(idx: ImportId): StubHook {
276 | // This implies we saw an "export" appear inside the body of a mapper function. This should be
277 | // impossible because exportStub()/exportPromise() throw exceptions in MapBuilder.
278 | throw new Error("A mapper function cannot refer to exports.");
279 | }
280 | importPromise(idx: ImportId): StubHook {
281 | return this.importStub(idx);
282 | }
283 |
284 | getExport(idx: ExportId): StubHook | undefined {
285 | if (idx < 0) {
286 | return this.captures[-idx - 1];
287 | } else {
288 | return this.variables[idx];
289 | }
290 | }
291 | }
292 |
293 | function applyMapToElement(input: unknown, parent: object | undefined, owner: RpcPayload | null,
294 | captures: StubHook[], instructions: unknown[]): RpcPayload {
295 | // TODO(perf): I wonder if we could use .fromAppParams() instead of .deepCopyFrom()? It
296 | // maybe wouldn't correctly handle the case of RpcTargets in the input, so we need a variant
297 | // which takes an `owner`, which does add some complexity.
298 | let inputHook = new PayloadStubHook(RpcPayload.deepCopyFrom(input, parent, owner));
299 | let mapper = new MapApplicator(captures, inputHook);
300 | try {
301 | return mapper.apply(instructions);
302 | } finally {
303 | mapper.dispose();
304 | }
305 | }
306 |
307 | mapImpl.applyMap = (input: unknown, parent: object | undefined, owner: RpcPayload | null,
308 | captures: StubHook[], instructions: unknown[]) => {
309 | try {
310 | let result: RpcPayload;
311 | if (input instanceof RpcPromise) {
312 | // The caller is responsible for making sure the input is not a promise, since we can't
313 | // then know if it would resolve to an array later.
314 | throw new Error("applyMap() can't be called on RpcPromise");
315 | } else if (input instanceof Array) {
316 | let payloads: RpcPayload[] = [];
317 | try {
318 | for (let elem of input) {
319 | payloads.push(applyMapToElement(elem, input, owner, captures, instructions));
320 | }
321 | } catch (err) {
322 | for (let payload of payloads) {
323 | payload.dispose();
324 | }
325 | throw err;
326 | }
327 |
328 | result = RpcPayload.fromArray(payloads);
329 | } else if (input === null || input === undefined) {
330 | result = RpcPayload.fromAppReturn(input);
331 | } else {
332 | result = applyMapToElement(input, parent, owner, captures, instructions);
333 | }
334 |
335 | // TODO(perf): We should probably return a hook that allows pipelining but whose pull() doesn't
336 | // resolve until all promises in the payload have been substituted.
337 | return new PayloadStubHook(result);
338 | } finally {
339 | for (let cap of captures) {
340 | cap.dispose();
341 | }
342 | }
343 | }
344 |
345 | export function forceInitMap() {}
346 |
--------------------------------------------------------------------------------
/src/serialize.ts:
--------------------------------------------------------------------------------
1 | // Copyright (c) 2025 Cloudflare, Inc.
2 | // Licensed under the MIT license found in the LICENSE.txt file or at:
3 | // https://opensource.org/license/mit
4 |
5 | import { StubHook, RpcPayload, typeForRpc, RpcStub, RpcPromise, LocatedPromise, RpcTarget, PropertyPath, unwrapStubAndPath } from "./core.js";
6 |
7 | export type ImportId = number;
8 | export type ExportId = number;
9 |
10 | // =======================================================================================
11 |
12 | export interface Exporter {
13 | exportStub(hook: StubHook): ExportId;
14 | exportPromise(hook: StubHook): ExportId;
15 | getImport(hook: StubHook): ImportId | undefined;
16 |
17 | // If a serialization error occurs after having exported some capabilities, this will be called
18 | // to roll back the exports.
19 | unexport(ids: Array): void;
20 |
21 | onSendError(error: Error): Error | void;
22 | }
23 |
24 | class NullExporter implements Exporter {
25 | exportStub(stub: StubHook): never {
26 | throw new Error("Cannot serialize RPC stubs without an RPC session.");
27 | }
28 | exportPromise(stub: StubHook): never {
29 | throw new Error("Cannot serialize RPC stubs without an RPC session.");
30 | }
31 | getImport(hook: StubHook): ImportId | undefined {
32 | return undefined;
33 | }
34 | unexport(ids: Array): void {}
35 |
36 | onSendError(error: Error): Error | void {}
37 | }
38 |
39 | const NULL_EXPORTER = new NullExporter();
40 |
41 | // Maps error name to error class for deserialization.
42 | const ERROR_TYPES: Record = {
43 | Error, EvalError, RangeError, ReferenceError, SyntaxError, TypeError, URIError, AggregateError,
44 | // TODO: DOMError? Others?
45 | };
46 |
47 | // Polyfill type for UInt8Array.toBase64(), which has started landing in JS runtimes but is not
48 | // supported everywhere just yet.
49 | interface Uint8Array {
50 | toBase64?(options?: {
51 | alphabet?: "base64" | "base64url",
52 | omitPadding?: boolean
53 | }): string;
54 | };
55 |
56 | interface FromBase64 {
57 | fromBase64?(text: string, options?: {
58 | alphabet?: "base64" | "base64url",
59 | lastChunkHandling?: "loose" | "strict" | "stop-before-partial"
60 | }): Uint8Array;
61 | }
62 |
63 | // Converts fully-hydrated messages into object trees that are JSON-serializable for sending over
64 | // the wire. This is used to implement serialization -- but it doesn't take the last step of
65 | // actually converting to a string. (The name is meant to be the opposite of "Evaluator", which
66 | // implements the opposite direction.)
67 | export class Devaluator {
68 | private constructor(private exporter: Exporter, private source: RpcPayload | undefined) {}
69 |
70 | // Devaluate the given value.
71 | // * value: The value to devaluate.
72 | // * parent: The value's parent object, which would be used as `this` if the value were called
73 | // as a function.
74 | // * exporter: Callbacks to the RPC session for exporting capabilities found in this message.
75 | // * source: The RpcPayload which contains the value, and therefore owns stubs within.
76 | //
77 | // Returns: The devaluated value, ready to be JSON-serialized.
78 | public static devaluate(
79 | value: unknown, parent?: object, exporter: Exporter = NULL_EXPORTER, source?: RpcPayload)
80 | : unknown {
81 | let devaluator = new Devaluator(exporter, source);
82 | try {
83 | return devaluator.devaluateImpl(value, parent, 0);
84 | } catch (err) {
85 | if (devaluator.exports) {
86 | try {
87 | exporter.unexport(devaluator.exports);
88 | } catch (err) {
89 | // probably a side effect of the original error, ignore it
90 | }
91 | }
92 | throw err;
93 | }
94 | }
95 |
96 | private exports?: Array;
97 |
98 | private devaluateImpl(value: unknown, parent: object | undefined, depth: number): unknown {
99 | if (depth >= 64) {
100 | throw new Error(
101 | "Serialization exceeded maximum allowed depth. (Does the message contain cycles?)");
102 | }
103 |
104 | let kind = typeForRpc(value);
105 | switch (kind) {
106 | case "unsupported": {
107 | let msg;
108 | try {
109 | msg = `Cannot serialize value: ${value}`;
110 | } catch (err) {
111 | msg = "Cannot serialize value: (couldn't stringify value)";
112 | }
113 | throw new TypeError(msg);
114 | }
115 |
116 | case "primitive":
117 | if (typeof value === "number" && !isFinite(value)) {
118 | if (value === Infinity) {
119 | return ["inf"];
120 | } else if (value === -Infinity) {
121 | return ["-inf"];
122 | } else {
123 | return ["nan"];
124 | }
125 | } else {
126 | // Supported directly by JSON.
127 | return value;
128 | }
129 |
130 | case "object": {
131 | let object = >value;
132 | let result: Record = {};
133 | for (let key in object) {
134 | result[key] = this.devaluateImpl(object[key], object, depth + 1);
135 | }
136 | return result;
137 | }
138 |
139 | case "array": {
140 | let array = >value;
141 | let len = array.length;
142 | let result = new Array(len);
143 | for (let i = 0; i < len; i++) {
144 | result[i] = this.devaluateImpl(array[i], array, depth + 1);
145 | }
146 | // Wrap literal arrays in an outer one-element array, to "escape" them.
147 | return [result];
148 | }
149 |
150 | case "bigint":
151 | return ["bigint", (value).toString()];
152 |
153 | case "date":
154 | return ["date", (value).getTime()];
155 |
156 | case "bytes": {
157 | let bytes = value as Uint8Array;
158 | if (bytes.toBase64) {
159 | return ["bytes", bytes.toBase64({omitPadding: true})];
160 | } else {
161 | return ["bytes",
162 | btoa(String.fromCharCode.apply(null, bytes as number[]).replace(/=*$/, ""))];
163 | }
164 | }
165 |
166 | case "error": {
167 | let e = value;
168 |
169 | // TODO:
170 | // - Determine type by checking prototype rather than `name`, which can be overridden?
171 | // - Serialize cause / suppressed error / etc.
172 | // - Serialize added properties.
173 |
174 | let rewritten = this.exporter.onSendError(e);
175 | if (rewritten) {
176 | e = rewritten;
177 | }
178 |
179 | let result = ["error", e.name, e.message];
180 | if (rewritten && rewritten.stack) {
181 | result.push(rewritten.stack);
182 | }
183 | return result;
184 | }
185 |
186 | case "undefined":
187 | return ["undefined"];
188 |
189 | case "stub":
190 | case "rpc-promise": {
191 | if (!this.source) {
192 | throw new Error("Can't serialize RPC stubs in this context.");
193 | }
194 |
195 | let {hook, pathIfPromise} = unwrapStubAndPath(value);
196 | let importId = this.exporter.getImport(hook);
197 | if (importId !== undefined) {
198 | if (pathIfPromise) {
199 | // It's a promise pointing back to the peer, so we are doing pipelining here.
200 | if (pathIfPromise.length > 0) {
201 | return ["pipeline", importId, pathIfPromise];
202 | } else {
203 | return ["pipeline", importId];
204 | }
205 | } else {
206 | return ["import", importId];
207 | }
208 | }
209 |
210 | if (pathIfPromise) {
211 | hook = hook.get(pathIfPromise);
212 | } else {
213 | hook = hook.dup();
214 | }
215 |
216 | return this.devaluateHook(pathIfPromise ? "promise" : "export", hook);
217 | }
218 |
219 | case "function":
220 | case "rpc-target": {
221 | if (!this.source) {
222 | throw new Error("Can't serialize RPC stubs in this context.");
223 | }
224 |
225 | let hook = this.source.getHookForRpcTarget(value, parent);
226 | return this.devaluateHook("export", hook);
227 | }
228 |
229 | case "rpc-thenable": {
230 | if (!this.source) {
231 | throw new Error("Can't serialize RPC stubs in this context.");
232 | }
233 |
234 | let hook = this.source.getHookForRpcTarget(value, parent);
235 | return this.devaluateHook("promise", hook);
236 | }
237 |
238 | default:
239 | kind satisfies never;
240 | throw new Error("unreachable");
241 | }
242 | }
243 |
244 | private devaluateHook(type: "export" | "promise", hook: StubHook): unknown {
245 | if (!this.exports) this.exports = [];
246 | let exportId = type === "promise" ? this.exporter.exportPromise(hook)
247 | : this.exporter.exportStub(hook);
248 | this.exports.push(exportId);
249 | return [type, exportId];
250 | }
251 | }
252 |
253 | /**
254 | * Serialize a value, using Cap'n Web's underlying serialization. This won't be able to serialize
255 | * RPC stubs, but it will support basic data types.
256 | */
257 | export function serialize(value: unknown): string {
258 | return JSON.stringify(Devaluator.devaluate(value));
259 | }
260 |
261 | // =======================================================================================
262 |
263 | export interface Importer {
264 | importStub(idx: ImportId): StubHook;
265 | importPromise(idx: ImportId): StubHook;
266 | getExport(idx: ExportId): StubHook | undefined;
267 | }
268 |
269 | class NullImporter implements Importer {
270 | importStub(idx: ImportId): never {
271 | throw new Error("Cannot deserialize RPC stubs without an RPC session.");
272 | }
273 | importPromise(idx: ImportId): never {
274 | throw new Error("Cannot deserialize RPC stubs without an RPC session.");
275 | }
276 | getExport(idx: ExportId): StubHook | undefined {
277 | return undefined;
278 | }
279 | }
280 |
281 | const NULL_IMPORTER = new NullImporter();
282 |
283 | // Takes object trees parse from JSON and converts them into fully-hydrated JavaScript objects for
284 | // delivery to the app. This is used to implement deserialization, except that it doesn't actually
285 | // start from a raw string.
286 | export class Evaluator {
287 | constructor(private importer: Importer) {}
288 |
289 | private stubs: RpcStub[] = [];
290 | private promises: LocatedPromise[] = [];
291 |
292 | public evaluate(value: unknown): RpcPayload {
293 | let payload = RpcPayload.forEvaluate(this.stubs, this.promises);
294 | try {
295 | payload.value = this.evaluateImpl(value, payload, "value");
296 | return payload;
297 | } catch (err) {
298 | payload.dispose();
299 | throw err;
300 | }
301 | }
302 |
303 | // Evaluate the value without destroying it.
304 | public evaluateCopy(value: unknown): RpcPayload {
305 | return this.evaluate(structuredClone(value));
306 | }
307 |
308 | private evaluateImpl(value: unknown, parent: object, property: string | number): unknown {
309 | if (value instanceof Array) {
310 | if (value.length == 1 && value[0] instanceof Array) {
311 | // Escaped array. Evaluate the contents.
312 | let result = value[0];
313 | for (let i = 0; i < result.length; i++) {
314 | result[i] = this.evaluateImpl(result[i], result, i);
315 | }
316 | return result;
317 | } else switch (value[0]) {
318 | case "bigint":
319 | if (typeof value[1] == "string") {
320 | return BigInt(value[1]);
321 | }
322 | break;
323 | case "date":
324 | if (typeof value[1] == "number") {
325 | return new Date(value[1]);
326 | }
327 | break;
328 | case "bytes": {
329 | let b64 = Uint8Array as FromBase64;
330 | if (typeof value[1] == "string") {
331 | if (b64.fromBase64) {
332 | return b64.fromBase64(value[1]);
333 | } else {
334 | let bs = atob(value[1]);
335 | let len = bs.length;
336 | let bytes = new Uint8Array(len);
337 | for (let i = 0; i < len; i++) {
338 | bytes[i] = bs.charCodeAt(i);
339 | }
340 | return bytes;
341 | }
342 | }
343 | break;
344 | }
345 | case "error":
346 | if (value.length >= 3 && typeof value[1] === "string" && typeof value[2] === "string") {
347 | let cls = ERROR_TYPES[value[1]] || Error;
348 | let result = new cls(value[2]);
349 | if (typeof value[3] === "string") {
350 | result.stack = value[3];
351 | }
352 | return result;
353 | }
354 | break;
355 | case "undefined":
356 | if (value.length === 1) {
357 | return undefined;
358 | }
359 | break;
360 | case "inf":
361 | return Infinity;
362 | case "-inf":
363 | return -Infinity;
364 | case "nan":
365 | return NaN;
366 |
367 | case "import":
368 | case "pipeline": {
369 | // It's an "import" from the perspective of the sender, so it's an export from our
370 | // side. In other words, the sender is passing our own object back to us.
371 |
372 | if (value.length < 2 || value.length > 4) {
373 | break; // report error below
374 | }
375 |
376 | // First parameter is import ID (from the sender's perspective, so export ID from
377 | // ours).
378 | if (typeof value[1] != "number") {
379 | break; // report error below
380 | }
381 |
382 | let hook = this.importer.getExport(value[1]);
383 | if (!hook) {
384 | throw new Error(`no such entry on exports table: ${value[1]}`);
385 | }
386 |
387 | let isPromise = value[0] == "pipeline";
388 |
389 | let addStub = (hook: StubHook) => {
390 | if (isPromise) {
391 | let promise = new RpcPromise(hook, []);
392 | this.promises.push({promise, parent, property});
393 | return promise;
394 | } else {
395 | let stub = new RpcPromise(hook, []);
396 | this.stubs.push(stub);
397 | return stub;
398 | }
399 | };
400 |
401 | if (value.length == 2) {
402 | // Just referencing the export itself.
403 | if (isPromise) {
404 | // We need to use hook.get([]) to make sure we get a promise hook.
405 | return addStub(hook.get([]));
406 | } else {
407 | // dup() returns a stub hook.
408 | return addStub(hook.dup());
409 | }
410 | }
411 |
412 | // Second parameter, if given, is a property path.
413 | let path = value[2];
414 | if (!(path instanceof Array)) {
415 | break; // report error below
416 | }
417 | if (!path.every(
418 | part => { return typeof part == "string" || typeof part == "number"; })) {
419 | break; // report error below
420 | }
421 |
422 | if (value.length == 3) {
423 | // Just referencing the path, not a call.
424 | return addStub(hook.get(path));
425 | }
426 |
427 | // Third parameter, if given, is call arguments. The sender has identified a function
428 | // and wants us to call it.
429 | //
430 | // Usually this is used with "pipeline", in which case we evaluate to an
431 | // RpcPromise. However, this can be used with "import", in which case the caller is
432 | // asking that the result be coerced to RpcStub. This distinction matters if the
433 | // result of this evaluation is to be passed as arguments to another call -- promises
434 | // must be resolved in advance, but stubs can be passed immediately.
435 | let args = value[3];
436 | if (!(args instanceof Array)) {
437 | break; // report error below
438 | }
439 |
440 | // We need a new evaluator for the args, to build a separate payload.
441 | let subEval = new Evaluator(this.importer);
442 | args = subEval.evaluate([args]);
443 |
444 | return addStub(hook.call(path, args));
445 | }
446 |
447 | case "remap": {
448 | if (value.length !== 5 ||
449 | typeof value[1] !== "number" ||
450 | !(value[2] instanceof Array) ||
451 | !(value[3] instanceof Array) ||
452 | !(value[4] instanceof Array)) {
453 | break; // report error below
454 | }
455 |
456 | let hook = this.importer.getExport(value[1]);
457 | if (!hook) {
458 | throw new Error(`no such entry on exports table: ${value[1]}`);
459 | }
460 |
461 | let path = value[2];
462 | if (!path.every(
463 | part => { return typeof part == "string" || typeof part == "number"; })) {
464 | break; // report error below
465 | }
466 |
467 | let captures: StubHook[] = value[3].map(cap => {
468 | if (!(cap instanceof Array) ||
469 | cap.length !== 2 ||
470 | (cap[0] !== "import" && cap[0] !== "export") ||
471 | typeof cap[1] !== "number") {
472 | throw new TypeError(`unknown map capture: ${JSON.stringify(cap)}`);
473 | }
474 |
475 | if (cap[0] === "export") {
476 | return this.importer.importStub(cap[1]);
477 | } else {
478 | let exp = this.importer.getExport(cap[1]);
479 | if (!exp) {
480 | throw new Error(`no such entry on exports table: ${cap[1]}`);
481 | }
482 | return exp.dup();
483 | }
484 | });
485 |
486 | let instructions = value[4];
487 |
488 | let resultHook = hook.map(path, captures, instructions);
489 |
490 | let promise = new RpcPromise(resultHook, []);
491 | this.promises.push({promise, parent, property});
492 | return promise;
493 | }
494 |
495 | case "export":
496 | case "promise":
497 | // It's an "export" from the perspective of the sender, i.e. they sent us a new object
498 | // which we want to import.
499 | //
500 | // "promise" is same as "export" but should not be delivered to the application. If any
501 | // promises appear in a value, they must be resolved and substituted with their results
502 | // before delivery. Note that if the value being evaluated appeared in call params, or
503 | // appeared in a resolve message for a promise that is being pulled, then the new promise
504 | // is automatically also being pulled, otherwise it is not.
505 | if (typeof value[1] == "number") {
506 | if (value[0] == "promise") {
507 | let hook = this.importer.importPromise(value[1]);
508 | let promise = new RpcPromise(hook, []);
509 | this.promises.push({parent, property, promise});
510 | return promise;
511 | } else {
512 | let hook = this.importer.importStub(value[1]);
513 | let stub = new RpcStub(hook);
514 | this.stubs.push(stub);
515 | return stub;
516 | }
517 | }
518 | break;
519 | }
520 | throw new TypeError(`unknown special value: ${JSON.stringify(value)}`);
521 | } else if (value instanceof Object) {
522 | let result = >value;
523 | for (let key in result) {
524 | if (key in Object.prototype || key === "toJSON") {
525 | // Out of an abundance of caution, we will ignore properties that override properties
526 | // of Object.prototype. It's especially important that we don't allow `__proto__` as it
527 | // may lead to prototype pollution. We also would rather not allow, e.g., `toString()`,
528 | // as overriding this could lead to various mischief.
529 | //
530 | // We also block `toJSON()` for similar reasons -- even though Object.prototype doesn't
531 | // actually define it, `JSON.stringify()` treats it specially and we don't want someone
532 | // snooping on JSON calls.
533 | //
534 | // We do still evaluate the inner value so that we can properly release any stubs.
535 | this.evaluateImpl(result[key], result, key);
536 | delete result[key];
537 | } else {
538 | result[key] = this.evaluateImpl(result[key], result, key);
539 | }
540 | }
541 | return result;
542 | } else {
543 | // Other JSON types just pass through.
544 | return value;
545 | }
546 | }
547 | }
548 |
549 | /**
550 | * Deserialize a value serialized using serialize().
551 | */
552 | export function deserialize(value: string): unknown {
553 | let payload = new Evaluator(NULL_IMPORTER).evaluate(JSON.parse(value));
554 | payload.dispose(); // should be no-op but just in case
555 | return payload.value;
556 | }
557 |
--------------------------------------------------------------------------------
/src/rpc.ts:
--------------------------------------------------------------------------------
1 | // Copyright (c) 2025 Cloudflare, Inc.
2 | // Licensed under the MIT license found in the LICENSE.txt file or at:
3 | // https://opensource.org/license/mit
4 |
5 | import { StubHook, RpcPayload, RpcStub, PropertyPath, PayloadStubHook, ErrorStubHook, RpcTarget, unwrapStubAndPath } from "./core.js";
6 | import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize } from "./serialize.js";
7 |
8 | /**
9 | * Interface for an RPC transport, which is a simple bidirectional message stream. Implement this
10 | * interface if the built-in transports (e.g. for HTTP batch and WebSocket) don't meet your needs.
11 | */
12 | export interface RpcTransport {
13 | /**
14 | * Sends a message to the other end.
15 | */
16 | send(message: string): Promise;
17 |
18 | /**
19 | * Receives a message sent by the other end.
20 | *
21 | * If and when the transport becomes disconnected, this will reject. The thrown error will be
22 | * propagated to all outstanding calls and future calls on any stubs associated with the session.
23 | * If there are no outstanding calls (and none are made in the future), then the error does not
24 | * propagate anywhere -- this is considered a "clean" shutdown.
25 | */
26 | receive(): Promise;
27 |
28 | /**
29 | * Indicates that the RPC system has suffered an error that prevents the session from continuing.
30 | * The transport should ideally try to send any queued messages if it can, and then close the
31 | * connection. (It's not strictly necessary to deliver queued messages, but the last message sent
32 | * before abort() is called is often an "abort" message, which communicates the error to the
33 | * peer, so if that is dropped, the peer may have less information about what happened.)
34 | */
35 | abort?(reason: any): void;
36 | }
37 |
38 | // Entry on the exports table.
39 | type ExportTableEntry = {
40 | hook: StubHook,
41 | refcount: number,
42 | pull?: Promise
43 | };
44 |
45 | // Entry on the imports table.
46 | class ImportTableEntry {
47 | constructor(public session: RpcSessionImpl, public importId: number, pulling: boolean) {
48 | if (pulling) {
49 | this.activePull = Promise.withResolvers();
50 | }
51 | }
52 |
53 | public localRefcount: number = 0;
54 | public remoteRefcount: number = 1;
55 |
56 | private activePull?: PromiseWithResolvers;
57 | public resolution?: StubHook;
58 |
59 | // List of integer indexes into session.onBrokenCallbacks which are callbacks registered on
60 | // this import. Initialized on first use (so `undefined` is the same as an empty list).
61 | private onBrokenRegistrations?: number[];
62 |
63 | resolve(resolution: StubHook) {
64 | // TODO: Need embargo handling here? PayloadStubHook needs to be wrapped in a
65 | // PromiseStubHook awaiting the embargo I suppose. Previous notes on embargoes:
66 | // - Resolve message specifies last call that was received before the resolve. The introducer is
67 | // responsible for any embargoes up to that point.
68 | // - Any further calls forwarded by the introducer after that point MUST immediately resolve to
69 | // a forwarded call. The caller is responsible for ensuring the last of these is handed off
70 | // before direct calls can be delivered.
71 |
72 | if (this.localRefcount == 0) {
73 | // Already disposed (canceled), so ignore the resolution and don't send a redundant release.
74 | resolution.dispose();
75 | return;
76 | }
77 |
78 | this.resolution = resolution;
79 | this.sendRelease();
80 |
81 | if (this.onBrokenRegistrations) {
82 | // Delete all our callback registrations from this session and re-register them on the
83 | // target stub.
84 | for (let i of this.onBrokenRegistrations) {
85 | let callback = this.session.onBrokenCallbacks[i];
86 | let endIndex = this.session.onBrokenCallbacks.length;
87 | resolution.onBroken(callback);
88 | if (this.session.onBrokenCallbacks[endIndex] === callback) {
89 | // Oh, calling onBroken() just registered the callback back on this connection again.
90 | // But when the connection dies, we want all the callbacks to be called in the order in
91 | // which they were registered. So we don't want this one pushed to the back of the line
92 | // here. So, let's remove the newly-added registration and keep the original.
93 | // TODO: This is quite hacky, think about whether this is really the right answer.
94 | delete this.session.onBrokenCallbacks[endIndex];
95 | } else {
96 | // The callback is now registered elsewhere, so delete it from our session.
97 | delete this.session.onBrokenCallbacks[i];
98 | }
99 | }
100 | this.onBrokenRegistrations = undefined;
101 | }
102 |
103 | if (this.activePull) {
104 | this.activePull.resolve();
105 | this.activePull = undefined;
106 | }
107 | }
108 |
109 | async awaitResolution(): Promise {
110 | if (!this.activePull) {
111 | this.session.sendPull(this.importId);
112 | this.activePull = Promise.withResolvers();
113 | }
114 | await this.activePull.promise;
115 | return this.resolution!.pull();
116 | }
117 |
118 | dispose() {
119 | if (this.resolution) {
120 | this.resolution.dispose();
121 | } else {
122 | this.abort(new Error("RPC was canceled because the RpcPromise was disposed."));
123 | this.sendRelease();
124 | }
125 | }
126 |
127 | abort(error: any) {
128 | if (!this.resolution) {
129 | this.resolution = new ErrorStubHook(error);
130 |
131 | if (this.activePull) {
132 | this.activePull.reject(error);
133 | this.activePull = undefined;
134 | }
135 |
136 | // The RpcSession itself will have called all our callbacks so we don't need to track the
137 | // registrations anymore.
138 | this.onBrokenRegistrations = undefined;
139 | }
140 | }
141 |
142 | onBroken(callback: (error: any) => void): void {
143 | if (this.resolution) {
144 | this.resolution.onBroken(callback);
145 | } else {
146 | let index = this.session.onBrokenCallbacks.length;
147 | this.session.onBrokenCallbacks.push(callback);
148 |
149 | if (!this.onBrokenRegistrations) this.onBrokenRegistrations = [];
150 | this.onBrokenRegistrations.push(index);
151 | }
152 | }
153 |
154 | private sendRelease() {
155 | if (this.remoteRefcount > 0) {
156 | this.session.sendRelease(this.importId, this.remoteRefcount);
157 | this.remoteRefcount = 0;
158 | }
159 | }
160 | };
161 |
162 | class RpcImportHook extends StubHook {
163 | public entry?: ImportTableEntry; // undefined when we're disposed
164 |
165 | // `pulling` is true if we already expect that this import is going to be resolved later, and
166 | // null if this import is not allowed to be pulled (i.e. it's a stub not a promise).
167 | constructor(public isPromise: boolean, entry: ImportTableEntry) {
168 | super();
169 | ++entry.localRefcount;
170 | this.entry = entry;
171 | }
172 |
173 | collectPath(path: PropertyPath): RpcImportHook {
174 | return this;
175 | }
176 |
177 | getEntry(): ImportTableEntry {
178 | if (this.entry) {
179 | return this.entry;
180 | } else {
181 | // Shouldn't get here in practice since the holding stub should have replaced the hook when
182 | // disposed.
183 | throw new Error("This RpcImportHook was already disposed.");
184 | }
185 | }
186 |
187 | // -------------------------------------------------------------------------------------
188 | // implements StubHook
189 |
190 | call(path: PropertyPath, args: RpcPayload): StubHook {
191 | let entry = this.getEntry();
192 | if (entry.resolution) {
193 | return entry.resolution.call(path, args);
194 | } else {
195 | return entry.session.sendCall(entry.importId, path, args);
196 | }
197 | }
198 |
199 | map(path: PropertyPath, captures: StubHook[], instructions: unknown[]): StubHook {
200 | let entry: ImportTableEntry;
201 | try {
202 | entry = this.getEntry();
203 | } catch (err) {
204 | for (let cap of captures) {
205 | cap.dispose();
206 | }
207 | throw err;
208 | }
209 |
210 | if (entry.resolution) {
211 | return entry.resolution.map(path, captures, instructions);
212 | } else {
213 | return entry.session.sendMap(entry.importId, path, captures, instructions);
214 | }
215 | }
216 |
217 | get(path: PropertyPath): StubHook {
218 | let entry = this.getEntry();
219 | if (entry.resolution) {
220 | return entry.resolution.get(path);
221 | } else {
222 | return entry.session.sendCall(entry.importId, path);
223 | }
224 | }
225 |
226 | dup(): RpcImportHook {
227 | return new RpcImportHook(false, this.getEntry());
228 | }
229 |
230 | pull(): RpcPayload | Promise {
231 | let entry = this.getEntry();
232 |
233 | if (!this.isPromise) {
234 | throw new Error("Can't pull this hook because it's not a promise hook.");
235 | }
236 |
237 | if (entry.resolution) {
238 | return entry.resolution.pull();
239 | }
240 |
241 | return entry.awaitResolution();
242 | }
243 |
244 | ignoreUnhandledRejections(): void {
245 | // We don't actually have to do anything here because this method only has to ignore rejections
246 | // if pull() is *not* called, and if pull() is not called then we won't generate any rejections
247 | // anyway.
248 | }
249 |
250 | dispose(): void {
251 | let entry = this.entry;
252 | this.entry = undefined;
253 | if (entry) {
254 | if (--entry.localRefcount === 0) {
255 | entry.dispose();
256 | }
257 | }
258 | }
259 |
260 | onBroken(callback: (error: any) => void): void {
261 | if (this.entry) {
262 | this.entry.onBroken(callback);
263 | }
264 | }
265 | }
266 |
267 | class RpcMainHook extends RpcImportHook {
268 | private session?: RpcSessionImpl;
269 |
270 | constructor(entry: ImportTableEntry) {
271 | super(false, entry);
272 | this.session = entry.session;
273 | }
274 |
275 | dispose(): void {
276 | if (this.session) {
277 | let session = this.session;
278 | this.session = undefined;
279 | session.shutdown();
280 | }
281 | }
282 | }
283 |
284 | /**
285 | * Options to customize behavior of an RPC session. All functions which start a session should
286 | * optionally accept this.
287 | */
288 | export type RpcSessionOptions = {
289 | /**
290 | * If provided, this function will be called whenever an `Error` object is serialized (for any
291 | * reason, not just because it was thrown). This can be used to log errors, and also to redact
292 | * them.
293 | *
294 | * If `onSendError` returns an Error object, than object will be substituted in place of the
295 | * original. If it has a stack property, the stack will be sent to the client.
296 | *
297 | * If `onSendError` doesn't return anything (or is not provided at all), the default behavior is
298 | * to serialize the error with the stack omitted.
299 | */
300 | onSendError?: (error: Error) => Error | void;
301 | };
302 |
303 | class RpcSessionImpl implements Importer, Exporter {
304 | private exports: Array = [];
305 | private reverseExports: Map = new Map();
306 | private imports: Array = [];
307 | private abortReason?: any;
308 | private cancelReadLoop: (error: any) => void;
309 |
310 | // We assign positive numbers to imports we initiate, and negative numbers to exports we
311 | // initiate. So the next import ID is just `imports.length`, but the next export ID needs
312 | // to be tracked explicitly.
313 | private nextExportId = -1;
314 |
315 | // If set, call this when all incoming calls are complete.
316 | private onBatchDone?: Omit, "promise">;
317 |
318 | // How many promises is our peer expecting us to resolve?
319 | private pullCount = 0;
320 |
321 | // Sparse array of onBrokenCallback registrations. Items are strictly appended to the end but
322 | // may be deleted from the middle (hence leaving the array sparse).
323 | onBrokenCallbacks: ((error: any) => void)[] = [];
324 |
325 | constructor(private transport: RpcTransport, mainHook: StubHook,
326 | private options: RpcSessionOptions) {
327 | // Export zero is automatically the bootstrap object.
328 | this.exports.push({hook: mainHook, refcount: 1});
329 |
330 | // Import zero is the other side's bootstrap object.
331 | this.imports.push(new ImportTableEntry(this, 0, false));
332 |
333 | let rejectFunc: (error: any) => void;;
334 | let abortPromise = new Promise((resolve, reject) => { rejectFunc = reject; });
335 | this.cancelReadLoop = rejectFunc!;
336 |
337 | this.readLoop(abortPromise).catch(err => this.abort(err));
338 | }
339 |
340 | // Should only be called once immediately after construction.
341 | getMainImport(): RpcImportHook {
342 | return new RpcMainHook(this.imports[0]);
343 | }
344 |
345 | shutdown(): void {
346 | // TODO(someday): Should we add some sort of "clean shutdown" mechanism? This gets the job
347 | // done just fine for the moment.
348 | this.abort(new Error("RPC session was shut down by disposing the main stub"), false);
349 | }
350 |
351 | exportStub(hook: StubHook): ExportId {
352 | if (this.abortReason) throw this.abortReason;
353 |
354 | let existingExportId = this.reverseExports.get(hook);
355 | if (existingExportId !== undefined) {
356 | ++this.exports[existingExportId].refcount;
357 | return existingExportId;
358 | } else {
359 | let exportId = this.nextExportId--;
360 | this.exports[exportId] = { hook, refcount: 1 };
361 | this.reverseExports.set(hook, exportId);
362 | // TODO: Use onBroken().
363 | return exportId;
364 | }
365 | }
366 |
367 | exportPromise(hook: StubHook): ExportId {
368 | if (this.abortReason) throw this.abortReason;
369 |
370 | // Promises always use a new ID because otherwise the recipient could miss the resolution.
371 | let exportId = this.nextExportId--;
372 | this.exports[exportId] = { hook, refcount: 1 };
373 | this.reverseExports.set(hook, exportId);
374 |
375 | // Automatically start resolving any promises we send.
376 | this.ensureResolvingExport(exportId);
377 | return exportId;
378 | }
379 |
380 | unexport(ids: Array): void {
381 | for (let id of ids) {
382 | this.releaseExport(id, 1);
383 | }
384 | }
385 |
386 | private releaseExport(exportId: ExportId, refcount: number) {
387 | let entry = this.exports[exportId];
388 | if (!entry) {
389 | throw new Error(`no such export ID: ${exportId}`);
390 | }
391 | if (entry.refcount < refcount) {
392 | throw new Error(`refcount would go negative: ${entry.refcount} < ${refcount}`);
393 | }
394 | entry.refcount -= refcount;
395 | if (entry.refcount === 0) {
396 | delete this.exports[exportId];
397 | this.reverseExports.delete(entry.hook);
398 | entry.hook.dispose();
399 | }
400 | }
401 |
402 | onSendError(error: Error): Error | void {
403 | if (this.options.onSendError) {
404 | return this.options.onSendError(error);
405 | }
406 | }
407 |
408 | private ensureResolvingExport(exportId: ExportId) {
409 | let exp = this.exports[exportId];
410 | if (!exp) {
411 | throw new Error(`no such export ID: ${exportId}`);
412 | }
413 | if (!exp.pull) {
414 | let resolve = async () => {
415 | let hook = exp.hook;
416 | for (;;) {
417 | let payload = await hook.pull();
418 | if (payload.value instanceof RpcStub) {
419 | let {hook: inner, pathIfPromise} = unwrapStubAndPath(payload.value);
420 | if (pathIfPromise && pathIfPromise.length == 0) {
421 | if (this.getImport(hook) === undefined) {
422 | // Optimization: The resolution is just another promise, and it is not a promise
423 | // pointing back to the peer. So if we send a resolve message, it's just going to
424 | // resolve to another new promise export, which is just going to have to wait for
425 | // another resolve message later. This intermediate resolve message gives the peer
426 | // no useful information, so let's skip it and just wait for the chained
427 | // resolution.
428 | hook = inner;
429 | continue;
430 | }
431 | }
432 | }
433 |
434 | return payload;
435 | }
436 | };
437 |
438 | ++this.pullCount;
439 | exp.pull = resolve().then(
440 | payload => {
441 | // We don't transfer ownership of stubs in the payload since the payload
442 | // belongs to the hook which sticks around to handle pipelined requests.
443 | let value = Devaluator.devaluate(payload.value, undefined, this, payload);
444 | this.send(["resolve", exportId, value]);
445 | },
446 | error => {
447 | this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this)]);
448 | }
449 | ).catch(
450 | error => {
451 | // If serialization failed, report the serialization error, which should
452 | // itself always be serializable.
453 | try {
454 | this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this)]);
455 | } catch (error2) {
456 | // TODO: Shouldn't happen, now what?
457 | this.abort(error2);
458 | }
459 | }
460 | ).finally(() => {
461 | if (--this.pullCount === 0) {
462 | if (this.onBatchDone) {
463 | this.onBatchDone.resolve();
464 | }
465 | }
466 | });
467 | }
468 | }
469 |
470 | getImport(hook: StubHook): ImportId | undefined {
471 | if (hook instanceof RpcImportHook && hook.entry && hook.entry.session === this) {
472 | return hook.entry.importId;
473 | } else {
474 | return undefined;
475 | }
476 | }
477 |
478 | importStub(idx: ImportId): RpcImportHook {
479 | if (this.abortReason) throw this.abortReason;
480 |
481 | let entry = this.imports[idx];
482 | if (!entry) {
483 | entry = new ImportTableEntry(this, idx, false);
484 | this.imports[idx] = entry;
485 | }
486 | return new RpcImportHook(/*isPromise=*/false, entry);
487 | }
488 |
489 | importPromise(idx: ImportId): StubHook {
490 | if (this.abortReason) throw this.abortReason;
491 |
492 | if (this.imports[idx]) {
493 | // Can't reuse an existing ID for a promise!
494 | return new ErrorStubHook(new Error(
495 | "Bug in RPC system: The peer sent a promise reusing an existing export ID."));
496 | }
497 |
498 | // Create an already-pulling hook.
499 | let entry = new ImportTableEntry(this, idx, true);
500 | this.imports[idx] = entry;
501 | return new RpcImportHook(/*isPromise=*/true, entry);
502 | }
503 |
504 | getExport(idx: ExportId): StubHook | undefined {
505 | return this.exports[idx]?.hook;
506 | }
507 |
508 | private send(msg: any) {
509 | if (this.abortReason !== undefined) {
510 | // Ignore sends after we've aborted.
511 | return;
512 | }
513 |
514 | let msgText: string;
515 | try {
516 | msgText = JSON.stringify(msg);
517 | } catch (err) {
518 | // If JSON stringification failed, there's something wrong with the devaluator, as it should
519 | // not allow non-JSONable values to be injected in the first place.
520 | try { this.abort(err); } catch (err2) {}
521 | throw err;
522 | }
523 |
524 | this.transport.send(msgText)
525 | // If send fails, abort the connection, but don't try to send an abort message since
526 | // that'll probably also fail.
527 | .catch(err => this.abort(err, false));
528 | }
529 |
530 | sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook {
531 | if (this.abortReason) throw this.abortReason;
532 |
533 | let value: Array = ["pipeline", id, path];
534 | if (args) {
535 | let devalue = Devaluator.devaluate(args.value, undefined, this, args);
536 |
537 | // HACK: Since the args is an array, devaluator will wrap in a second array. Need to unwrap.
538 | // TODO: Clean this up somehow.
539 | value.push((>devalue)[0]);
540 |
541 | // Serializing the payload takes ownership of all stubs within, so the payload itself does
542 | // not need to be disposed.
543 | }
544 | this.send(["push", value]);
545 |
546 | let entry = new ImportTableEntry(this, this.imports.length, false);
547 | this.imports.push(entry);
548 | return new RpcImportHook(/*isPromise=*/true, entry);
549 | }
550 |
551 | sendMap(id: ImportId, path: PropertyPath, captures: StubHook[], instructions: unknown[])
552 | : RpcImportHook {
553 | if (this.abortReason) {
554 | for (let cap of captures) {
555 | cap.dispose();
556 | }
557 | throw this.abortReason;
558 | }
559 |
560 | let devaluedCaptures = captures.map(hook => {
561 | let importId = this.getImport(hook);
562 | if (importId !== undefined) {
563 | return ["import", importId];
564 | } else {
565 | return ["export", this.exportStub(hook)];
566 | }
567 | });
568 |
569 | let value = ["remap", id, path, devaluedCaptures, instructions];
570 |
571 | this.send(["push", value]);
572 |
573 | let entry = new ImportTableEntry(this, this.imports.length, false);
574 | this.imports.push(entry);
575 | return new RpcImportHook(/*isPromise=*/true, entry);
576 | }
577 |
578 | sendPull(id: ImportId) {
579 | if (this.abortReason) throw this.abortReason;
580 |
581 | this.send(["pull", id]);
582 | }
583 |
584 | sendRelease(id: ImportId, remoteRefcount: number) {
585 | if (this.abortReason) return;
586 |
587 | this.send(["release", id, remoteRefcount]);
588 | delete this.imports[id];
589 | }
590 |
591 | abort(error: any, trySendAbortMessage: boolean = true) {
592 | // Don't double-abort.
593 | if (this.abortReason !== undefined) return;
594 |
595 | this.cancelReadLoop(error);
596 |
597 | if (trySendAbortMessage) {
598 | try {
599 | this.transport.send(JSON.stringify(["abort", Devaluator
600 | .devaluate(error, undefined, this)]))
601 | .catch(err => {});
602 | } catch (err) {
603 | // ignore, probably the whole reason we're aborting is because the transport is broken
604 | }
605 | }
606 |
607 | if (error === undefined) {
608 | // Shouldn't happen, but if it does, avoid setting `abortReason` to `undefined`.
609 | error = "undefined";
610 | }
611 |
612 | this.abortReason = error;
613 | if (this.onBatchDone) {
614 | this.onBatchDone.reject(error);
615 | }
616 |
617 | if (this.transport.abort) {
618 | // Call transport's abort handler, but guard against buggy app code.
619 | try {
620 | this.transport.abort(error);
621 | } catch (err) {
622 | // Treat as unhandled rejection.
623 | Promise.resolve(err);
624 | }
625 | }
626 |
627 | // WATCH OUT: these are sparse arrays. `for/let/of` will iterate only positive indexes
628 | // including deleted indexes -- bad. We need to use `for/let/in` instead.
629 | for (let i in this.onBrokenCallbacks) {
630 | try {
631 | this.onBrokenCallbacks[i](error);
632 | } catch (err) {
633 | // Treat as unhandled rejection.
634 | Promise.resolve(err);
635 | }
636 | }
637 | for (let i in this.imports) {
638 | this.imports[i].abort(error);
639 | }
640 | for (let i in this.exports) {
641 | this.exports[i].hook.dispose();
642 | }
643 | }
644 |
645 | private async readLoop(abortPromise: Promise) {
646 | while (!this.abortReason) {
647 | let msg = JSON.parse(await Promise.race([this.transport.receive(), abortPromise]));
648 | if (this.abortReason) break; // check again before processing
649 |
650 | if (msg instanceof Array) {
651 | switch (msg[0]) {
652 | case "push": // ["push", Expression]
653 | if (msg.length > 1) {
654 | let payload = new Evaluator(this).evaluate(msg[1]);
655 | let hook = new PayloadStubHook(payload);
656 |
657 | // It's possible for a rejection to occur before the client gets a chance to send
658 | // a "pull" message or to use the promise in a pipeline. We don't want that to be
659 | // treated as an unhandled rejection on our end.
660 | hook.ignoreUnhandledRejections();
661 |
662 | this.exports.push({ hook, refcount: 1 });
663 | continue;
664 | }
665 | break;
666 |
667 | case "pull": { // ["pull", ImportId]
668 | let exportId = msg[1];
669 | if (typeof exportId == "number") {
670 | this.ensureResolvingExport(exportId);
671 | continue;
672 | }
673 | break;
674 | }
675 |
676 | case "resolve": // ["resolve", ExportId, Expression]
677 | case "reject": { // ["reject", ExportId, Expression]
678 | let importId = msg[1];
679 | if (typeof importId == "number" && msg.length > 2) {
680 | let imp = this.imports[importId];
681 | if (imp) {
682 | if (msg[0] == "resolve") {
683 | imp.resolve(new PayloadStubHook(new Evaluator(this).evaluate(msg[2])));
684 | } else {
685 | // HACK: We expect errors are always simple values (no stubs) so we can just
686 | // pull the value out of the payload.
687 | let payload = new Evaluator(this).evaluate(msg[2]);
688 | payload.dispose(); // just in case -- should be no-op
689 | imp.resolve(new ErrorStubHook(payload.value));
690 | }
691 | } else {
692 | // Import ID is not found on the table. Probably we released it already, in which
693 | // case we do not care about the resolution, so whatever.
694 |
695 | if (msg[0] == "resolve") {
696 | // We need to evaluate the resolution and immediately dispose it so that we
697 | // release any stubs it contains.
698 | new Evaluator(this).evaluate(msg[2]).dispose();
699 | }
700 | }
701 | continue;
702 | }
703 | break;
704 | }
705 |
706 | case "release": {
707 | let exportId = msg[1];
708 | let refcount = msg[2];
709 | if (typeof exportId == "number" && typeof refcount == "number") {
710 | this.releaseExport(exportId, refcount);
711 | continue;
712 | }
713 | break;
714 | }
715 |
716 | case "abort": {
717 | let payload = new Evaluator(this).evaluate(msg[1]);
718 | payload.dispose(); // just in case -- should be no-op
719 | this.abort(payload, false);
720 | break;
721 | }
722 | }
723 | }
724 |
725 | throw new Error(`bad RPC message: ${JSON.stringify(msg)}`);
726 | }
727 | }
728 |
729 | async drain(): Promise {
730 | if (this.abortReason) {
731 | throw this.abortReason;
732 | }
733 |
734 | if (this.pullCount > 0) {
735 | let {promise, resolve, reject} = Promise.withResolvers();
736 | this.onBatchDone = {resolve, reject};
737 | await promise;
738 | }
739 | }
740 |
741 | getStats(): {imports: number, exports: number} {
742 | let result = {imports: 0, exports: 0};
743 | // We can't just use `.length` because the arrays can be sparse and can have negative indexes.
744 | for (let i in this.imports) {
745 | ++result.imports;
746 | }
747 | for (let i in this.exports) {
748 | ++result.exports;
749 | }
750 | return result;
751 | }
752 | }
753 |
754 | // Public interface that wraps RpcSession and hides private implementation details (even from
755 | // JavaScript with no type enforcement).
756 | export class RpcSession {
757 | #session: RpcSessionImpl;
758 | #mainStub: RpcStub;
759 |
760 | constructor(transport: RpcTransport, localMain?: any, options: RpcSessionOptions = {}) {
761 | let mainHook: StubHook;
762 | if (localMain) {
763 | mainHook = new PayloadStubHook(RpcPayload.fromAppReturn(localMain));
764 | } else {
765 | mainHook = new ErrorStubHook(new Error("This connection has no main object."));
766 | }
767 | this.#session = new RpcSessionImpl(transport, mainHook, options);
768 | this.#mainStub = new RpcStub(this.#session.getMainImport());
769 | }
770 |
771 | getRemoteMain(): RpcStub {
772 | return this.#mainStub;
773 | }
774 |
775 | getStats(): {imports: number, exports: number} {
776 | return this.#session.getStats();
777 | }
778 |
779 | drain(): Promise {
780 | return this.#session.drain();
781 | }
782 | }
783 |
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
1 | # Cap'n Web: A JavaScript-native RPC system
2 |
3 | Cap'n Web is a spiritual sibling to [Cap'n Proto](https://capnproto.org) (and is created by the same author), but designed to play nice in the web stack. That means:
4 | * Like Cap'n Proto, it is an object-capability protocol. ("Cap'n" is short for "capabilities and".) We'll get into this more below, but it's incredibly powerful.
5 | * Unlike Cap'n Proto, Cap'n Web has no schemas. In fact, it has almost no boilerplate whatsoever. This means it works more like the [JavaScript-native RPC system in Cloudflare Workers](https://blog.cloudflare.com/javascript-native-rpc/).
6 | * That said, it integrates nicely with TypeScript.
7 | * Also unlike Cap'n Proto, Cap'n Web's underlying serialization is human-readable. In fact, it's just JSON, with a little pre-/post-processing.
8 | * It works over HTTP, WebSocket, and postMessage() out-of-the-box, with the ability to extend it to other transports easily.
9 | * It works in all major browsers, Cloudflare Workers, Node.js, and other modern JavaScript runtimes.
10 | The whole thing compresses (minify+gzip) to under 10kB with no dependencies.
11 |
12 | Cap'n Web is more expressive than almost every other RPC system, because it implements an object-capability RPC model. That means it:
13 | * Supports bidirectional calling. The client can call the server, and the server can also call the client.
14 | * Supports passing functions by reference: If you pass a function over RPC, the recipient receives a "stub". When they call the stub, they actually make an RPC back to you, invoking the function where it was created. This is how bidirectional calling happens: the client passes a callback to the server, and then the server can call it later.
15 | * Similarly, supports passing objects by reference: If a class extends the special marker type `RpcTarget`, then instances of that class are passed by reference, with method calls calling back to the location where the object was created.
16 | * Supports promise pipelining. When you start an RPC, you get back a promise. Instead of awaiting it, you can immediately use the promise in dependent RPCs, thus performing a chain of calls in a single network round trip.
17 | * Supports capability-based security patterns.
18 |
19 | ## Installation
20 |
21 | [Cap'n Web is an npm package.](https://www.npmjs.com/package/capnweb)
22 |
23 | ```
24 | npm i capnweb
25 | ```
26 |
27 | ## Example
28 |
29 | A client looks like this:
30 |
31 | ```js
32 | import { newWebSocketRpcSession } from "capnweb";
33 |
34 | // One-line setup.
35 | let api = newWebSocketRpcSession("wss://example.com/api");
36 |
37 | // Call a method on the server!
38 | let result = await api.hello("World");
39 |
40 | console.log(result);
41 | ```
42 |
43 | Here's the server:
44 |
45 | ```js
46 | import { RpcTarget, newWorkersRpcResponse } from "capnweb";
47 |
48 | // This is the server implementation.
49 | class MyApiServer extends RpcTarget {
50 | hello(name) {
51 | return `Hello, ${name}!`
52 | }
53 | }
54 |
55 | // Standard Cloudflare Workers HTTP handler.
56 | //
57 | // (Node and other runtimes are supported too; see below.)
58 | export default {
59 | fetch(request, env, ctx) {
60 | // Parse URL for routing.
61 | let url = new URL(request.url);
62 |
63 | // Serve API at `/api`.
64 | if (url.pathname === "/api") {
65 | return newWorkersRpcResponse(request, new MyApiServer());
66 | }
67 |
68 | // You could serve other endpoints here...
69 | return new Response("Not found", {status: 404});
70 | }
71 | }
72 | ```
73 |
74 | ### More complicated example
75 |
76 | Here's an example that:
77 | * Uses TypeScript
78 | * Sends multiple calls, where the second call depends on the result of the first, in one round trip.
79 |
80 | We declare our interface in a shared types file:
81 |
82 | ```ts
83 | interface PublicApi {
84 | // Authenticate the API token, and returned the authenticated API.
85 | authenticate(apiToken: string): AuthedApi;
86 |
87 | // Get a given user's public profile info. (Doesn't require authentication.)
88 | getUserProfile(userId: string): Promise;
89 | }
90 |
91 | interface AuthedApi {
92 | getUserId(): number;
93 |
94 | // Get the user IDs of all the user's friends.
95 | getFriendIds(): number[];
96 | }
97 |
98 | type UserProfile = {
99 | name: string;
100 | photoUrl: string;
101 | }
102 | ```
103 |
104 | (Note: you don't _have to_ declare your interface separately. The client could just use `import("./server").ApiServer` as the type.)
105 |
106 | On the server, we implement the interface as an RpcTarget:
107 |
108 | ```ts
109 | import { newWorkersRpcResponse, RpcTarget } from "capnweb";
110 |
111 | class ApiServer extends RpcTarget implements PublicApi {
112 | // ... implement PublicApi ...
113 | }
114 |
115 | export default {
116 | async fetch(req, env, ctx) {
117 | // ... same as previous example ...
118 | }
119 | }
120 | ```
121 |
122 | On the client, we can use it in a batch request:
123 |
124 | ```ts
125 | import { newHttpBatchRpcSession } from "capnweb";
126 |
127 | let api = newHttpBatchRpcSession("https://example.com/api");
128 |
129 | // Call authenticate(), but don't await it. We can use the returned promise
130 | // to make "pipelined" calls without waiting.
131 | let authedApi: RpcPromise = api.authenticate(apiToken);
132 |
133 | // Make a pipelined call to get the user's ID. Again, don't await it.
134 | let userIdPromise: RpcPromise = authedApi.getUserId();
135 |
136 | // Make another pipelined call to fetch the user's public profile, based on
137 | // the user ID. Notice how we can use `RpcPromise` in the parameters of a
138 | // call anywhere where T is expected. The promise will be replaced with its
139 | // resolution before delivering the call.
140 | let profilePromise = api.getUserProfile(userIdPromise);
141 |
142 | // Make another call to get the user's friends.
143 | let friendsPromise = authedApi.getFriendIds();
144 |
145 | // That only returns an array of user IDs, but we want all the profile info
146 | // too, so use the magic .map() function to get them, too! Still one round
147 | // trip.
148 | let friendProfilesPromise = friendsPromise.map((id: RpcPromise) => {
149 | return { id, profile: api.getUserProfile(id) };
150 | });
151 |
152 | // Now await the promises. The batch is sent at this point. It's important
153 | // to simultaneously await all promises for which you actually want the
154 | // result. If you don't actually await a promise before the batch is sent,
155 | // the system detects this and doesn't actually ask the server to send the
156 | // return value back!
157 | let [profile, friendProfiles] =
158 | await Promise.all([profilePromise, friendProfilesPromise]);
159 |
160 | console.log(`Hello, ${profile.name}!`);
161 |
162 | // Note that at this point, the `api` and `authedApi` stubs no longer work,
163 | // because the batch is done. You must start a new batch.
164 | ```
165 |
166 | Alternatively, for a long-running interactive application, we can set up a persistent WebSocket connection:
167 |
168 | ```ts
169 | import { newWebSocketRpcSession } from "capnweb";
170 |
171 | // We declare `api` with `using` so that it'll be disposed at the end of the
172 | // scope, which closes the connection. `using` is a fairly new JavaScript
173 | // feature, part of the "explicit resource management" spec. Alternatively,
174 | // we could declare `api` with `let` or `const` and make sure to call
175 | // `api[Symbol.dispose]()` to dispose it and close the connection later.
176 | using api = newWebSocketRpcSession("wss://example.com/api");
177 |
178 | // Usage is exactly the same, except we don't have to await all the promises
179 | // at once.
180 |
181 | // Authenticate and get the user ID in one round trip. Note we use `using`
182 | // again so that `authedApi` will be disposed when we're done with it. In
183 | // this case, it won't close the connection (since it's not the main stub),
184 | // but disposing it does release the `AuthedApi` object on the server side.
185 | using authedApi: RpcPromise = api.authenticate(apiToken);
186 | let userId: number = await authedApi.getUserId();
187 |
188 | // ... continue calling other methods, now or in the future ...
189 | ```
190 |
191 | ## RPC Basics
192 |
193 | ### Pass-by-value types
194 |
195 | The following types can be passed over RPC (in arguments or return values), and will be passed "by value", meaning the content is serialized, producing a copy at the receiving end:
196 |
197 | * Primitive values: strings, numbers, booleans, null, undefined
198 | * Plain objects (e.g., from object literals)
199 | * Arrays
200 | * `bigint`
201 | * `Date`
202 | * `Uint8Array`
203 | * `Error` and its well-known subclasses
204 |
205 | The following types are not supported as of this writing, but may be added in the future:
206 | * `Map` and `Set`
207 | * `ArrayBuffer` and typed arrays other than `Uint8Array`
208 | * `RegExp`
209 | * `ReadableStream` and `WritableStream`, with automatic flow control.
210 | * `Headers`, `Request`, and `Response`
211 |
212 | The following are intentionally NOT supported:
213 | * Application-defined classes that do not extend `RpcTarget`.
214 | * Cyclic values. Messages are serialized strictly as trees (like JSON).
215 |
216 | ### `RpcTarget`
217 |
218 | To export an interface over RPC, you must write a class that `extends RpcTarget`. Extending `RpcTarget` tells the RPC system: instances of this class are _pass-by-reference_. When an instance is passed over RPC, the object should NOT be serialized. Instead, the RPC message will contain a "stub" that points back to the original target object. Invoking this stub calls back over RPC.
219 |
220 | When you send someone an `RpcTarget` reference, they will be able to call any class method over RPC, including getters. They will not, however, be able to access "own" properties. In precise JavaScript terms, they can access prototype properties but not instance properties. This policy is intended to "do the right thing" for typical JavaScript code, where private members are typically stored as instance properties.
221 |
222 | WARNING: If you are using TypeScript, note that declaring a method `private` does not hide it from RPC, because TypeScript annotations are "erased" at runtime, so cannot be enforced. To actually make methods private, you must prefix their names with `#`, which makes them private for JavaScript (not just TypeScript). Names prefixed with `#` are never available over RPC.
223 |
224 | ### Functions
225 |
226 | When a plain function is passed over RPC, it will be treated similarly to an `RpcTarget`. The function will be replaced by a stub which, when invoked, calls back over RPC to the original function object.
227 |
228 | If the function has any own properties, those will be available over RPC. Note that this differs from `RpcTarget`: With `RpcTarget`, own properties are not exposed, but with functions, _only_ own properties are exposed. Generally functions don't have properties anyway, making the point moot.
229 |
230 | ### `RpcStub`
231 |
232 | When a type `T` which extends `RpcTarget` (or is a function) is sent as part of an RPC message (in the arguments to a call, or in the return value), it is replaced with a stub of type `RpcStub`.
233 |
234 | Stubs are implemented using JavaScript `Proxy`s. A stub appears to have every possible method and property name. The stub does not know at runtime which properties actually exist on the server side. If you use a property that doesn't exist, an error will not be produced until you await the results.
235 |
236 | TypeScript, however, will know which properties exist from type parameter `T`. Thus, if you are using TypeScript, you will get full compile-time type checking, auto-complete, etc. Hooray!
237 |
238 | To read a property from the remote object (as opposed to calling a method), simply `await` the property, like `let foo = await stub.foo;`.
239 |
240 | A stub can be passed across RPC again, including over independent connections. If Alice is connected to Bob and Carol, and Alice receives a stub from Bob, Alice can pass the stub in an RPC to Carol, thus allowing Carol to call Bob. (As of this writing, any such calls will be proxied through Alice, but in the future we may support "three-party handoff" such that Carol can make a direct connection to Bob.)
241 |
242 | You may construct a stub explicitly without an RPC connection, using `new RpcStub(target)`. This is sometimes useful to be able to perform local calls as if they were remote, or to help manage disposal (see below).
243 |
244 | ### `RpcPromise`
245 |
246 | Calling an RPC method returns an `RpcPromise` rather than a regular `Promise`. You can use an `RpcPromise` in all the ways a regular `Promise` can be used, that is, you can `await` it, call `.then()`, pass it to `Promise.resolve()`, etc. (This is all possible because `RpcPromise` is a ["thenable"](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise#thenables).)
247 |
248 | However, you can do more with `RpcPromise`. `RpcPromise` supports _Promise Pipelining_:
249 |
250 | 1. An `RpcPromise` also acts as a _stub_ for the eventual result of the promise. That means, you can access properties and invoke methods on it, without awaiting the promise first.
251 |
252 | ```ts
253 | // In a single round trip, authenticate the user, and fetch their notifications.
254 | let user = api.authenticate(cookie);
255 | let notifications = await user.getNotifications();
256 | ```
257 |
258 | 2. An `RpcPromise` (or its properties) can be passed as parameters to other RPC calls.
259 |
260 | ```ts
261 | // In a single round trip, authenticate the user, and fetch their public profile
262 | // given their ID.
263 | let user = api.authenticate(cookie);
264 | let profile = await api.getUserProfile(user.id);
265 | ```
266 |
267 | Whenever an `RpcPromise` is passed in the parameters to an RPC, or returned as part of the result, the promise will be replaced with its resolution before delivery to the receiving application. So, you can use an `RpcPromise` anywhere where a `T` is required!
268 |
269 | ### The magic `map()` method
270 |
271 | Every RPC promise has a special method `.map()` which can be used to remotely transform a value, without pulling it back locally. Here's an example:
272 |
273 | ```ts
274 | // Get a list of user IDs.
275 | let idsPromise = api.listUserIds();
276 |
277 | // Look up the username for each one.
278 | let names = await idsPromise.map(id => [id, api.getUserName(id)]);
279 | ```
280 |
281 | This example calls one API method to get a list of user IDs, then, for each user ID in the list, makes another RPC call to look up the user's name, producing a list of id/name pairs.
282 |
283 | **All this happens in a single network round trip!**
284 |
285 | `promise.map(func)` transfers a representation of `func` to the server, where it is executed on the promise's result. Specifically:
286 |
287 | * If the promise resolves to an array, the mapper function executes on each element of the array. The overall `.map()` operation returns a promise for an array of the results.
288 | * If the promise resolves to `null` or `undefined`, the map function is not executed at all. The result is the same value.
289 | * If the promise resolves to any other value, the map function executes once on that value, returning the result.
290 |
291 | Thus, `map()` can be used both for handling arrays, and for handling nullable values.
292 |
293 | There are some restrictions:
294 |
295 | * The callback must have no side effects other than calling RPCs.
296 | * The callback must be synchronous. It cannot await anything.
297 | * The input to the callback is an `RpcPromise`, hence the callback cannot actually operate on it, other than to invoke its RPC methods, or to use it in the params of other RPC methods.
298 | * Any stubs which you use in the callback -- and any parameters you pass to them -- will be sent to the peer. Be warned, a malicious peer can use these stubs for anything, not just calling your callback. Typically, it only makes sense to invoke stubs that came from the same peer originally, since this is what saves round-trips.
299 |
300 | **How the heck does that work?**
301 |
302 | Cap'n Web does NOT send arbitrary code over the wire!
303 |
304 | The trick here is record-replay: On the calling side, Cap'n Web will invoke your callback once, in a special "recording" mode, passing in a special placeholder stub which records what you do with it. During the invocation, any RPCs invoked by the callback (on *any* stub) will not actually be executed, but will be recorded as an action the callback performs. Any stubs you use during the recording are "captured" as well. Once the callback returns, the recording and the capture list can then be sent to the peer, where the recording can then be replayed as needed to process individual results.
305 |
306 | Since all of the not-yet-determined values seen by the callback are represented as `RpcPromise`s, the callback's behavior is deterministic. Any actual computation (arithmetic, branching, etc.) can't possibly use these promises as (meaningful) inputs, so would logically produce the same results for every invocation of the callback. Any such computation will actually end up being performed on the sending side, just once, with the results being imbued into the recording.
307 |
308 | ### Cloudflare Workers RPC interoperability
309 |
310 | Cap'n Web works on any JavaScript platform. But, on Cloudflare Workers specifically, it's designed to play nicely with the [the built-in RPC system](https://blog.cloudflare.com/javascript-native-rpc/). The two have basically the same semantics, the only difference being that Workers RPC is a built-in API provided by the Workers Runtime, whereas Cap'n Web is implemented in pure JavaScript.
311 |
312 | To facilitate interoperability:
313 | * On Workers, the `RpcTarget` class exported by "capnweb" is just an alias of the built-in one, so you can use them interchangeably.
314 | * RPC stubs and promises originating from one RPC system can be passed over the other. This will automatically set up proxying.
315 | * You can also send Workers Service Bindings and Durable Object stubs over Cap'n Web -- again, this sets up proxying.
316 |
317 | So basically, it "just works".
318 |
319 | With that said, as of this writing, the feature set is not exactly the same between the two. We aim to fix this over time, by adding missing features to both sides until they match. In particular, as of this writing:
320 | * Workers RPC supports some types that Cap'n Web does not yet, like `Map`, streams, etc.
321 | * Workers RPC supports sending values that contain aliases and cycles. This can actually cause problems, so we actually plan to *remove* this feature from Workers RPC (with a compatibility flag, of course).
322 | * Workers RPC does not yet support placing an `RpcPromise` into the parameters of a request, to be replaced by its resolution.
323 | * Workers RPC does not yet support the magic `.map()` method.
324 |
325 | ## Resource Management and Disposal
326 |
327 | Unfortunately, garbage collection does not work well when remote resources are involved, for two reasons:
328 |
329 | 1. Many JavaScript runtimes only run the garbage collector when they sense "memory pressure" -- if memory is not running low, then they figure there's no need to try to reclaim any. However, the runtime has no way to know if the other side of an RPC connection is suffering memory pressure.
330 |
331 | 2. Garbage collectors need to trace the full object graph in order to detect which objects are unreachable, especially when those objects contain cyclic references. However, the garbage collector can only see local objects; it has no ability to trace through the remote graph to discover cycles that may cross RPC connections.
332 |
333 | Both of these problems might be solvable with sufficient work, but the problem seems exceedingly difficult. We make no attempt to solve it in this library.
334 |
335 | Instead, you may choose one of two strategies:
336 |
337 | 1. Explicitly dispose stubs when you are done with them. This notifies the remote end that it can release the associated resources.
338 |
339 | 2. Use short-lived sessions. When the session ends, all stubs are implicitly disposed. In particular, when using HTTP batch request, there's generally no need to dispose stubs. When using long-lived WebSocket sessions, however, disposal may be important.
340 |
341 | Note: We might extend Cap'n Web to use `FinalizationRegistry` to automatically dispose abandoned stubs in the future, but even if we do, it should not be relied upon, due to problems discussed above.
342 |
343 | ### How to dispose
344 |
345 | Stubs integrate with JavaScript's [explicit resource management](https://v8.dev/features/explicit-resource-management), which became widely available in mid-2025 (and has been supported via transpilers and polyfills going back a few years earlier). In short:
346 |
347 | * Disposable objects (including stubs) have a method `[Symbol.dispose]`. You can call this like `stub[Symbol.dispose]()`.
348 | * You can arrange for a stub to be disposed automatically at the end of a function scope by assigning it to a `using` variable, like `using stub = api.getStub();`. The disposer will automatically be invoked when the variable goes out-of-scope.
349 |
350 | ### Automatic disposal
351 |
352 | This library implements several rules to help make resource management more manageable. These rules may appear a bit complicated, but are intended to implement the behavior you would naturally expect.
353 |
354 | The basic principle is: **The caller is responsible for disposing all stubs.** That is:
355 | * Stubs passed in the params of a call remain property of the caller, and must be disposed by the caller, not by the callee.
356 | * Stubs returned in the result of a call have their ownership transferred from the callee to the caller, and must be disposed by the caller.
357 |
358 | In practice, though, the callee and caller do not actually share the same stubs. When stubs are passed over RPC, they are _duplicated_, and the target object is only disposed when all duplicates of the stub are disposed. Thus, to achieve the rule that only the caller needs to dispose stubs, the RPC system implicitly disposes the callee's duplicates of all stubs when the call completes. That is:
359 | * Any stubs the callee receives in the parameters are implicitly disposed when the call completes.
360 | * Any stubs returned in the results are implicitly disposed some time after the call completes. (Specifically, the RPC system will dispose them once it knows there will be no more pipelined calls.)
361 |
362 | Some additional wonky details:
363 | * Disposing an `RpcPromise` will automatically dispose the future result. (It may also cause the promise to be canceled and rejected, though this is not guaranteed.) If you don't intend to await an RPC promise, you should dispose it.
364 | * Passing an `RpcPromise` in params or the return value of a call has the same ownership / disposal rules as passing an `RpcStub`.
365 | * When you access a property of an `RpcStub` or `RpcPromise`, the result is itself an `RpcPromise`. However, this `RpcPromise` does not have its own disposer; you must dispose the stub or promise it came from. You can pass such properties in params or return values, but doing so will never lead to anything being implicitly disposed.
366 | * The caller of an RPC may dispose any stubs used in the parameters immediately after initiating the RPC, without waiting for the RPC to complete. All stubs are duplicated at the moment of the call, so the callee is not responsible for keeping them alive.
367 | * If the final result of an RPC returned to the caller is an object, it will always have a disposer. Disposing it will dispose all stubs found in that response. It's a good idea to always dispose return values even if you don't expect they contain any stubs, just in case the server changes the API in the future to add stubs to the result.
368 |
369 | WARNING: The ownership behavior of calls differs from the original behavior in the native RPC implementation built into the Cloudflare Workers Runtime. In the original Workers behavior, the callee loses ownership of stubs passed in a call's parameters. We plan to change the Workers Runtime to match Cap'n Web's behavior, as the original behavior has proven more problematic than helpful.
370 |
371 | ### Duplicating stubs
372 |
373 | Sometimes you need to pass a stub somewhere where it will be disposed, but also keep the stub for later use. To prevent the disposer from disabling your copy of the stub, you can duplicate the stub by calling `stub.dup()`. The stub's target will only be disposed when all duplicates of the stub have been disposed.
374 |
375 | Hint: You can call `.dup()` on a property of a stub or promise, in order to create a stub backed by that property. This is particularly useful when you know in advance that the property is going to resolve to a stub: calling `.dup()` on it gives you a stub you can start using immediately, that otherwise behaves exactly the same as the eventual stub would if you awaited it.
376 |
377 | ### Listening for disposal
378 |
379 | An `RpcTarget` may declare a `Symbol.dispose` method. If it does, the RPC system will automatically invoke it when a stub pointing at it (and all its duplicates) has been disposed.
380 |
381 | Note that if you pass the same `RpcTarget` instance to RPC multiple times -- thus creating multiple stubs -- you will eventually get a separate dispose call for each one. To avoid this, you could use `new RpcStub(target)` to create a single stub upfront, and then pass that stub across multiple RPCs. In this case, you will receive only one call to the target's disposer when all stubs are disposed.
382 |
383 | ### Listening for disconnect
384 |
385 | You can monitor any stub for "brokenness" with its `onRpcBroken()` method:
386 |
387 | ```ts
388 | stub.onRpcBroken((error: any) => {
389 | console.error(error);
390 | });
391 | ```
392 |
393 | If anything happens to the stub that would cause all further method calls and property accesses to throw exceptions, then the callback will be called. In particular, this happens if:
394 | * The stub's underlying connection is lost.
395 | * The stub is a promise, and the promise rejects.
396 |
397 | ## Security Considerations
398 |
399 | * The WebSocket API in browsers always permits cross-site connections, and does not permit setting headers. Because of this, you generally cannot use cookies nor other headers for authentication. Instead, we highly recommend the pattern shown in the second example above, in which authentication happens in-band via an RPC method that returns the authenticated API.
400 |
401 | * Cap'n Web's pipelining can make it easy for a malicious client to enqueue a large amount of work to occur on a server. To mitigate this, we recommend implementing rate limits on expensive operations. If using Cloudflare Workers, you may also consider configuring [per-request CPU limits](https://developers.cloudflare.com/workers/wrangler/configuration/#limits) to be lower than the default 30s. Note that in stateless Workers (i.e. not Durable Objects), the system considers an entire WebSocket session to be one "request" for CPU limits purposes.
402 |
403 | * Cap'n Web currently does not provide any runtime type checking. When using TypeScript, keep in mind that types are checked only at compile time. A malicious client can send types you did not expect, and this could cause you application to behave in unexpected ways. For example, MongoDB uses special property names to express queries; placing attacker-provided values directly into queries can result in query injection vulnerabilities (similar to SQL injection). Of course, JSON has always had the same problem, and there exists tooling to solve it. You might consider using a runtime type-checking framework like Zod to check your inputs. In the future, we hope to explore auto-generating type-checking code based on TypeScript types.
404 |
405 | ## Setting up a session
406 |
407 | ### HTTP batch client
408 |
409 | In HTTP batch mode, a batch of RPC calls can be made in a single HTTP request, with the server returning a batch of results.
410 |
411 | **Cap'n Web has a magic trick:** The results of one call in the batch can be used in the parameters to later calls in the same batch, even though the entire batch is sent at once. If you simply take the Promise returned by one call and use it in the parameters to another call, the Promise will be replaced with its resolution before delivering it to the callee. **This is called Promise Pipelining.**
412 |
413 | ```ts
414 | import { RpcTarget, RpcStub, newHttpBatchRpcSession } from "capnweb";
415 |
416 | // Declare our RPC interface.
417 | interface MyApi extends RpcTarget {
418 | // Returns information about the logged-in user.
419 | getUserInfo(): UserInfo;
420 |
421 | // Returns a friendly greeting for a user with the given name.
422 | greet(name: string): string;
423 | };
424 |
425 | // Start a batch request using this interface.
426 | using stub: RpcStub = newHttpBatchRpcSession("https://example.com/api");
427 |
428 | // The batch will be sent on the next I/O tick (i.e. using setTimeout(sendBatch, 0)). You have
429 | // until then to add calls to the batch.
430 | //
431 | // We can make any number of calls as part of the batch, as long as we store the promises without
432 | // awaiting them yet.
433 | let promise1 = stub.greet("Alice");
434 | let promise2 = stub.greet("Bob");
435 |
436 | // Note that a promise returned by one call can be used in the input to another call. The first
437 | // call's result will be substituted into the second call's parameters on the server side. If the
438 | // first call returns an object, you can even specify a property of the object to pass to the
439 | // second call, as shown here.
440 | let userInfoPromise = stub.getUserInfo();
441 | let promise3 = stub.greet(userInfoPromise.name);
442 |
443 | // Use Promise.all() to wait on all the promises at once. NOTE: You don't necessarily have to
444 | // use Promise.all(), but you must make sure you have explicitly awaited (or called `.then()` on)
445 | // all promises before the batch is sent. The system will only ask the server to send back
446 | // results for the promises you explicitly await. In this example, we have not awaited
447 | // `userInfoPromise` -- we only used it as a parameter to another call -- so the result will
448 | // not actually be returned.
449 | let [greeting1, greeting2, greeting3] = await Promise.all([promise1, promise2, promise3]);
450 |
451 | // Now we can do stuff with the results.
452 | console.log(greeting1);
453 | console.log(greeting2);
454 | console.log(greeting3);
455 | ```
456 |
457 | ### WebSocket client
458 |
459 | In WebSocket mode, the client forms a long-lived connection to the server, allowing us to make many calls over a long period of time. In this mode, the server can even make asynchronous calls back to the client.
460 |
461 | ```ts
462 | import { RpcTarget, RpcStub, newWebSocketRpcSession } from "capnweb";
463 |
464 | // Declare our RPC interface.
465 | interface MyApi extends RpcTarget {
466 | // Returns information about the logged-in user.
467 | getUserInfo(): UserInfo;
468 |
469 | // Returns a friendly greeting for a user with the given name.
470 | greet(name: string): string;
471 | };
472 |
473 | // Start a WebSocket session.
474 | //
475 | // (Note that disposing the root stub will close the connection. Here we declare it with `using` so
476 | // that the connection will be closed when the stub goes out of scope, but you can also call
477 | // `stub[Symbol.dispose]()` directly.)
478 | using stub: RpcStub = newWebSocketRpcSession("wss://example.com/api");
479 |
480 | // With a WebSocket, we can freely make calls over time.
481 | console.log(await stub.greet("Alice"));
482 | console.log(await stub.greet("Bob"));
483 |
484 | // But we can still use Promise Pipelining to reduce round trips. Note that we should use `using`
485 | // with promises we don't intend to await so that the system knows when we don't need them anymore.
486 | {
487 | using userInfoPromise = stub.getUserInfo();
488 | console.log(await stub.greet(userInfoPromise.name));
489 | }
490 |
491 | // Note that since we never awaited `userInfoPromise`, the server won't even bother sending the
492 | // response back over the wire.
493 | ```
494 |
495 | ### HTTP server on Cloudflare Workers
496 |
497 | The helper function `newWorkersRpcResponse()` makes it easy to implement an HTTP server that accepts both the HTTP batch and WebSocket APIs at once:
498 |
499 | ```ts
500 | import { RpcTarget, newWorkersRpcResponse } from "capnweb";
501 |
502 | // Define our server implementation.
503 | class MyApiImpl extends RpcTarget implements MyApi {
504 | constructor(private userInfo: UserInfo) {}
505 |
506 | getUserInfo(): UserInfo {
507 | return this.userInfo;
508 | }
509 |
510 | greet(name: string): string {
511 | return `Hello, ${name}!`;
512 | }
513 | };
514 |
515 | // Define our Worker HTTP handler.
516 | export default {
517 | fetch(request: Request, env, ctx) {
518 | let userInfo: UserInfo = authenticateFromCookie(request);
519 | let url = new URL(request.url);
520 |
521 | // Serve API at `/api`.
522 | if (url.pathname === "/api") {
523 | return newWorkersRpcResponse(request, new MyApiImpl(userInfo));
524 | }
525 |
526 | return new Response("Not found", {status: 404});
527 | }
528 | }
529 | ```
530 |
531 | ### HTTP server on Node.js
532 |
533 | A server on Node.js is a bit more involved, due to the awkward handling of WebSockets in Node's HTTP library.
534 |
535 | ```ts
536 | import http from "node:http";
537 | import { WebSocketServer } from 'ws'; // npm package
538 | import { RpcTarget, newWebSocketRpcSession, nodeHttpBatchRpcResponse } from "capnweb";
539 |
540 | class MyApiImpl extends RpcTarget implements MyApi {
541 | // ... define API, same as above ...
542 | }
543 |
544 | // Run standard HTTP server on a port.
545 | httpServer = http.createServer(async (request, response) => {
546 | if (request.headers.upgrade?.toLowerCase() === 'websocket') {
547 | // Ignore, should be handled by WebSocketServer instead.
548 | return;
549 | }
550 |
551 | // Accept Cap'n Web requests at `/api`.
552 | if (request.url === "/api") {
553 | try {
554 | await nodeHttpBatchRpcResponse(request, response, new MyApiImpl(), {
555 | // If you are accepting WebSockets, then you might as well accept cross-origin HTTP, since
556 | // WebSockets always permit cross-origin request anyway. But, see security considerations
557 | // for further discussion.
558 | headers: { "Access-Control-Allow-Origin": "*" }
559 | });
560 | } catch (err) {
561 | response.writeHead(500, { 'content-type': 'text/plain' });
562 | response.end(String(err?.stack || err));
563 | }
564 | return;
565 | }
566 |
567 | response.writeHead(404, { 'content-type': 'text/plain' });
568 | response.end("Not Found");
569 | });
570 |
571 | // Arrange to handle WebSockets as well, using the `ws` package. You can skip this if you only
572 | // want to handle HTTP batch requests.
573 | wsServer = new WebSocketServer({ server: httpServer })
574 | wsServer.on('connection', (ws) => {
575 | // The `as any` here is because the `ws` module seems to have its own `WebSocket` type
576 | // declaration that's incompatible with the standard one. In practice, though, they are
577 | // compatible enough for Cap'n Web!
578 | newWebSocketRpcSession(ws as any, new MyApiImpl());
579 | })
580 |
581 | // Accept requests on port 8080.
582 | httpServer.listen(8080);
583 | ```
584 |
585 | ### HTTP server on Deno
586 | ```ts
587 | import {
588 | newHttpBatchRpcResponse,
589 | newWebSocketRpcSession,
590 | RpcTarget,
591 | } from "npm:capnweb";
592 |
593 | // This is the server implementation.
594 | class MyApiImpl extends RpcTarget implements MyApi {
595 | // ... define API, same as above ...
596 | }
597 |
598 | Deno.serve(async (req) => {
599 | const url = new URL(req.url);
600 | if (url.pathname === "/api") {
601 | if (req.headers.get("upgrade") === "websocket") {
602 | const { socket, response } = Deno.upgradeWebSocket(req);
603 | socket.addEventListener("open", () => {
604 | newWebSocketRpcSession(socket, new MyApiImpl());
605 | });
606 | return response;
607 | } else {
608 | const response = await newHttpBatchRpcResponse(req, new MyApiImpl());
609 | // If you are accepting WebSockets, then you might as well accept cross-origin HTTP, since
610 | // WebSockets always permit cross-origin request anyway. But, see security considerations
611 | // for further discussion.
612 | response.headers.set("Access-Control-Allow-Origin", "*");
613 | return response;
614 | }
615 | }
616 |
617 | return new Response("Not Found", { status: 404 });
618 | });
619 | ```
620 |
621 | ### HTTP server on other runtimes
622 |
623 | Every runtime does HTTP handling and WebSockets a little differently, although most modern runtimes use the standard `Request` and `Response` types from the Fetch API, as well as the standard `WebSocket` API. You should be able to use these two functions (exported by `capnweb`) to implement both HTTP batch and WebSocket handling on all platforms:
624 |
625 | ```ts
626 | // Run a single HTTP batch.
627 | function newHttpBatchRpcResponse(
628 | request: Request, yourApi: RpcTarget, options?: RpcSessionOptions)
629 | : Promise;
630 |
631 | // Run a WebSocket session.
632 | //
633 | // This is actually the same function as is used on the client side! But on the
634 | // server, you should pass in a `WebSocket` object representing the already-open
635 | // connection, instead of a URL string, and you pass your API implementation as
636 | // the second parameter.
637 | //
638 | // You can dispose the returned `Disposable` to close the connection, or just
639 | // let it run until the client closes it.
640 | function newWebSocketRpcSession(
641 | webSocket: WebSocket, yourApi: RpcTarget, options?: RpcSessionOptions)
642 | : Disposable;
643 | ```
644 |
645 | ### HTTP server using Hono
646 |
647 | If your app is built on [Hono](https://hono.dev/) (on any runtime it supports), check out [`@hono/capnweb`](https://github.com/honojs/middleware/tree/main/packages/capnweb).
648 |
649 | ### MessagePort
650 |
651 | Cap'n Web can also talk over MessagePorts. This can be used in a browser to talk to Web Workers, iframes, etc.
652 |
653 | ```ts
654 | import { RpcTarget, RpcStub, newMessagePortRpcSession } from "capnweb";
655 |
656 | // Declare our RPC interface.
657 | class Greeter extends RpcTarget {
658 | greet(name: string): string {
659 | return `Hello, ${name}!`;
660 | }
661 | };
662 |
663 | // Create a MessageChannel (pair of MessagePorts).
664 | let channel = new MessageChannel()
665 |
666 | // Initialize the server on port1.
667 | newMessagePortRpcSession(channel.port1, new Greeter());
668 |
669 | // Initialize the client on port2.
670 | using stub: RpcStub = newMessagePortRpcSession(channel.port2);
671 |
672 | // Now you can make calls.
673 | console.log(await stub.greet("Alice"));
674 | console.log(await stub.greet("Bob"));
675 | ```
676 |
677 | Of course, in a real-world scenario, you'd probably want to send one of the two ports to another context. A `MessagePort` can itself be transferred to another context using `postMessage()`, e.g. `window.postMessage()`, `worker.postMessage()`, or even `port.postMessage()` on some other existing `MessagePort`.
678 |
679 | Note that you should not use a `Window` object itself as a port for RPC -- you should always create a new `MessageChannel` and send one of the ports over. This is because anyone can `postMessage()` to a window, and the RPC system does not authenticate that messages came from the expected sender. You need to verify that you received the port itself from the expected sender first, then let the RPC system take over.
680 |
681 | ### Custom transports
682 |
683 | You can implement a custom RPC transport across any bidirectional stream. To do so, implement the interface `RpcTransport`, which is defined as follows:
684 |
685 | ```ts
686 | // Interface for an RPC transport, which is a simple bidirectional message stream.
687 | export interface RpcTransport {
688 | // Sends a message to the other end.
689 | send(message: string): Promise;
690 |
691 | // Receives a message sent by the other end.
692 | //
693 | // If and when the transport becomes disconnected, this will reject. The thrown error will be
694 | // propagated to all outstanding calls and future calls on any stubs associated with the session.
695 | // If there are no outstanding calls (and none are made in the future), then the error does not
696 | // propagate anywhere -- this is considered a "clean" shutdown.
697 | receive(): Promise;
698 |
699 | // Indicates that the RPC system has suffered an error that prevents the session from continuing.
700 | // The transport should ideally try to send any queued messages if it can, and then close the
701 | // connection. (It's not strictly necessary to deliver queued messages, but the last message sent
702 | // before abort() is called is often an "abort" message, which communicates the error to the
703 | // peer, so if that is dropped, the peer may have less information about what happened.)
704 | abort?(reason: any): void;
705 | }
706 | ```
707 |
708 | You can then set up a connection over it:
709 |
710 | ```ts
711 | // Create the transport.
712 | let transport: RpcTransport = new MyTransport();
713 |
714 | // Create the main interface we will expose to the other end.
715 | let localMain: RpcTarget = new MyMainInterface();
716 |
717 | // Start the session.
718 | let session = new RpcSession(transport, localMain);
719 |
720 | // Get a stub for the other end's main interface.
721 | let stub: RemoteMainInterface = session.getRemoteMain();
722 |
723 | // Now we can call methods on the stub.
724 | ```
725 |
726 | Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not.
727 |
--------------------------------------------------------------------------------