dctrlDb.js
1 import Kefir from 'kefir' 2 import dbengine from 'better-sqlite3' 3 import chalk from 'chalk' 4 import cryptoUtils from '../crypto.js' 5 import { config } from './configParser.js' 6 7 let PORT = process.env.PORT || 8003 8 let dblocation = config.aodir + '/database.sqlite3' 9 if (parseInt(PORT) !== 8003){ 10 dblocation = dblocation.replace('database', PORT) 11 } 12 if (process.env.DATABASE){ 13 dblocation = process.env.DATABASE 14 } 15 16 const preparedStmts = {}; 17 var conn, eventEmitter, shadowEmitter 18 19 const changeFeed = Kefir.stream(e => { 20 eventEmitter = e 21 }) 22 23 const shadowFeed = Kefir.stream(e => { 24 shadowEmitter = e 25 }) 26 27 function triggerShadow(x){ 28 shadowEmitter.emit(x) 29 } 30 31 function initializeSqlite(cb) { 32 console.log('initializing new sqlite3'); 33 var err = null; 34 try { 35 var initDb = conn.prepare("CREATE TABLE `events` ( `document` BLOB NOT NULL, `timestamp` INTEGER UNIQUE, PRIMARY KEY(`timestamp`) )"); 36 var initBackups = conn.prepare("CREATE TABLE `backups` ( `document` BLOB NOT NULL, `timestamp` INTEGER UNIQUE, PRIMARY KEY(`timestamp`) )"); 37 initDb.run(); 38 initBackups.run(); 39 createStatements() 40 } catch(actualErr) { 41 console.log("err from table initialize: ", actualErr); 42 err = actualErr; 43 } 44 if (err) { 45 cb(err, conn); 46 } else { 47 startFeed(); 48 cb(null, conn); 49 } 50 } 51 52 function createStatements() { 53 conn.function('eventFeed', (doc) => { 54 eventEmitter.emit(JSON.parse(doc)) 55 }) 56 preparedStmts.getAll = conn.prepare('SELECT document FROM events WHERE (timestamp > ?) ORDER BY timestamp') 57 preparedStmts.insertEvent = conn.prepare("INSERT INTO events VALUES (?, ?)") 58 preparedStmts.insertBackup = conn.prepare("INSERT INTO backups VALUES (?, ?)") 59 preparedStmts.recover = conn.prepare("SELECT document from backups ORDER BY timestamp DESC LIMIT 1") 60 } 61 62 function recover(callback){ 63 try { 64 let all = []; 65 for (const ev of preparedStmts.recover.iterate()) { 66 all.push(JSON.parse(ev.document)) 67 } 68 callback(null, all) 69 } catch(err){ 70 console.log('err caught recover ' + err) 71 } 72 } 73 74 function getAll(timestamp, callback) { 75 try { 76 let all = []; 77 78 for (const ev of preparedStmts.getAll.iterate(timestamp)) { 79 all.push(JSON.parse(ev.document)) 80 } 81 callback(null, all) 82 } catch(err) { 83 console.log(err) 84 console.log('err caught getAll ' + err) 85 } 86 } 87 88 function startFeed() { 89 conn.function('eventFeed', (doc) => { 90 eventEmitter.emit(JSON.parse(doc)) 91 }) 92 conn.prepare('CREATE TRIGGER updateHook AFTER INSERT ON events BEGIN SELECT eventFeed(NEW.document); END').run() 93 } 94 95 function insertEvent(ev, callback) { 96 if (!conn) return callback("No db connection") 97 if (!ev.timestamp) { 98 ev.timestamp = Date.now() 99 } 100 var err = null; 101 var result = null; 102 try{ 103 result = preparedStmts.insertEvent.run(JSON.stringify(ev), ev.timestamp); 104 } catch(actualErr) { 105 console.log("insertEvent try", {actualErr, ev}) // this can be reached by nondb errors (ie in mutations) ? 106 err = actualErr 107 } 108 if (callback) { 109 callback(err, result) 110 } 111 } 112 113 function insertBackup(state, callback) { 114 if (!conn) return callback("No db connection") 115 116 state.timestamp = Date.now() 117 118 var err = null; 119 var result = null; 120 try{ 121 result = preparedStmts.insertBackup.run(JSON.stringify(state), state.timestamp); 122 } catch(actualErr) { 123 err = actualErr; 124 } 125 if (callback) return callback(err, result); 126 } 127 128 function startDb(callback){ 129 conn = dbengine(dblocation, { }); 130 var checkTable = conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='events'"); 131 console.log('Database at ', chalk.bold.red(dblocation)) 132 if(checkTable.all().length == 0){ 133 initializeSqlite(callback); 134 } else { 135 createStatements(); 136 callback(null, conn); 137 } 138 } 139 140 function getConn(){ 141 return conn 142 } 143 144 export default { 145 conn:conn, 146 startDb, 147 getAll, 148 changeFeed, 149 shadowFeed, 150 triggerShadow, 151 insertEvent, 152 insertBackup, 153 getConn, 154 recover, 155 }