Connection.res
1 // SPDX-License-Identifier: AGPL-3.0-or-later 2 // SPDX-FileCopyrightText: 2025 Jonathan D.A. Jewell 3 // BEAM connection management 4 5 open Types 6 7 type connectionOptions = { 8 node: string, 9 cookie: string, 10 mode?: string, // "port" or "distribution" 11 portProgram?: string, 12 } 13 14 type connectionState = 15 | Disconnected 16 | Connecting 17 | Connected 18 | Draining 19 20 type t = { 21 mutable state: connectionState, 22 options: connectionOptions, 23 mutable process: option<'process>, 24 mutable callId: int, 25 pendingCalls: Dict.t<{resolve: erlangTerm => unit, reject: string => unit}>, 26 } 27 28 let make = (options: connectionOptions): t => { 29 { 30 state: Disconnected, 31 options: options, 32 process: None, 33 callId: 0, 34 pendingCalls: Dict.make(), 35 } 36 } 37 38 let connect = async (options: connectionOptions): t => { 39 let conn = make(options) 40 conn.state = Connecting 41 42 let mode = Option.getOr(options.mode, "port") 43 44 if mode == "distribution" { 45 Exn.raiseError("Distribution protocol not yet implemented") 46 } 47 48 // Port mode implementation 49 let portProgram = Option.getOr(options.portProgram, "erl") 50 let erlCode = ` 51 {ok, _} = net_kernel:connect_node('${options.node}'), 52 Port = open_port({fd, 0, 1}, [binary, {packet, 4}]), 53 loop(Port). 54 loop(Port) -> 55 receive 56 {Port, {data, Data}} -> 57 Term = binary_to_term(Data), 58 handle_call(Port, Term), 59 loop(Port); 60 stop -> ok 61 end. 62 handle_call(Port, {call, Id, M, F, A}) -> 63 Result = try apply(M, F, A) catch _:E -> {error, E} end, 64 Port ! {self(), {command, term_to_binary({reply, Id, Result})}}; 65 handle_call(Port, {cast, M, F, A}) -> 66 spawn(fun() -> apply(M, F, A) end). 67 ` 68 69 conn.process = Some( 70 %raw(` 71 const command = new Deno.Command(portProgram, { 72 args: ["-noshell", "-sname", "deno_client", "-setcookie", options.cookie, "-eval", erlCode], 73 stdin: "piped", 74 stdout: "piped", 75 stderr: "inherit", 76 }); 77 command.spawn() 78 `), 79 ) 80 81 conn.state = Connected 82 conn 83 } 84 85 let call = async (conn: t, module: string, func: string, args: array<erlangTerm>): erlangTerm => { 86 if conn.state != Connected { 87 Exn.raiseError("Not connected") 88 } 89 90 let id = conn.callId 91 conn.callId = conn.callId + 1 92 93 let term = Tuple([Atom("call"), Integer(id), Atom(module), Atom(func), List(args)]) 94 let encoded = Etf.encode(term) 95 96 await %raw(` 97 new Promise((resolve, reject) => { 98 conn.pendingCalls[id] = { resolve, reject }; 99 100 const packet = new Uint8Array(4 + encoded.length); 101 new DataView(packet.buffer).setUint32(0, encoded.length, false); 102 packet.set(encoded, 4); 103 104 conn.process.stdin.getWriter().write(packet).catch(reject); 105 }) 106 `) 107 } 108 109 let cast = async (conn: t, module: string, func: string, args: array<erlangTerm>): unit => { 110 if conn.state != Connected { 111 Exn.raiseError("Not connected") 112 } 113 114 let term = Tuple([Atom("cast"), Atom(module), Atom(func), List(args)]) 115 let encoded = Etf.encode(term) 116 117 await %raw(` 118 const packet = new Uint8Array(4 + encoded.length); 119 new DataView(packet.buffer).setUint32(0, encoded.length, false); 120 packet.set(encoded, 4); 121 conn.process.stdin.getWriter().write(packet) 122 `) 123 } 124 125 let close = async (conn: t): unit => { 126 conn.state = Draining 127 128 switch conn.process { 129 | Some(proc) => %raw(`proc.kill("SIGTERM")`) 130 | None => () 131 } 132 133 conn.process = None 134 conn.state = Disconnected 135 136 // Reject all pending calls 137 Dict.forEachWithKey(conn.pendingCalls, (_, pending) => { 138 pending.reject("Connection closed") 139 }) 140 } 141 142 let isConnected = (conn: t): bool => conn.state == Connected