mod.ts
1 // SPDX-FileCopyrightText: 2024 Mass Labs 2 // 3 // SPDX-License-Identifier: MIT 4 import { assert } from "@std/assert"; 5 import { 6 type Account, 7 type Hex, 8 hexToBigInt, 9 hexToBytes, 10 numberToBytes, 11 pad, 12 recoverMessageAddress, 13 recoverPublicKey, 14 type WalletClient, 15 } from "viem"; 16 import { parseAccount } from "viem/accounts"; 17 import { createSiweMessage } from "viem/siwe"; 18 import { hashMessage } from "viem/utils"; 19 import { ProjectivePoint } from "@noble/secp256k1"; 20 import { getLogger } from "@logtape/logtape"; 21 import LockMap from "@nullradix/lockmap"; 22 import schema, { EnvelopMessageTypes } from "@massmarket/schema"; 23 import { decodeBufferToString, hexToBase64 } from "@massmarket/utils"; 24 import { 25 type CodecKey, 26 type CodecValue, 27 decode, 28 encode, 29 } from "@massmarket/utils/codec"; 30 31 const logger = getLogger(["mass-market", "relay-client"]); 32 33 export interface IRelayEndpoint { 34 url: URL; // the websocket URL to talk to 35 tokenId: `0x${string}`; 36 } 37 38 export interface IRelayClientOptions { 39 relayEndpoint: IRelayEndpoint; 40 walletClient: WalletClient; 41 keycard: Hex | Account; 42 //TODO: make ID part of the path 43 shopId: bigint; 44 keyCardNonce?: number; 45 } 46 47 export type Patch = 48 & { 49 Path: CodecKey[]; 50 } 51 & ( 52 | { 53 Op: "add" | "replace" | "append"; 54 Value: CodecValue; 55 } 56 | { 57 Op: "increment" | "decrement"; 58 Value: number; 59 } 60 | { 61 Op: "remove"; 62 } 63 ); 64 65 export type PushedPatchSet = { 66 signer: Hex; 67 patches: Patch[]; 68 header: CodecValue; 69 sequence: number; 70 }; 71 72 export type SignedPatchSet = { 73 Header: Map<string, CodecValue>; 74 Patches: Map<string, CodecValue>[]; 75 Signature: Uint8Array; 76 }; 77 78 export class RelayResponseError extends Error { 79 constructor( 80 public override cause: { 81 message: string; 82 id: unknown; 83 requestType: string; 84 code: number; 85 additionalInfo?: { 86 objectId: number | bigint; 87 }; 88 }, 89 ) { 90 logger 91 .info`RelayResponseError:network request ${cause.requestType} cause id ${cause.id} failed with error[${cause.code}]: ${cause.message}`; 92 super( 93 `network request ${cause.requestType} failed with error[${cause.code}]: ${cause.message}`, 94 ); 95 } 96 } 97 98 export class ClientWriteError extends Error { 99 constructor( 100 public originalError: Error, 101 public patchSet: SignedPatchSet, 102 ) { 103 super(originalError.message); 104 } 105 } 106 107 export class RelayClient { 108 connection: WebSocket | null = null; 109 keyCardNonce: number; 110 private pingsReceived: number = 0; 111 private lastPingReceived: Date = new Date(0); 112 readonly walletClient: WalletClient; 113 readonly keycard; 114 readonly relayEndpoint; 115 readonly shopId; 116 // TODO; we can use the subscription path for the id 117 #subscriptions: Map<string, ReadableStreamDefaultController<PushedPatchSet>> = 118 new Map(); 119 #requestCounter; 120 #waitingMessagesResponse: LockMap<string, schema.Envelope> = new LockMap(); 121 #initialAuthPromise = Promise.resolve(false); 122 #authenticationPromise: Promise<boolean> = this.#initialAuthPromise; 123 #isAuthenticated = false; 124 125 constructor(params: IRelayClientOptions) { 126 this.walletClient = params.walletClient; 127 this.relayEndpoint = params.relayEndpoint; 128 this.keyCardNonce = params.keyCardNonce ?? 0; 129 this.shopId = params.shopId; 130 this.keycard = params.keycard; 131 this.#requestCounter = 1; 132 this.#authenticationPromise = this.#initialAuthPromise; 133 this.#isAuthenticated = false; 134 } 135 136 get stats() { 137 return { 138 pingsReceived: this.pingsReceived, 139 lastPingReceived: this.lastPingReceived, 140 subscriptions: this.#subscriptions.size, 141 waitingMessagesResponse: this.#waitingMessagesResponse.size, 142 requestCounter: this.#requestCounter, 143 }; 144 } 145 146 // like encodeAndSend but doesn't wait for a response. 147 encodeAndSendNoWait(envelope: schema.IEnvelope = {}): schema.RequestId { 148 if (!envelope.requestId) { 149 envelope.requestId = { raw: this.#requestCounter }; 150 } 151 const err = schema.Envelope.verify(envelope); 152 if (err) { 153 throw new Error(`unable to verify envelope: ${err}`); 154 } 155 const payload = schema.Envelope.encode(envelope).finish(); 156 assert(this.connection, "Connection is not established"); 157 this.connection.send(payload); 158 const requestType = 159 Object.keys(envelope).filter((k) => k !== "requestId")[0]; 160 const reqId = envelope.requestId!.raw; 161 logger.debug`sent[${reqId}] ${requestType}`; 162 this.#requestCounter++; 163 return schema.RequestId.create(envelope.requestId); 164 } 165 166 // encode and send a message and then wait for a response 167 async encodeAndSend( 168 envelope: schema.IEnvelope = {}, 169 ): Promise<schema.Envelope> { 170 const id = this.encodeAndSendNoWait(envelope); 171 const { promise } = this.#waitingMessagesResponse.lock( 172 id.raw.toString(), 173 )!; 174 const response = await promise; 175 const requestType = 176 Object.keys(response).filter((k) => k !== "requestId")[0]; 177 logger.debug`recvt[${id.raw}] ${requestType}`; 178 if (response.response?.error) { 179 const { code, message, additionalInfo } = response.response.error; 180 assert(code, "code is required"); 181 assert(message, "message is required"); 182 let unpackedExtraInfo: { 183 objectId: number | bigint; 184 } | undefined; 185 if (additionalInfo && additionalInfo.objectId) { 186 // TODO: pretty ugly Long > number conversion. 187 unpackedExtraInfo = { 188 objectId: Number(additionalInfo.objectId), 189 }; 190 } 191 throw new RelayResponseError( 192 { 193 id: id.raw, 194 message, 195 code, 196 requestType, 197 additionalInfo: unpackedExtraInfo, 198 }, 199 ); 200 } else { 201 return response; 202 } 203 } 204 205 async #decodeMessage(me: MessageEvent) { 206 const data = me.data instanceof Blob 207 ? await new Response(me.data).arrayBuffer() 208 : me.data; 209 const payload = new Uint8Array(data); 210 const envelope = schema.Envelope.decode(payload); 211 assert(envelope.requestId?.raw, "requestId is required"); 212 const requestType = 213 Object.keys(envelope).filter((k) => k !== "requestId")[0]; 214 const reqId = envelope.requestId!.raw; 215 if (requestType === "response") { 216 const isError = envelope.response?.error ? "error" : "okay"; 217 logger.debug`unbox[${reqId}] ${isError}`; 218 } else if (requestType === "subscriptionPushRequest") { 219 logger.debug`unbox[${reqId}] ${requestType}`; 220 } 221 222 switch (envelope.message) { 223 case EnvelopMessageTypes.PingRequest: 224 this.#handlePingRequest(envelope); 225 break; 226 case EnvelopMessageTypes.SubscriptionPushRequest: 227 assert( 228 envelope.subscriptionPushRequest, 229 "subscriptionPushRequest is required", 230 ); 231 { 232 const subscriptionId = envelope 233 .subscriptionPushRequest 234 .subscriptionId!.toString(); 235 const controller = this.#subscriptions.get(subscriptionId); 236 if (!controller) { 237 logger.warn`invalid subscription recv. id: ${subscriptionId}`; 238 return; 239 } 240 const sets = envelope.subscriptionPushRequest.sets ?? []; 241 logger 242 .debug`unbox[${reqId}] subscriptionPushRequest[${subscriptionId}]. SetCount: ${sets.length}`; 243 try { 244 for (const ppset of sets) { 245 const header = decode(ppset.header!); 246 const sequence = typeof ppset!.shopSeqNo! === "number" 247 ? ppset!.shopSeqNo! 248 : ppset!.shopSeqNo!.toNumber(); 249 const patches = ppset.patches!.map((patch) => 250 Object.fromEntries( 251 decode(patch) as Map<string, CodecValue>, 252 ) as Patch 253 ); 254 255 // This doesn't really need to be async 256 // viem does an async import of @noble/secp256k1 257 const signer = await recoverMessageAddress({ 258 message: { raw: ppset.header! }, 259 signature: ppset.signature!, 260 }); 261 controller.enqueue({ 262 patches, 263 header, 264 signer, 265 sequence, 266 }); 267 } 268 } catch (e) { 269 controller.error(e); 270 } 271 // TODO: properly handle backpressure 272 // we should implement `pull` for the read stream, in the pull we should request the next chunk 273 this.encodeAndSendNoWait({ 274 requestId: envelope.requestId, 275 response: {}, 276 }); 277 } 278 break; 279 default: 280 this.#waitingMessagesResponse.unlock( 281 envelope.requestId!.raw!.toString(), 282 envelope, 283 ); 284 break; 285 } 286 } 287 288 #handlePingRequest(ping: schema.Envelope) { 289 // relay ends connection if ping is not responded to 3 times. 290 this.encodeAndSendNoWait({ 291 requestId: ping.requestId, 292 response: {}, 293 }); 294 this.pingsReceived++; 295 this.lastPingReceived = new Date(); 296 } 297 298 async createSubscription( 299 seqNo = 0, 300 controller?: ReadableStreamDefaultController<PushedPatchSet>, 301 ) { 302 logger.debug`createSubscription seqNo: ${seqNo}`; 303 const { response } = await this.encodeAndSend({ 304 subscriptionRequest: { 305 startShopSeqNo: seqNo, 306 shopId: { raw: pad(numberToBytes(this.shopId)) }, 307 }, 308 }); 309 310 assert(response?.payload, "response.payload is required"); 311 if (controller) { 312 const id = response.payload!; 313 this.#subscriptions.set(id.toString(), controller); 314 logger.debug`registered subscription[${id.toString()}]`; 315 } 316 return response; 317 } 318 319 // TODO implement sending reason 320 cancelSubscriptionRequest(id: Uint8Array, _reason: unknown) { 321 return this.encodeAndSend({ 322 subscriptionCancelRequest: { 323 subscriptionId: id, 324 }, 325 }); 326 } 327 328 createSubscriptionStream(seqNum: number) { 329 let id: Uint8Array; 330 return new ReadableStream<PushedPatchSet>({ 331 start: async (c) => { 332 await this.connect(); 333 const { payload } = await this.createSubscription(seqNum, c); 334 id = payload!; 335 }, 336 cancel: async (reason) => { 337 this.#subscriptions.delete(id.toString()); 338 await this.cancelSubscriptionRequest(id, reason); 339 }, 340 }); 341 } 342 343 createWriteStream() { 344 return new WritableStream<Patch[]>({ 345 // Why do we even need to authenticate here? 346 start: async () => { 347 await this.connect(); 348 await this.authenticate(); 349 }, 350 write: async (patches) => { 351 const patch = new Map(Object.entries(patches[0])); 352 // TODO: add MMR 353 const rootHash = await crypto.subtle.digest( 354 "SHA-256", 355 encode(patch), 356 ); 357 const header = new Map<string, CodecValue>([ 358 ["KeyCardNonce", ++this.keyCardNonce], 359 ["Timestamp", new Date()], 360 ["ShopID", this.shopId], 361 ["RootHash", rootHash], 362 ]); 363 // TODO: use embedded cbor, or COSE 364 const encodedHeader = encode(header); 365 const sig = await this.walletClient.signMessage({ 366 account: this.keycard, 367 message: { raw: encodedHeader }, 368 }); 369 const signedPatchSet = { 370 Header: header, 371 Patches: [patch], 372 Signature: hexToBytes(sig), 373 }; 374 const encodedPatchSet = encode(new Map(Object.entries(signedPatchSet))); 375 const envelope = { 376 patchSetWriteRequest: { 377 patchSet: encodedPatchSet, 378 }, 379 }; 380 try { 381 await this.encodeAndSend(envelope); 382 } catch (error: unknown) { 383 if ( 384 error instanceof Error && !(error instanceof RelayResponseError) 385 ) { 386 throw new ClientWriteError(error, signedPatchSet); 387 } else { 388 throw error; 389 } 390 } 391 }, 392 }); 393 } 394 395 // TODO: this is a bit of a mess. 396 // What these promises are trying to achieve would usually be a mutex/lock, 397 // to make sure we are not running multiple authentication attempts at the same time. 398 async authenticate(): Promise<boolean> { 399 // 1. Already authenticated? Return true. 400 if (this.#isAuthenticated) { 401 return true; 402 } 403 404 // 2. Authentication already in progress? Wait for it. 405 // Grab the promise *before* potentially creating a new one. 406 const currentAuthAttempt = this.#authenticationPromise; 407 if (currentAuthAttempt !== this.#initialAuthPromise) { 408 // An attempt is/was in progress. Wait for it. 409 try { 410 await currentAuthAttempt; 411 // If it succeeded, #isAuthenticated should now be true. 412 // If it failed, an error is thrown. 413 return this.#isAuthenticated; // Return the final state 414 } catch (_error) { 415 // The ongoing attempt failed. The state should have been reset 416 // by the catch block of that attempt. We fall through to retry. 417 // Ensure state is reset if the original call didn't handle it properly. 418 if (this.#authenticationPromise === currentAuthAttempt) { 419 this.#authenticationPromise = this.#initialAuthPromise; 420 this.#isAuthenticated = false; 421 } 422 } 423 } 424 // Check again if authentication succeeded while waiting 425 if (this.#isAuthenticated) { 426 return true; 427 } 428 429 // 3. No authentication in progress, or previous attempt failed. Start a new one. 430 const { promise, resolve, reject } = Promise.withResolvers<boolean>(); 431 // IMPORTANT: Set the shared promise *before* the first await 432 this.#authenticationPromise = promise; 433 434 try { 435 // Perform authentication process 436 const publicKey = await getAccountPublicKey( 437 this.walletClient, 438 this.keycard, 439 ); 440 441 // Use a bypass function for auth requests to avoid potential recursive auth checks 442 // within encodeAndSend if it calls authenticate itself. 443 const authRequestFunction = async (env: schema.IEnvelope) => { 444 const id = this.encodeAndSendNoWait(env); 445 const { promise: waitPromise } = this.#waitingMessagesResponse.lock( 446 id.raw.toString(), 447 )!; 448 // Ensure the lock exists before awaiting 449 assert(waitPromise, `Lock not found for request ID: ${id.raw}`); 450 return await waitPromise; 451 }; 452 453 const { response: authResponse } = await authRequestFunction({ 454 authRequest: { 455 publicKey: { 456 raw: hexToBytes(`0x${publicKey}`), 457 }, 458 }, 459 }); 460 461 // either authResponse.payload or authResponse.error is required 462 if (!authResponse?.payload) { 463 assert( 464 authResponse?.error, 465 "Authentication response error is required", 466 ); 467 const { code, message } = authResponse.error; 468 assert(code, "error.code is required"); 469 assert(message, "error.message is required"); 470 throw new Error(`Authentication failed with code: ${code}: ${message}`); 471 } 472 473 const sig = await this.walletClient.signMessage({ 474 account: this.keycard, 475 message: { 476 raw: authResponse.payload, 477 }, 478 }); 479 480 await authRequestFunction({ 481 challengeSolutionRequest: { 482 signature: { raw: hexToBytes(sig) }, 483 }, 484 }); 485 486 // Mark as authenticated 487 this.#isAuthenticated = true; 488 resolve(true); // Resolve the promise we created 489 return true; 490 } catch (error) { 491 // Mark as not authenticated 492 this.#isAuthenticated = false; 493 // Reset the main promise *only if* it's still our promise. 494 // This prevents a late failure from overwriting a newer attempt's promise. 495 if (this.#authenticationPromise === promise) { 496 this.#authenticationPromise = this.#initialAuthPromise; 497 } 498 reject(error); // Reject the promise we created 499 throw error; // Rethrow 500 } 501 } 502 503 connect(onError?: (error: Event) => void): Promise<Event> { 504 if ( 505 !this.connection || 506 this.connection.readyState === WebSocket.CLOSING || 507 this.connection.readyState === WebSocket.CLOSED 508 ) { 509 this.connection = new WebSocket(this.relayEndpoint.url + "/sessions"); 510 511 // Reset authentication state when creating a new connection 512 this.#isAuthenticated = false; 513 this.#authenticationPromise = this.#initialAuthPromise; // Use the initial promise instance 514 515 this.connection.addEventListener( 516 "error", 517 onError ? onError : (errEv: Event) => { 518 assert(errEv instanceof ErrorEvent, "error event is required"); 519 const error = new Error(errEv.message); 520 error.name = errEv.error?.name ?? "WebSocketError"; 521 logger.error`WebSocket error! ${error}`; 522 }, 523 ); 524 525 this.connection.addEventListener( 526 "close", 527 (ev: CloseEvent) => { 528 if (!ev.wasClean) { 529 logger.warn`WebSocket closed uncleanly`; 530 } 531 this.#isAuthenticated = false; 532 this.#authenticationPromise = this.#initialAuthPromise; 533 this.connection = null; 534 }, 535 ); 536 537 let messageDecoding = Promise.resolve(); 538 this.connection.addEventListener( 539 "message", 540 (data) => { 541 // Here we wait to decode the message until the previous message is fully processed 542 messageDecoding = messageDecoding.then(() => 543 this.#decodeMessage(data) 544 ); 545 }, 546 ); 547 } 548 return new Promise((resolve) => { 549 if (this.connection!.readyState === WebSocket.OPEN) { 550 resolve(new Event("already open")); 551 } else { 552 this.connection!.addEventListener("open", (evt: Event) => { 553 logger.debug`WebSocket opened`; 554 // TODO: unbox event to concrete values 555 resolve(evt); 556 }); 557 } 558 }); 559 } 560 561 disconnect(): Promise<CloseEvent | string> { 562 return new Promise((resolve) => { 563 if ( 564 !this.connection || // Check if connection exists before accessing readyState 565 this.connection.readyState === WebSocket.CLOSED 566 ) { 567 resolve("already closed"); 568 return; 569 } 570 // Add listener before closing 571 this.connection.addEventListener("close", (event) => { 572 // Reset authentication state on disconnect 573 this.#isAuthenticated = false; 574 this.#authenticationPromise = this.#initialAuthPromise; // Use the initial promise instance 575 resolve(event); // Resolve with the close event 576 }); 577 // Handle cases where close might happen before listener is added (less likely but possible) 578 if ( 579 this.connection.readyState === WebSocket.CLOSING || 580 this.connection.readyState === WebSocket.CLOSED 581 ) { 582 // If already closing/closed after check but before listener, resolve immediately after resetting state 583 this.#isAuthenticated = false; 584 this.#authenticationPromise = this.#initialAuthPromise; 585 resolve("closed before listener attached"); 586 return; 587 } 588 589 this.connection.close(1000); 590 // State reset now happens in the 'close' event listener 591 }); 592 } 593 594 async enrollKeycard( 595 wallet: WalletClient, 596 account: Hex | Account, 597 isGuest: boolean = true, 598 location?: URL, 599 ) { 600 const parsedAccount = parseAccount(account); 601 const address = parsedAccount.address; 602 const publicKey = await getAccountPublicKey( 603 this.walletClient, 604 this.keycard, 605 ); 606 const endpointURL = new URL(this.relayEndpoint.url); 607 endpointURL.protocol = this.relayEndpoint.url.protocol.includes("wss") 608 ? "https" 609 : "http"; 610 endpointURL.pathname += `/enroll_key_card`; 611 endpointURL.search = `guest=${isGuest ? 1 : 0}`; 612 const signInURL: URL = location ?? endpointURL; 613 614 const message = createSiweMessage({ 615 address, 616 chainId: 1, // not really used 617 domain: signInURL.host, 618 nonce: "00000000", 619 uri: signInURL.href, 620 version: "1", 621 resources: [ 622 `mass-relayid:${hexToBigInt(this.relayEndpoint.tokenId)}`, 623 `mass-shopid:${this.shopId}`, 624 `mass-keycard:${publicKey}`, 625 ], 626 }); 627 628 const signature = await wallet.signMessage({ 629 account: parsedAccount, 630 message, 631 }); 632 const body = JSON.stringify({ 633 message, 634 signature: hexToBase64(signature), 635 }); 636 return fetch(endpointURL.href, { 637 method: "POST", 638 body, 639 }); 640 } 641 642 async uploadBlob(blob: FormData) { 643 await this.connect(); 644 await this.authenticate(); 645 const envelope = await this.encodeAndSend({ 646 getBlobUploadUrlRequest: {}, 647 }); 648 assert(envelope.response, "envelope.response is required"); 649 650 if (envelope.response.error) { 651 const { code, message } = envelope.response.error; 652 throw new Error( 653 `Failed to get blob upload URL - code: ${code} message: ${message}`, 654 ); 655 } 656 assert(envelope.response?.payload, "envelope.response.payload is required"); 657 658 const url = decodeBufferToString(envelope.response.payload); 659 const uploadResp = await fetch(url, { 660 method: "POST", 661 body: blob, 662 }); 663 if (uploadResp.status !== 201) { 664 throw new Error( 665 `unexpected status: ${uploadResp.statusText} (${uploadResp.status})`, 666 ); 667 } 668 return uploadResp.json(); 669 } 670 } 671 672 /** 673 * Get the public key of an account. We have to sign then recover the public key, 674 * because the eth rpc doesn't have a method to get the public key of an account. 675 */ 676 async function getAccountPublicKey( 677 wallet: WalletClient, 678 account: Hex | Account, 679 ): Promise<string> { 680 const signature = await wallet.signMessage({ 681 account, 682 message: "", 683 }); 684 const hash = hashMessage(""); 685 const publicKey = await recoverPublicKey({ signature, hash }); 686 return ProjectivePoint.fromHex(publicKey.slice(2)).toHex(); 687 } 688 689 // testing helper 690 export async function discoverRelay(url: string): Promise<IRelayEndpoint> { 691 const discoveryURL = url 692 .replace("ws", "http") 693 .replace("/v4", "/testing/discovery"); 694 const testingResponse = await fetch(discoveryURL); 695 const testingData = await testingResponse.json(); 696 return { 697 url: new URL(url), 698 tokenId: testingData.relay_token_id, 699 }; 700 }