/ tests / support / peerManager.ts
peerManager.ts
  1  /* eslint-disable @typescript-eslint/naming-convention */
  2  import type { Config, BaseUrl } from "@http-client";
  3  import type * as Execa from "execa";
  4  
  5  import * as Fs from "node:fs/promises";
  6  import * as Os from "node:os";
  7  import * as Path from "node:path";
  8  import * as Stream from "node:stream";
  9  import * as Util from "node:util";
 10  import * as readline from "node:readline/promises";
 11  import getPort from "get-port";
 12  import matches from "lodash/matches.js";
 13  import waitOn from "wait-on";
 14  import { configSchema } from "@http-client/lib/shared.js";
 15  import { defaultConfig } from "@tests/support/fixtures.js";
 16  import { execa } from "execa";
 17  import { logPrefix } from "@tests/support/logPrefix.js";
 18  import { randomTag } from "@tests/support/support.js";
 19  import { sleep } from "@app/lib/sleep.js";
 20  
 21  export type RefsUpdate =
 22    | { updated: { name: string; old: string; new: string } }
 23    | { created: { name: string; oid: string } }
 24    | { deleted: { name: string; oid: string } }
 25    | { skipped: { name: string; oid: string } };
 26  
 27  export type NodeEvent =
 28    | {
 29        type: "refsFetched";
 30        remote: string;
 31        rid: string;
 32        updated: RefsUpdate[];
 33      }
 34    | {
 35        type: "refsSynced";
 36        remote: string;
 37        rid: string;
 38      }
 39    | {
 40        type: "seedDiscovered";
 41        rid: string;
 42        nid: string;
 43      }
 44    | {
 45        type: "seedDropped";
 46        nid: string;
 47        rid: string;
 48      }
 49    | {
 50        type: "peerConnected";
 51        nid: string;
 52      }
 53    | {
 54        type: "peerDisconnected";
 55        nid: string;
 56        reason: string;
 57      };
 58  
 59  export interface RoutingEntry {
 60    nid: string;
 61    rid: string;
 62  }
 63  
 64  interface PeerManagerParams {
 65    dataPath: string;
 66    radSeed: string;
 67    // Name for easy identification. Used on file system and in logs.
 68    name: string;
 69    gitOptions?: Record<string, string>;
 70    outputLog: Stream.Writable;
 71  }
 72  
 73  export interface PeerManager {
 74    createPeer(params: {
 75      name: string;
 76      gitOptions?: Record<string, string>;
 77    }): Promise<RadiclePeer>;
 78    /**
 79     * Kill all processes spawned by any of the peers
 80     */
 81    shutdown(): Promise<void>;
 82  }
 83  
 84  export async function createPeerManager(createParams: {
 85    dataDir: string;
 86    outputLog?: Stream.Writable;
 87  }): Promise<PeerManager> {
 88    let outputLog: Stream.Writable;
 89    let outputLogFile: Fs.FileHandle;
 90    if (createParams.outputLog) {
 91      outputLog = createParams.outputLog;
 92    } else {
 93      outputLogFile = await Fs.open(
 94        Path.join(createParams.dataDir, "peerManager.log"),
 95        "a",
 96      );
 97      outputLog = outputLogFile.createWriteStream();
 98    }
 99  
100    const peers: RadiclePeer[] = [];
101    return {
102      async createPeer(params) {
103        const peer = await RadiclePeer.create({
104          dataPath: createParams.dataDir,
105          name: params.name,
106          gitOptions: params.gitOptions,
107          radSeed: Array(64)
108            .fill((peers.length + 1).toString())
109            .join(""),
110          outputLog,
111        });
112        peers.push(peer);
113  
114        return peer;
115      },
116      async shutdown() {
117        await Promise.all(peers.map(peer => peer.shutdown()));
118      },
119    };
120  }
121  
122  // Specialize the return type of `execa()` to guarantee that `stdout` and
123  // `stderr` are strings.
124  type SpawnResult = Execa.ResultPromise<
125    SpawnOptions & {
126      stdout: (line: unknown) => AsyncGenerator<string, void, void>;
127      stderr: (line: unknown) => AsyncGenerator<string, void, void>;
128      encoding: "utf8";
129    }
130  >;
131  
132  type SpawnOptions = Omit<
133    Execa.Options,
134    "stdin" | "stdout" | "stderr" | "lines" | "encoding"
135  >;
136  
137  export class RadiclePeer {
138    public checkoutPath: string;
139    public nodeId: string;
140  
141    #radSeed: string;
142    #socket: string;
143    #radHome: string;
144    #eventRecords: NodeEvent[] = [];
145    #outputLog: Stream.Writable;
146    #gitOptions?: Record<string, string>;
147    #listenSocketAddr?: string;
148    #httpdBaseUrl?: BaseUrl;
149    #nodeProcess?: SpawnResult;
150    // Name for easy identification. Used on file system and in logs.
151    #name: string;
152    #childProcesses: SpawnResult[] = [];
153  
154    private constructor(props: {
155      checkoutPath: string;
156      nodeId: string;
157      radSeed: string;
158      socket: string;
159      gitOptions?: Record<string, string>;
160      radHome: string;
161      logFile: Stream.Writable;
162      name: string;
163    }) {
164      this.checkoutPath = props.checkoutPath;
165      this.nodeId = props.nodeId;
166      this.#gitOptions = props.gitOptions;
167      this.#radSeed = props.radSeed;
168      this.#socket = props.socket;
169      this.#radHome = props.radHome;
170      this.#outputLog = props.logFile;
171      this.#name = props.name;
172    }
173  
174    public async waitForEvent(searchEvent: NodeEvent, timeoutInMs: number) {
175      const start = new Date().getTime();
176  
177      while (true) {
178        if (this.#eventRecords.find(matches(searchEvent))) {
179          return;
180        }
181        if (new Date().getTime() - start > timeoutInMs) {
182          throw Error(
183            `Timeout waiting for event on node ${this.#name} ${Util.inspect(
184              searchEvent,
185              { depth: null },
186            )}`,
187          );
188        }
189        await sleep(100);
190      }
191    }
192  
193    public static async create({
194      dataPath,
195      name,
196      gitOptions,
197      radSeed: node,
198      outputLog: logFile,
199    }: PeerManagerParams): Promise<RadiclePeer> {
200      const checkoutPath = Path.join(dataPath, name, "copy");
201      await Fs.mkdir(checkoutPath, { recursive: true });
202      const radHome = Path.join(dataPath, name, "home");
203      await Fs.mkdir(radHome, { recursive: true });
204  
205      const socketDir = await Fs.mkdtemp(
206        Path.join(Os.tmpdir(), `radicle-${randomTag()}`),
207      );
208      const socket = Path.join(socketDir, "control.sock");
209  
210      const env = {
211        ...gitOptions,
212        RAD_HOME: radHome,
213        RAD_PASSPHRASE: "asdf",
214        RAD_KEYGEN_SEED: node,
215        RAD_SOCKET: socket,
216      };
217  
218      await execa("rad", ["auth", "--alias", name], { env });
219      const { stdout: nodeId } = await execa("rad", ["self", "--nid"], { env });
220  
221      return new RadiclePeer({
222        checkoutPath,
223        gitOptions,
224        radSeed: node,
225        socket,
226        nodeId,
227        radHome,
228        logFile,
229        name,
230      });
231    }
232  
233    public async startHttpd(port?: number): Promise<void> {
234      if (!port) {
235        port = await getPort();
236      }
237      this.#httpdBaseUrl = {
238        hostname: "127.0.0.1",
239        port,
240        scheme: "http",
241      };
242      void this.spawn("radicle-httpd", [
243        "--listen",
244        `${this.#httpdBaseUrl.hostname}:${this.#httpdBaseUrl.port}`,
245      ]);
246  
247      await waitOn({
248        resources: [
249          `tcp:${this.#httpdBaseUrl.hostname}:${this.#httpdBaseUrl.port}`,
250        ],
251        timeout: 2000,
252      });
253    }
254  
255    public async startNode(config: Partial<Config> = defaultConfig) {
256      const listenPort = await getPort();
257      this.#listenSocketAddr = `0.0.0.0:${listenPort}`;
258  
259      await updateConfig(this.#radHome, config);
260  
261      this.#nodeProcess = this.spawn("radicle-node", [
262        "--listen",
263        this.#listenSocketAddr,
264      ]);
265  
266      await waitOn({
267        resources: [`socket:${this.#socket}`],
268        timeout: 2000,
269      });
270  
271      const { stdout } = this.rad(["node", "events"], {
272        cwd: this.#radHome,
273      });
274  
275      if (!stdout) {
276        throw new Error("Could not get stdout to track events");
277      }
278  
279      readline
280        .createInterface({
281          input: stdout,
282          terminal: false,
283        })
284        .on("line", line => {
285          let event;
286          try {
287            event = JSON.parse(line);
288          } catch {
289            console.log("Error parsing event", line);
290            return;
291          }
292  
293          this.#eventRecords.push(event);
294          for (const line of Util.inspect(event, { depth: null }).split("\n")) {
295            this.#outputLog.write(
296              `${logPrefix(`${this.#name} node events`)} ${line}\n`,
297            );
298          }
299        });
300    }
301  
302    public async stopNode() {
303      // Don’t leak unhandled rejections when forcefully killing the process
304      // eslint-disable-next-line @typescript-eslint/no-empty-function
305      this.#nodeProcess?.catch(() => {});
306      this.#nodeProcess?.kill("SIGTERM");
307  
308      await waitOn({
309        resources: [`socket:${this.#socket}`],
310        reverse: true,
311        timeout: 2000,
312      });
313    }
314  
315    /**
316     * Kill all child processes created with `spawn()`, including the node and
317     * httpd processes.
318     */
319    public async shutdown() {
320      // We don’t care about proper cleanup. We just want to make sure that no
321      // processes are running anymore.
322      this.#childProcesses.forEach(p => {
323        // Don’t leak unhandled rejections when forcefully killing the process
324        // eslint-disable-next-line @typescript-eslint/no-empty-function
325        p.catch(() => {});
326        p.kill("SIGKILL");
327      });
328    }
329  
330    public get address(): string {
331      if (!this.#listenSocketAddr) {
332        throw new Error("Remote node has no listen addr yet");
333      }
334      return `${this.nodeId}@${this.#listenSocketAddr}`;
335    }
336  
337    public uiUrl(): string {
338      if (!this.#httpdBaseUrl) {
339        throw new Error("No httpd service running");
340      }
341  
342      return `/nodes/${this.#httpdBaseUrl.hostname}:${this.#httpdBaseUrl.port}`;
343    }
344  
345    public ridUrl(rid: string): string {
346      return `/nodes/${this.httpdBaseUrl.hostname}:${this.httpdBaseUrl.port}/${rid}`;
347    }
348  
349    public get httpdBaseUrl(): BaseUrl {
350      if (!this.#httpdBaseUrl) {
351        throw new Error("No httpd service running");
352      }
353  
354      return this.#httpdBaseUrl;
355    }
356  
357    public git(args: string[] = [], opts?: SpawnOptions): SpawnResult {
358      return this.spawn("git", args, { ...opts });
359    }
360  
361    public rad(args: string[] = [], opts?: SpawnOptions): SpawnResult {
362      return this.spawn("rad", args, { ...opts });
363    }
364  
365    public spawn(
366      cmd: string,
367      args: string[] = [],
368      opts?: SpawnOptions,
369    ): SpawnResult {
370      const prefix = logPrefix(`${this.#name} ${cmd}`);
371      const outputLog = this.#outputLog;
372  
373      function* logWithPrefix(line: unknown) {
374        if (typeof line === "string") {
375          outputLog.write(`${prefix} ${line}\n`, "utf8");
376        }
377        yield line;
378      }
379  
380      const childProcess = execa(cmd, args, {
381        ...opts,
382        env: {
383          GIT_CONFIG_GLOBAL: "/dev/null",
384          GIT_CONFIG_NOSYSTEM: "1",
385          RAD_HOME: this.#radHome,
386          RAD_PASSPHRASE: "asdf",
387          RAD_LOCAL_TIME: "1671125284",
388          RAD_KEYGEN_SEED: this.#radSeed,
389          RAD_SOCKET: this.#socket,
390          ...opts?.env,
391          ...this.#gitOptions,
392        },
393        encoding: "utf8",
394        stdout: logWithPrefix,
395        stderr: logWithPrefix,
396      });
397  
398      this.#childProcesses.push(childProcess);
399  
400      return childProcess;
401    }
402  }
403  
404  async function updateConfig(radHome: string, configParams: Partial<Config>) {
405    const configPath = Path.join(radHome, "config.json");
406    const configFile = await Fs.readFile(configPath, "utf-8");
407    const config = configSchema.parse({
408      defaultConfig,
409      ...JSON.parse(configFile),
410    });
411    config.preferredSeeds = [];
412    config.web = { ...config.web, ...configParams.web };
413    config.node = {
414      ...config.node,
415      ...configParams.node,
416      network: "test",
417    };
418    await Fs.writeFile(configPath, JSON.stringify(config), "utf-8");
419  }