/ lib.ts
lib.ts
  1  import * as Dialog from "@dialog-db/query/interface";
  2  import { Constant, Link, Task, type Predicate, type FactSchema } from "@dialog-db/query";
  3  import { entity } from "@dialog-db/query/source/memory";
  4  const { Bytes } = Constant;
  5  
  6  export { Task };
  7  
  8  /**
  9   * Subscriber is a function to be called with query results when some facts
 10   * are asserted or retracted.
 11   */
 12  export type Subscriber<Fact> = (facts: Fact[]) => unknown;
 13  
 14  /**
 15   * Represents a query subscription that can be polled or cancelled.
 16   */
 17  export interface Subscription {
 18    readonly cancelled: boolean;
 19    cancel(): void;
 20    poll(source: Dialog.Querier): Task.Task<{}, Error>;
 21  }
 22  
 23  export class Session implements Dialog.Querier {
 24    static *open(
 25      address?: URL,
 26    ): Task.Task<Session, Error> {
 27      if (!Deno.Kv) {
 28        throw new Error(
 29          "Deno.Kv is not available, please pass --unstable-kv flag to the deno process to enable",
 30        );
 31      } else {
 32        const store = yield* Task.wait(Deno.openKv(address?.href));
 33        return new this(store);
 34      }
 35    }
 36  
 37    #subscriptions = new Set<Subscription>();
 38    
 39    constructor(public store: Deno.Kv) {
 40    }
 41  
 42    select(selector: Dialog.FactsSelector) {
 43      return select(this, selector);
 44    }
 45  
 46    /**
 47     * Takes changes and transacts them atomically into this database.
 48     */
 49    transact(changes: Changes): Task.Invocation<Revision, Error> {
 50      return Task.perform(transact(this, changes));
 51    }
 52  
 53    /**
 54     * Subscribes to the provided query & calls `subscriber` every time changes
 55     * are transacted in this session allowing subscribes to react to changes.
 56     */
 57    subscribe<Fact>(
 58      query: Predicate<Fact, string, FactSchema>,
 59      subscriber: Subscriber<Fact>
 60    ): Subscription {
 61      const subscription = new QuerySubscription(query, subscriber);
 62      this.#subscriptions.add(subscription);
 63      return subscription;
 64    }
 65  
 66    close(): Dialog.Task<Dialog.Unit, Error> {
 67      this.#subscriptions.clear();
 68      return close(this);
 69    }
 70  }
 71  
 72  /**
 73   * Represents a database revision using via IPLD link formatted as string.
 74   */
 75  export interface Revision {
 76    toString(): string;
 77  }
 78  
 79  export type Assertion = Dialog.Assertion;
 80  /**
 81   * Change that retracts set of facts, which is usually a set corresponding to
 82   * one relation model.
 83   */
 84  export interface Retraction extends Iterable<{ retract: Dialog.Fact }> {}
 85  
 86  /**
 87   * Change is either assertion or a rtercation.
 88   */
 89  export type Change = Assertion | Retraction;
 90  
 91  /**
 92   * Changes are set of changes that can be transacted atomically.
 93   */
 94  export interface Changes extends Iterable<Change> {}
 95  
 96  type DataSource = { store: Deno.Kv };
 97  
 98  export const open = (
 99    address?: URL,
100  ) => Task.perform(Session.open(address));
101  
102  /**
103   * Open a database connection using DATABASE_URL environment variable if set,
104   * otherwise falls back to the default behavior
105   */
106  export const openWithEnv = () => {
107    // Get DATABASE_URL from environment (supporting both system env and .env file)
108    const getDatabaseUrl = () => {
109      try {
110        // Try system environment first
111        const systemUrl = Deno.env.get("DATABASE_URL");
112        if (systemUrl) return systemUrl;
113        
114        // Fall back to .env file loaded values
115        const envFileUrl = (globalThis as any).__loaded_env?.["DATABASE_URL"];
116        if (envFileUrl) return envFileUrl;
117        
118        return undefined;
119      } catch {
120        return undefined;
121      }
122    };
123  
124    const databaseUrl = getDatabaseUrl();
125    
126    if (databaseUrl) {
127      console.log(`🗄️ Connecting to database: ${databaseUrl}`);
128      try {
129        const url = new URL(databaseUrl);
130        return Task.perform(Session.open(url));
131      } catch (error) {
132        console.warn(`⚠️ Invalid DATABASE_URL format: ${databaseUrl}. Using default connection.`);
133        return Task.perform(Session.open());
134      }
135    } else {
136      console.log("🗄️ Using default database connection (no DATABASE_URL set)");
137      return Task.perform(Session.open());
138    }
139  };
140  
141  export const select = (
142    source: DataSource,
143    selector: Dialog.FactsSelector,
144  ) => Search.from(selector).select(source.store);
145  
146  export const transact = function* (
147    source: DataSource,
148    changes: Changes,
149  ): Task.Task<Revision, Error> {
150    const time = Time.now();
151  
152    const cause = time;
153    const mutation = source.store.atomic();
154  
155    for (const change of instructions(changes)) {
156      if (change.assert) {
157        Fact.from({ ...change.assert, cause }).assert(mutation);
158      } else if (change.retract) {
159        Fact.from({ ...change.retract, cause }).retract(mutation);
160      }
161    }
162  
163    const result = yield* Task.wait(mutation.commit());
164    if (result.ok) {
165      return Link.of(time);
166    } else {
167      return yield* Task.fail(
168        Object.assign(new Error("Transaction failed"), { cause: result }),
169      );
170    }
171  };
172  
173  export const close = function* (
174    source: DataSource,
175  ): Task.Task<Dialog.Unit, Error> {
176    yield* Task.wait(source.store.close());
177    return {};
178  };
179  
180  type FactJSON = {
181    the: Dialog.The;
182    of: { "/": string };
183    is: Dialog.Scalar;
184    cause: number;
185  };
186  
187  type FactModel = {
188    the: Dialog.The;
189    of: Dialog.Entity;
190    is: Dialog.Scalar;
191    cause: number;
192  };
193  
194  class Fact {
195    static toJSON({ the, of, is, cause }: FactModel): FactJSON {
196      return {
197        the,
198        of: Link.toJSON(of),
199        is: Constant.toJSON(is) as Dialog.Scalar,
200        cause,
201      };
202    }
203  
204    static fromJSON({ the, of, is, cause }: FactJSON): FactModel {
205      return {
206        the,
207        of: Link.fromJSON(of),
208        is: decodeScalar(is),
209        cause,
210      };
211    }
212  
213    static from(fact: FactModel) {
214      return new Fact(fact);
215    }
216  
217    entity: string;
218    attribute: Attribute;
219    value: string;
220    json: FactJSON;
221  
222    constructor(public fact: FactModel) {
223      this.entity = fact.of.toString();
224      this.attribute = parseAttribute(fact.the);
225      this.value = Link.of(this.fact.is).toString();
226  
227      this.json = Fact.toJSON(fact);
228    }
229    get cause() {
230      return this.fact.cause;
231    }
232  
233    retract(operation: Deno.AtomicOperation) {
234      operation.delete([
235        Index.EAVT,
236        this.entity,
237        this.attribute.namespace,
238        this.attribute.name,
239        this.value,
240      ]);
241  
242      operation.delete([
243        Index.AEVT,
244        this.attribute.namespace,
245        this.attribute.name,
246        this.entity,
247        this.value,
248      ]);
249  
250      operation.delete([
251        Index.VAET,
252        this.value,
253        this.attribute.namespace,
254        this.attribute.name,
255        this.entity,
256      ]);
257    }
258    assert(operation: Deno.AtomicOperation) {
259      operation.set([
260        Index.EAVT,
261        this.entity,
262        this.attribute.namespace,
263        this.attribute.name,
264        this.value,
265      ], this.json);
266  
267      operation.set([
268        Index.AEVT,
269        this.attribute.namespace,
270        this.attribute.name,
271        this.entity,
272        this.value,
273      ], this.json);
274  
275      operation.set([
276        Index.VAET,
277        this.value,
278        this.attribute.namespace,
279        this.attribute.name,
280        this.entity,
281      ], this.json);
282  
283      // operation.set([
284      //   Index.TEAV,
285      //   this.cause,
286      //   this.entity,
287      //   this.attribute.namespace,
288      //   this.attribute.name,
289      //   this.value,
290      // ], this.json);
291    }
292  }
293  
294  const decodeScalar = (
295    scalar?: Dialog.Scalar,
296  ): Dialog.Scalar => {
297    const source = scalar as
298      | undefined
299      | Record<string, undefined | Record<string, unknown>>;
300  
301    if (typeof source?.["/"]?.["bytes"] === "string") {
302      return Bytes.fromJSON(source as Constant.Bytes.JSON);
303    } else if (typeof source?.["/"] === "string") {
304      return Link.fromJSON({ "/": source["/"] });
305    } else {
306      return scalar as Dialog.Scalar;
307    }
308  };
309  
310  enum Index {
311    EAVT = 1,
312    AEVT = 2,
313    VAET = 3,
314    TEAV = 4,
315  }
316  
317  type FactFilter = {
318    index: Index;
319    entity?: string;
320    attribute?: Attribute;
321    value?: string;
322  };
323  
324  const toPrefix = (filter: FactFilter): Deno.KvKey => {
325    switch (filter.index) {
326      case Index.EAVT:
327        return toPrefixByEntity(filter);
328      case Index.AEVT:
329        return toPrefixByAttribute(filter);
330      case Index.VAET:
331        return toPrefixByValue(filter);
332      default:
333        throw new Error(`Unsupported index: ${filter.index}`);
334    }
335  };
336  
337  const toPrefixByEntity = (
338    { index, entity, attribute, value }: FactFilter,
339  ): Deno.KvKey => {
340    const prefix = [index, entity!];
341    if (attribute) {
342      prefix.push(attribute.namespace);
343      prefix.push(attribute.name);
344  
345      if (value !== undefined) {
346        prefix.push(value);
347      }
348    }
349  
350    return prefix;
351  };
352  
353  const toPrefixByAttribute = (
354    { index, entity, attribute, value }: FactFilter,
355  ): Deno.KvKey => {
356    const prefix = [index, attribute!.namespace, attribute!.name];
357    if (entity !== undefined) {
358      prefix.push(entity);
359      if (value !== undefined) {
360        prefix.push(value);
361      }
362    }
363  
364    return prefix;
365  };
366  
367  const toPrefixByValue = (
368    { index, entity, attribute, value }: FactFilter,
369  ): Deno.KvKey => {
370    const prefix = [index, value!];
371    if (attribute) {
372      prefix.push(attribute.namespace);
373      prefix.push(attribute.name);
374  
375      if (entity !== undefined) {
376        prefix.push(entity);
377      }
378    }
379  
380    return prefix;
381  };
382  
383  class Search {
384    static from({ the, of, is }: Dialog.FactsSelector) {
385      const filter: FactFilter = { index: Index.EAVT };
386  
387      if (of) {
388        filter.index = Index.EAVT;
389        filter.entity = of.toString();
390      } else if (is !== undefined) {
391        filter.index = Index.AEVT;
392      } else if (the !== undefined) {
393        filter.index = Index.AEVT;
394      } else {
395        filter.index = Index.EAVT;
396      }
397  
398      if (the) {
399        filter.attribute = parseAttribute(the);
400      }
401  
402      if (is !== undefined) {
403        filter.value = Link.of(is).toString();
404      }
405  
406      return new this(filter);
407    }
408  
409    prefix: Deno.KvKey;
410    constructor(public filter: FactFilter) {
411      this.prefix = toPrefix(filter);
412    }
413  
414    decodeKey([index, first, second, third, fourth, fifth]: Deno.KvKey) {
415      switch (index) {
416        case Index.EAVT:
417          return {
418            index,
419            entity: first,
420            namespace: second,
421            name: third,
422            value: fourth,
423          };
424        case Index.AEVT:
425          return {
426            index,
427            entity: third,
428            namespace: first,
429            name: second,
430            value: fourth,
431          };
432        case Index.VAET:
433          return {
434            index,
435            entity: fourth,
436            namespace: second,
437            name: third,
438            value: first,
439          };
440        case Index.TEAV:
441          return {
442            index,
443            entity: second,
444            namespace: third,
445            name: fourth,
446            value: fifth,
447          };
448  
449        default:
450          throw new Error(`Unknown index ${String(index)}`);
451      }
452    }
453  
454    match(key: Deno.KvKey) {
455      const { index, entity, namespace, name, value } = this.decodeKey(key);
456      const { filter } = this;
457      if (index !== filter.index) {
458        return false;
459      }
460  
461      if (filter.entity !== undefined && filter.entity !== entity) {
462        return false;
463      }
464  
465      if (filter.attribute) {
466        if (namespace !== filter.attribute.namespace) {
467          return false;
468        }
469        if (name !== filter.attribute.name) {
470          return false;
471        }
472      }
473  
474      if (filter.value !== undefined && filter.value !== value) {
475        return false;
476      }
477  
478      return true;
479    }
480  
481    *select(store: Deno.Kv): Task.Task<Dialog.Datum[], Error> {
482      const entries = store.list<FactJSON>({ prefix: this.prefix });
483      const facts = [];
484      while (true) {
485        const next = yield* Task.wait(entries.next());
486        if (next.done) {
487          break;
488        } else if (this.match(next.value.key)) {
489          facts.push(Fact.fromJSON(next.value.value));
490        }
491      }
492  
493      return facts as unknown as Dialog.Datum[];
494    }
495  }
496  
497  type Attribute = {
498    namespace: string;
499    name: string;
500  };
501  
502  class Time {
503    static since: number = 0;
504    static now() {
505      let now = Date.now();
506      if (this.since >= now) {
507        now++;
508      }
509      this.since = now;
510      return now;
511    }
512  }
513  
514  const parseAttribute = (the: string): Attribute => {
515    const delimiter = the.indexOf("/");
516    return delimiter >= 0
517      ? {
518        namespace: the.slice(0, delimiter),
519        name: the.slice(delimiter + 1),
520      }
521      : {
522        namespace: "",
523        name: the,
524      };
525  };
526  
527  function* instructions(
528    changes: Iterable<Assertion | Retraction>,
529  ): Iterable<Dialog.Instruction> {
530    for (const change of changes) {
531      yield* change;
532    }
533  }
534  
535  class QuerySubscription<Fact> implements Subscription {
536    #cancelled = false;
537    constructor(
538      public predicate: Predicate<Fact, string, FactSchema>,
539      public subscriber: (facts: Fact[]) => unknown
540    ) {
541      this.cancel = this.cancel.bind(this);
542    }
543  
544    get cancelled() {
545      return this.#cancelled;
546    }
547  
548    *poll(source: Dialog.Querier) {
549      if (!this.#cancelled) {
550        const facts = yield* this.predicate.query({ from: source });
551        this.subscriber(facts);
552      }
553      return {};
554    }
555  
556    cancel() {
557      this.#cancelled = true;
558    }
559  }