db.js
1 /** 2 * Database Utilities — PostgreSQL (pg.Pool) 3 * 4 * Single `mmo` database with schemas: 5 * m333 — 333Method application data (was sites.db) 6 * ops — operations (was ops.db: cron_jobs, cron_locks, pipeline_control, settings, migrations) 7 * tel — telemetry (was telemetry.db: llm_usage, agent_*, system_*, pipeline_metrics) 8 * msgs — shared cross-project (was messages.db + suppression.db) 9 * 10 * Schema-prefixed queries (ops.cron_jobs, tel.llm_usage, msgs.messages) work 11 * unchanged — search_path resolves unqualified names to m333 first. 12 * 13 * Environment variables: 14 * DATABASE_URL — PostgreSQL connection string (e.g. postgresql:///mmo) 15 * PG_SEARCH_PATH — schema search path (default: m333, ops, tel, msgs) 16 * PG_POOL_MAX — max pool connections (default: 20) 17 * PGHOST, PGDATABASE, PGUSER, PG_PASSWORD — fallback individual vars 18 */ 19 20 import pg from 'pg'; 21 22 let pool; 23 24 /** 25 * Validate search_path schemas against injection. 26 * SET doesn't support parameterised values, so we whitelist identifiers. 27 */ 28 function validateSearchPath(path) { 29 const schemas = path.split(',').map(s => s.trim()); 30 const valid = /^[a-zA-Z_][a-zA-Z0-9_]*$/; 31 for (const s of schemas) { 32 if (!valid.test(s)) throw new Error(`Invalid schema name in PG_SEARCH_PATH: "${s}"`); 33 } 34 return schemas.join(', '); 35 } 36 37 /** 38 * Get or create the shared connection pool. 39 * @returns {pg.Pool} 40 */ 41 export function getPool() { 42 if (!pool) { 43 const poolMax = parseInt(process.env.PG_POOL_MAX || '20', 10); 44 45 const poolConfig = { 46 max: poolMax, 47 idleTimeoutMillis: 300_000, // 5 min — long-running pipeline processes 48 connectionTimeoutMillis: 5_000, 49 }; 50 51 if (process.env.DATABASE_URL) { 52 poolConfig.connectionString = process.env.DATABASE_URL; 53 } else { 54 poolConfig.host = process.env.PGHOST || '/run/postgresql'; 55 poolConfig.database = process.env.PGDATABASE || 'mmo'; 56 poolConfig.user = process.env.PGUSER || 'jason'; 57 if (process.env.PG_PASSWORD) poolConfig.password = process.env.PG_PASSWORD; 58 } 59 60 pool = new pg.Pool(poolConfig); 61 62 const searchPath = validateSearchPath( 63 process.env.PG_SEARCH_PATH || 'm333, ops, tel, msgs' 64 ); 65 66 pool.on('connect', async (client) => { 67 try { 68 await client.query(`SET search_path TO ${searchPath}, public`); 69 await client.query("SET timezone TO 'UTC'"); 70 await client.query('SET statement_timeout = 30000'); 71 } catch (err) { 72 console.error('[db] Failed to configure client:', err.message); 73 } 74 }); 75 76 // Prevent process crash on idle client errors 77 pool.on('error', (err) => { 78 console.error('[db] Pool error on idle client:', err.message); 79 }); 80 } 81 return pool; 82 } 83 84 /** 85 * Run a query and return the full pg Result. 86 * @param {string} text — SQL with $1, $2 placeholders 87 * @param {any[]} [params] 88 * @returns {Promise<pg.QueryResult>} 89 */ 90 export async function query(text, params) { 91 return await getPool().query(text, params); 92 } 93 94 /** 95 * Run a SELECT and return the first row, or null. 96 * @param {string} text 97 * @param {any[]} [params] 98 * @returns {Promise<object|null>} 99 */ 100 export async function getOne(text, params) { 101 const { rows } = await query(text, params); 102 return rows[0] || null; 103 } 104 105 /** 106 * Run a SELECT and return all rows. 107 * @param {string} text 108 * @param {any[]} [params] 109 * @returns {Promise<object[]>} 110 */ 111 export async function getAll(text, params) { 112 const { rows } = await query(text, params); 113 return rows; 114 } 115 116 /** 117 * Run an INSERT/UPDATE/DELETE and return { changes, lastInsertRowid }. 118 * For INSERTs that need the new id, add RETURNING id to the SQL. 119 * @param {string} text 120 * @param {any[]} [params] 121 * @returns {Promise<{changes: number, lastInsertRowid: number|undefined}>} 122 */ 123 export async function run(text, params) { 124 const { rowCount, rows } = await query(text, params); 125 return { changes: rowCount, lastInsertRowid: rows[0]?.id }; 126 } 127 128 /** 129 * Execute a function inside a transaction. 130 * The callback receives a dedicated client (not from the pool query shortcut). 131 * On error, ROLLBACK is issued and the error re-thrown. 132 * 133 * @param {(client: pg.PoolClient) => Promise<T>} fn 134 * @returns {Promise<T>} 135 * @template T 136 */ 137 export async function withTransaction(fn) { 138 const client = await getPool().connect(); 139 let failed = false; 140 try { 141 await client.query('BEGIN'); 142 await client.query('SET LOCAL statement_timeout = 30000'); 143 const result = await fn(client); 144 await client.query('COMMIT'); 145 return result; 146 } catch (e) { 147 failed = true; 148 try { await client.query('ROLLBACK'); } catch { /* connection may be dead */ } 149 throw e; 150 } finally { 151 // Destroy broken connections instead of returning them to pool 152 client.release(failed); 153 } 154 } 155 156 /** 157 * Gracefully shut down the pool. Call from process exit handlers. 158 * @returns {Promise<void>} 159 */ 160 export async function closePool() { 161 if (pool) { 162 await pool.end(); 163 pool = null; 164 } 165 } 166 167 // ─── Legacy compatibility ──────────────────────────────────────────────────── 168 // These are exported to make grep-based migration easier. 169 // `createDatabaseConnection` is no longer meaningful — the pool is a singleton. 170 // `closeDatabaseConnection` maps to closePool. 171 172 /** @deprecated Use getPool() — pool is a singleton, no need to "create" connections */ 173 export function createDatabaseConnection() { 174 return getPool(); 175 } 176 177 /** @deprecated Use closePool() */ 178 export async function closeDatabaseConnection() { 179 return await closePool(); 180 } 181 182 export default { 183 getPool, 184 query, 185 getOne, 186 getAll, 187 run, 188 withTransaction, 189 closePool, 190 createDatabaseConnection, 191 closeDatabaseConnection, 192 };