/ cli / src / plugins.rs
plugins.rs
  1  use std::{
  2      env, fs,
  3      io::ErrorKind,
  4      os::unix::prelude::PermissionsExt,
  5      path::{Path, PathBuf},
  6      process,
  7  };
  8  
  9  use async_compression::tokio::bufread::GzipDecoder;
 10  use clap::{Args, Subcommand};
 11  use colored::*;
 12  use error_stack::{Result, ResultExt};
 13  use futures::stream::TryStreamExt;
 14  use reqwest::Url;
 15  use tabled::{settings::Style, Table, Tabled};
 16  use tokio_util::io::StreamReader;
 17  
 18  use crate::{error::CliError, paths::plugins_dir};
 19  
 20  static GITHUB_REPO_ORG: &str = "apibara";
 21  static GITHUB_REPO_NAME: &str = "dna";
 22  
 23  #[derive(Debug, Args)]
 24  pub struct PluginsArgs {
 25      #[command(subcommand)]
 26      subcommand: Command,
 27  }
 28  
 29  #[derive(Debug, Subcommand)]
 30  pub enum Command {
 31      /// Install a new plugin.
 32      Install(InstallArgs),
 33      /// List all installed plugins.
 34      List(ListArgs),
 35      /// Remove an installed plugin.
 36      Remove(RemoveArgs),
 37  }
 38  
 39  #[derive(Debug, Tabled)]
 40  #[tabled(rename_all = "SCREAMING_SNAKE_CASE")]
 41  struct PluginInfo {
 42      name: String,
 43      kind: String,
 44      version: String,
 45  }
 46  
 47  #[derive(Debug, Args)]
 48  pub struct InstallArgs {
 49      /// The name of the plugin to install, e.g. `sink-postgres`.
 50      name: Option<String>,
 51      /// Install the plugin from the given file.
 52      #[arg(long, short = 'f')]
 53      file: Option<PathBuf>,
 54  }
 55  
 56  #[derive(Debug, Args)]
 57  pub struct ListArgs {}
 58  
 59  #[derive(Debug, Args)]
 60  pub struct RemoveArgs {
 61      /// The type of plugin to remove, e.g. `sink`.
 62      kind: String,
 63      /// The name of the plugin to remove, e.g. `mongo`.
 64      name: String,
 65  }
 66  
 67  pub async fn run(args: PluginsArgs) -> Result<(), CliError> {
 68      match args.subcommand {
 69          Command::Install(args) => run_install(args).await,
 70          Command::List(args) => run_list(args),
 71          Command::Remove(args) => run_remove(args),
 72      }
 73  }
 74  
 75  async fn run_install(args: InstallArgs) -> Result<(), CliError> {
 76      let dir = plugins_dir();
 77      fs::create_dir_all(dir)
 78          .change_context(CliError)
 79          .attach_printable("failed to created plugins directory")?;
 80      let cwd = env::current_dir().change_context(CliError)?;
 81  
 82      if let Some(file) = args.file {
 83          install_from_file(cwd.join(file))?;
 84      } else if let Some(name) = args.name {
 85          install_from_github(name).await?;
 86      }
 87  
 88      Ok(())
 89  }
 90  
 91  async fn install_from_github(name: String) -> Result<(), CliError> {
 92      let (kind, name) = name
 93          .split_once('-')
 94          .ok_or(CliError)
 95          .attach_printable("Plugin name must be in the format <kind>-<name>")?;
 96  
 97      let releases = octocrab::instance()
 98          .repos(GITHUB_REPO_ORG, GITHUB_REPO_NAME)
 99          .releases()
100          .list()
101          .per_page(50)
102          .send()
103          .await
104          .change_context(CliError)
105          .attach_printable("failed to fetch GitHub releases")?;
106  
107      let tag_prefix = format!("{}-{}/", kind, name);
108      let mut plugin_release = None;
109      let all_releases = octocrab::instance()
110          .all_pages(releases)
111          .await
112          .change_context(CliError)
113          .attach_printable("failed to fetch GitHub releases")?;
114      for release in all_releases {
115          if !release.prerelease && release.tag_name.starts_with(&tag_prefix) {
116              plugin_release = Some(release);
117              break;
118          }
119      }
120  
121      let plugin_release = plugin_release.ok_or(CliError).attach_printable_lazy(|| {
122          format!(
123              "No release found for plugin {}-{}. Did you spell it correctly?",
124              kind, name
125          )
126      })?;
127  
128      println!(
129          "Found release {}",
130          plugin_release
131              .name
132              .unwrap_or(plugin_release.tag_name)
133              .green()
134      );
135  
136      let info = PluginInfo::from_kind_name(kind.to_string(), name.to_string());
137  
138      let artifact_name = info.artifact_name(env::consts::OS, env::consts::ARCH);
139      let asset = plugin_release
140          .assets
141          .iter()
142          .find(|asset| asset.name == artifact_name)
143          .ok_or(CliError)
144          .attach_printable_lazy(|| {
145              format!(
146                  "No asset found for plugin {}-{} for your platform. OS={}, ARCH={}",
147                  kind,
148                  name,
149                  env::consts::OS,
150                  env::consts::ARCH
151              )
152          })?;
153  
154      println!("Downloading {}...", asset.name.blue());
155  
156      let target = plugins_dir().join(info.binary_name());
157      download_artifact_to_path(asset.browser_download_url.clone(), &target).await?;
158  
159      println!("Plugin {} installed to {}", info.name, target.display());
160  
161      Ok(())
162  }
163  
164  fn install_from_file(file: impl AsRef<Path>) -> Result<(), CliError> {
165      let (name, version) = get_binary_name_version(&file)?;
166  
167      println!("Installing {} v{}", name, version);
168  
169      let target = plugins_dir().join(name);
170      // Copy the binary content to a new file to avoid copying the permissions.
171      let content = fs::read(&file)
172          .change_context(CliError)
173          .attach_printable_lazy(|| format!("failed to read content of file {:?}", file.as_ref()))?;
174      fs::write(&target, content)
175          .change_context(CliError)
176          .attach_printable_lazy(|| format!("failed to write plugin to file {:?}", &target))?;
177      fs::set_permissions(&target, fs::Permissions::from_mode(0o755))
178          .change_context(CliError)
179          .attach_printable_lazy(|| {
180              format!("failed to update plugin permissions at {:?}", &target)
181          })?;
182  
183      println!("Plugin installed to {}", target.display());
184  
185      Ok(())
186  }
187  
188  fn run_list(_args: ListArgs) -> Result<(), CliError> {
189      let dir = plugins_dir();
190      let plugins = get_plugins(dir)?;
191  
192      let table = Table::new(plugins).with(Style::rounded()).to_string();
193      println!("{}", table);
194  
195      Ok(())
196  }
197  
198  fn run_remove(args: RemoveArgs) -> Result<(), CliError> {
199      let dir = plugins_dir();
200      let plugin = PluginInfo::from_kind_name(args.kind, args.name);
201      let plugin_path = dir.join(plugin.binary_name());
202  
203      let (name, version) = get_binary_name_version(&plugin_path)?;
204  
205      println!("Removing {} v{}", name, version);
206      fs::remove_file(plugin_path.clone())
207          .change_context(CliError)
208          .attach_printable_lazy(|| format!("failed to remove plugin at {:?}", plugin_path))?;
209  
210      Ok(())
211  }
212  
213  fn get_plugins(dir: impl AsRef<Path>) -> Result<Vec<PluginInfo>, CliError> {
214      if !dir.as_ref().is_dir() {
215          return Ok(vec![]);
216      }
217  
218      let mut plugins = Vec::default();
219      for file in fs::read_dir(dir).change_context(CliError)? {
220          let file = file.change_context(CliError)?;
221  
222          let metadata = file.metadata().change_context(CliError)?;
223          if !metadata.is_file() || !metadata.permissions().mode() & 0o111 != 0 {
224              eprintln!(
225                  "{} {:?}",
226                  "Plugins directory contains non-executable file".yellow(),
227                  file.path()
228              );
229              continue;
230          }
231  
232          let (name, version) = get_binary_name_version(file.path()).attach_printable_lazy(|| {
233              format!(
234                  "Failed to get plugin version: {}",
235                  file.file_name().to_string_lossy()
236              )
237          })?;
238          let info = PluginInfo::from_name_version(name, version)?;
239          plugins.push(info);
240      }
241  
242      Ok(plugins)
243  }
244  
245  /// Runs the given plugin binary to extract the name and version.
246  fn get_binary_name_version(file: impl AsRef<Path>) -> Result<(String, String), CliError> {
247      let output = process::Command::new(file.as_ref())
248          .arg("--version")
249          .output()
250          .change_context(CliError)?;
251  
252      let output = String::from_utf8(output.stdout).change_context(CliError)?;
253      let (name, version) = output
254          .trim()
255          .split_once(' ')
256          .ok_or(CliError)
257          .attach_printable("Plugin --version output does not match spec")?;
258      Ok((name.to_string(), version.to_string()))
259  }
260  
261  impl PluginInfo {
262      pub fn from_kind_name(kind: String, name: String) -> Self {
263          Self {
264              name,
265              kind,
266              version: String::default(),
267          }
268      }
269  
270      pub fn from_name_version(name: String, version: String) -> Result<Self, CliError> {
271          let mut parts = name.splitn(3, '-');
272          let _ = parts
273              .next()
274              .ok_or(CliError)
275              .attach_printable("Plugin name is empty")?;
276          let kind = parts
277              .next()
278              .ok_or(CliError)
279              .attach_printable("Plugin name does not contain a kind")?
280              .to_string();
281          let name = parts
282              .next()
283              .ok_or(CliError)
284              .attach_printable("Plugin name does not contain a kind")?
285              .to_string();
286  
287          Ok(Self {
288              name,
289              version,
290              kind,
291          })
292      }
293  
294      pub fn binary_name(&self) -> String {
295          format!("apibara-{}-{}", self.kind, self.name)
296      }
297  
298      pub fn artifact_name(&self, os: &str, arch: &str) -> String {
299          format!("{}-{}-{}-{}.gz", self.kind, self.name, arch, os)
300      }
301  }
302  
303  async fn download_artifact_to_path(url: Url, dest: impl AsRef<Path>) -> Result<(), CliError> {
304      let response = reqwest::get(url.clone())
305          .await
306          .change_context(CliError)
307          .attach_printable_lazy(|| format!("failed to GET {url}"))?;
308      let stream = response
309          .bytes_stream()
310          .map_err(|err| std::io::Error::new(ErrorKind::Other, err));
311  
312      let stream_reader = StreamReader::new(stream);
313      let mut decompressed = GzipDecoder::new(stream_reader);
314  
315      let mut file = tokio::fs::File::create(&dest)
316          .await
317          .change_context(CliError)
318          .attach_printable_lazy(|| format!("failed to create file {:?}", dest.as_ref()))?;
319  
320      tokio::io::copy(&mut decompressed, &mut file)
321          .await
322          .change_context(CliError)
323          .attach_printable("failed to copy artifact content")?;
324  
325      file.set_permissions(fs::Permissions::from_mode(0o755))
326          .await
327          .change_context(CliError)
328          .attach_printable("failed to set permissions on artifact file")?;
329  
330      Ok(())
331  }