plugins.rs
1 use std::{ 2 env, fs, 3 io::ErrorKind, 4 os::unix::prelude::PermissionsExt, 5 path::{Path, PathBuf}, 6 process, 7 }; 8 9 use async_compression::tokio::bufread::GzipDecoder; 10 use clap::{Args, Subcommand}; 11 use colored::*; 12 use error_stack::{Result, ResultExt}; 13 use futures::stream::TryStreamExt; 14 use reqwest::Url; 15 use tabled::{settings::Style, Table, Tabled}; 16 use tokio_util::io::StreamReader; 17 18 use crate::{error::CliError, paths::plugins_dir}; 19 20 static GITHUB_REPO_ORG: &str = "apibara"; 21 static GITHUB_REPO_NAME: &str = "dna"; 22 23 #[derive(Debug, Args)] 24 pub struct PluginsArgs { 25 #[command(subcommand)] 26 subcommand: Command, 27 } 28 29 #[derive(Debug, Subcommand)] 30 pub enum Command { 31 /// Install a new plugin. 32 Install(InstallArgs), 33 /// List all installed plugins. 34 List(ListArgs), 35 /// Remove an installed plugin. 36 Remove(RemoveArgs), 37 } 38 39 #[derive(Debug, Tabled)] 40 #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] 41 struct PluginInfo { 42 name: String, 43 kind: String, 44 version: String, 45 } 46 47 #[derive(Debug, Args)] 48 pub struct InstallArgs { 49 /// The name of the plugin to install, e.g. `sink-postgres`. 50 name: Option<String>, 51 /// Install the plugin from the given file. 52 #[arg(long, short = 'f')] 53 file: Option<PathBuf>, 54 } 55 56 #[derive(Debug, Args)] 57 pub struct ListArgs {} 58 59 #[derive(Debug, Args)] 60 pub struct RemoveArgs { 61 /// The type of plugin to remove, e.g. `sink`. 62 kind: String, 63 /// The name of the plugin to remove, e.g. `mongo`. 64 name: String, 65 } 66 67 pub async fn run(args: PluginsArgs) -> Result<(), CliError> { 68 match args.subcommand { 69 Command::Install(args) => run_install(args).await, 70 Command::List(args) => run_list(args), 71 Command::Remove(args) => run_remove(args), 72 } 73 } 74 75 async fn run_install(args: InstallArgs) -> Result<(), CliError> { 76 let dir = plugins_dir(); 77 fs::create_dir_all(dir) 78 .change_context(CliError) 79 .attach_printable("failed to created plugins directory")?; 80 let cwd = env::current_dir().change_context(CliError)?; 81 82 if let Some(file) = args.file { 83 install_from_file(cwd.join(file))?; 84 } else if let Some(name) = args.name { 85 install_from_github(name).await?; 86 } 87 88 Ok(()) 89 } 90 91 async fn install_from_github(name: String) -> Result<(), CliError> { 92 let (kind, name) = name 93 .split_once('-') 94 .ok_or(CliError) 95 .attach_printable("Plugin name must be in the format <kind>-<name>")?; 96 97 let releases = octocrab::instance() 98 .repos(GITHUB_REPO_ORG, GITHUB_REPO_NAME) 99 .releases() 100 .list() 101 .per_page(50) 102 .send() 103 .await 104 .change_context(CliError) 105 .attach_printable("failed to fetch GitHub releases")?; 106 107 let tag_prefix = format!("{}-{}/", kind, name); 108 let mut plugin_release = None; 109 let all_releases = octocrab::instance() 110 .all_pages(releases) 111 .await 112 .change_context(CliError) 113 .attach_printable("failed to fetch GitHub releases")?; 114 for release in all_releases { 115 if !release.prerelease && release.tag_name.starts_with(&tag_prefix) { 116 plugin_release = Some(release); 117 break; 118 } 119 } 120 121 let plugin_release = plugin_release.ok_or(CliError).attach_printable_lazy(|| { 122 format!( 123 "No release found for plugin {}-{}. Did you spell it correctly?", 124 kind, name 125 ) 126 })?; 127 128 println!( 129 "Found release {}", 130 plugin_release 131 .name 132 .unwrap_or(plugin_release.tag_name) 133 .green() 134 ); 135 136 let info = PluginInfo::from_kind_name(kind.to_string(), name.to_string()); 137 138 let artifact_name = info.artifact_name(env::consts::OS, env::consts::ARCH); 139 let asset = plugin_release 140 .assets 141 .iter() 142 .find(|asset| asset.name == artifact_name) 143 .ok_or(CliError) 144 .attach_printable_lazy(|| { 145 format!( 146 "No asset found for plugin {}-{} for your platform. OS={}, ARCH={}", 147 kind, 148 name, 149 env::consts::OS, 150 env::consts::ARCH 151 ) 152 })?; 153 154 println!("Downloading {}...", asset.name.blue()); 155 156 let target = plugins_dir().join(info.binary_name()); 157 download_artifact_to_path(asset.browser_download_url.clone(), &target).await?; 158 159 println!("Plugin {} installed to {}", info.name, target.display()); 160 161 Ok(()) 162 } 163 164 fn install_from_file(file: impl AsRef<Path>) -> Result<(), CliError> { 165 let (name, version) = get_binary_name_version(&file)?; 166 167 println!("Installing {} v{}", name, version); 168 169 let target = plugins_dir().join(name); 170 // Copy the binary content to a new file to avoid copying the permissions. 171 let content = fs::read(&file) 172 .change_context(CliError) 173 .attach_printable_lazy(|| format!("failed to read content of file {:?}", file.as_ref()))?; 174 fs::write(&target, content) 175 .change_context(CliError) 176 .attach_printable_lazy(|| format!("failed to write plugin to file {:?}", &target))?; 177 fs::set_permissions(&target, fs::Permissions::from_mode(0o755)) 178 .change_context(CliError) 179 .attach_printable_lazy(|| { 180 format!("failed to update plugin permissions at {:?}", &target) 181 })?; 182 183 println!("Plugin installed to {}", target.display()); 184 185 Ok(()) 186 } 187 188 fn run_list(_args: ListArgs) -> Result<(), CliError> { 189 let dir = plugins_dir(); 190 let plugins = get_plugins(dir)?; 191 192 let table = Table::new(plugins).with(Style::rounded()).to_string(); 193 println!("{}", table); 194 195 Ok(()) 196 } 197 198 fn run_remove(args: RemoveArgs) -> Result<(), CliError> { 199 let dir = plugins_dir(); 200 let plugin = PluginInfo::from_kind_name(args.kind, args.name); 201 let plugin_path = dir.join(plugin.binary_name()); 202 203 let (name, version) = get_binary_name_version(&plugin_path)?; 204 205 println!("Removing {} v{}", name, version); 206 fs::remove_file(plugin_path.clone()) 207 .change_context(CliError) 208 .attach_printable_lazy(|| format!("failed to remove plugin at {:?}", plugin_path))?; 209 210 Ok(()) 211 } 212 213 fn get_plugins(dir: impl AsRef<Path>) -> Result<Vec<PluginInfo>, CliError> { 214 if !dir.as_ref().is_dir() { 215 return Ok(vec![]); 216 } 217 218 let mut plugins = Vec::default(); 219 for file in fs::read_dir(dir).change_context(CliError)? { 220 let file = file.change_context(CliError)?; 221 222 let metadata = file.metadata().change_context(CliError)?; 223 if !metadata.is_file() || !metadata.permissions().mode() & 0o111 != 0 { 224 eprintln!( 225 "{} {:?}", 226 "Plugins directory contains non-executable file".yellow(), 227 file.path() 228 ); 229 continue; 230 } 231 232 let (name, version) = get_binary_name_version(file.path()).attach_printable_lazy(|| { 233 format!( 234 "Failed to get plugin version: {}", 235 file.file_name().to_string_lossy() 236 ) 237 })?; 238 let info = PluginInfo::from_name_version(name, version)?; 239 plugins.push(info); 240 } 241 242 Ok(plugins) 243 } 244 245 /// Runs the given plugin binary to extract the name and version. 246 fn get_binary_name_version(file: impl AsRef<Path>) -> Result<(String, String), CliError> { 247 let output = process::Command::new(file.as_ref()) 248 .arg("--version") 249 .output() 250 .change_context(CliError)?; 251 252 let output = String::from_utf8(output.stdout).change_context(CliError)?; 253 let (name, version) = output 254 .trim() 255 .split_once(' ') 256 .ok_or(CliError) 257 .attach_printable("Plugin --version output does not match spec")?; 258 Ok((name.to_string(), version.to_string())) 259 } 260 261 impl PluginInfo { 262 pub fn from_kind_name(kind: String, name: String) -> Self { 263 Self { 264 name, 265 kind, 266 version: String::default(), 267 } 268 } 269 270 pub fn from_name_version(name: String, version: String) -> Result<Self, CliError> { 271 let mut parts = name.splitn(3, '-'); 272 let _ = parts 273 .next() 274 .ok_or(CliError) 275 .attach_printable("Plugin name is empty")?; 276 let kind = parts 277 .next() 278 .ok_or(CliError) 279 .attach_printable("Plugin name does not contain a kind")? 280 .to_string(); 281 let name = parts 282 .next() 283 .ok_or(CliError) 284 .attach_printable("Plugin name does not contain a kind")? 285 .to_string(); 286 287 Ok(Self { 288 name, 289 version, 290 kind, 291 }) 292 } 293 294 pub fn binary_name(&self) -> String { 295 format!("apibara-{}-{}", self.kind, self.name) 296 } 297 298 pub fn artifact_name(&self, os: &str, arch: &str) -> String { 299 format!("{}-{}-{}-{}.gz", self.kind, self.name, arch, os) 300 } 301 } 302 303 async fn download_artifact_to_path(url: Url, dest: impl AsRef<Path>) -> Result<(), CliError> { 304 let response = reqwest::get(url.clone()) 305 .await 306 .change_context(CliError) 307 .attach_printable_lazy(|| format!("failed to GET {url}"))?; 308 let stream = response 309 .bytes_stream() 310 .map_err(|err| std::io::Error::new(ErrorKind::Other, err)); 311 312 let stream_reader = StreamReader::new(stream); 313 let mut decompressed = GzipDecoder::new(stream_reader); 314 315 let mut file = tokio::fs::File::create(&dest) 316 .await 317 .change_context(CliError) 318 .attach_printable_lazy(|| format!("failed to create file {:?}", dest.as_ref()))?; 319 320 tokio::io::copy(&mut decompressed, &mut file) 321 .await 322 .change_context(CliError) 323 .attach_printable("failed to copy artifact content")?; 324 325 file.set_permissions(fs::Permissions::from_mode(0o755)) 326 .await 327 .change_context(CliError) 328 .attach_printable("failed to set permissions on artifact file")?; 329 330 Ok(()) 331 }