/ src / server / dctrlDb.js
dctrlDb.js
  1  import Kefir from 'kefir'
  2  import dbengine from 'better-sqlite3'
  3  import chalk from 'chalk'
  4  import cryptoUtils from '../crypto.js'
  5  import { config } from './configParser.js'
  6  
  7  let PORT = process.env.PORT || 8003
  8  let dblocation = config.aodir + '/database.sqlite3'
  9  if (parseInt(PORT) !== 8003){
 10      dblocation = dblocation.replace('database', PORT)
 11  }
 12  if (process.env.DATABASE){
 13      dblocation = process.env.DATABASE
 14  }
 15  
 16  const preparedStmts = {};
 17  var conn, eventEmitter, shadowEmitter
 18  
 19  const changeFeed = Kefir.stream(e => {
 20    eventEmitter = e
 21  })
 22  
 23  const shadowFeed = Kefir.stream(e => {
 24    shadowEmitter = e
 25  })
 26  
 27  function triggerShadow(x){
 28      shadowEmitter.emit(x)
 29  }
 30  
 31  function initializeSqlite(cb) {
 32    console.log('initializing new sqlite3');
 33    var err = null;
 34    try {
 35      var initDb = conn.prepare("CREATE TABLE `events` ( `document` BLOB NOT NULL, `timestamp` INTEGER UNIQUE, PRIMARY KEY(`timestamp`) )");
 36      var initBackups = conn.prepare("CREATE TABLE `backups` ( `document` BLOB NOT NULL, `timestamp` INTEGER UNIQUE, PRIMARY KEY(`timestamp`) )");
 37      initDb.run();
 38      initBackups.run();
 39      createStatements()
 40    } catch(actualErr) {
 41      console.log("err from table initialize: ", actualErr);
 42      err = actualErr;
 43    }
 44    if (err) {
 45      cb(err, conn);
 46    } else {
 47      startFeed();
 48      cb(null, conn);
 49    }
 50  }
 51  
 52  function createStatements() {
 53      conn.function('eventFeed', (doc) => {
 54          eventEmitter.emit(JSON.parse(doc))
 55      })
 56      preparedStmts.getAll = conn.prepare('SELECT document FROM events WHERE (timestamp > ?) ORDER BY timestamp')
 57      preparedStmts.insertEvent = conn.prepare("INSERT INTO events VALUES (?, ?)")
 58      preparedStmts.insertBackup = conn.prepare("INSERT INTO backups VALUES (?, ?)")
 59      preparedStmts.recover  = conn.prepare("SELECT document from backups ORDER BY timestamp DESC LIMIT 1")
 60  }
 61  
 62  function recover(callback){
 63      try {
 64        let all = [];
 65        for (const ev of preparedStmts.recover.iterate()) {
 66            all.push(JSON.parse(ev.document))
 67        }
 68        callback(null, all)
 69      } catch(err){
 70        console.log('err caught recover ' + err)
 71      }
 72  }
 73  
 74  function getAll(timestamp, callback) {
 75      try {
 76        let all = [];
 77  
 78        for (const ev of preparedStmts.getAll.iterate(timestamp)) {
 79            all.push(JSON.parse(ev.document))
 80        }
 81        callback(null, all)
 82      } catch(err) {
 83        console.log(err)
 84        console.log('err caught getAll ' + err)
 85      }
 86  }
 87  
 88  function startFeed() {
 89    conn.function('eventFeed', (doc) => {
 90        eventEmitter.emit(JSON.parse(doc))
 91    })
 92    conn.prepare('CREATE TRIGGER updateHook AFTER INSERT ON events BEGIN SELECT eventFeed(NEW.document); END').run()
 93  }
 94  
 95  function insertEvent(ev, callback) {
 96      if (!conn) return callback("No db connection")
 97      if (!ev.timestamp) {
 98          ev.timestamp = Date.now()
 99      }
100      var err = null;
101      var result = null;
102      try{
103        result = preparedStmts.insertEvent.run(JSON.stringify(ev), ev.timestamp);
104      } catch(actualErr) {
105        console.log("insertEvent try", {actualErr, ev}) // this can be reached by nondb errors (ie in mutations) ?
106        err = actualErr
107      }
108      if (callback) {
109         callback(err, result)
110      }
111  }
112  
113  function insertBackup(state, callback) {
114      if (!conn) return callback("No db connection")
115  
116      state.timestamp = Date.now()
117  
118      var err = null;
119      var result = null;
120      try{
121          result = preparedStmts.insertBackup.run(JSON.stringify(state), state.timestamp);
122      } catch(actualErr) {
123          err = actualErr;
124      }
125      if (callback) return callback(err, result);
126  }
127  
128  function startDb(callback){
129      conn = dbengine(dblocation, { });
130      var checkTable = conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='events'");
131      console.log('Database at ', chalk.bold.red(dblocation))
132      if(checkTable.all().length == 0){
133         initializeSqlite(callback);
134      } else {
135         createStatements();
136         callback(null, conn);
137      }
138  }
139  
140  function getConn(){
141      return conn
142  }
143  
144  export default {
145    conn:conn,
146    startDb,
147    getAll,
148    changeFeed,
149    shadowFeed,
150    triggerShadow,
151    insertEvent,
152    insertBackup,
153    getConn,
154    recover,
155  }