/ src / utils / db.js
db.js
  1  /**
  2   * Database Utilities — PostgreSQL (pg.Pool)
  3   *
  4   * Single `mmo` database with schemas:
  5   *   m333 — 333Method application data (was sites.db)
  6   *   ops  — operations (was ops.db: cron_jobs, cron_locks, pipeline_control, settings, migrations)
  7   *   tel  — telemetry (was telemetry.db: llm_usage, agent_*, system_*, pipeline_metrics)
  8   *   msgs — shared cross-project (was messages.db + suppression.db)
  9   *
 10   * Schema-prefixed queries (ops.cron_jobs, tel.llm_usage, msgs.messages) work
 11   * unchanged — search_path resolves unqualified names to m333 first.
 12   *
 13   * Environment variables:
 14   *   DATABASE_URL       — PostgreSQL connection string (e.g. postgresql:///mmo)
 15   *   PG_SEARCH_PATH     — schema search path (default: m333, ops, tel, msgs)
 16   *   PG_POOL_MAX        — max pool connections (default: 20)
 17   *   PGHOST, PGDATABASE, PGUSER, PG_PASSWORD — fallback individual vars
 18   */
 19  
 20  import pg from 'pg';
 21  
 22  let pool;
 23  
 24  /**
 25   * Validate search_path schemas against injection.
 26   * SET doesn't support parameterised values, so we whitelist identifiers.
 27   */
 28  function validateSearchPath(path) {
 29    const schemas = path.split(',').map(s => s.trim());
 30    const valid = /^[a-zA-Z_][a-zA-Z0-9_]*$/;
 31    for (const s of schemas) {
 32      if (!valid.test(s)) throw new Error(`Invalid schema name in PG_SEARCH_PATH: "${s}"`);
 33    }
 34    return schemas.join(', ');
 35  }
 36  
 37  /**
 38   * Get or create the shared connection pool.
 39   * @returns {pg.Pool}
 40   */
 41  export function getPool() {
 42    if (!pool) {
 43      const poolMax = parseInt(process.env.PG_POOL_MAX || '20', 10);
 44  
 45      const poolConfig = {
 46        max: poolMax,
 47        idleTimeoutMillis: 300_000,     // 5 min — long-running pipeline processes
 48        connectionTimeoutMillis: 5_000,
 49      };
 50  
 51      if (process.env.DATABASE_URL) {
 52        poolConfig.connectionString = process.env.DATABASE_URL;
 53      } else {
 54        poolConfig.host = process.env.PGHOST || '/run/postgresql';
 55        poolConfig.database = process.env.PGDATABASE || 'mmo';
 56        poolConfig.user = process.env.PGUSER || 'jason';
 57        if (process.env.PG_PASSWORD) poolConfig.password = process.env.PG_PASSWORD;
 58      }
 59  
 60      pool = new pg.Pool(poolConfig);
 61  
 62      const searchPath = validateSearchPath(
 63        process.env.PG_SEARCH_PATH || 'm333, ops, tel, msgs'
 64      );
 65  
 66      pool.on('connect', async (client) => {
 67        try {
 68          await client.query(`SET search_path TO ${searchPath}, public`);
 69          await client.query("SET timezone TO 'UTC'");
 70          await client.query('SET statement_timeout = 30000');
 71        } catch (err) {
 72          console.error('[db] Failed to configure client:', err.message);
 73        }
 74      });
 75  
 76      // Prevent process crash on idle client errors
 77      pool.on('error', (err) => {
 78        console.error('[db] Pool error on idle client:', err.message);
 79      });
 80    }
 81    return pool;
 82  }
 83  
 84  /**
 85   * Run a query and return the full pg Result.
 86   * @param {string} text — SQL with $1, $2 placeholders
 87   * @param {any[]} [params]
 88   * @returns {Promise<pg.QueryResult>}
 89   */
 90  export async function query(text, params) {
 91    return await getPool().query(text, params);
 92  }
 93  
 94  /**
 95   * Run a SELECT and return the first row, or null.
 96   * @param {string} text
 97   * @param {any[]} [params]
 98   * @returns {Promise<object|null>}
 99   */
100  export async function getOne(text, params) {
101    const { rows } = await query(text, params);
102    return rows[0] || null;
103  }
104  
105  /**
106   * Run a SELECT and return all rows.
107   * @param {string} text
108   * @param {any[]} [params]
109   * @returns {Promise<object[]>}
110   */
111  export async function getAll(text, params) {
112    const { rows } = await query(text, params);
113    return rows;
114  }
115  
116  /**
117   * Run an INSERT/UPDATE/DELETE and return { changes, lastInsertRowid }.
118   * For INSERTs that need the new id, add RETURNING id to the SQL.
119   * @param {string} text
120   * @param {any[]} [params]
121   * @returns {Promise<{changes: number, lastInsertRowid: number|undefined}>}
122   */
123  export async function run(text, params) {
124    const { rowCount, rows } = await query(text, params);
125    return { changes: rowCount, lastInsertRowid: rows[0]?.id };
126  }
127  
128  /**
129   * Execute a function inside a transaction.
130   * The callback receives a dedicated client (not from the pool query shortcut).
131   * On error, ROLLBACK is issued and the error re-thrown.
132   *
133   * @param {(client: pg.PoolClient) => Promise<T>} fn
134   * @returns {Promise<T>}
135   * @template T
136   */
137  export async function withTransaction(fn) {
138    const client = await getPool().connect();
139    let failed = false;
140    try {
141      await client.query('BEGIN');
142      await client.query('SET LOCAL statement_timeout = 30000');
143      const result = await fn(client);
144      await client.query('COMMIT');
145      return result;
146    } catch (e) {
147      failed = true;
148      try { await client.query('ROLLBACK'); } catch { /* connection may be dead */ }
149      throw e;
150    } finally {
151      // Destroy broken connections instead of returning them to pool
152      client.release(failed);
153    }
154  }
155  
156  /**
157   * Gracefully shut down the pool. Call from process exit handlers.
158   * @returns {Promise<void>}
159   */
160  export async function closePool() {
161    if (pool) {
162      await pool.end();
163      pool = null;
164    }
165  }
166  
167  // ─── Legacy compatibility ────────────────────────────────────────────────────
168  // These are exported to make grep-based migration easier.
169  // `createDatabaseConnection` is no longer meaningful — the pool is a singleton.
170  // `closeDatabaseConnection` maps to closePool.
171  
172  /** @deprecated Use getPool() — pool is a singleton, no need to "create" connections */
173  export function createDatabaseConnection() {
174    return getPool();
175  }
176  
177  /** @deprecated Use closePool() */
178  export async function closeDatabaseConnection() {
179    return await closePool();
180  }
181  
182  export default {
183    getPool,
184    query,
185    getOne,
186    getAll,
187    run,
188    withTransaction,
189    closePool,
190    createDatabaseConnection,
191    closeDatabaseConnection,
192  };