/ src / ipc / libmultiprocess / src / mp / proxy.cpp
proxy.cpp
  1  // Copyright (c) The Bitcoin Core developers
  2  // Distributed under the MIT software license, see the accompanying
  3  // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4  
  5  #include <mp/proxy.h>
  6  
  7  #include <mp/proxy-io.h>
  8  #include <mp/proxy-types.h>
  9  #include <mp/proxy.capnp.h>
 10  #include <mp/type-threadmap.h>
 11  #include <mp/util.h>
 12  
 13  #include <atomic>
 14  #include <capnp/capability.h>
 15  #include <capnp/common.h> // IWYU pragma: keep
 16  #include <capnp/rpc.h>
 17  #include <condition_variable>
 18  #include <functional>
 19  #include <future>
 20  #include <kj/async.h>
 21  #include <kj/async-io.h>
 22  #include <kj/async-prelude.h>
 23  #include <kj/common.h>
 24  #include <kj/debug.h>
 25  #include <kj/function.h>
 26  #include <kj/memory.h>
 27  #include <kj/string.h>
 28  #include <map>
 29  #include <memory>
 30  #include <optional>
 31  #include <stdexcept>
 32  #include <string>
 33  #include <sys/socket.h>
 34  #include <thread>
 35  #include <tuple>
 36  #include <unistd.h>
 37  #include <utility>
 38  
 39  namespace mp {
 40  
 41  thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
 42  
 43  void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
 44  {
 45      KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
 46      MP_LOG(m_loop, Log::Error) << "Uncaught exception in daemonized task.";
 47  }
 48  
 49  EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
 50  {
 51      auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
 52      loop_lock->assert_locked(m_loop->m_mutex);
 53      m_loop->m_num_clients += 1;
 54  }
 55  
 56  // Due to the conditionals in this function, MP_NO_TSA is required to avoid
 57  // error "error: mutex 'loop_lock' is not held on every path through here
 58  // [-Wthread-safety-analysis]"
 59  void EventLoopRef::reset(bool relock) MP_NO_TSA
 60  {
 61      if (auto* loop{m_loop}) {
 62          m_loop = nullptr;
 63          auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}};
 64          loop_lock->assert_locked(loop->m_mutex);
 65          assert(loop->m_num_clients > 0);
 66          loop->m_num_clients -= 1;
 67          if (loop->done()) {
 68              loop->m_cv.notify_all();
 69              int post_fd{loop->m_post_fd};
 70              loop_lock->unlock();
 71              char buffer = 0;
 72              KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
 73              // By default, do not try to relock `loop_lock` after writing,
 74              // because the event loop could wake up and destroy itself and the
 75              // mutex might no longer exist.
 76              if (relock) loop_lock->lock();
 77          }
 78      }
 79  }
 80  
 81  ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
 82  
 83  Connection::~Connection()
 84  {
 85      // Connection destructor is always called on the event loop thread. If this
 86      // is a local disconnect, it will trigger I/O, so this needs to run on the
 87      // event loop thread, and if there was a remote disconnect, this is called
 88      // by an onDisconnect callback directly from the event loop thread.
 89      assert(std::this_thread::get_id() == m_loop->m_thread_id);
 90  
 91      // Try to cancel any calls that may be executing.
 92      m_canceler.cancel("Interrupted by disconnect");
 93  
 94      // Shut down RPC system first, since this will garbage collect any
 95      // ProxyServer objects that were not freed before the connection was closed.
 96      // Typically all ProxyServer objects associated with this connection will be
 97      // freed before this call returns. However that will not be the case if
 98      // there are asynchronous IPC calls over this connection still currently
 99      // executing. In that case, Cap'n Proto will destroy the ProxyServer objects
100      // after the calls finish.
101      m_rpc_system.reset();
102  
103      // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
104      // handlers are in the async list.
105      //
106      // The ProxyClient cleanup handlers are synchronous because they are fast
107      // and don't do anything besides release capnp resources and reset state so
108      // future calls to client methods immediately throw exceptions instead of
109      // trying to communicate across the socket. The synchronous callbacks set
110      // ProxyClient capability pointers to null, so new method calls on client
111      // objects fail without triggering i/o or relying on event loop which may go
112      // out of scope or trigger obscure capnp i/o errors.
113      //
114      // The ProxyServer cleanup handlers call user defined destructors on the server
115      // object, which can run arbitrary blocking bitcoin code so they have to run
116      // asynchronously in a different thread. The asynchronous cleanup functions
117      // intentionally aren't started until after the synchronous cleanup
118      // functions run, so client objects are fully disconnected before bitcoin
119      // code in the destructors are run. This way if the bitcoin code tries to
120      // make client requests the requests will just fail immediately instead of
121      // sending i/o or accessing the event loop.
122      //
123      // The context where Connection objects are destroyed and this destructor is invoked
124      // is different depending on whether this is an outgoing connection being used
125      // to make an Init.makeX call() (e.g. Init.makeNode or Init.makeWalletClient) or an incoming
126      // connection implementing the Init interface and handling the Init.makeX() calls.
127      //
128      // Either way when a connection is closed, capnp behavior is to call all
129      // ProxyServer object destructors first, and then trigger an onDisconnect
130      // callback.
131      //
132      // On incoming side of the connection, the onDisconnect callback is written
133      // to delete the Connection object from the m_incoming_connections and call
134      // this destructor which calls Connection::disconnect.
135      //
136      // On the outgoing side, the Connection object is owned by top level client
137      // object client, which onDisconnect handler doesn't have ready access to,
138      // so onDisconnect handler just calls Connection::disconnect directly
139      // instead.
140      //
141      // Either way disconnect code runs in the event loop thread and called both
142      // on clean and unclean shutdowns. In unclean shutdown case when the
143      // connection is broken, sync and async cleanup lists will be filled with
144      // callbacks. In the clean shutdown case both lists will be empty.
145      Lock lock{m_loop->m_mutex};
146      while (!m_sync_cleanup_fns.empty()) {
147          CleanupList fn;
148          fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
149          Unlock(lock, fn.front());
150      }
151  }
152  
153  CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
154  {
155      const Lock lock(m_loop->m_mutex);
156      // Add cleanup callbacks to the front of list, so sync cleanup functions run
157      // in LIFO order. This is a good approach because sync cleanup functions are
158      // added as client objects are created, and it is natural to clean up
159      // objects in the reverse order they were created. In practice, however,
160      // order should not be significant because the cleanup callbacks run
161      // synchronously in a single batch when the connection is broken, and they
162      // only reset the connection pointers in the client objects without actually
163      // deleting the client objects.
164      return m_sync_cleanup_fns.emplace(m_sync_cleanup_fns.begin(), std::move(fn));
165  }
166  
167  void Connection::removeSyncCleanup(CleanupIt it)
168  {
169      // Require cleanup functions to be removed on the event loop thread to avoid
170      // needing to deal with them being removed in the middle of a disconnect.
171      assert(std::this_thread::get_id() == m_loop->m_thread_id);
172      const Lock lock(m_loop->m_mutex);
173      m_sync_cleanup_fns.erase(it);
174  }
175  
176  void EventLoop::addAsyncCleanup(std::function<void()> fn)
177  {
178      const Lock lock(m_mutex);
179      // Add async cleanup callbacks to the back of the list. Unlike the sync
180      // cleanup list, this list order is more significant because it determines
181      // the order server objects are destroyed when there is a sudden disconnect,
182      // and it is possible objects may need to be destroyed in a certain order.
183      // This function is called in ProxyServerBase destructors, and since capnp
184      // destroys ProxyServer objects in LIFO order, we should preserve this
185      // order, and add cleanup callbacks to the end of the list so they can be
186      // run starting from the beginning of the list.
187      //
188      // In bitcoin core, running these callbacks in the right order is
189      // particularly important for the wallet process, because it uses blocking
190      // shared_ptrs and requires Chain::Notification pointers owned by the node
191      // process to be destroyed before the WalletLoader objects owned by the node
192      // process, otherwise shared pointer counts of the CWallet objects (which
193      // inherit from Chain::Notification) will not be 1 when WalletLoader
194      // destructor runs and it will wait forever for them to be released.
195      m_async_fns->emplace_back(std::move(fn));
196      startAsyncThread();
197  }
198  
199  EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context)
200      : m_exe_name(exe_name),
201        m_io_context(kj::setupAsyncIo()),
202        m_task_set(new kj::TaskSet(m_error_handler)),
203        m_log_opts(std::move(log_opts)),
204        m_context(context)
205  {
206      int fds[2];
207      KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
208      m_wait_fd = fds[0];
209      m_post_fd = fds[1];
210  }
211  
212  EventLoop::~EventLoop()
213  {
214      if (m_async_thread.joinable()) m_async_thread.join();
215      const Lock lock(m_mutex);
216      KJ_ASSERT(m_post_fn == nullptr);
217      KJ_ASSERT(!m_async_fns);
218      KJ_ASSERT(m_wait_fd == -1);
219      KJ_ASSERT(m_post_fd == -1);
220      KJ_ASSERT(m_num_clients == 0);
221  
222      // Spin event loop. wait for any promises triggered by RPC shutdown.
223      // auto cleanup = kj::evalLater([]{});
224      // cleanup.wait(m_io_context.waitScope);
225  }
226  
227  void EventLoop::loop()
228  {
229      assert(!g_thread_context.loop_thread);
230      g_thread_context.loop_thread = true;
231      KJ_DEFER(g_thread_context.loop_thread = false);
232  
233      {
234          const Lock lock(m_mutex);
235          assert(!m_async_fns);
236          m_async_fns.emplace();
237      }
238  
239      kj::Own<kj::AsyncIoStream> wait_stream{
240          m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
241      int post_fd{m_post_fd};
242      char buffer = 0;
243      for (;;) {
244          const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
245          if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
246          Lock lock(m_mutex);
247          if (m_post_fn) {
248              Unlock(lock, *m_post_fn);
249              m_post_fn = nullptr;
250              m_cv.notify_all();
251          } else if (done()) {
252              // Intentionally do not break if m_post_fn was set, even if done()
253              // would return true, to ensure that the EventLoopRef write(post_fd)
254              // call always succeeds and the loop does not exit between the time
255              // that the done condition is set and the write call is made.
256              break;
257          }
258      }
259      MP_LOG(*this, Log::Info) << "EventLoop::loop done, cancelling event listeners.";
260      m_task_set.reset();
261      MP_LOG(*this, Log::Info) << "EventLoop::loop bye.";
262      wait_stream = nullptr;
263      KJ_SYSCALL(::close(post_fd));
264      const Lock lock(m_mutex);
265      m_wait_fd = -1;
266      m_post_fd = -1;
267      m_async_fns.reset();
268      m_cv.notify_all();
269  }
270  
271  void EventLoop::post(kj::Function<void()> fn)
272  {
273      if (std::this_thread::get_id() == m_thread_id) {
274          fn();
275          return;
276      }
277      Lock lock(m_mutex);
278      EventLoopRef ref(*this, &lock);
279      m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
280      m_post_fn = &fn;
281      int post_fd{m_post_fd};
282      Unlock(lock, [&] {
283          char buffer = 0;
284          KJ_SYSCALL(write(post_fd, &buffer, 1));
285      });
286      m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
287  }
288  
289  void EventLoop::startAsyncThread()
290  {
291      assert (std::this_thread::get_id() == m_thread_id);
292      if (m_async_thread.joinable()) {
293          // Notify to wake up the async thread if it is already running.
294          m_cv.notify_all();
295      } else if (!m_async_fns->empty()) {
296          m_async_thread = std::thread([this] {
297              Lock lock(m_mutex);
298              while (m_async_fns) {
299                  if (!m_async_fns->empty()) {
300                      EventLoopRef ref{*this, &lock};
301                      const std::function<void()> fn = std::move(m_async_fns->front());
302                      m_async_fns->pop_front();
303                      Unlock(lock, fn);
304                      // Important to relock because of the wait() call below.
305                      ref.reset(/*relock=*/true);
306                      // Continue without waiting in case there are more async_fns
307                      continue;
308                  }
309                  m_cv.wait(lock.m_lock);
310              }
311          });
312      }
313  }
314  
315  bool EventLoop::done() const
316  {
317      assert(m_num_clients >= 0);
318      return m_num_clients == 0 && m_async_fns->empty();
319  }
320  
321  std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread)
322  {
323      assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
324      ConnThread thread;
325      bool inserted;
326      {
327          const Lock lock(threads.mutex);
328          std::tie(thread, inserted) = threads.ref.try_emplace(connection);
329      }
330      if (inserted) {
331          thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false);
332          thread->second->m_disconnect_cb = connection->addSyncCleanup([threads, thread] {
333              // Note: it is safe to use the `thread` iterator in this cleanup
334              // function, because the iterator would only be invalid if the map entry
335              // was removed, and if the map entry is removed the ProxyClient<Thread>
336              // destructor unregisters the cleanup.
337  
338              // Connection is being destroyed before thread client is, so reset
339              // thread client m_disconnect_cb member so thread client destructor does not
340              // try to unregister this callback after connection is destroyed.
341              thread->second->m_disconnect_cb.reset();
342  
343              // Remove connection pointer about to be destroyed from the map
344              const Lock lock(threads.mutex);
345              threads.ref.erase(thread);
346          });
347      }
348      return {thread, inserted};
349  }
350  
351  ProxyClient<Thread>::~ProxyClient()
352  {
353      // If thread is being destroyed before connection is destroyed, remove the
354      // cleanup callback that was registered to handle the connection being
355      // destroyed before the thread being destroyed.
356      if (m_disconnect_cb) {
357          // Remove disconnect callback on the event loop thread with
358          // loop->sync(), so if the connection is broken there is not a race
359          // between this thread trying to remove the callback and the disconnect
360          // handler attempting to call it.
361          m_context.loop->sync([&]() {
362              if (m_disconnect_cb) {
363                  m_context.connection->removeSyncCleanup(*m_disconnect_cb);
364              }
365          });
366      }
367  }
368  
369  ProxyServer<Thread>::ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread)
370      : m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread))
371  {
372      assert(m_thread_context.waiter.get() != nullptr);
373  }
374  
375  ProxyServer<Thread>::~ProxyServer()
376  {
377      if (!m_thread.joinable()) return;
378      // Stop async thread and wait for it to exit. Need to wait because the
379      // m_thread handle needs to outlive the thread to avoid "terminate called
380      // without an active exception" error. An alternative to waiting would be
381      // detach the thread, but this would introduce nondeterminism which could
382      // make code harder to debug or extend.
383      assert(m_thread_context.waiter.get());
384      std::unique_ptr<Waiter> waiter;
385      {
386          const Lock lock(m_thread_context.waiter->m_mutex);
387          //! Reset thread context waiter pointer, as shutdown signal for done
388          //! lambda passed as waiter->wait() argument in makeThread code below.
389          waiter = std::move(m_thread_context.waiter);
390          //! Assert waiter is idle. This destructor shouldn't be getting called if it is busy.
391          assert(!waiter->m_fn);
392          // Clear client maps now to avoid deadlock in m_thread.join() call
393          // below. The maps contain Thread::Client objects that need to be
394          // destroyed from the event loop thread (this thread), which can't
395          // happen if this thread is busy calling join.
396          m_thread_context.request_threads.clear();
397          m_thread_context.callback_threads.clear();
398          //! Ping waiter.
399          waiter->m_cv.notify_all();
400      }
401      m_thread.join();
402  }
403  
404  kj::Promise<void> ProxyServer<Thread>::getName(GetNameContext context)
405  {
406      context.getResults().setResult(m_thread_context.thread_name);
407      return kj::READY_NOW;
408  }
409  
410  ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(connection) {}
411  
412  kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
413  {
414      const std::string from = context.getParams().getName();
415      std::promise<ThreadContext*> thread_context;
416      std::thread thread([&thread_context, from, this]() {
417          g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
418          g_thread_context.waiter = std::make_unique<Waiter>();
419          thread_context.set_value(&g_thread_context);
420          Lock lock(g_thread_context.waiter->m_mutex);
421          // Wait for shutdown signal from ProxyServer<Thread> destructor (signal
422          // is just waiter getting set to null.)
423          g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
424      });
425      auto thread_server = kj::heap<ProxyServer<Thread>>(m_connection, *thread_context.get_future().get(), std::move(thread));
426      auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
427      context.getResults().setResult(kj::mv(thread_client));
428      return kj::READY_NOW;
429  }
430  
431  std::atomic<int> server_reqs{0};
432  
433  std::string LongThreadName(const char* exe_name)
434  {
435      return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
436  }
437  
438  kj::StringPtr KJ_STRINGIFY(Log v)
439  {
440      switch (v) {
441          case Log::Trace:   return "Trace";
442          case Log::Debug:   return "Debug";
443          case Log::Info:    return "Info";
444          case Log::Warning: return "Warning";
445          case Log::Error:   return "Error";
446          case Log::Raise:   return "Raise";
447      }
448      return "<Log?>";
449  }
450  } // namespace mp