/ plugin / loader / loader.go
loader.go
  1  package loader
  2  
  3  import (
  4  	"encoding/json"
  5  	"errors"
  6  	"fmt"
  7  	"io"
  8  	"os"
  9  	"path/filepath"
 10  	"runtime"
 11  	"strings"
 12  
 13  	config "github.com/ipfs/kubo/config"
 14  	"github.com/ipld/go-ipld-prime/multicodec"
 15  
 16  	"github.com/ipfs/kubo/core"
 17  	"github.com/ipfs/kubo/core/coreapi"
 18  	plugin "github.com/ipfs/kubo/plugin"
 19  	fsrepo "github.com/ipfs/kubo/repo/fsrepo"
 20  
 21  	logging "github.com/ipfs/go-log/v2"
 22  	opentracing "github.com/opentracing/opentracing-go"
 23  )
 24  
 25  var preloadPlugins []plugin.Plugin
 26  
 27  // Preload adds one or more plugins to the preload list. This should _only_ be called during init.
 28  func Preload(plugins ...plugin.Plugin) {
 29  	preloadPlugins = append(preloadPlugins, plugins...)
 30  }
 31  
 32  var log = logging.Logger("plugin/loader")
 33  
 34  var loadPluginFunc = func(string) ([]plugin.Plugin, error) {
 35  	return nil, fmt.Errorf("unsupported platform %s", runtime.GOOS)
 36  }
 37  
 38  type loaderState int
 39  
 40  const (
 41  	loaderLoading loaderState = iota
 42  	loaderInitializing
 43  	loaderInitialized
 44  	loaderInjecting
 45  	loaderInjected
 46  	loaderStarting
 47  	loaderStarted
 48  	loaderClosing
 49  	loaderClosed
 50  	loaderFailed
 51  )
 52  
 53  func (ls loaderState) String() string {
 54  	switch ls {
 55  	case loaderLoading:
 56  		return "Loading"
 57  	case loaderInitializing:
 58  		return "Initializing"
 59  	case loaderInitialized:
 60  		return "Initialized"
 61  	case loaderInjecting:
 62  		return "Injecting"
 63  	case loaderInjected:
 64  		return "Injected"
 65  	case loaderStarting:
 66  		return "Starting"
 67  	case loaderStarted:
 68  		return "Started"
 69  	case loaderClosing:
 70  		return "Closing"
 71  	case loaderClosed:
 72  		return "Closed"
 73  	case loaderFailed:
 74  		return "Failed"
 75  	default:
 76  		return "Unknown"
 77  	}
 78  }
 79  
 80  // PluginLoader keeps track of loaded plugins.
 81  //
 82  // To use:
 83  //  1. Load any desired plugins with Load and LoadDirectory. Preloaded plugins
 84  //     will automatically be loaded.
 85  //  2. Call Initialize to run all initialization logic.
 86  //  3. Call Inject to register the plugins.
 87  //  4. Optionally call Start to start plugins.
 88  //  5. Call Close to close all plugins.
 89  type PluginLoader struct {
 90  	state   loaderState
 91  	plugins []plugin.Plugin
 92  	started []plugin.Plugin
 93  	config  config.Plugins
 94  	repo    string
 95  }
 96  
 97  // NewPluginLoader creates new plugin loader.
 98  func NewPluginLoader(repo string) (*PluginLoader, error) {
 99  	loader := &PluginLoader{plugins: make([]plugin.Plugin, 0, len(preloadPlugins)), repo: repo}
100  	if repo != "" {
101  		switch plugins, err := readPluginsConfig(repo, config.DefaultConfigFile); {
102  		case err == nil:
103  			loader.config = plugins
104  		case os.IsNotExist(err):
105  		default:
106  			return nil, err
107  		}
108  	}
109  
110  	for _, v := range preloadPlugins {
111  		if err := loader.Load(v); err != nil {
112  			return nil, err
113  		}
114  	}
115  
116  	if err := loader.LoadDirectory(filepath.Join(repo, "plugins")); err != nil {
117  		return nil, err
118  	}
119  	return loader, nil
120  }
121  
122  // readPluginsConfig reads the Plugins section of the IPFS config, avoiding
123  // reading anything other than the Plugin section. That way, we're free to
124  // make arbitrary changes to all _other_ sections in migrations.
125  func readPluginsConfig(repoRoot string, userConfigFile string) (config.Plugins, error) {
126  	var cfg struct {
127  		Plugins config.Plugins
128  	}
129  
130  	cfgPath, err := config.Filename(repoRoot, userConfigFile)
131  	if err != nil {
132  		return config.Plugins{}, err
133  	}
134  
135  	cfgFile, err := os.Open(cfgPath)
136  	if err != nil {
137  		return config.Plugins{}, err
138  	}
139  	defer cfgFile.Close()
140  
141  	err = json.NewDecoder(cfgFile).Decode(&cfg)
142  	if err != nil {
143  		return config.Plugins{}, err
144  	}
145  
146  	return cfg.Plugins, nil
147  }
148  
149  func (loader *PluginLoader) assertState(state loaderState) error {
150  	if loader.state != state {
151  		return fmt.Errorf("loader state must be %s, was %s", state, loader.state)
152  	}
153  	return nil
154  }
155  
156  func (loader *PluginLoader) transition(from, to loaderState) error {
157  	if err := loader.assertState(from); err != nil {
158  		return err
159  	}
160  	loader.state = to
161  	return nil
162  }
163  
164  // Load loads a plugin into the plugin loader.
165  func (loader *PluginLoader) Load(pl plugin.Plugin) error {
166  	if err := loader.assertState(loaderLoading); err != nil {
167  		return err
168  	}
169  
170  	name := pl.Name()
171  
172  	for _, p := range loader.plugins {
173  		if p.Name() == name {
174  			// plugin is already loaded
175  			return fmt.Errorf(
176  				"plugin: %s, is duplicated in version: %s, "+
177  					"while trying to load dynamically: %s",
178  				name, p.Version(), pl.Version())
179  		}
180  	}
181  
182  	if loader.config.Plugins[name].Disabled {
183  		log.Infof("not loading disabled plugin %s", name)
184  		return nil
185  	}
186  	loader.plugins = append(loader.plugins, pl)
187  	return nil
188  }
189  
190  // LoadDirectory loads a directory of plugins into the plugin loader.
191  func (loader *PluginLoader) LoadDirectory(pluginDir string) error {
192  	if err := loader.assertState(loaderLoading); err != nil {
193  		return err
194  	}
195  	newPls, err := loadDynamicPlugins(pluginDir)
196  	if err != nil {
197  		return err
198  	}
199  
200  	for _, pl := range newPls {
201  		if err := loader.Load(pl); err != nil {
202  			return err
203  		}
204  	}
205  	return nil
206  }
207  
208  func loadDynamicPlugins(pluginDir string) ([]plugin.Plugin, error) {
209  	_, err := os.Stat(pluginDir)
210  	if os.IsNotExist(err) {
211  		return nil, nil
212  	}
213  	if err != nil {
214  		return nil, err
215  	}
216  
217  	var plugins []plugin.Plugin
218  
219  	err = filepath.Walk(pluginDir, func(fi string, info os.FileInfo, err error) error {
220  		if err != nil {
221  			return err
222  		}
223  		if info.IsDir() {
224  			if fi != pluginDir {
225  				log.Warnf("found directory inside plugins directory: %s", fi)
226  			}
227  			return nil
228  		}
229  
230  		if info.Mode().Perm()&0o111 == 0 {
231  			// file is not executable let's not load it
232  			// this is to prevent loading plugins from for example non-executable
233  			// mounts, some /tmp mounts are marked as such for security
234  			log.Errorf("non-executable file in plugins directory: %s", fi)
235  			return nil
236  		}
237  
238  		if newPlugins, err := loadPluginFunc(fi); err == nil {
239  			plugins = append(plugins, newPlugins...)
240  		} else {
241  			return fmt.Errorf("loading plugin %s: %s", fi, err)
242  		}
243  		return nil
244  	})
245  
246  	return plugins, err
247  }
248  
249  // Initialize initializes all loaded plugins.
250  func (loader *PluginLoader) Initialize() error {
251  	if err := loader.transition(loaderLoading, loaderInitializing); err != nil {
252  		return err
253  	}
254  	for _, p := range loader.plugins {
255  		err := p.Init(&plugin.Environment{
256  			Repo:   loader.repo,
257  			Config: loader.config.Plugins[p.Name()].Config,
258  		})
259  		if err != nil {
260  			loader.state = loaderFailed
261  			return err
262  		}
263  	}
264  
265  	return loader.transition(loaderInitializing, loaderInitialized)
266  }
267  
268  // Inject hooks all the plugins into the appropriate subsystems.
269  func (loader *PluginLoader) Inject() error {
270  	if err := loader.transition(loaderInitialized, loaderInjecting); err != nil {
271  		return err
272  	}
273  
274  	for _, pl := range loader.plugins {
275  		if pl, ok := pl.(plugin.PluginIPLD); ok {
276  			err := injectIPLDPlugin(pl)
277  			if err != nil {
278  				loader.state = loaderFailed
279  				return err
280  			}
281  		}
282  		if pl, ok := pl.(plugin.PluginTracer); ok {
283  			err := injectTracerPlugin(pl)
284  			if err != nil {
285  				loader.state = loaderFailed
286  				return err
287  			}
288  		}
289  		if pl, ok := pl.(plugin.PluginDatastore); ok {
290  			err := injectDatastorePlugin(pl)
291  			if err != nil {
292  				loader.state = loaderFailed
293  				return err
294  			}
295  		}
296  		if pl, ok := pl.(plugin.PluginFx); ok {
297  			err := injectFxPlugin(pl)
298  			if err != nil {
299  				loader.state = loaderFailed
300  				return err
301  			}
302  		}
303  	}
304  
305  	return loader.transition(loaderInjecting, loaderInjected)
306  }
307  
308  // Start starts all long-running plugins.
309  func (loader *PluginLoader) Start(node *core.IpfsNode) error {
310  	if err := loader.transition(loaderInjected, loaderStarting); err != nil {
311  		return err
312  	}
313  	iface, err := coreapi.NewCoreAPI(node)
314  	if err != nil {
315  		return err
316  	}
317  	for _, pl := range loader.plugins {
318  		if pl, ok := pl.(plugin.PluginDaemon); ok {
319  			err := pl.Start(iface)
320  			if err != nil {
321  				_ = loader.Close()
322  				return err
323  			}
324  			loader.started = append(loader.started, pl)
325  		}
326  		if pl, ok := pl.(plugin.PluginDaemonInternal); ok {
327  			err := pl.Start(node)
328  			if err != nil {
329  				_ = loader.Close()
330  				return err
331  			}
332  			loader.started = append(loader.started, pl)
333  		}
334  	}
335  
336  	return loader.transition(loaderStarting, loaderStarted)
337  }
338  
339  // Close stops all long-running plugins.
340  func (loader *PluginLoader) Close() error {
341  	switch loader.state {
342  	case loaderClosing, loaderFailed, loaderClosed:
343  		// nothing to do.
344  		return nil
345  	}
346  	loader.state = loaderClosing
347  
348  	var errs []string
349  	started := loader.started
350  	loader.started = nil
351  	for _, pl := range started {
352  		if closer, ok := pl.(io.Closer); ok {
353  			err := closer.Close()
354  			if err != nil {
355  				errs = append(errs, fmt.Sprintf(
356  					"error closing plugin %s: %s",
357  					pl.Name(),
358  					err.Error(),
359  				))
360  			}
361  		}
362  	}
363  	if errs != nil {
364  		loader.state = loaderFailed
365  		return errors.New(strings.Join(errs, "\n"))
366  	}
367  	loader.state = loaderClosed
368  	return nil
369  }
370  
371  func injectDatastorePlugin(pl plugin.PluginDatastore) error {
372  	return fsrepo.AddDatastoreConfigHandler(pl.DatastoreTypeName(), pl.DatastoreConfigParser())
373  }
374  
375  func injectIPLDPlugin(pl plugin.PluginIPLD) error {
376  	return pl.Register(multicodec.DefaultRegistry)
377  }
378  
379  func injectTracerPlugin(pl plugin.PluginTracer) error {
380  	log.Warn("Tracer plugins are deprecated, it's recommended to configure an OpenTelemetry collector instead.")
381  	tracer, err := pl.InitTracer()
382  	if err != nil {
383  		return err
384  	}
385  	opentracing.SetGlobalTracer(tracer)
386  	return nil
387  }
388  
389  func injectFxPlugin(pl plugin.PluginFx) error {
390  	core.RegisterFXOptionFunc(pl.Options)
391  	return nil
392  }