/ lib.ts
lib.ts
1 import * as Dialog from "@dialog-db/query/interface"; 2 import { Constant, Link, Task, type Predicate, type FactSchema } from "@dialog-db/query"; 3 import { entity } from "@dialog-db/query/source/memory"; 4 const { Bytes } = Constant; 5 6 export { Task }; 7 8 /** 9 * Subscriber is a function to be called with query results when some facts 10 * are asserted or retracted. 11 */ 12 export type Subscriber<Fact> = (facts: Fact[]) => unknown; 13 14 /** 15 * Represents a query subscription that can be polled or cancelled. 16 */ 17 export interface Subscription { 18 readonly cancelled: boolean; 19 cancel(): void; 20 poll(source: Dialog.Querier): Task.Task<{}, Error>; 21 } 22 23 export class Session implements Dialog.Querier { 24 static *open( 25 address?: URL, 26 ): Task.Task<Session, Error> { 27 if (!Deno.Kv) { 28 throw new Error( 29 "Deno.Kv is not available, please pass --unstable-kv flag to the deno process to enable", 30 ); 31 } else { 32 const store = yield* Task.wait(Deno.openKv(address?.href)); 33 return new this(store); 34 } 35 } 36 37 #subscriptions = new Set<Subscription>(); 38 39 constructor(public store: Deno.Kv) { 40 } 41 42 select(selector: Dialog.FactsSelector) { 43 return select(this, selector); 44 } 45 46 /** 47 * Takes changes and transacts them atomically into this database. 48 */ 49 transact(changes: Changes): Task.Invocation<Revision, Error> { 50 return Task.perform(transact(this, changes)); 51 } 52 53 /** 54 * Subscribes to the provided query & calls `subscriber` every time changes 55 * are transacted in this session allowing subscribes to react to changes. 56 */ 57 subscribe<Fact>( 58 query: Predicate<Fact, string, FactSchema>, 59 subscriber: Subscriber<Fact> 60 ): Subscription { 61 const subscription = new QuerySubscription(query, subscriber); 62 this.#subscriptions.add(subscription); 63 return subscription; 64 } 65 66 close(): Dialog.Task<Dialog.Unit, Error> { 67 this.#subscriptions.clear(); 68 return close(this); 69 } 70 } 71 72 /** 73 * Represents a database revision using via IPLD link formatted as string. 74 */ 75 export interface Revision { 76 toString(): string; 77 } 78 79 export type Assertion = Dialog.Assertion; 80 /** 81 * Change that retracts set of facts, which is usually a set corresponding to 82 * one relation model. 83 */ 84 export interface Retraction extends Iterable<{ retract: Dialog.Fact }> {} 85 86 /** 87 * Change is either assertion or a rtercation. 88 */ 89 export type Change = Assertion | Retraction; 90 91 /** 92 * Changes are set of changes that can be transacted atomically. 93 */ 94 export interface Changes extends Iterable<Change> {} 95 96 type DataSource = { store: Deno.Kv }; 97 98 export const open = ( 99 address?: URL, 100 ) => Task.perform(Session.open(address)); 101 102 /** 103 * Open a database connection using DATABASE_URL environment variable if set, 104 * otherwise falls back to the default behavior 105 */ 106 export const openWithEnv = () => { 107 // Get DATABASE_URL from environment (supporting both system env and .env file) 108 const getDatabaseUrl = () => { 109 try { 110 // Try system environment first 111 const systemUrl = Deno.env.get("DATABASE_URL"); 112 if (systemUrl) return systemUrl; 113 114 // Fall back to .env file loaded values 115 const envFileUrl = (globalThis as any).__loaded_env?.["DATABASE_URL"]; 116 if (envFileUrl) return envFileUrl; 117 118 return undefined; 119 } catch { 120 return undefined; 121 } 122 }; 123 124 const databaseUrl = getDatabaseUrl(); 125 126 if (databaseUrl) { 127 console.log(`🗄️ Connecting to database: ${databaseUrl}`); 128 try { 129 const url = new URL(databaseUrl); 130 return Task.perform(Session.open(url)); 131 } catch (error) { 132 console.warn(`⚠️ Invalid DATABASE_URL format: ${databaseUrl}. Using default connection.`); 133 return Task.perform(Session.open()); 134 } 135 } else { 136 console.log("🗄️ Using default database connection (no DATABASE_URL set)"); 137 return Task.perform(Session.open()); 138 } 139 }; 140 141 export const select = ( 142 source: DataSource, 143 selector: Dialog.FactsSelector, 144 ) => Search.from(selector).select(source.store); 145 146 export const transact = function* ( 147 source: DataSource, 148 changes: Changes, 149 ): Task.Task<Revision, Error> { 150 const time = Time.now(); 151 152 const cause = time; 153 const mutation = source.store.atomic(); 154 155 for (const change of instructions(changes)) { 156 if (change.assert) { 157 Fact.from({ ...change.assert, cause }).assert(mutation); 158 } else if (change.retract) { 159 Fact.from({ ...change.retract, cause }).retract(mutation); 160 } 161 } 162 163 const result = yield* Task.wait(mutation.commit()); 164 if (result.ok) { 165 return Link.of(time); 166 } else { 167 return yield* Task.fail( 168 Object.assign(new Error("Transaction failed"), { cause: result }), 169 ); 170 } 171 }; 172 173 export const close = function* ( 174 source: DataSource, 175 ): Task.Task<Dialog.Unit, Error> { 176 yield* Task.wait(source.store.close()); 177 return {}; 178 }; 179 180 type FactJSON = { 181 the: Dialog.The; 182 of: { "/": string }; 183 is: Dialog.Scalar; 184 cause: number; 185 }; 186 187 type FactModel = { 188 the: Dialog.The; 189 of: Dialog.Entity; 190 is: Dialog.Scalar; 191 cause: number; 192 }; 193 194 class Fact { 195 static toJSON({ the, of, is, cause }: FactModel): FactJSON { 196 return { 197 the, 198 of: Link.toJSON(of), 199 is: Constant.toJSON(is) as Dialog.Scalar, 200 cause, 201 }; 202 } 203 204 static fromJSON({ the, of, is, cause }: FactJSON): FactModel { 205 return { 206 the, 207 of: Link.fromJSON(of), 208 is: decodeScalar(is), 209 cause, 210 }; 211 } 212 213 static from(fact: FactModel) { 214 return new Fact(fact); 215 } 216 217 entity: string; 218 attribute: Attribute; 219 value: string; 220 json: FactJSON; 221 222 constructor(public fact: FactModel) { 223 this.entity = fact.of.toString(); 224 this.attribute = parseAttribute(fact.the); 225 this.value = Link.of(this.fact.is).toString(); 226 227 this.json = Fact.toJSON(fact); 228 } 229 get cause() { 230 return this.fact.cause; 231 } 232 233 retract(operation: Deno.AtomicOperation) { 234 operation.delete([ 235 Index.EAVT, 236 this.entity, 237 this.attribute.namespace, 238 this.attribute.name, 239 this.value, 240 ]); 241 242 operation.delete([ 243 Index.AEVT, 244 this.attribute.namespace, 245 this.attribute.name, 246 this.entity, 247 this.value, 248 ]); 249 250 operation.delete([ 251 Index.VAET, 252 this.value, 253 this.attribute.namespace, 254 this.attribute.name, 255 this.entity, 256 ]); 257 } 258 assert(operation: Deno.AtomicOperation) { 259 operation.set([ 260 Index.EAVT, 261 this.entity, 262 this.attribute.namespace, 263 this.attribute.name, 264 this.value, 265 ], this.json); 266 267 operation.set([ 268 Index.AEVT, 269 this.attribute.namespace, 270 this.attribute.name, 271 this.entity, 272 this.value, 273 ], this.json); 274 275 operation.set([ 276 Index.VAET, 277 this.value, 278 this.attribute.namespace, 279 this.attribute.name, 280 this.entity, 281 ], this.json); 282 283 // operation.set([ 284 // Index.TEAV, 285 // this.cause, 286 // this.entity, 287 // this.attribute.namespace, 288 // this.attribute.name, 289 // this.value, 290 // ], this.json); 291 } 292 } 293 294 const decodeScalar = ( 295 scalar?: Dialog.Scalar, 296 ): Dialog.Scalar => { 297 const source = scalar as 298 | undefined 299 | Record<string, undefined | Record<string, unknown>>; 300 301 if (typeof source?.["/"]?.["bytes"] === "string") { 302 return Bytes.fromJSON(source as Constant.Bytes.JSON); 303 } else if (typeof source?.["/"] === "string") { 304 return Link.fromJSON({ "/": source["/"] }); 305 } else { 306 return scalar as Dialog.Scalar; 307 } 308 }; 309 310 enum Index { 311 EAVT = 1, 312 AEVT = 2, 313 VAET = 3, 314 TEAV = 4, 315 } 316 317 type FactFilter = { 318 index: Index; 319 entity?: string; 320 attribute?: Attribute; 321 value?: string; 322 }; 323 324 const toPrefix = (filter: FactFilter): Deno.KvKey => { 325 switch (filter.index) { 326 case Index.EAVT: 327 return toPrefixByEntity(filter); 328 case Index.AEVT: 329 return toPrefixByAttribute(filter); 330 case Index.VAET: 331 return toPrefixByValue(filter); 332 default: 333 throw new Error(`Unsupported index: ${filter.index}`); 334 } 335 }; 336 337 const toPrefixByEntity = ( 338 { index, entity, attribute, value }: FactFilter, 339 ): Deno.KvKey => { 340 const prefix = [index, entity!]; 341 if (attribute) { 342 prefix.push(attribute.namespace); 343 prefix.push(attribute.name); 344 345 if (value !== undefined) { 346 prefix.push(value); 347 } 348 } 349 350 return prefix; 351 }; 352 353 const toPrefixByAttribute = ( 354 { index, entity, attribute, value }: FactFilter, 355 ): Deno.KvKey => { 356 const prefix = [index, attribute!.namespace, attribute!.name]; 357 if (entity !== undefined) { 358 prefix.push(entity); 359 if (value !== undefined) { 360 prefix.push(value); 361 } 362 } 363 364 return prefix; 365 }; 366 367 const toPrefixByValue = ( 368 { index, entity, attribute, value }: FactFilter, 369 ): Deno.KvKey => { 370 const prefix = [index, value!]; 371 if (attribute) { 372 prefix.push(attribute.namespace); 373 prefix.push(attribute.name); 374 375 if (entity !== undefined) { 376 prefix.push(entity); 377 } 378 } 379 380 return prefix; 381 }; 382 383 class Search { 384 static from({ the, of, is }: Dialog.FactsSelector) { 385 const filter: FactFilter = { index: Index.EAVT }; 386 387 if (of) { 388 filter.index = Index.EAVT; 389 filter.entity = of.toString(); 390 } else if (is !== undefined) { 391 filter.index = Index.AEVT; 392 } else if (the !== undefined) { 393 filter.index = Index.AEVT; 394 } else { 395 filter.index = Index.EAVT; 396 } 397 398 if (the) { 399 filter.attribute = parseAttribute(the); 400 } 401 402 if (is !== undefined) { 403 filter.value = Link.of(is).toString(); 404 } 405 406 return new this(filter); 407 } 408 409 prefix: Deno.KvKey; 410 constructor(public filter: FactFilter) { 411 this.prefix = toPrefix(filter); 412 } 413 414 decodeKey([index, first, second, third, fourth, fifth]: Deno.KvKey) { 415 switch (index) { 416 case Index.EAVT: 417 return { 418 index, 419 entity: first, 420 namespace: second, 421 name: third, 422 value: fourth, 423 }; 424 case Index.AEVT: 425 return { 426 index, 427 entity: third, 428 namespace: first, 429 name: second, 430 value: fourth, 431 }; 432 case Index.VAET: 433 return { 434 index, 435 entity: fourth, 436 namespace: second, 437 name: third, 438 value: first, 439 }; 440 case Index.TEAV: 441 return { 442 index, 443 entity: second, 444 namespace: third, 445 name: fourth, 446 value: fifth, 447 }; 448 449 default: 450 throw new Error(`Unknown index ${String(index)}`); 451 } 452 } 453 454 match(key: Deno.KvKey) { 455 const { index, entity, namespace, name, value } = this.decodeKey(key); 456 const { filter } = this; 457 if (index !== filter.index) { 458 return false; 459 } 460 461 if (filter.entity !== undefined && filter.entity !== entity) { 462 return false; 463 } 464 465 if (filter.attribute) { 466 if (namespace !== filter.attribute.namespace) { 467 return false; 468 } 469 if (name !== filter.attribute.name) { 470 return false; 471 } 472 } 473 474 if (filter.value !== undefined && filter.value !== value) { 475 return false; 476 } 477 478 return true; 479 } 480 481 *select(store: Deno.Kv): Task.Task<Dialog.Datum[], Error> { 482 const entries = store.list<FactJSON>({ prefix: this.prefix }); 483 const facts = []; 484 while (true) { 485 const next = yield* Task.wait(entries.next()); 486 if (next.done) { 487 break; 488 } else if (this.match(next.value.key)) { 489 facts.push(Fact.fromJSON(next.value.value)); 490 } 491 } 492 493 return facts as unknown as Dialog.Datum[]; 494 } 495 } 496 497 type Attribute = { 498 namespace: string; 499 name: string; 500 }; 501 502 class Time { 503 static since: number = 0; 504 static now() { 505 let now = Date.now(); 506 if (this.since >= now) { 507 now++; 508 } 509 this.since = now; 510 return now; 511 } 512 } 513 514 const parseAttribute = (the: string): Attribute => { 515 const delimiter = the.indexOf("/"); 516 return delimiter >= 0 517 ? { 518 namespace: the.slice(0, delimiter), 519 name: the.slice(delimiter + 1), 520 } 521 : { 522 namespace: "", 523 name: the, 524 }; 525 }; 526 527 function* instructions( 528 changes: Iterable<Assertion | Retraction>, 529 ): Iterable<Dialog.Instruction> { 530 for (const change of changes) { 531 yield* change; 532 } 533 } 534 535 class QuerySubscription<Fact> implements Subscription { 536 #cancelled = false; 537 constructor( 538 public predicate: Predicate<Fact, string, FactSchema>, 539 public subscriber: (facts: Fact[]) => unknown 540 ) { 541 this.cancel = this.cancel.bind(this); 542 } 543 544 get cancelled() { 545 return this.#cancelled; 546 } 547 548 *poll(source: Dialog.Querier) { 549 if (!this.#cancelled) { 550 const facts = yield* this.predicate.query({ from: source }); 551 this.subscriber(facts); 552 } 553 return {}; 554 } 555 556 cancel() { 557 this.#cancelled = true; 558 } 559 }