peerManager.ts
1 /* eslint-disable @typescript-eslint/naming-convention */ 2 import type { Config, BaseUrl } from "@http-client"; 3 import type * as Execa from "execa"; 4 5 import * as Fs from "node:fs/promises"; 6 import * as Os from "node:os"; 7 import * as Path from "node:path"; 8 import * as Stream from "node:stream"; 9 import * as Util from "node:util"; 10 import * as readline from "node:readline/promises"; 11 import getPort from "get-port"; 12 import matches from "lodash/matches.js"; 13 import waitOn from "wait-on"; 14 import { configSchema } from "@http-client/lib/shared.js"; 15 import { defaultConfig } from "@tests/support/fixtures.js"; 16 import { execa } from "execa"; 17 import { logPrefix } from "@tests/support/logPrefix.js"; 18 import { randomTag } from "@tests/support/support.js"; 19 import { sleep } from "@app/lib/sleep.js"; 20 21 export type RefsUpdate = 22 | { updated: { name: string; old: string; new: string } } 23 | { created: { name: string; oid: string } } 24 | { deleted: { name: string; oid: string } } 25 | { skipped: { name: string; oid: string } }; 26 27 export type NodeEvent = 28 | { 29 type: "refsFetched"; 30 remote: string; 31 rid: string; 32 updated: RefsUpdate[]; 33 } 34 | { 35 type: "refsSynced"; 36 remote: string; 37 rid: string; 38 } 39 | { 40 type: "seedDiscovered"; 41 rid: string; 42 nid: string; 43 } 44 | { 45 type: "seedDropped"; 46 nid: string; 47 rid: string; 48 } 49 | { 50 type: "peerConnected"; 51 nid: string; 52 } 53 | { 54 type: "peerDisconnected"; 55 nid: string; 56 reason: string; 57 }; 58 59 export interface RoutingEntry { 60 nid: string; 61 rid: string; 62 } 63 64 interface PeerManagerParams { 65 dataPath: string; 66 radSeed: string; 67 // Name for easy identification. Used on file system and in logs. 68 name: string; 69 gitOptions?: Record<string, string>; 70 outputLog: Stream.Writable; 71 } 72 73 export interface PeerManager { 74 createPeer(params: { 75 name: string; 76 gitOptions?: Record<string, string>; 77 }): Promise<RadiclePeer>; 78 /** 79 * Kill all processes spawned by any of the peers 80 */ 81 shutdown(): Promise<void>; 82 } 83 84 export async function createPeerManager(createParams: { 85 dataDir: string; 86 outputLog?: Stream.Writable; 87 }): Promise<PeerManager> { 88 let outputLog: Stream.Writable; 89 let outputLogFile: Fs.FileHandle; 90 if (createParams.outputLog) { 91 outputLog = createParams.outputLog; 92 } else { 93 outputLogFile = await Fs.open( 94 Path.join(createParams.dataDir, "peerManager.log"), 95 "a", 96 ); 97 outputLog = outputLogFile.createWriteStream(); 98 } 99 100 const peers: RadiclePeer[] = []; 101 return { 102 async createPeer(params) { 103 const peer = await RadiclePeer.create({ 104 dataPath: createParams.dataDir, 105 name: params.name, 106 gitOptions: params.gitOptions, 107 radSeed: Array(64) 108 .fill((peers.length + 1).toString()) 109 .join(""), 110 outputLog, 111 }); 112 peers.push(peer); 113 114 return peer; 115 }, 116 async shutdown() { 117 await Promise.all(peers.map(peer => peer.shutdown())); 118 }, 119 }; 120 } 121 122 // Specialize the return type of `execa()` to guarantee that `stdout` and 123 // `stderr` are strings. 124 type SpawnResult = Execa.ResultPromise< 125 SpawnOptions & { 126 stdout: (line: unknown) => AsyncGenerator<string, void, void>; 127 stderr: (line: unknown) => AsyncGenerator<string, void, void>; 128 encoding: "utf8"; 129 } 130 >; 131 132 type SpawnOptions = Omit< 133 Execa.Options, 134 "stdin" | "stdout" | "stderr" | "lines" | "encoding" 135 >; 136 137 export class RadiclePeer { 138 public checkoutPath: string; 139 public nodeId: string; 140 141 #radSeed: string; 142 #socket: string; 143 #radHome: string; 144 #eventRecords: NodeEvent[] = []; 145 #outputLog: Stream.Writable; 146 #gitOptions?: Record<string, string>; 147 #listenSocketAddr?: string; 148 #httpdBaseUrl?: BaseUrl; 149 #nodeProcess?: SpawnResult; 150 // Name for easy identification. Used on file system and in logs. 151 #name: string; 152 #childProcesses: SpawnResult[] = []; 153 154 private constructor(props: { 155 checkoutPath: string; 156 nodeId: string; 157 radSeed: string; 158 socket: string; 159 gitOptions?: Record<string, string>; 160 radHome: string; 161 logFile: Stream.Writable; 162 name: string; 163 }) { 164 this.checkoutPath = props.checkoutPath; 165 this.nodeId = props.nodeId; 166 this.#gitOptions = props.gitOptions; 167 this.#radSeed = props.radSeed; 168 this.#socket = props.socket; 169 this.#radHome = props.radHome; 170 this.#outputLog = props.logFile; 171 this.#name = props.name; 172 } 173 174 public async waitForEvent(searchEvent: NodeEvent, timeoutInMs: number) { 175 const start = new Date().getTime(); 176 177 while (true) { 178 if (this.#eventRecords.find(matches(searchEvent))) { 179 return; 180 } 181 if (new Date().getTime() - start > timeoutInMs) { 182 throw Error( 183 `Timeout waiting for event on node ${this.#name} ${Util.inspect( 184 searchEvent, 185 { depth: null }, 186 )}`, 187 ); 188 } 189 await sleep(100); 190 } 191 } 192 193 public static async create({ 194 dataPath, 195 name, 196 gitOptions, 197 radSeed: node, 198 outputLog: logFile, 199 }: PeerManagerParams): Promise<RadiclePeer> { 200 const checkoutPath = Path.join(dataPath, name, "copy"); 201 await Fs.mkdir(checkoutPath, { recursive: true }); 202 const radHome = Path.join(dataPath, name, "home"); 203 await Fs.mkdir(radHome, { recursive: true }); 204 205 const socketDir = await Fs.mkdtemp( 206 Path.join(Os.tmpdir(), `radicle-${randomTag()}`), 207 ); 208 const socket = Path.join(socketDir, "control.sock"); 209 210 const env = { 211 ...gitOptions, 212 RAD_HOME: radHome, 213 RAD_PASSPHRASE: "asdf", 214 RAD_KEYGEN_SEED: node, 215 RAD_SOCKET: socket, 216 }; 217 218 await execa("rad", ["auth", "--alias", name], { env }); 219 const { stdout: nodeId } = await execa("rad", ["self", "--nid"], { env }); 220 221 return new RadiclePeer({ 222 checkoutPath, 223 gitOptions, 224 radSeed: node, 225 socket, 226 nodeId, 227 radHome, 228 logFile, 229 name, 230 }); 231 } 232 233 public async startHttpd(port?: number): Promise<void> { 234 if (!port) { 235 port = await getPort(); 236 } 237 this.#httpdBaseUrl = { 238 hostname: "127.0.0.1", 239 port, 240 scheme: "http", 241 }; 242 void this.spawn("radicle-httpd", [ 243 "--listen", 244 `${this.#httpdBaseUrl.hostname}:${this.#httpdBaseUrl.port}`, 245 ]); 246 247 await waitOn({ 248 resources: [ 249 `tcp:${this.#httpdBaseUrl.hostname}:${this.#httpdBaseUrl.port}`, 250 ], 251 timeout: 2000, 252 }); 253 } 254 255 public async startNode(config: Partial<Config> = defaultConfig) { 256 const listenPort = await getPort(); 257 this.#listenSocketAddr = `0.0.0.0:${listenPort}`; 258 259 await updateConfig(this.#radHome, config); 260 261 this.#nodeProcess = this.spawn("radicle-node", [ 262 "--listen", 263 this.#listenSocketAddr, 264 ]); 265 266 await waitOn({ 267 resources: [`socket:${this.#socket}`], 268 timeout: 2000, 269 }); 270 271 const { stdout } = this.rad(["node", "events"], { 272 cwd: this.#radHome, 273 }); 274 275 if (!stdout) { 276 throw new Error("Could not get stdout to track events"); 277 } 278 279 readline 280 .createInterface({ 281 input: stdout, 282 terminal: false, 283 }) 284 .on("line", line => { 285 let event; 286 try { 287 event = JSON.parse(line); 288 } catch { 289 console.log("Error parsing event", line); 290 return; 291 } 292 293 this.#eventRecords.push(event); 294 for (const line of Util.inspect(event, { depth: null }).split("\n")) { 295 this.#outputLog.write( 296 `${logPrefix(`${this.#name} node events`)} ${line}\n`, 297 ); 298 } 299 }); 300 } 301 302 public async stopNode() { 303 // Don’t leak unhandled rejections when forcefully killing the process 304 // eslint-disable-next-line @typescript-eslint/no-empty-function 305 this.#nodeProcess?.catch(() => {}); 306 this.#nodeProcess?.kill("SIGTERM"); 307 308 await waitOn({ 309 resources: [`socket:${this.#socket}`], 310 reverse: true, 311 timeout: 2000, 312 }); 313 } 314 315 /** 316 * Kill all child processes created with `spawn()`, including the node and 317 * httpd processes. 318 */ 319 public async shutdown() { 320 // We don’t care about proper cleanup. We just want to make sure that no 321 // processes are running anymore. 322 this.#childProcesses.forEach(p => { 323 // Don’t leak unhandled rejections when forcefully killing the process 324 // eslint-disable-next-line @typescript-eslint/no-empty-function 325 p.catch(() => {}); 326 p.kill("SIGKILL"); 327 }); 328 } 329 330 public get address(): string { 331 if (!this.#listenSocketAddr) { 332 throw new Error("Remote node has no listen addr yet"); 333 } 334 return `${this.nodeId}@${this.#listenSocketAddr}`; 335 } 336 337 public uiUrl(): string { 338 if (!this.#httpdBaseUrl) { 339 throw new Error("No httpd service running"); 340 } 341 342 return `/nodes/${this.#httpdBaseUrl.hostname}:${this.#httpdBaseUrl.port}`; 343 } 344 345 public ridUrl(rid: string): string { 346 return `/nodes/${this.httpdBaseUrl.hostname}:${this.httpdBaseUrl.port}/${rid}`; 347 } 348 349 public get httpdBaseUrl(): BaseUrl { 350 if (!this.#httpdBaseUrl) { 351 throw new Error("No httpd service running"); 352 } 353 354 return this.#httpdBaseUrl; 355 } 356 357 public git(args: string[] = [], opts?: SpawnOptions): SpawnResult { 358 return this.spawn("git", args, { ...opts }); 359 } 360 361 public rad(args: string[] = [], opts?: SpawnOptions): SpawnResult { 362 return this.spawn("rad", args, { ...opts }); 363 } 364 365 public spawn( 366 cmd: string, 367 args: string[] = [], 368 opts?: SpawnOptions, 369 ): SpawnResult { 370 const prefix = logPrefix(`${this.#name} ${cmd}`); 371 const outputLog = this.#outputLog; 372 373 function* logWithPrefix(line: unknown) { 374 if (typeof line === "string") { 375 outputLog.write(`${prefix} ${line}\n`, "utf8"); 376 } 377 yield line; 378 } 379 380 const childProcess = execa(cmd, args, { 381 ...opts, 382 env: { 383 GIT_CONFIG_GLOBAL: "/dev/null", 384 GIT_CONFIG_NOSYSTEM: "1", 385 RAD_HOME: this.#radHome, 386 RAD_PASSPHRASE: "asdf", 387 RAD_LOCAL_TIME: "1671125284", 388 RAD_KEYGEN_SEED: this.#radSeed, 389 RAD_SOCKET: this.#socket, 390 ...opts?.env, 391 ...this.#gitOptions, 392 }, 393 encoding: "utf8", 394 stdout: logWithPrefix, 395 stderr: logWithPrefix, 396 }); 397 398 this.#childProcesses.push(childProcess); 399 400 return childProcess; 401 } 402 } 403 404 async function updateConfig(radHome: string, configParams: Partial<Config>) { 405 const configPath = Path.join(radHome, "config.json"); 406 const configFile = await Fs.readFile(configPath, "utf-8"); 407 const config = configSchema.parse({ 408 defaultConfig, 409 ...JSON.parse(configFile), 410 }); 411 config.preferredSeeds = []; 412 config.web = { ...config.web, ...configParams.web }; 413 config.node = { 414 ...config.node, 415 ...configParams.node, 416 network: "test", 417 }; 418 await Fs.writeFile(configPath, JSON.stringify(config), "utf-8"); 419 }