mod.ts
1 import { assert } from "@std/assert"; 2 3 import { DAG, type RootValue } from "@massmarket/merkle-dag-builder"; 4 import type { AbstractStore } from "@massmarket/store"; 5 import EventTree from "@massmarket/eventTree"; 6 import type { Patch, PushedPatchSet, RelayClient } from "@massmarket/client"; 7 import { type codec, get, type Hash, set } from "@massmarket/utils"; 8 import { BaseClass } from "@massmarket/schema/utils"; 9 10 type HashOrValue = Hash | codec.CodecValue; 11 12 interface IStoredState { 13 // holds a map of subscription paths to sequence numbers 14 // example: { "/accounts/": 1000, "/orders/": 12222 } 15 // This should actually be a radix tree. Since if we subscribe to the root path that sequence number will overwrite the children paths, 16 // so for example if a subscription to the root path is made the map will be 17 // {"/": 1000} 18 // where the sequence number is the lowest sequence number of all children paths 19 // TODO: we are currently using a string, but need to use CodecKey[] 20 subscriptionSequenceNumber: number; 21 keycardNonce: number; 22 root: Promise<codec.CodecValue>; 23 } 24 25 export default class StateManager { 26 readonly events = new EventTree<codec.CodecValue>(new Map()); 27 readonly graph: DAG; 28 client?: RelayClient; 29 readonly id: bigint; 30 #streamsWriters: Set<WritableStreamDefaultWriter<Patch[]>> = new Set(); 31 // very simple cache, we always want a reference to the same object 32 #state?: IStoredState; 33 #defaultState: RootValue; 34 constructor( 35 params: { 36 store: AbstractStore; 37 id: bigint; 38 defaultState?: RootValue; 39 }, 40 ) { 41 this.id = params.id; 42 this.graph = new DAG(params.store); 43 this.#defaultState = params?.defaultState ?? new Map(); 44 } 45 46 get root(): RootValue { 47 assert(this.#state, "open not finished"); 48 return this.#state.root; 49 } 50 51 async open() { 52 const storedState = await this.graph.store.objStore.get(this.id); 53 const restored: IStoredState = storedState instanceof Map 54 ? Object.fromEntries(storedState) 55 : { 56 subscriptionTrees: new Map(), 57 keycardNonce: 0, 58 root: this.#defaultState, 59 // TODO: add class for shop at some point 60 // root: v.getDefaults(this.params.schema) as CborValue, 61 }; 62 this.#state = restored; 63 } 64 65 async close() { 66 const state = this.#state; 67 this.#state = undefined; 68 assert(state, "open not finished"); 69 let clientClosing = Promise.resolve<unknown>(undefined); 70 if (this.client) { 71 state.keycardNonce = this.client.keyCardNonce; 72 clientClosing = this.client.disconnect(); 73 } 74 // wait for root to be resolved 75 const realState = { 76 ...state, 77 root: await state.root, 78 }; 79 return Promise.all([ 80 clientClosing, 81 this.graph.store.objStore.set( 82 this.id, 83 new Map(Object.entries(realState)), 84 ), 85 ]); 86 } 87 88 createWriteStream() { 89 const state = this.#state; 90 assert(state, "open not finished"); 91 return new WritableStream<PushedPatchSet>({ 92 write: async (patchSet) => { 93 // TODO: validate the Operation's schema 94 // const _validityRange = await this.graph.get(state.root, [ 95 // "Account", 96 // patchSet.signer, 97 // ]) as Map<string, string>; 98 99 // TODO: Validate keycard for a given time range 100 // throw new Error("Invalid keycard"); 101 for (const patch of patchSet.patches) { 102 // TODO validate the Operation's value if any 103 if (patch.Op === "add") { 104 state.root = this.graph.add( 105 state.root, 106 patch.Path, 107 patch.Value, 108 ); 109 } else if (patch.Op === "replace") { 110 state.root = this.graph.set( 111 state.root, 112 patch.Path, 113 patch.Value, 114 ); 115 } else if (patch.Op === "append") { 116 state.root = this.graph.append( 117 state.root, 118 patch.Path, 119 patch.Value, 120 ); 121 } else if (patch.Op === "remove") { 122 state.root = this.graph.remove( 123 state.root, 124 patch.Path, 125 ); 126 } else if (patch.Op === "increment") { 127 state.root = this.graph.addNumber( 128 state.root, 129 patch.Path, 130 patch.Value, 131 ); 132 } else if (patch.Op === "decrement") { 133 state.root = this.graph.addNumber( 134 state.root, 135 patch.Path, 136 -patch.Value, 137 ); 138 } else { 139 throw new Error(`Unimplemented operation type: ${patch.Op}`); 140 } 141 } 142 143 state.subscriptionSequenceNumber = patchSet.sequence; 144 // we want to wait to resolve the promise before emitting the new state 145 const realState = await state.root; 146 this.events.emit(realState); 147 // TODO: check stateroot 148 // TODO: we are saving the patches here 149 // incase we want to replay the log, but we have no way to get them out 150 // this.graph.store.objStore.append( 151 // [patchSet.signer, "patches"], 152 // patchSet.patches, 153 // ); 154 }, 155 }); 156 } 157 158 #addClientsWriteStream(client: RelayClient) { 159 const remoteWritable = client.createWriteStream(); 160 const writer = remoteWritable.getWriter(); 161 writer.closed.catch((_error) => { 162 // is this an error we can recover from? 163 // if so do the following 164 this.#streamsWriters.delete(writer); 165 this.#addClientsWriteStream(client); 166 }); 167 this.#streamsWriters.add(writer); 168 } 169 170 addConnection(client: RelayClient) { 171 assert(this.#state, "open not finished"); 172 client.keyCardNonce = this.#state.keycardNonce; 173 this.client = client; 174 // TODO: implement dynamic subscriptions 175 // currently we subscribe to the root when any event is subscribed to 176 const remoteReadable = client.createSubscriptionStream( 177 this.#state.subscriptionSequenceNumber, 178 ); 179 const ourWritable = this.createWriteStream(); 180 this.#addClientsWriteStream(client); 181 const connection = remoteReadable.pipeTo(ourWritable); 182 return { connection }; 183 } 184 185 #sendPatch(patch: Patch) { 186 // send patch to peers 187 return Promise.all( 188 [...this.#streamsWriters].map((writer) => writer.write([patch])), 189 ); 190 } 191 192 // TODO: these need to be implemented in createWriteStream first 193 // async increment(path: codec.Path, value: codec.CodecValue) { 194 // const state = this.#state; 195 // assert(state, "open not finished"); 196 // await this.#sendPatch({ Op: "increment", Path: path, Value: value }); 197 // state.root = await this.graph.set(state.root, path, value); 198 // this.events.emit(state.root); 199 // } 200 201 // async decrement(path: codec.Path, value: codec.CodecValue) { 202 // const state = this.#state; 203 // assert(state, "open not finished"); 204 // await this.#sendPatch({ Op: "decrement", Path: path, Value: value }); 205 // state.root = await this.graph.set(state.root, path, value); 206 // this.events.emit(state.root); 207 // } 208 209 async set(path: codec.Path, value: codec.CodecValue | BaseClass) { 210 if (BaseClass.isBaseClass(value)) { 211 value = value.asCBORMap() as codec.CodecValue; 212 } 213 const state = this.#state; 214 assert(state, "open not finished"); 215 const oldStateRoot = state.root; 216 state.root = this.graph.set(state.root, path, async (parent, p) => { 217 const v = get(parent, p); 218 set(parent, p, value); 219 const op = v === undefined ? "add" : "replace"; 220 await this.#sendPatch( 221 { Op: op, Path: path, Value: value }, 222 ); 223 }); 224 try { 225 // TODO (@nullradix 2025-04-28) If we revert the state 226 // here, we would potentaily be reverting multiple operations 227 // for example, not awaiting here 228 // dag.set(path, bad-value) 229 // dag.set(path, good-value) 230 // would lead to the both sets failing 231 // we could take care of this in the merkle-dag builder by catching 232 // the error there (??) 233 const r = await state.root; 234 this.events.emit(r); 235 } catch (e) { 236 state.root = oldStateRoot; 237 throw e; 238 } 239 } 240 241 get( 242 path: codec.Path, 243 ): Promise<codec.CodecValue | undefined> { 244 // wait for any pending writes to complete 245 const state = this.#state; 246 assert(state, "open not finished"); 247 return this.graph.get(state.root, path); 248 } 249 }