mod.ts
  1  import { assert } from "@std/assert";
  2  
  3  import { DAG, type RootValue } from "@massmarket/merkle-dag-builder";
  4  import type { AbstractStore } from "@massmarket/store";
  5  import EventTree from "@massmarket/eventTree";
  6  import type { Patch, PushedPatchSet, RelayClient } from "@massmarket/client";
  7  import { type codec, get, type Hash, set } from "@massmarket/utils";
  8  import { BaseClass } from "@massmarket/schema/utils";
  9  
 10  type HashOrValue = Hash | codec.CodecValue;
 11  
 12  interface IStoredState {
 13    // holds a map of subscription paths to sequence numbers
 14    // example: { "/accounts/": 1000, "/orders/": 12222  }
 15    // This should actually be a radix tree. Since if we subscribe to the root path that sequence number will overwrite the children paths,
 16    // so for example if a subscription to the root path is made the map will be
 17    // {"/": 1000}
 18    // where the sequence number is the lowest sequence number of all children paths
 19    // TODO: we are currently using a string, but need to use CodecKey[]
 20    subscriptionSequenceNumber: number;
 21    keycardNonce: number;
 22    root: Promise<codec.CodecValue>;
 23  }
 24  
 25  export default class StateManager {
 26    readonly events = new EventTree<codec.CodecValue>(new Map());
 27    readonly graph: DAG;
 28    client?: RelayClient;
 29    readonly id: bigint;
 30    #streamsWriters: Set<WritableStreamDefaultWriter<Patch[]>> = new Set();
 31    // very simple cache, we always want a reference to the same object
 32    #state?: IStoredState;
 33    #defaultState: RootValue;
 34    constructor(
 35      params: {
 36        store: AbstractStore;
 37        id: bigint;
 38        defaultState?: RootValue;
 39      },
 40    ) {
 41      this.id = params.id;
 42      this.graph = new DAG(params.store);
 43      this.#defaultState = params?.defaultState ?? new Map();
 44    }
 45  
 46    get root(): RootValue {
 47      assert(this.#state, "open not finished");
 48      return this.#state.root;
 49    }
 50  
 51    async open() {
 52      const storedState = await this.graph.store.objStore.get(this.id);
 53      const restored: IStoredState = storedState instanceof Map
 54        ? Object.fromEntries(storedState)
 55        : {
 56          subscriptionTrees: new Map(),
 57          keycardNonce: 0,
 58          root: this.#defaultState,
 59          // TODO: add class for shop at some point
 60          // root: v.getDefaults(this.params.schema) as CborValue,
 61        };
 62      this.#state = restored;
 63    }
 64  
 65    async close() {
 66      const state = this.#state;
 67      this.#state = undefined;
 68      assert(state, "open not finished");
 69      let clientClosing = Promise.resolve<unknown>(undefined);
 70      if (this.client) {
 71        state.keycardNonce = this.client.keyCardNonce;
 72        clientClosing = this.client.disconnect();
 73      }
 74      // wait for root to be resolved
 75      const realState = {
 76        ...state,
 77        root: await state.root,
 78      };
 79      return Promise.all([
 80        clientClosing,
 81        this.graph.store.objStore.set(
 82          this.id,
 83          new Map(Object.entries(realState)),
 84        ),
 85      ]);
 86    }
 87  
 88    createWriteStream() {
 89      const state = this.#state;
 90      assert(state, "open not finished");
 91      return new WritableStream<PushedPatchSet>({
 92        write: async (patchSet) => {
 93          // TODO: validate the Operation's schema
 94          // const _validityRange = await this.graph.get(state.root, [
 95          //   "Account",
 96          //   patchSet.signer,
 97          // ]) as Map<string, string>;
 98  
 99          // TODO: Validate keycard for a given time range
100          //   throw new Error("Invalid keycard");
101          for (const patch of patchSet.patches) {
102            // TODO validate the Operation's value if any
103            if (patch.Op === "add") {
104              state.root = this.graph.add(
105                state.root,
106                patch.Path,
107                patch.Value,
108              );
109            } else if (patch.Op === "replace") {
110              state.root = this.graph.set(
111                state.root,
112                patch.Path,
113                patch.Value,
114              );
115            } else if (patch.Op === "append") {
116              state.root = this.graph.append(
117                state.root,
118                patch.Path,
119                patch.Value,
120              );
121            } else if (patch.Op === "remove") {
122              state.root = this.graph.remove(
123                state.root,
124                patch.Path,
125              );
126            } else if (patch.Op === "increment") {
127              state.root = this.graph.addNumber(
128                state.root,
129                patch.Path,
130                patch.Value,
131              );
132            } else if (patch.Op === "decrement") {
133              state.root = this.graph.addNumber(
134                state.root,
135                patch.Path,
136                -patch.Value,
137              );
138            } else {
139              throw new Error(`Unimplemented operation type: ${patch.Op}`);
140            }
141          }
142  
143          state.subscriptionSequenceNumber = patchSet.sequence;
144          // we want to wait to resolve the promise before emitting the new state
145          const realState = await state.root;
146          this.events.emit(realState);
147          // TODO: check stateroot
148          // TODO: we are saving the patches here
149          // incase we want to replay the log, but we have no way to get them out
150          // this.graph.store.objStore.append(
151          //   [patchSet.signer, "patches"],
152          //   patchSet.patches,
153          // );
154        },
155      });
156    }
157  
158    #addClientsWriteStream(client: RelayClient) {
159      const remoteWritable = client.createWriteStream();
160      const writer = remoteWritable.getWriter();
161      writer.closed.catch((_error) => {
162        // is this an error we can recover from?
163        // if so do the following
164        this.#streamsWriters.delete(writer);
165        this.#addClientsWriteStream(client);
166      });
167      this.#streamsWriters.add(writer);
168    }
169  
170    addConnection(client: RelayClient) {
171      assert(this.#state, "open not finished");
172      client.keyCardNonce = this.#state.keycardNonce;
173      this.client = client;
174      // TODO:  implement dynamic subscriptions
175      // currently we subscribe to the root when any event is subscribed to
176      const remoteReadable = client.createSubscriptionStream(
177        this.#state.subscriptionSequenceNumber,
178      );
179      const ourWritable = this.createWriteStream();
180      this.#addClientsWriteStream(client);
181      const connection = remoteReadable.pipeTo(ourWritable);
182      return { connection };
183    }
184  
185    #sendPatch(patch: Patch) {
186      // send patch to peers
187      return Promise.all(
188        [...this.#streamsWriters].map((writer) => writer.write([patch])),
189      );
190    }
191  
192    // TODO: these need to be implemented in createWriteStream first
193    // async increment(path: codec.Path, value: codec.CodecValue) {
194    //   const state = this.#state;
195    //   assert(state, "open not finished");
196    //   await this.#sendPatch({ Op: "increment", Path: path, Value: value });
197    //   state.root = await this.graph.set(state.root, path, value);
198    //   this.events.emit(state.root);
199    // }
200  
201    // async decrement(path: codec.Path, value: codec.CodecValue) {
202    //   const state = this.#state;
203    //   assert(state, "open not finished");
204    //   await this.#sendPatch({ Op: "decrement", Path: path, Value: value });
205    //   state.root = await this.graph.set(state.root, path, value);
206    //   this.events.emit(state.root);
207    // }
208  
209    async set(path: codec.Path, value: codec.CodecValue | BaseClass) {
210      if (BaseClass.isBaseClass(value)) {
211        value = value.asCBORMap() as codec.CodecValue;
212      }
213      const state = this.#state;
214      assert(state, "open not finished");
215      const oldStateRoot = state.root;
216      state.root = this.graph.set(state.root, path, async (parent, p) => {
217        const v = get(parent, p);
218        set(parent, p, value);
219        const op = v === undefined ? "add" : "replace";
220        await this.#sendPatch(
221          { Op: op, Path: path, Value: value },
222        );
223      });
224      try {
225        // TODO (@nullradix 2025-04-28) If we revert the state
226        // here, we would potentaily be reverting multiple operations
227        // for example, not awaiting here
228        // dag.set(path, bad-value)
229        // dag.set(path, good-value)
230        // would lead to the both sets failing
231        // we could take care of this in the merkle-dag builder by catching
232        // the error there (??)
233        const r = await state.root;
234        this.events.emit(r);
235      } catch (e) {
236        state.root = oldStateRoot;
237        throw e;
238      }
239    }
240  
241    get(
242      path: codec.Path,
243    ): Promise<codec.CodecValue | undefined> {
244      // wait for any pending writes to complete
245      const state = this.#state;
246      assert(state, "open not finished");
247      return this.graph.get(state.root, path);
248    }
249  }