/ packages / client / mod.ts
mod.ts
  1  // SPDX-FileCopyrightText: 2024 Mass Labs
  2  //
  3  // SPDX-License-Identifier: MIT
  4  import { assert } from "@std/assert";
  5  import {
  6    type Account,
  7    type Hex,
  8    hexToBigInt,
  9    hexToBytes,
 10    numberToBytes,
 11    pad,
 12    recoverMessageAddress,
 13    recoverPublicKey,
 14    type WalletClient,
 15  } from "viem";
 16  import { parseAccount } from "viem/accounts";
 17  import { createSiweMessage } from "viem/siwe";
 18  import { hashMessage } from "viem/utils";
 19  import { ProjectivePoint } from "@noble/secp256k1";
 20  import { getLogger } from "@logtape/logtape";
 21  import LockMap from "@nullradix/lockmap";
 22  import schema, { EnvelopMessageTypes } from "@massmarket/schema";
 23  import { decodeBufferToString, hexToBase64 } from "@massmarket/utils";
 24  import {
 25    type CodecKey,
 26    type CodecValue,
 27    decode,
 28    encode,
 29  } from "@massmarket/utils/codec";
 30  
 31  const logger = getLogger(["mass-market", "relay-client"]);
 32  
 33  export interface IRelayEndpoint {
 34    url: URL; // the websocket URL to talk to
 35    tokenId: `0x${string}`;
 36  }
 37  
 38  export interface IRelayClientOptions {
 39    relayEndpoint: IRelayEndpoint;
 40    walletClient: WalletClient;
 41    keycard: Hex | Account;
 42    //TODO: make ID part of the path
 43    shopId: bigint;
 44    keyCardNonce?: number;
 45  }
 46  
 47  export type Patch =
 48    & {
 49      Path: CodecKey[];
 50    }
 51    & (
 52      | {
 53        Op: "add" | "replace" | "append";
 54        Value: CodecValue;
 55      }
 56      | {
 57        Op: "increment" | "decrement";
 58        Value: number;
 59      }
 60      | {
 61        Op: "remove";
 62      }
 63    );
 64  
 65  export type PushedPatchSet = {
 66    signer: Hex;
 67    patches: Patch[];
 68    header: CodecValue;
 69    sequence: number;
 70  };
 71  
 72  export type SignedPatchSet = {
 73    Header: Map<string, CodecValue>;
 74    Patches: Map<string, CodecValue>[];
 75    Signature: Uint8Array;
 76  };
 77  
 78  export class RelayResponseError extends Error {
 79    constructor(
 80      public override cause: {
 81        message: string;
 82        id: unknown;
 83        requestType: string;
 84        code: number;
 85        additionalInfo?: {
 86          objectId: number | bigint;
 87        };
 88      },
 89    ) {
 90      logger
 91        .info`RelayResponseError:network request ${cause.requestType} cause id ${cause.id} failed with error[${cause.code}]: ${cause.message}`;
 92      super(
 93        `network request ${cause.requestType} failed with error[${cause.code}]: ${cause.message}`,
 94      );
 95    }
 96  }
 97  
 98  export class ClientWriteError extends Error {
 99    constructor(
100      public originalError: Error,
101      public patchSet: SignedPatchSet,
102    ) {
103      super(originalError.message);
104    }
105  }
106  
107  export class RelayClient {
108    connection: WebSocket | null = null;
109    keyCardNonce: number;
110    private pingsReceived: number = 0;
111    private lastPingReceived: Date = new Date(0);
112    readonly walletClient: WalletClient;
113    readonly keycard;
114    readonly relayEndpoint;
115    readonly shopId;
116    // TODO; we can use the subscription path for the id
117    #subscriptions: Map<string, ReadableStreamDefaultController<PushedPatchSet>> =
118      new Map();
119    #requestCounter;
120    #waitingMessagesResponse: LockMap<string, schema.Envelope> = new LockMap();
121    #initialAuthPromise = Promise.resolve(false);
122    #authenticationPromise: Promise<boolean> = this.#initialAuthPromise;
123    #isAuthenticated = false;
124  
125    constructor(params: IRelayClientOptions) {
126      this.walletClient = params.walletClient;
127      this.relayEndpoint = params.relayEndpoint;
128      this.keyCardNonce = params.keyCardNonce ?? 0;
129      this.shopId = params.shopId;
130      this.keycard = params.keycard;
131      this.#requestCounter = 1;
132      this.#authenticationPromise = this.#initialAuthPromise;
133      this.#isAuthenticated = false;
134    }
135  
136    get stats() {
137      return {
138        pingsReceived: this.pingsReceived,
139        lastPingReceived: this.lastPingReceived,
140        subscriptions: this.#subscriptions.size,
141        waitingMessagesResponse: this.#waitingMessagesResponse.size,
142        requestCounter: this.#requestCounter,
143      };
144    }
145  
146    // like encodeAndSend but doesn't wait for a response.
147    encodeAndSendNoWait(envelope: schema.IEnvelope = {}): schema.RequestId {
148      if (!envelope.requestId) {
149        envelope.requestId = { raw: this.#requestCounter };
150      }
151      const err = schema.Envelope.verify(envelope);
152      if (err) {
153        throw new Error(`unable to verify envelope: ${err}`);
154      }
155      const payload = schema.Envelope.encode(envelope).finish();
156      assert(this.connection, "Connection is not established");
157      this.connection.send(payload);
158      const requestType =
159        Object.keys(envelope).filter((k) => k !== "requestId")[0];
160      const reqId = envelope.requestId!.raw;
161      logger.debug`sent[${reqId}] ${requestType}`;
162      this.#requestCounter++;
163      return schema.RequestId.create(envelope.requestId);
164    }
165  
166    // encode and send a message and then wait for a response
167    async encodeAndSend(
168      envelope: schema.IEnvelope = {},
169    ): Promise<schema.Envelope> {
170      const id = this.encodeAndSendNoWait(envelope);
171      const { promise } = this.#waitingMessagesResponse.lock(
172        id.raw.toString(),
173      )!;
174      const response = await promise;
175      const requestType =
176        Object.keys(response).filter((k) => k !== "requestId")[0];
177      logger.debug`recvt[${id.raw}] ${requestType}`;
178      if (response.response?.error) {
179        const { code, message, additionalInfo } = response.response.error;
180        assert(code, "code is required");
181        assert(message, "message is required");
182        let unpackedExtraInfo: {
183          objectId: number | bigint;
184        } | undefined;
185        if (additionalInfo && additionalInfo.objectId) {
186          // TODO: pretty ugly Long > number conversion.
187          unpackedExtraInfo = {
188            objectId: Number(additionalInfo.objectId),
189          };
190        }
191        throw new RelayResponseError(
192          {
193            id: id.raw,
194            message,
195            code,
196            requestType,
197            additionalInfo: unpackedExtraInfo,
198          },
199        );
200      } else {
201        return response;
202      }
203    }
204  
205    async #decodeMessage(me: MessageEvent) {
206      const data = me.data instanceof Blob
207        ? await new Response(me.data).arrayBuffer()
208        : me.data;
209      const payload = new Uint8Array(data);
210      const envelope = schema.Envelope.decode(payload);
211      assert(envelope.requestId?.raw, "requestId is required");
212      const requestType =
213        Object.keys(envelope).filter((k) => k !== "requestId")[0];
214      const reqId = envelope.requestId!.raw;
215      if (requestType === "response") {
216        const isError = envelope.response?.error ? "error" : "okay";
217        logger.debug`unbox[${reqId}] ${isError}`;
218      } else if (requestType === "subscriptionPushRequest") {
219        logger.debug`unbox[${reqId}] ${requestType}`;
220      }
221  
222      switch (envelope.message) {
223        case EnvelopMessageTypes.PingRequest:
224          this.#handlePingRequest(envelope);
225          break;
226        case EnvelopMessageTypes.SubscriptionPushRequest:
227          assert(
228            envelope.subscriptionPushRequest,
229            "subscriptionPushRequest is required",
230          );
231          {
232            const subscriptionId = envelope
233              .subscriptionPushRequest
234              .subscriptionId!.toString();
235            const controller = this.#subscriptions.get(subscriptionId);
236            if (!controller) {
237              logger.warn`invalid subscription recv. id: ${subscriptionId}`;
238              return;
239            }
240            const sets = envelope.subscriptionPushRequest.sets ?? [];
241            logger
242              .debug`unbox[${reqId}] subscriptionPushRequest[${subscriptionId}]. SetCount: ${sets.length}`;
243            try {
244              for (const ppset of sets) {
245                const header = decode(ppset.header!);
246                const sequence = typeof ppset!.shopSeqNo! === "number"
247                  ? ppset!.shopSeqNo!
248                  : ppset!.shopSeqNo!.toNumber();
249                const patches = ppset.patches!.map((patch) =>
250                  Object.fromEntries(
251                    decode(patch) as Map<string, CodecValue>,
252                  ) as Patch
253                );
254  
255                // This doesn't really need to be async
256                // viem does an async import of @noble/secp256k1
257                const signer = await recoverMessageAddress({
258                  message: { raw: ppset.header! },
259                  signature: ppset.signature!,
260                });
261                controller.enqueue({
262                  patches,
263                  header,
264                  signer,
265                  sequence,
266                });
267              }
268            } catch (e) {
269              controller.error(e);
270            }
271            // TODO: properly handle backpressure
272            // we should implement `pull` for the read stream, in the pull we should request the next chunk
273            this.encodeAndSendNoWait({
274              requestId: envelope.requestId,
275              response: {},
276            });
277          }
278          break;
279        default:
280          this.#waitingMessagesResponse.unlock(
281            envelope.requestId!.raw!.toString(),
282            envelope,
283          );
284          break;
285      }
286    }
287  
288    #handlePingRequest(ping: schema.Envelope) {
289      // relay ends connection if ping is not responded to 3 times.
290      this.encodeAndSendNoWait({
291        requestId: ping.requestId,
292        response: {},
293      });
294      this.pingsReceived++;
295      this.lastPingReceived = new Date();
296    }
297  
298    async createSubscription(
299      seqNo = 0,
300      controller?: ReadableStreamDefaultController<PushedPatchSet>,
301    ) {
302      logger.debug`createSubscription seqNo: ${seqNo}`;
303      const { response } = await this.encodeAndSend({
304        subscriptionRequest: {
305          startShopSeqNo: seqNo,
306          shopId: { raw: pad(numberToBytes(this.shopId)) },
307        },
308      });
309  
310      assert(response?.payload, "response.payload is required");
311      if (controller) {
312        const id = response.payload!;
313        this.#subscriptions.set(id.toString(), controller);
314        logger.debug`registered subscription[${id.toString()}]`;
315      }
316      return response;
317    }
318  
319    // TODO implement sending reason
320    cancelSubscriptionRequest(id: Uint8Array, _reason: unknown) {
321      return this.encodeAndSend({
322        subscriptionCancelRequest: {
323          subscriptionId: id,
324        },
325      });
326    }
327  
328    createSubscriptionStream(seqNum: number) {
329      let id: Uint8Array;
330      return new ReadableStream<PushedPatchSet>({
331        start: async (c) => {
332          await this.connect();
333          const { payload } = await this.createSubscription(seqNum, c);
334          id = payload!;
335        },
336        cancel: async (reason) => {
337          this.#subscriptions.delete(id.toString());
338          await this.cancelSubscriptionRequest(id, reason);
339        },
340      });
341    }
342  
343    createWriteStream() {
344      return new WritableStream<Patch[]>({
345        // Why do we even need to authenticate here?
346        start: async () => {
347          await this.connect();
348          await this.authenticate();
349        },
350        write: async (patches) => {
351          const patch = new Map(Object.entries(patches[0]));
352          // TODO: add MMR
353          const rootHash = await crypto.subtle.digest(
354            "SHA-256",
355            encode(patch),
356          );
357          const header = new Map<string, CodecValue>([
358            ["KeyCardNonce", ++this.keyCardNonce],
359            ["Timestamp", new Date()],
360            ["ShopID", this.shopId],
361            ["RootHash", rootHash],
362          ]);
363          // TODO: use embedded cbor, or COSE
364          const encodedHeader = encode(header);
365          const sig = await this.walletClient.signMessage({
366            account: this.keycard,
367            message: { raw: encodedHeader },
368          });
369          const signedPatchSet = {
370            Header: header,
371            Patches: [patch],
372            Signature: hexToBytes(sig),
373          };
374          const encodedPatchSet = encode(new Map(Object.entries(signedPatchSet)));
375          const envelope = {
376            patchSetWriteRequest: {
377              patchSet: encodedPatchSet,
378            },
379          };
380          try {
381            await this.encodeAndSend(envelope);
382          } catch (error: unknown) {
383            if (
384              error instanceof Error && !(error instanceof RelayResponseError)
385            ) {
386              throw new ClientWriteError(error, signedPatchSet);
387            } else {
388              throw error;
389            }
390          }
391        },
392      });
393    }
394  
395    // TODO: this is a bit of a mess.
396    // What these promises are trying to achieve would usually be a mutex/lock,
397    // to make sure we are not running multiple authentication attempts at the same time.
398    async authenticate(): Promise<boolean> {
399      // 1. Already authenticated? Return true.
400      if (this.#isAuthenticated) {
401        return true;
402      }
403  
404      // 2. Authentication already in progress? Wait for it.
405      // Grab the promise *before* potentially creating a new one.
406      const currentAuthAttempt = this.#authenticationPromise;
407      if (currentAuthAttempt !== this.#initialAuthPromise) {
408        // An attempt is/was in progress. Wait for it.
409        try {
410          await currentAuthAttempt;
411          // If it succeeded, #isAuthenticated should now be true.
412          // If it failed, an error is thrown.
413          return this.#isAuthenticated; // Return the final state
414        } catch (_error) {
415          // The ongoing attempt failed. The state should have been reset
416          // by the catch block of that attempt. We fall through to retry.
417          // Ensure state is reset if the original call didn't handle it properly.
418          if (this.#authenticationPromise === currentAuthAttempt) {
419            this.#authenticationPromise = this.#initialAuthPromise;
420            this.#isAuthenticated = false;
421          }
422        }
423      }
424      // Check again if authentication succeeded while waiting
425      if (this.#isAuthenticated) {
426        return true;
427      }
428  
429      // 3. No authentication in progress, or previous attempt failed. Start a new one.
430      const { promise, resolve, reject } = Promise.withResolvers<boolean>();
431      // IMPORTANT: Set the shared promise *before* the first await
432      this.#authenticationPromise = promise;
433  
434      try {
435        // Perform authentication process
436        const publicKey = await getAccountPublicKey(
437          this.walletClient,
438          this.keycard,
439        );
440  
441        // Use a bypass function for auth requests to avoid potential recursive auth checks
442        // within encodeAndSend if it calls authenticate itself.
443        const authRequestFunction = async (env: schema.IEnvelope) => {
444          const id = this.encodeAndSendNoWait(env);
445          const { promise: waitPromise } = this.#waitingMessagesResponse.lock(
446            id.raw.toString(),
447          )!;
448          // Ensure the lock exists before awaiting
449          assert(waitPromise, `Lock not found for request ID: ${id.raw}`);
450          return await waitPromise;
451        };
452  
453        const { response: authResponse } = await authRequestFunction({
454          authRequest: {
455            publicKey: {
456              raw: hexToBytes(`0x${publicKey}`),
457            },
458          },
459        });
460  
461        // either authResponse.payload or authResponse.error is required
462        if (!authResponse?.payload) {
463          assert(
464            authResponse?.error,
465            "Authentication response error is required",
466          );
467          const { code, message } = authResponse.error;
468          assert(code, "error.code is required");
469          assert(message, "error.message is required");
470          throw new Error(`Authentication failed with code: ${code}: ${message}`);
471        }
472  
473        const sig = await this.walletClient.signMessage({
474          account: this.keycard,
475          message: {
476            raw: authResponse.payload,
477          },
478        });
479  
480        await authRequestFunction({
481          challengeSolutionRequest: {
482            signature: { raw: hexToBytes(sig) },
483          },
484        });
485  
486        // Mark as authenticated
487        this.#isAuthenticated = true;
488        resolve(true); // Resolve the promise we created
489        return true;
490      } catch (error) {
491        // Mark as not authenticated
492        this.#isAuthenticated = false;
493        // Reset the main promise *only if* it's still our promise.
494        // This prevents a late failure from overwriting a newer attempt's promise.
495        if (this.#authenticationPromise === promise) {
496          this.#authenticationPromise = this.#initialAuthPromise;
497        }
498        reject(error); // Reject the promise we created
499        throw error; // Rethrow
500      }
501    }
502  
503    connect(onError?: (error: Event) => void): Promise<Event> {
504      if (
505        !this.connection ||
506        this.connection.readyState === WebSocket.CLOSING ||
507        this.connection.readyState === WebSocket.CLOSED
508      ) {
509        this.connection = new WebSocket(this.relayEndpoint.url + "/sessions");
510  
511        // Reset authentication state when creating a new connection
512        this.#isAuthenticated = false;
513        this.#authenticationPromise = this.#initialAuthPromise; // Use the initial promise instance
514  
515        this.connection.addEventListener(
516          "error",
517          onError ? onError : (errEv: Event) => {
518            assert(errEv instanceof ErrorEvent, "error event is required");
519            const error = new Error(errEv.message);
520            error.name = errEv.error?.name ?? "WebSocketError";
521            logger.error`WebSocket error! ${error}`;
522          },
523        );
524  
525        this.connection.addEventListener(
526          "close",
527          (ev: CloseEvent) => {
528            if (!ev.wasClean) {
529              logger.warn`WebSocket closed uncleanly`;
530            }
531            this.#isAuthenticated = false;
532            this.#authenticationPromise = this.#initialAuthPromise;
533            this.connection = null;
534          },
535        );
536  
537        let messageDecoding = Promise.resolve();
538        this.connection.addEventListener(
539          "message",
540          (data) => {
541            // Here we wait to decode the message until the previous message is fully processed
542            messageDecoding = messageDecoding.then(() =>
543              this.#decodeMessage(data)
544            );
545          },
546        );
547      }
548      return new Promise((resolve) => {
549        if (this.connection!.readyState === WebSocket.OPEN) {
550          resolve(new Event("already open"));
551        } else {
552          this.connection!.addEventListener("open", (evt: Event) => {
553            logger.debug`WebSocket opened`;
554            // TODO: unbox event to concrete values
555            resolve(evt);
556          });
557        }
558      });
559    }
560  
561    disconnect(): Promise<CloseEvent | string> {
562      return new Promise((resolve) => {
563        if (
564          !this.connection || // Check if connection exists before accessing readyState
565          this.connection.readyState === WebSocket.CLOSED
566        ) {
567          resolve("already closed");
568          return;
569        }
570        // Add listener before closing
571        this.connection.addEventListener("close", (event) => {
572          // Reset authentication state on disconnect
573          this.#isAuthenticated = false;
574          this.#authenticationPromise = this.#initialAuthPromise; // Use the initial promise instance
575          resolve(event); // Resolve with the close event
576        });
577        // Handle cases where close might happen before listener is added (less likely but possible)
578        if (
579          this.connection.readyState === WebSocket.CLOSING ||
580          this.connection.readyState === WebSocket.CLOSED
581        ) {
582          // If already closing/closed after check but before listener, resolve immediately after resetting state
583          this.#isAuthenticated = false;
584          this.#authenticationPromise = this.#initialAuthPromise;
585          resolve("closed before listener attached");
586          return;
587        }
588  
589        this.connection.close(1000);
590        // State reset now happens in the 'close' event listener
591      });
592    }
593  
594    async enrollKeycard(
595      wallet: WalletClient,
596      account: Hex | Account,
597      isGuest: boolean = true,
598      location?: URL,
599    ) {
600      const parsedAccount = parseAccount(account);
601      const address = parsedAccount.address;
602      const publicKey = await getAccountPublicKey(
603        this.walletClient,
604        this.keycard,
605      );
606      const endpointURL = new URL(this.relayEndpoint.url);
607      endpointURL.protocol = this.relayEndpoint.url.protocol.includes("wss")
608        ? "https"
609        : "http";
610      endpointURL.pathname += `/enroll_key_card`;
611      endpointURL.search = `guest=${isGuest ? 1 : 0}`;
612      const signInURL: URL = location ?? endpointURL;
613  
614      const message = createSiweMessage({
615        address,
616        chainId: 1, // not really used
617        domain: signInURL.host,
618        nonce: "00000000",
619        uri: signInURL.href,
620        version: "1",
621        resources: [
622          `mass-relayid:${hexToBigInt(this.relayEndpoint.tokenId)}`,
623          `mass-shopid:${this.shopId}`,
624          `mass-keycard:${publicKey}`,
625        ],
626      });
627  
628      const signature = await wallet.signMessage({
629        account: parsedAccount,
630        message,
631      });
632      const body = JSON.stringify({
633        message,
634        signature: hexToBase64(signature),
635      });
636      return fetch(endpointURL.href, {
637        method: "POST",
638        body,
639      });
640    }
641  
642    async uploadBlob(blob: FormData) {
643      await this.connect();
644      await this.authenticate();
645      const envelope = await this.encodeAndSend({
646        getBlobUploadUrlRequest: {},
647      });
648      assert(envelope.response, "envelope.response is required");
649  
650      if (envelope.response.error) {
651        const { code, message } = envelope.response.error;
652        throw new Error(
653          `Failed to get blob upload URL - code: ${code} message: ${message}`,
654        );
655      }
656      assert(envelope.response?.payload, "envelope.response.payload is required");
657  
658      const url = decodeBufferToString(envelope.response.payload);
659      const uploadResp = await fetch(url, {
660        method: "POST",
661        body: blob,
662      });
663      if (uploadResp.status !== 201) {
664        throw new Error(
665          `unexpected status: ${uploadResp.statusText} (${uploadResp.status})`,
666        );
667      }
668      return uploadResp.json();
669    }
670  }
671  
672  /**
673   * Get the public key of an account. We have to sign then recover the public key,
674   * because the eth rpc doesn't have a method to get the public key of an account.
675   */
676  async function getAccountPublicKey(
677    wallet: WalletClient,
678    account: Hex | Account,
679  ): Promise<string> {
680    const signature = await wallet.signMessage({
681      account,
682      message: "",
683    });
684    const hash = hashMessage("");
685    const publicKey = await recoverPublicKey({ signature, hash });
686    return ProjectivePoint.fromHex(publicKey.slice(2)).toHex();
687  }
688  
689  // testing helper
690  export async function discoverRelay(url: string): Promise<IRelayEndpoint> {
691    const discoveryURL = url
692      .replace("ws", "http")
693      .replace("/v4", "/testing/discovery");
694    const testingResponse = await fetch(discoveryURL);
695    const testingData = await testingResponse.json();
696    return {
697      url: new URL(url),
698      tokenId: testingData.relay_token_id,
699    };
700  }