/ src / Connection.res
Connection.res
  1  // SPDX-License-Identifier: AGPL-3.0-or-later
  2  // SPDX-FileCopyrightText: 2025 Jonathan D.A. Jewell
  3  // BEAM connection management
  4  
  5  open Types
  6  
  7  type connectionOptions = {
  8    node: string,
  9    cookie: string,
 10    mode?: string, // "port" or "distribution"
 11    portProgram?: string,
 12  }
 13  
 14  type connectionState =
 15    | Disconnected
 16    | Connecting
 17    | Connected
 18    | Draining
 19  
 20  type t = {
 21    mutable state: connectionState,
 22    options: connectionOptions,
 23    mutable process: option<'process>,
 24    mutable callId: int,
 25    pendingCalls: Dict.t<{resolve: erlangTerm => unit, reject: string => unit}>,
 26  }
 27  
 28  let make = (options: connectionOptions): t => {
 29    {
 30      state: Disconnected,
 31      options: options,
 32      process: None,
 33      callId: 0,
 34      pendingCalls: Dict.make(),
 35    }
 36  }
 37  
 38  let connect = async (options: connectionOptions): t => {
 39    let conn = make(options)
 40    conn.state = Connecting
 41  
 42    let mode = Option.getOr(options.mode, "port")
 43  
 44    if mode == "distribution" {
 45      Exn.raiseError("Distribution protocol not yet implemented")
 46    }
 47  
 48    // Port mode implementation
 49    let portProgram = Option.getOr(options.portProgram, "erl")
 50    let erlCode = `
 51      {ok, _} = net_kernel:connect_node('${options.node}'),
 52      Port = open_port({fd, 0, 1}, [binary, {packet, 4}]),
 53      loop(Port).
 54      loop(Port) ->
 55        receive
 56          {Port, {data, Data}} ->
 57            Term = binary_to_term(Data),
 58            handle_call(Port, Term),
 59            loop(Port);
 60          stop -> ok
 61        end.
 62      handle_call(Port, {call, Id, M, F, A}) ->
 63        Result = try apply(M, F, A) catch _:E -> {error, E} end,
 64        Port ! {self(), {command, term_to_binary({reply, Id, Result})}};
 65      handle_call(Port, {cast, M, F, A}) ->
 66        spawn(fun() -> apply(M, F, A) end).
 67    `
 68  
 69    conn.process = Some(
 70      %raw(`
 71        const command = new Deno.Command(portProgram, {
 72          args: ["-noshell", "-sname", "deno_client", "-setcookie", options.cookie, "-eval", erlCode],
 73          stdin: "piped",
 74          stdout: "piped",
 75          stderr: "inherit",
 76        });
 77        command.spawn()
 78      `),
 79    )
 80  
 81    conn.state = Connected
 82    conn
 83  }
 84  
 85  let call = async (conn: t, module: string, func: string, args: array<erlangTerm>): erlangTerm => {
 86    if conn.state != Connected {
 87      Exn.raiseError("Not connected")
 88    }
 89  
 90    let id = conn.callId
 91    conn.callId = conn.callId + 1
 92  
 93    let term = Tuple([Atom("call"), Integer(id), Atom(module), Atom(func), List(args)])
 94    let encoded = Etf.encode(term)
 95  
 96    await %raw(`
 97      new Promise((resolve, reject) => {
 98        conn.pendingCalls[id] = { resolve, reject };
 99  
100        const packet = new Uint8Array(4 + encoded.length);
101        new DataView(packet.buffer).setUint32(0, encoded.length, false);
102        packet.set(encoded, 4);
103  
104        conn.process.stdin.getWriter().write(packet).catch(reject);
105      })
106    `)
107  }
108  
109  let cast = async (conn: t, module: string, func: string, args: array<erlangTerm>): unit => {
110    if conn.state != Connected {
111      Exn.raiseError("Not connected")
112    }
113  
114    let term = Tuple([Atom("cast"), Atom(module), Atom(func), List(args)])
115    let encoded = Etf.encode(term)
116  
117    await %raw(`
118      const packet = new Uint8Array(4 + encoded.length);
119      new DataView(packet.buffer).setUint32(0, encoded.length, false);
120      packet.set(encoded, 4);
121      conn.process.stdin.getWriter().write(packet)
122    `)
123  }
124  
125  let close = async (conn: t): unit => {
126    conn.state = Draining
127  
128    switch conn.process {
129    | Some(proc) => %raw(`proc.kill("SIGTERM")`)
130    | None => ()
131    }
132  
133    conn.process = None
134    conn.state = Disconnected
135  
136    // Reject all pending calls
137    Dict.forEachWithKey(conn.pendingCalls, (_, pending) => {
138      pending.reject("Connection closed")
139    })
140  }
141  
142  let isConnected = (conn: t): bool => conn.state == Connected