proxy.cpp
1 // Copyright (c) The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #include <mp/proxy.h> 6 7 #include <mp/proxy-io.h> 8 #include <mp/proxy-types.h> 9 #include <mp/proxy.capnp.h> 10 #include <mp/type-threadmap.h> 11 #include <mp/util.h> 12 13 #include <atomic> 14 #include <capnp/capability.h> 15 #include <capnp/common.h> // IWYU pragma: keep 16 #include <capnp/rpc.h> 17 #include <condition_variable> 18 #include <functional> 19 #include <future> 20 #include <kj/async.h> 21 #include <kj/async-io.h> 22 #include <kj/async-prelude.h> 23 #include <kj/common.h> 24 #include <kj/debug.h> 25 #include <kj/function.h> 26 #include <kj/memory.h> 27 #include <kj/string.h> 28 #include <map> 29 #include <memory> 30 #include <optional> 31 #include <stdexcept> 32 #include <string> 33 #include <sys/socket.h> 34 #include <thread> 35 #include <tuple> 36 #include <unistd.h> 37 #include <utility> 38 39 namespace mp { 40 41 thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal) 42 43 void LoggingErrorHandler::taskFailed(kj::Exception&& exception) 44 { 45 KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception); 46 MP_LOG(m_loop, Log::Error) << "Uncaught exception in daemonized task."; 47 } 48 49 EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock) 50 { 51 auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}}; 52 loop_lock->assert_locked(m_loop->m_mutex); 53 m_loop->m_num_clients += 1; 54 } 55 56 // Due to the conditionals in this function, MP_NO_TSA is required to avoid 57 // error "error: mutex 'loop_lock' is not held on every path through here 58 // [-Wthread-safety-analysis]" 59 void EventLoopRef::reset(bool relock) MP_NO_TSA 60 { 61 if (auto* loop{m_loop}) { 62 m_loop = nullptr; 63 auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}}; 64 loop_lock->assert_locked(loop->m_mutex); 65 assert(loop->m_num_clients > 0); 66 loop->m_num_clients -= 1; 67 if (loop->done()) { 68 loop->m_cv.notify_all(); 69 int post_fd{loop->m_post_fd}; 70 loop_lock->unlock(); 71 char buffer = 0; 72 KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) 73 // By default, do not try to relock `loop_lock` after writing, 74 // because the event loop could wake up and destroy itself and the 75 // mutex might no longer exist. 76 if (relock) loop_lock->lock(); 77 } 78 } 79 } 80 81 ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {} 82 83 Connection::~Connection() 84 { 85 // Connection destructor is always called on the event loop thread. If this 86 // is a local disconnect, it will trigger I/O, so this needs to run on the 87 // event loop thread, and if there was a remote disconnect, this is called 88 // by an onDisconnect callback directly from the event loop thread. 89 assert(std::this_thread::get_id() == m_loop->m_thread_id); 90 91 // Try to cancel any calls that may be executing. 92 m_canceler.cancel("Interrupted by disconnect"); 93 94 // Shut down RPC system first, since this will garbage collect any 95 // ProxyServer objects that were not freed before the connection was closed. 96 // Typically all ProxyServer objects associated with this connection will be 97 // freed before this call returns. However that will not be the case if 98 // there are asynchronous IPC calls over this connection still currently 99 // executing. In that case, Cap'n Proto will destroy the ProxyServer objects 100 // after the calls finish. 101 m_rpc_system.reset(); 102 103 // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup 104 // handlers are in the async list. 105 // 106 // The ProxyClient cleanup handlers are synchronous because they are fast 107 // and don't do anything besides release capnp resources and reset state so 108 // future calls to client methods immediately throw exceptions instead of 109 // trying to communicate across the socket. The synchronous callbacks set 110 // ProxyClient capability pointers to null, so new method calls on client 111 // objects fail without triggering i/o or relying on event loop which may go 112 // out of scope or trigger obscure capnp i/o errors. 113 // 114 // The ProxyServer cleanup handlers call user defined destructors on the server 115 // object, which can run arbitrary blocking bitcoin code so they have to run 116 // asynchronously in a different thread. The asynchronous cleanup functions 117 // intentionally aren't started until after the synchronous cleanup 118 // functions run, so client objects are fully disconnected before bitcoin 119 // code in the destructors are run. This way if the bitcoin code tries to 120 // make client requests the requests will just fail immediately instead of 121 // sending i/o or accessing the event loop. 122 // 123 // The context where Connection objects are destroyed and this destructor is invoked 124 // is different depending on whether this is an outgoing connection being used 125 // to make an Init.makeX call() (e.g. Init.makeNode or Init.makeWalletClient) or an incoming 126 // connection implementing the Init interface and handling the Init.makeX() calls. 127 // 128 // Either way when a connection is closed, capnp behavior is to call all 129 // ProxyServer object destructors first, and then trigger an onDisconnect 130 // callback. 131 // 132 // On incoming side of the connection, the onDisconnect callback is written 133 // to delete the Connection object from the m_incoming_connections and call 134 // this destructor which calls Connection::disconnect. 135 // 136 // On the outgoing side, the Connection object is owned by top level client 137 // object client, which onDisconnect handler doesn't have ready access to, 138 // so onDisconnect handler just calls Connection::disconnect directly 139 // instead. 140 // 141 // Either way disconnect code runs in the event loop thread and called both 142 // on clean and unclean shutdowns. In unclean shutdown case when the 143 // connection is broken, sync and async cleanup lists will be filled with 144 // callbacks. In the clean shutdown case both lists will be empty. 145 Lock lock{m_loop->m_mutex}; 146 while (!m_sync_cleanup_fns.empty()) { 147 CleanupList fn; 148 fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin()); 149 Unlock(lock, fn.front()); 150 } 151 } 152 153 CleanupIt Connection::addSyncCleanup(std::function<void()> fn) 154 { 155 const Lock lock(m_loop->m_mutex); 156 // Add cleanup callbacks to the front of list, so sync cleanup functions run 157 // in LIFO order. This is a good approach because sync cleanup functions are 158 // added as client objects are created, and it is natural to clean up 159 // objects in the reverse order they were created. In practice, however, 160 // order should not be significant because the cleanup callbacks run 161 // synchronously in a single batch when the connection is broken, and they 162 // only reset the connection pointers in the client objects without actually 163 // deleting the client objects. 164 return m_sync_cleanup_fns.emplace(m_sync_cleanup_fns.begin(), std::move(fn)); 165 } 166 167 void Connection::removeSyncCleanup(CleanupIt it) 168 { 169 // Require cleanup functions to be removed on the event loop thread to avoid 170 // needing to deal with them being removed in the middle of a disconnect. 171 assert(std::this_thread::get_id() == m_loop->m_thread_id); 172 const Lock lock(m_loop->m_mutex); 173 m_sync_cleanup_fns.erase(it); 174 } 175 176 void EventLoop::addAsyncCleanup(std::function<void()> fn) 177 { 178 const Lock lock(m_mutex); 179 // Add async cleanup callbacks to the back of the list. Unlike the sync 180 // cleanup list, this list order is more significant because it determines 181 // the order server objects are destroyed when there is a sudden disconnect, 182 // and it is possible objects may need to be destroyed in a certain order. 183 // This function is called in ProxyServerBase destructors, and since capnp 184 // destroys ProxyServer objects in LIFO order, we should preserve this 185 // order, and add cleanup callbacks to the end of the list so they can be 186 // run starting from the beginning of the list. 187 // 188 // In bitcoin core, running these callbacks in the right order is 189 // particularly important for the wallet process, because it uses blocking 190 // shared_ptrs and requires Chain::Notification pointers owned by the node 191 // process to be destroyed before the WalletLoader objects owned by the node 192 // process, otherwise shared pointer counts of the CWallet objects (which 193 // inherit from Chain::Notification) will not be 1 when WalletLoader 194 // destructor runs and it will wait forever for them to be released. 195 m_async_fns->emplace_back(std::move(fn)); 196 startAsyncThread(); 197 } 198 199 EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context) 200 : m_exe_name(exe_name), 201 m_io_context(kj::setupAsyncIo()), 202 m_task_set(new kj::TaskSet(m_error_handler)), 203 m_log_opts(std::move(log_opts)), 204 m_context(context) 205 { 206 int fds[2]; 207 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); 208 m_wait_fd = fds[0]; 209 m_post_fd = fds[1]; 210 } 211 212 EventLoop::~EventLoop() 213 { 214 if (m_async_thread.joinable()) m_async_thread.join(); 215 const Lock lock(m_mutex); 216 KJ_ASSERT(m_post_fn == nullptr); 217 KJ_ASSERT(!m_async_fns); 218 KJ_ASSERT(m_wait_fd == -1); 219 KJ_ASSERT(m_post_fd == -1); 220 KJ_ASSERT(m_num_clients == 0); 221 222 // Spin event loop. wait for any promises triggered by RPC shutdown. 223 // auto cleanup = kj::evalLater([]{}); 224 // cleanup.wait(m_io_context.waitScope); 225 } 226 227 void EventLoop::loop() 228 { 229 assert(!g_thread_context.loop_thread); 230 g_thread_context.loop_thread = true; 231 KJ_DEFER(g_thread_context.loop_thread = false); 232 233 { 234 const Lock lock(m_mutex); 235 assert(!m_async_fns); 236 m_async_fns.emplace(); 237 } 238 239 kj::Own<kj::AsyncIoStream> wait_stream{ 240 m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; 241 int post_fd{m_post_fd}; 242 char buffer = 0; 243 for (;;) { 244 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); 245 if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); 246 Lock lock(m_mutex); 247 if (m_post_fn) { 248 Unlock(lock, *m_post_fn); 249 m_post_fn = nullptr; 250 m_cv.notify_all(); 251 } else if (done()) { 252 // Intentionally do not break if m_post_fn was set, even if done() 253 // would return true, to ensure that the EventLoopRef write(post_fd) 254 // call always succeeds and the loop does not exit between the time 255 // that the done condition is set and the write call is made. 256 break; 257 } 258 } 259 MP_LOG(*this, Log::Info) << "EventLoop::loop done, cancelling event listeners."; 260 m_task_set.reset(); 261 MP_LOG(*this, Log::Info) << "EventLoop::loop bye."; 262 wait_stream = nullptr; 263 KJ_SYSCALL(::close(post_fd)); 264 const Lock lock(m_mutex); 265 m_wait_fd = -1; 266 m_post_fd = -1; 267 m_async_fns.reset(); 268 m_cv.notify_all(); 269 } 270 271 void EventLoop::post(kj::Function<void()> fn) 272 { 273 if (std::this_thread::get_id() == m_thread_id) { 274 fn(); 275 return; 276 } 277 Lock lock(m_mutex); 278 EventLoopRef ref(*this, &lock); 279 m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; }); 280 m_post_fn = &fn; 281 int post_fd{m_post_fd}; 282 Unlock(lock, [&] { 283 char buffer = 0; 284 KJ_SYSCALL(write(post_fd, &buffer, 1)); 285 }); 286 m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; }); 287 } 288 289 void EventLoop::startAsyncThread() 290 { 291 assert (std::this_thread::get_id() == m_thread_id); 292 if (m_async_thread.joinable()) { 293 // Notify to wake up the async thread if it is already running. 294 m_cv.notify_all(); 295 } else if (!m_async_fns->empty()) { 296 m_async_thread = std::thread([this] { 297 Lock lock(m_mutex); 298 while (m_async_fns) { 299 if (!m_async_fns->empty()) { 300 EventLoopRef ref{*this, &lock}; 301 const std::function<void()> fn = std::move(m_async_fns->front()); 302 m_async_fns->pop_front(); 303 Unlock(lock, fn); 304 // Important to relock because of the wait() call below. 305 ref.reset(/*relock=*/true); 306 // Continue without waiting in case there are more async_fns 307 continue; 308 } 309 m_cv.wait(lock.m_lock); 310 } 311 }); 312 } 313 } 314 315 bool EventLoop::done() const 316 { 317 assert(m_num_clients >= 0); 318 return m_num_clients == 0 && m_async_fns->empty(); 319 } 320 321 std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread) 322 { 323 assert(std::this_thread::get_id() == connection->m_loop->m_thread_id); 324 ConnThread thread; 325 bool inserted; 326 { 327 const Lock lock(threads.mutex); 328 std::tie(thread, inserted) = threads.ref.try_emplace(connection); 329 } 330 if (inserted) { 331 thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false); 332 thread->second->m_disconnect_cb = connection->addSyncCleanup([threads, thread] { 333 // Note: it is safe to use the `thread` iterator in this cleanup 334 // function, because the iterator would only be invalid if the map entry 335 // was removed, and if the map entry is removed the ProxyClient<Thread> 336 // destructor unregisters the cleanup. 337 338 // Connection is being destroyed before thread client is, so reset 339 // thread client m_disconnect_cb member so thread client destructor does not 340 // try to unregister this callback after connection is destroyed. 341 thread->second->m_disconnect_cb.reset(); 342 343 // Remove connection pointer about to be destroyed from the map 344 const Lock lock(threads.mutex); 345 threads.ref.erase(thread); 346 }); 347 } 348 return {thread, inserted}; 349 } 350 351 ProxyClient<Thread>::~ProxyClient() 352 { 353 // If thread is being destroyed before connection is destroyed, remove the 354 // cleanup callback that was registered to handle the connection being 355 // destroyed before the thread being destroyed. 356 if (m_disconnect_cb) { 357 // Remove disconnect callback on the event loop thread with 358 // loop->sync(), so if the connection is broken there is not a race 359 // between this thread trying to remove the callback and the disconnect 360 // handler attempting to call it. 361 m_context.loop->sync([&]() { 362 if (m_disconnect_cb) { 363 m_context.connection->removeSyncCleanup(*m_disconnect_cb); 364 } 365 }); 366 } 367 } 368 369 ProxyServer<Thread>::ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread) 370 : m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread)) 371 { 372 assert(m_thread_context.waiter.get() != nullptr); 373 } 374 375 ProxyServer<Thread>::~ProxyServer() 376 { 377 if (!m_thread.joinable()) return; 378 // Stop async thread and wait for it to exit. Need to wait because the 379 // m_thread handle needs to outlive the thread to avoid "terminate called 380 // without an active exception" error. An alternative to waiting would be 381 // detach the thread, but this would introduce nondeterminism which could 382 // make code harder to debug or extend. 383 assert(m_thread_context.waiter.get()); 384 std::unique_ptr<Waiter> waiter; 385 { 386 const Lock lock(m_thread_context.waiter->m_mutex); 387 //! Reset thread context waiter pointer, as shutdown signal for done 388 //! lambda passed as waiter->wait() argument in makeThread code below. 389 waiter = std::move(m_thread_context.waiter); 390 //! Assert waiter is idle. This destructor shouldn't be getting called if it is busy. 391 assert(!waiter->m_fn); 392 // Clear client maps now to avoid deadlock in m_thread.join() call 393 // below. The maps contain Thread::Client objects that need to be 394 // destroyed from the event loop thread (this thread), which can't 395 // happen if this thread is busy calling join. 396 m_thread_context.request_threads.clear(); 397 m_thread_context.callback_threads.clear(); 398 //! Ping waiter. 399 waiter->m_cv.notify_all(); 400 } 401 m_thread.join(); 402 } 403 404 kj::Promise<void> ProxyServer<Thread>::getName(GetNameContext context) 405 { 406 context.getResults().setResult(m_thread_context.thread_name); 407 return kj::READY_NOW; 408 } 409 410 ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(connection) {} 411 412 kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context) 413 { 414 const std::string from = context.getParams().getName(); 415 std::promise<ThreadContext*> thread_context; 416 std::thread thread([&thread_context, from, this]() { 417 g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")"; 418 g_thread_context.waiter = std::make_unique<Waiter>(); 419 thread_context.set_value(&g_thread_context); 420 Lock lock(g_thread_context.waiter->m_mutex); 421 // Wait for shutdown signal from ProxyServer<Thread> destructor (signal 422 // is just waiter getting set to null.) 423 g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); 424 }); 425 auto thread_server = kj::heap<ProxyServer<Thread>>(m_connection, *thread_context.get_future().get(), std::move(thread)); 426 auto thread_client = m_connection.m_threads.add(kj::mv(thread_server)); 427 context.getResults().setResult(kj::mv(thread_client)); 428 return kj::READY_NOW; 429 } 430 431 std::atomic<int> server_reqs{0}; 432 433 std::string LongThreadName(const char* exe_name) 434 { 435 return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name; 436 } 437 438 kj::StringPtr KJ_STRINGIFY(Log v) 439 { 440 switch (v) { 441 case Log::Trace: return "Trace"; 442 case Log::Debug: return "Debug"; 443 case Log::Info: return "Info"; 444 case Log::Warning: return "Warning"; 445 case Log::Error: return "Error"; 446 case Log::Raise: return "Raise"; 447 } 448 return "<Log?>"; 449 } 450 } // namespace mp