loader.go
1 package loader 2 3 import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "io" 8 "os" 9 "path/filepath" 10 "runtime" 11 "strings" 12 13 config "github.com/ipfs/kubo/config" 14 "github.com/ipld/go-ipld-prime/multicodec" 15 16 "github.com/ipfs/kubo/core" 17 "github.com/ipfs/kubo/core/coreapi" 18 plugin "github.com/ipfs/kubo/plugin" 19 fsrepo "github.com/ipfs/kubo/repo/fsrepo" 20 21 logging "github.com/ipfs/go-log/v2" 22 opentracing "github.com/opentracing/opentracing-go" 23 ) 24 25 var preloadPlugins []plugin.Plugin 26 27 // Preload adds one or more plugins to the preload list. This should _only_ be called during init. 28 func Preload(plugins ...plugin.Plugin) { 29 preloadPlugins = append(preloadPlugins, plugins...) 30 } 31 32 var log = logging.Logger("plugin/loader") 33 34 var loadPluginFunc = func(string) ([]plugin.Plugin, error) { 35 return nil, fmt.Errorf("unsupported platform %s", runtime.GOOS) 36 } 37 38 type loaderState int 39 40 const ( 41 loaderLoading loaderState = iota 42 loaderInitializing 43 loaderInitialized 44 loaderInjecting 45 loaderInjected 46 loaderStarting 47 loaderStarted 48 loaderClosing 49 loaderClosed 50 loaderFailed 51 ) 52 53 func (ls loaderState) String() string { 54 switch ls { 55 case loaderLoading: 56 return "Loading" 57 case loaderInitializing: 58 return "Initializing" 59 case loaderInitialized: 60 return "Initialized" 61 case loaderInjecting: 62 return "Injecting" 63 case loaderInjected: 64 return "Injected" 65 case loaderStarting: 66 return "Starting" 67 case loaderStarted: 68 return "Started" 69 case loaderClosing: 70 return "Closing" 71 case loaderClosed: 72 return "Closed" 73 case loaderFailed: 74 return "Failed" 75 default: 76 return "Unknown" 77 } 78 } 79 80 // PluginLoader keeps track of loaded plugins. 81 // 82 // To use: 83 // 1. Load any desired plugins with Load and LoadDirectory. Preloaded plugins 84 // will automatically be loaded. 85 // 2. Call Initialize to run all initialization logic. 86 // 3. Call Inject to register the plugins. 87 // 4. Optionally call Start to start plugins. 88 // 5. Call Close to close all plugins. 89 type PluginLoader struct { 90 state loaderState 91 plugins []plugin.Plugin 92 started []plugin.Plugin 93 config config.Plugins 94 repo string 95 } 96 97 // NewPluginLoader creates new plugin loader. 98 func NewPluginLoader(repo string) (*PluginLoader, error) { 99 loader := &PluginLoader{plugins: make([]plugin.Plugin, 0, len(preloadPlugins)), repo: repo} 100 if repo != "" { 101 switch plugins, err := readPluginsConfig(repo, config.DefaultConfigFile); { 102 case err == nil: 103 loader.config = plugins 104 case os.IsNotExist(err): 105 default: 106 return nil, err 107 } 108 } 109 110 for _, v := range preloadPlugins { 111 if err := loader.Load(v); err != nil { 112 return nil, err 113 } 114 } 115 116 if err := loader.LoadDirectory(filepath.Join(repo, "plugins")); err != nil { 117 return nil, err 118 } 119 return loader, nil 120 } 121 122 // readPluginsConfig reads the Plugins section of the IPFS config, avoiding 123 // reading anything other than the Plugin section. That way, we're free to 124 // make arbitrary changes to all _other_ sections in migrations. 125 func readPluginsConfig(repoRoot string, userConfigFile string) (config.Plugins, error) { 126 var cfg struct { 127 Plugins config.Plugins 128 } 129 130 cfgPath, err := config.Filename(repoRoot, userConfigFile) 131 if err != nil { 132 return config.Plugins{}, err 133 } 134 135 cfgFile, err := os.Open(cfgPath) 136 if err != nil { 137 return config.Plugins{}, err 138 } 139 defer cfgFile.Close() 140 141 err = json.NewDecoder(cfgFile).Decode(&cfg) 142 if err != nil { 143 return config.Plugins{}, err 144 } 145 146 return cfg.Plugins, nil 147 } 148 149 func (loader *PluginLoader) assertState(state loaderState) error { 150 if loader.state != state { 151 return fmt.Errorf("loader state must be %s, was %s", state, loader.state) 152 } 153 return nil 154 } 155 156 func (loader *PluginLoader) transition(from, to loaderState) error { 157 if err := loader.assertState(from); err != nil { 158 return err 159 } 160 loader.state = to 161 return nil 162 } 163 164 // Load loads a plugin into the plugin loader. 165 func (loader *PluginLoader) Load(pl plugin.Plugin) error { 166 if err := loader.assertState(loaderLoading); err != nil { 167 return err 168 } 169 170 name := pl.Name() 171 172 for _, p := range loader.plugins { 173 if p.Name() == name { 174 // plugin is already loaded 175 return fmt.Errorf( 176 "plugin: %s, is duplicated in version: %s, "+ 177 "while trying to load dynamically: %s", 178 name, p.Version(), pl.Version()) 179 } 180 } 181 182 if loader.config.Plugins[name].Disabled { 183 log.Infof("not loading disabled plugin %s", name) 184 return nil 185 } 186 loader.plugins = append(loader.plugins, pl) 187 return nil 188 } 189 190 // LoadDirectory loads a directory of plugins into the plugin loader. 191 func (loader *PluginLoader) LoadDirectory(pluginDir string) error { 192 if err := loader.assertState(loaderLoading); err != nil { 193 return err 194 } 195 newPls, err := loadDynamicPlugins(pluginDir) 196 if err != nil { 197 return err 198 } 199 200 for _, pl := range newPls { 201 if err := loader.Load(pl); err != nil { 202 return err 203 } 204 } 205 return nil 206 } 207 208 func loadDynamicPlugins(pluginDir string) ([]plugin.Plugin, error) { 209 _, err := os.Stat(pluginDir) 210 if os.IsNotExist(err) { 211 return nil, nil 212 } 213 if err != nil { 214 return nil, err 215 } 216 217 var plugins []plugin.Plugin 218 219 err = filepath.Walk(pluginDir, func(fi string, info os.FileInfo, err error) error { 220 if err != nil { 221 return err 222 } 223 if info.IsDir() { 224 if fi != pluginDir { 225 log.Warnf("found directory inside plugins directory: %s", fi) 226 } 227 return nil 228 } 229 230 if info.Mode().Perm()&0o111 == 0 { 231 // file is not executable let's not load it 232 // this is to prevent loading plugins from for example non-executable 233 // mounts, some /tmp mounts are marked as such for security 234 log.Errorf("non-executable file in plugins directory: %s", fi) 235 return nil 236 } 237 238 if newPlugins, err := loadPluginFunc(fi); err == nil { 239 plugins = append(plugins, newPlugins...) 240 } else { 241 return fmt.Errorf("loading plugin %s: %s", fi, err) 242 } 243 return nil 244 }) 245 246 return plugins, err 247 } 248 249 // Initialize initializes all loaded plugins. 250 func (loader *PluginLoader) Initialize() error { 251 if err := loader.transition(loaderLoading, loaderInitializing); err != nil { 252 return err 253 } 254 for _, p := range loader.plugins { 255 err := p.Init(&plugin.Environment{ 256 Repo: loader.repo, 257 Config: loader.config.Plugins[p.Name()].Config, 258 }) 259 if err != nil { 260 loader.state = loaderFailed 261 return err 262 } 263 } 264 265 return loader.transition(loaderInitializing, loaderInitialized) 266 } 267 268 // Inject hooks all the plugins into the appropriate subsystems. 269 func (loader *PluginLoader) Inject() error { 270 if err := loader.transition(loaderInitialized, loaderInjecting); err != nil { 271 return err 272 } 273 274 for _, pl := range loader.plugins { 275 if pl, ok := pl.(plugin.PluginIPLD); ok { 276 err := injectIPLDPlugin(pl) 277 if err != nil { 278 loader.state = loaderFailed 279 return err 280 } 281 } 282 if pl, ok := pl.(plugin.PluginTracer); ok { 283 err := injectTracerPlugin(pl) 284 if err != nil { 285 loader.state = loaderFailed 286 return err 287 } 288 } 289 if pl, ok := pl.(plugin.PluginDatastore); ok { 290 err := injectDatastorePlugin(pl) 291 if err != nil { 292 loader.state = loaderFailed 293 return err 294 } 295 } 296 if pl, ok := pl.(plugin.PluginFx); ok { 297 err := injectFxPlugin(pl) 298 if err != nil { 299 loader.state = loaderFailed 300 return err 301 } 302 } 303 } 304 305 return loader.transition(loaderInjecting, loaderInjected) 306 } 307 308 // Start starts all long-running plugins. 309 func (loader *PluginLoader) Start(node *core.IpfsNode) error { 310 if err := loader.transition(loaderInjected, loaderStarting); err != nil { 311 return err 312 } 313 iface, err := coreapi.NewCoreAPI(node) 314 if err != nil { 315 return err 316 } 317 for _, pl := range loader.plugins { 318 if pl, ok := pl.(plugin.PluginDaemon); ok { 319 err := pl.Start(iface) 320 if err != nil { 321 _ = loader.Close() 322 return err 323 } 324 loader.started = append(loader.started, pl) 325 } 326 if pl, ok := pl.(plugin.PluginDaemonInternal); ok { 327 err := pl.Start(node) 328 if err != nil { 329 _ = loader.Close() 330 return err 331 } 332 loader.started = append(loader.started, pl) 333 } 334 } 335 336 return loader.transition(loaderStarting, loaderStarted) 337 } 338 339 // Close stops all long-running plugins. 340 func (loader *PluginLoader) Close() error { 341 switch loader.state { 342 case loaderClosing, loaderFailed, loaderClosed: 343 // nothing to do. 344 return nil 345 } 346 loader.state = loaderClosing 347 348 var errs []string 349 started := loader.started 350 loader.started = nil 351 for _, pl := range started { 352 if closer, ok := pl.(io.Closer); ok { 353 err := closer.Close() 354 if err != nil { 355 errs = append(errs, fmt.Sprintf( 356 "error closing plugin %s: %s", 357 pl.Name(), 358 err.Error(), 359 )) 360 } 361 } 362 } 363 if errs != nil { 364 loader.state = loaderFailed 365 return errors.New(strings.Join(errs, "\n")) 366 } 367 loader.state = loaderClosed 368 return nil 369 } 370 371 func injectDatastorePlugin(pl plugin.PluginDatastore) error { 372 return fsrepo.AddDatastoreConfigHandler(pl.DatastoreTypeName(), pl.DatastoreConfigParser()) 373 } 374 375 func injectIPLDPlugin(pl plugin.PluginIPLD) error { 376 return pl.Register(multicodec.DefaultRegistry) 377 } 378 379 func injectTracerPlugin(pl plugin.PluginTracer) error { 380 log.Warn("Tracer plugins are deprecated, it's recommended to configure an OpenTelemetry collector instead.") 381 tracer, err := pl.InitTracer() 382 if err != nil { 383 return err 384 } 385 opentracing.SetGlobalTracer(tracer) 386 return nil 387 } 388 389 func injectFxPlugin(pl plugin.PluginFx) error { 390 core.RegisterFXOptionFunc(pl.Options) 391 return nil 392 }