thread.cpp
1 /** 2 * This file is part of Darling. 3 * 4 * Copyright (C) 2021 Darling developers 5 * 6 * Darling is free software: you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation, either version 3 of the License, or 9 * (at your option) any later version. 10 * 11 * Darling is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with Darling. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #include "darlingserver/registry.hpp" 21 #include <darlingserver/thread.hpp> 22 #include <darlingserver/process.hpp> 23 #include <darlingserver/call.hpp> 24 #include <darlingserver/server.hpp> 25 #include <darlingserver/logging.hpp> 26 #include <filesystem> 27 #include <fstream> 28 29 #include <sys/mman.h> 30 #include <signal.h> 31 32 #include <darlingserver/duct-tape.h> 33 #include <atomic> 34 35 #include <sys/syscall.h> 36 37 #if DSERVER_ASAN 38 #include <sanitizer/asan_interface.h> 39 #endif 40 41 #include <rtsig.h> 42 43 #include <assert.h> 44 45 #include <limits> 46 #include <sys/ptrace.h> 47 #include <sys/user.h> 48 #include <sys/wait.h> 49 #include <vector> 50 51 // 64KiB should be enough for us 52 #define THREAD_STACK_SIZE (64 * 1024ULL) 53 #define USE_THREAD_GUARD_PAGES 1 54 #define IDLE_THREAD_STACK_COUNT 8 55 56 static thread_local std::shared_ptr<DarlingServer::Thread> currentThreadVar = nullptr; 57 static thread_local bool returningToThreadTop = false; 58 static thread_local ucontext_t backToThreadTopContext; 59 static thread_local libsimple_lock_t* unlockMeWhenSuspending = nullptr; 60 static thread_local std::function<void()> currentContinuation = nullptr; 61 62 /** 63 * Our microthreads use cooperative multitasking, so we don't really use interrupts per-se. 64 * Rather, this is an indication to our cooperative scheduler that the microthread is doing something and 65 * expects to continue to have control of the executing thread. If it calls a function/method that 66 * would cause it to relinquish control of the thread, this should be considered an error. 67 * 68 * This is primarily of use for debugging duct-tape code and ensuring certain assumptions made in the duct-tape code hold true. 69 */ 70 static thread_local uint64_t interruptDisableCount = 0; 71 72 #if DSERVER_ASAN 73 static thread_local void* asanOldFakeStack = nullptr; 74 static thread_local const void* asanOldStackBottom = nullptr; 75 static thread_local size_t asanOldStackSize = 0; 76 #endif 77 78 static DarlingServer::Log threadLog("thread"); 79 80 DarlingServer::StackPool DarlingServer::Thread::stackPool(IDLE_THREAD_STACK_COUNT, THREAD_STACK_SIZE, USE_THREAD_GUARD_PAGES); 81 82 DarlingServer::Thread::Thread(std::shared_ptr<Process> process, NSID nsid, void* stackHint): 83 _nstid(nsid), 84 _process(process) 85 { 86 _tid = -1; 87 88 for (const auto& entry: std::filesystem::directory_iterator("/proc/" + std::to_string(process->id()) + "/task")) { 89 std::ifstream statusFile(entry.path() / "status"); 90 std::string line; 91 92 while (std::getline(statusFile, line)) { 93 if (line.substr(0, sizeof("NSpid") - 1) == "NSpid") { 94 auto pos = line.find_last_of('\t'); 95 std::string id; 96 97 if (pos != line.npos) { 98 id = line.substr(pos + 1); 99 } 100 101 if (id.empty()) { 102 throw std::runtime_error("Failed to parse thread ID"); 103 } 104 105 if (std::stoi(id) != _nstid) { 106 continue; 107 } 108 109 _tid = std::stoi(entry.path().filename().string()); 110 111 break; 112 } 113 } 114 } 115 116 // if we can't determine the thread id from procfs, try some other more costly methods. 117 if (_tid == -1) { 118 std::vector<pid_t> ids; 119 auto& registry = threadRegistry(); 120 for (const auto& entry: std::filesystem::directory_iterator("/proc/" + std::to_string(process->id()) + "/task")) { 121 pid_t currentId = std::stoi(entry.path().filename().string()); 122 // Skip threads that are already registered, as we're sure they're not the ones we want. 123 if (registry.lookupEntryByID(currentId).has_value()) { 124 continue; 125 } 126 ids.push_back(currentId); 127 } 128 129 // we're sure this is the thread we want as this is the only unregistered thread. 130 if (ids.size() == 1) { 131 _tid = ids[0]; 132 } else if (stackHint != nullptr) { 133 pid_t chosenId = -1; 134 intptr_t nearest = std::numeric_limits<intptr_t>::max(); 135 136 for (auto id : ids) { 137 if (ptrace(PTRACE_ATTACH, id, 0, 0) == -1) { 138 continue; 139 } 140 141 int status; 142 int waitStatus = waitpid(id, &status, 0); 143 144 if (waitStatus < 0) { 145 continue; 146 } 147 148 struct user_regs_struct regs; 149 if (ptrace(PTRACE_GETREGS, id, 0, ®s) == -1) { 150 continue; 151 } 152 153 #ifdef __x86_64__ 154 intptr_t stackDiff = (intptr_t)stackHint - (intptr_t)regs.rsp; 155 if (stackDiff >= 0 && stackDiff < nearest) { 156 #else 157 #warning Unsupported architecture 158 if (true) { 159 #endif 160 chosenId = id; 161 nearest = stackDiff; 162 } 163 164 // this is critical: we're tracing a process but cannot detach from it, and it'll not run normally. 165 if (ptrace(PTRACE_DETACH, id, 0, 0) == -1) { 166 throw std::system_error(errno, std::generic_category(), "Failed to detach from process."); 167 } 168 } 169 170 _tid = chosenId; 171 } 172 } 173 174 if (_tid == -1) { 175 throw std::system_error(ESRCH, std::generic_category(), "Failed to find thread ID within darlingserver's namespace"); 176 } 177 178 // NOTE: it's okay to use raw `this` without a shared pointer because the duct-taped thread will always live for less time than this Thread instance 179 _dtapeThread = dtape_thread_create(process->_dtapeTask, _nstid, this); 180 _s2cPerformSempahore = dtape_semaphore_create(process->_dtapeTask, 1); 181 _s2cReplySempahore = dtape_semaphore_create(process->_dtapeTask, 0); 182 _s2cInterruptEnterSemaphore = dtape_semaphore_create(process->_dtapeTask, 0); 183 _s2cInterruptExitSemaphore = dtape_semaphore_create(process->_dtapeTask, 0); 184 185 threadLog.info() << "New thread created with ID " << _tid << " and NSID " << _nstid << " for process with ID " << (process ? process->id() : -1) << " and NSID " << (process ? process->nsid() : -1); 186 }; 187 188 DarlingServer::Thread::Thread(KernelThreadConstructorTag tag): 189 _tid(-1), 190 _process(Process::kernelProcess()) 191 { 192 static uint64_t kernelThreadIDCounter = DTAPE_KERNEL_THREAD_ID_THRESHOLD; 193 static std::mutex kernelThreadIDCounterLock; 194 195 std::unique_lock idLock(kernelThreadIDCounterLock); 196 _nstid = kernelThreadIDCounter++; 197 if (kernelThreadIDCounter == 0) { 198 kernelThreadIDCounter = DTAPE_KERNEL_THREAD_ID_THRESHOLD; 199 } 200 idLock.unlock(); 201 202 _dtapeThread = dtape_thread_create(Process::kernelProcess()->_dtapeTask, _nstid, this); 203 }; 204 205 void DarlingServer::Thread::registerWithProcess() { 206 std::unique_lock lock(_process->_rwlock); 207 _process->_threads[_nstid] = shared_from_this(); 208 }; 209 210 DarlingServer::Thread::~Thread() noexcept(false) { 211 threadLog.info() << *this << ": thread being destroyed" << threadLog.endLog; 212 213 if (_stack.isValid()) { 214 stackPool.free(_stack); 215 } 216 217 if (!_process) { 218 return; 219 } 220 221 std::unique_lock lock(_process->_rwlock); 222 auto it = _process->_threads.begin(); 223 while (it != _process->_threads.end()) { 224 if (it->first == _nstid) { 225 break; 226 } 227 ++it; 228 } 229 if (it == _process->_threads.end()) { 230 throw std::runtime_error("Thread was not registered with Process"); 231 } 232 _process->_threads.erase(it); 233 234 if (_process->_threads.empty()) { 235 // if this was the last thread in the process, it has died, so unregister it. 236 // this should already be handled by the process' pidfd monitor, but just in case, we also handle it here. 237 lock.unlock(); 238 _process->notifyDead(); 239 } 240 }; 241 242 DarlingServer::Thread::ID DarlingServer::Thread::id() const { 243 return _tid; 244 }; 245 246 DarlingServer::Thread::NSID DarlingServer::Thread::nsid() const { 247 return _nstid; 248 }; 249 250 DarlingServer::EternalID DarlingServer::Thread::eternalID() const { 251 return _eid; 252 }; 253 254 void DarlingServer::Thread::_setEternalID(EternalID eid) { 255 _eid = eid; 256 }; 257 258 std::shared_ptr<DarlingServer::Process> DarlingServer::Thread::process() const { 259 return _process; 260 }; 261 262 std::shared_ptr<DarlingServer::Call> DarlingServer::Thread::pendingCall() const { 263 std::shared_lock lock(_rwlock); 264 return _pendingCall; 265 }; 266 267 void DarlingServer::Thread::setPendingCall(std::shared_ptr<Call> newPendingCall) { 268 std::unique_lock lock(_rwlock); 269 if (newPendingCall && _pendingCall) { 270 if (newPendingCall->number() == Call::Number::InterruptEnter) { 271 // InterruptEnter calls can occur after we receive a call but before we start processing it, 272 // so we need to handle this case gracefully. we do so by saving the interrupt and scheduling 273 // it to be processed once the pending call becomes active and suspends or exits. 274 _pendingInterrupts.push(newPendingCall); 275 return; 276 } else { 277 throw std::runtime_error("Thread's pending call overwritten while active"); 278 } 279 } 280 _pendingCall = newPendingCall; 281 }; 282 283 std::shared_ptr<DarlingServer::Call> DarlingServer::Thread::activeCall() const { 284 std::shared_lock lock(_rwlock); 285 return _activeCall; 286 }; 287 288 void DarlingServer::Thread::makePendingCallActive() { 289 std::unique_lock lock(_rwlock); 290 _activeCall = _pendingCall; 291 _pendingCall = nullptr; 292 }; 293 294 void DarlingServer::Thread::_deactivateCallLocked(std::shared_ptr<Call> expectedCall) { 295 if ((_interruptedForSignal ? _interrupts.top().interruptedCall : _activeCall).get() != expectedCall.get()) { 296 throw std::runtime_error("Upon deactivating the active call found active/interrupted call != expected call"); 297 } 298 (_interruptedForSignal ? _interrupts.top().interruptedCall : _activeCall) = nullptr; 299 }; 300 301 void DarlingServer::Thread::deactivateCall(std::shared_ptr<Call> expectedCall) { 302 std::unique_lock lock(_rwlock); 303 _deactivateCallLocked(expectedCall); 304 }; 305 306 DarlingServer::Address DarlingServer::Thread::address() const { 307 std::shared_lock lock(_rwlock); 308 return _address; 309 }; 310 311 void DarlingServer::Thread::setAddress(Address address) { 312 std::unique_lock lock(_rwlock); 313 _address = address; 314 }; 315 316 void DarlingServer::Thread::setThreadHandles(uintptr_t pthreadHandle, uintptr_t dispatchQueueAddress) { 317 dtape_thread_set_handles(_dtapeThread, pthreadHandle, dispatchQueueAddress); 318 }; 319 320 bool DarlingServer::Thread::waitingForReply() const { 321 std::shared_lock lock(_rwlock); 322 return !!_activeCall; 323 }; 324 325 /* 326 * IMPORTANT 327 * === 328 * 329 * The way that microthread handling/switching is done here is... not pretty, to say the least. 330 * The problem is that, in order to use XNU's Mach IPC code, we need a way to interrupt execution of a "thread" and then resume from the same point. 331 * Actual threads are too heavyweight, so instead we use our own form of microthreads with cooperative multitasking. 332 * Using actual threads for each managed thread would be much simpler and far less hacky, but far more resource-intensive. 333 */ 334 335 static const auto microthreadLog = DarlingServer::Log("microthread"); 336 337 // this runs in the context of the microthread (i.e. with the microthread's stack active) 338 void DarlingServer::Thread::microthreadWorker() { 339 #if DSERVER_ASAN 340 __sanitizer_finish_switch_fiber(asanOldFakeStack, &asanOldStackBottom, &asanOldStackSize); 341 asanOldFakeStack = nullptr; 342 #endif 343 344 currentContinuation = nullptr; 345 currentThreadVar->makePendingCallActive(); 346 currentThreadVar->_activeCall->processCall(); 347 348 if (currentThreadVar->_handlingInterruptedCall) { 349 currentThreadVar->_didSyscallReturnDuringInterrupt = true; 350 #if DSERVER_ASAN 351 __sanitizer_start_switch_fiber(NULL, currentThreadVar->_stack.base, currentThreadVar->_stack.size); 352 #endif 353 setcontext(¤tThreadVar->_syscallReturnHereDuringInterrupt); 354 } else { 355 #if DSERVER_ASAN 356 // we're exiting normally, so we might not re-enter this microthread; tell ASAN to drop the fake stack 357 __sanitizer_start_switch_fiber(NULL, asanOldStackBottom, asanOldStackSize); 358 #endif 359 360 setcontext(&backToThreadTopContext); 361 } 362 __builtin_unreachable(); 363 }; 364 365 void DarlingServer::Thread::microthreadContinuation() { 366 #if DSERVER_ASAN 367 __sanitizer_finish_switch_fiber(asanOldFakeStack, &asanOldStackBottom, &asanOldStackSize); 368 asanOldFakeStack = nullptr; 369 #endif 370 371 currentContinuation = currentThreadVar->_continuationCallback; 372 // FIXME: we probably should never see `currentContinuation == nullptr` 373 if (currentContinuation) { 374 currentThreadVar->_continuationCallback = nullptr; 375 currentContinuation(); 376 currentContinuation = nullptr; 377 } 378 379 if (currentThreadVar->_handlingInterruptedCall) { 380 currentThreadVar->_didSyscallReturnDuringInterrupt = true; 381 #if DSERVER_ASAN 382 __sanitizer_start_switch_fiber(NULL, currentThreadVar->_stack.base, currentThreadVar->_stack.size); 383 #endif 384 setcontext(¤tThreadVar->_syscallReturnHereDuringInterrupt); 385 } else { 386 #if DSERVER_ASAN 387 // see microthreadWorker() 388 __sanitizer_start_switch_fiber(NULL, asanOldStackBottom, asanOldStackSize); 389 #endif 390 setcontext(&backToThreadTopContext); 391 } 392 __builtin_unreachable(); 393 }; 394 395 void DarlingServer::Thread::doWork() { 396 // NOTE: this method MUST NOT use any local variables that require destructors. 397 // this method is actually major UB because the compiler is free to do whatever it likes with the stack, 398 // but we know what reasonable compilers (i.e. GCC and Clang) do with it and we're specifically targeting Clang, so it's okay for us. 399 400 _rwlock.lock(); 401 402 if (_deferralState != DeferralState::NotDeferred) { 403 microthreadLog.debug() << _tid << "(" << _nstid << "): execution was deferred" << microthreadLog.endLog; 404 _deferralState = DeferralState::DeferredPending; 405 _rwlock.unlock(); 406 return; 407 } 408 409 if (_running) { 410 // this is probably an error 411 microthreadLog.warning() << _tid << "(" << _nstid << "): attempt to re-run already running microthread on another thread" << microthreadLog.endLog; 412 _rwlock.unlock(); 413 return; 414 } 415 416 if (_terminating) { 417 goto doneWorking; 418 } 419 420 if (_dead && !_activeCall) { 421 // should be impossible, since this should be handled in `notifyDead`, but just in case 422 _terminating = true; 423 goto doneWorking; 424 } 425 426 _running = true; 427 currentThreadVar = shared_from_this(); 428 dtape_thread_entering(_dtapeThread); 429 430 returningToThreadTop = false; 431 _rwlock.unlock(); 432 433 _runningCondvar.notify_all(); 434 435 getcontext(&backToThreadTopContext); 436 437 if (returningToThreadTop) { 438 // someone jumped back to the top of the microthread 439 // (that means either the microthread has been suspended or it has finished) 440 441 #if DSERVER_ASAN 442 __sanitizer_finish_switch_fiber(asanOldFakeStack, &asanOldStackBottom, &asanOldStackSize); 443 asanOldFakeStack = nullptr; 444 #endif 445 446 _rwlock.lock(); 447 448 if (!_suspended || _continuationCallback) { 449 // we discard the old stack when either: 450 // * we exit normally (i.e. without suspending); this includes syscall returns. 451 // * or when we suspend with a continuation callback. 452 stackPool.free(_stack); 453 } 454 455 //microthreadLog.debug() << _tid << "(" << _nstid << "): microthread returned to top" << microthreadLog.endLog; 456 goto doneWorking; 457 } else { 458 returningToThreadTop = true; 459 460 _rwlock.lock(); 461 462 if (!_pendingCallOverride && _pendingCall && _pendingCall->number() == Call::Number::InterruptEnter) { 463 _interrupts.emplace(); 464 _interrupts.top().savedStack = _stack; 465 _stack = StackPool::Stack(); 466 _interruptedContinuation = _continuationCallback; 467 _continuationCallback = nullptr; 468 _interrupts.top().interruptedCall = _activeCall; 469 _activeCall = nullptr; 470 } 471 472 if (_continuationCallback && _pendingCall) { 473 // we can only have one of the two 474 throw std::runtime_error("Thread has both a pending call and a pending continuation"); 475 } 476 477 if (_suspended && (_pendingCallOverride || !_pendingCall)) { 478 if (_pendingCallOverride) { 479 microthreadLog.info() << _tid << "(" << _nstid << "): thread was suspended with a pending call override and is now resuming with a pending call" << microthreadLog.endLog; 480 } 481 // we were in the middle of processing a call and we need to resume now 482 _suspended = false; 483 _resumeContext.uc_link = &backToThreadTopContext; 484 _rwlock.unlock(); 485 486 if (_continuationCallback) { 487 // for continuations, we discard the old stack and start with a new one 488 assert(!_stack.isValid()); 489 stackPool.allocate(_stack); 490 491 // we also ahve to set up the resume context properly with the new stack 492 _resumeContext.uc_stack.ss_sp = _stack.base; 493 _resumeContext.uc_stack.ss_size = _stack.size; 494 _resumeContext.uc_stack.ss_flags = 0; 495 _resumeContext.uc_link = &backToThreadTopContext; 496 makecontext(&_resumeContext, microthreadContinuation, 0); 497 } else { 498 // otherwise, we expect to have a valid stack to continue where we left off 499 assert(_stack.isValid()); 500 } 501 502 #if DSERVER_ASAN 503 __sanitizer_start_switch_fiber(&asanOldFakeStack, _stack.base, _stack.size); 504 #endif 505 506 setcontext(&_resumeContext); 507 } else { 508 if (!_pendingCall) { 509 // if we don't actually have a pending call, we have nothing to do 510 goto doneWorking; 511 } 512 _suspended = false; 513 _rwlock.unlock(); 514 515 // we might've had a valid stack if we're overwriting a previous suspension, so handle that. 516 if (_stack.isValid()) { 517 stackPool.free(_stack); 518 } 519 520 stackPool.allocate(_stack); 521 522 ucontext_t newContext; 523 getcontext(&newContext); 524 newContext.uc_stack.ss_sp = _stack.base; 525 newContext.uc_stack.ss_size = _stack.size; 526 newContext.uc_stack.ss_flags = 0; 527 newContext.uc_link = &backToThreadTopContext; 528 makecontext(&newContext, microthreadWorker, 0); 529 530 #if DSERVER_ASAN 531 __sanitizer_start_switch_fiber(&asanOldFakeStack, _stack.base, _stack.size); 532 #endif 533 534 setcontext(&newContext); 535 } 536 537 // inform the compiler that it shouldn't do anything that would live past the setcontext calls 538 __builtin_unreachable(); 539 } 540 541 doneWorking: 542 // we must be holding `_rwlock` when we get here 543 if (_running) { 544 dtape_thread_exiting(_dtapeThread); 545 currentThreadVar = nullptr; 546 _running = false; 547 } 548 bool canRelease = false; 549 if (_dead) { 550 threadLog.debug() << *this << ": dead thread returning. active call? " << (!!_activeCall ? "true" : "false") << " terminating? " << (_terminating ? "true" : "false") << threadLog.endLog; 551 } 552 if (_dead && !_activeCall && !_terminating) { 553 // this is the case when `notifyDead` notified us we were dead 554 // but we had an active call and had to finish it first 555 _terminating = true; 556 canRelease = true; 557 } 558 if (_terminating && !_dead) { 559 // this will not destroy our thread immediately; 560 // the worker thread invoker still holds a reference on us 561 _rwlock.unlock(); 562 notifyDead(); 563 } else { 564 // if we have any pending interrupts, schedule them to be processed now 565 // (we've just finished or suspended a call, so now's the time to handle interrupts) 566 if (!_terminating && !_dead && !_pendingInterrupts.empty()) { 567 if (_pendingCall) { 568 throw std::runtime_error("Need to schedule interrupt for processing, but thread has pending call"); 569 } 570 571 _pendingCall = _pendingInterrupts.front(); 572 _pendingInterrupts.pop(); 573 574 Server::sharedInstance().scheduleThread(shared_from_this()); 575 } 576 577 _rwlock.unlock(); 578 } 579 if (unlockMeWhenSuspending) { 580 libsimple_lock_unlock(unlockMeWhenSuspending); 581 unlockMeWhenSuspending = nullptr; 582 } 583 _runningCondvar.notify_all(); 584 if (canRelease) { 585 _scheduleRelease(); 586 } 587 return; 588 }; 589 590 void DarlingServer::Thread::suspend(std::function<void()> continuationCallback, libsimple_lock_t* unlockMe) { 591 if (this != currentThreadVar.get()) { 592 throw std::runtime_error("Attempt to suspend thread other than current thread"); 593 } 594 595 if (interruptDisableCount > 0) { 596 throw std::runtime_error("Attempt to suspend thread while interrupts disabled"); 597 } 598 599 _rwlock.lock(); 600 _suspended = true; 601 _rwlock.unlock(); 602 603 unlockMeWhenSuspending = unlockMe; 604 605 getcontext(&_resumeContext); 606 607 _rwlock.lock(); 608 if (_suspended) { 609 if (continuationCallback) { 610 // when suspendeding with a continuation, the current continuation and call are discarded (since they can no longer be safely returned to) 611 currentContinuation = nullptr; 612 613 _continuationCallback = continuationCallback; 614 } 615 // jump back to the top of the microthread 616 _rwlock.unlock(); 617 618 #if DSERVER_ASAN 619 // if we have a continuation, we don't expect to come back here 620 __sanitizer_start_switch_fiber((continuationCallback) ? nullptr : &asanOldFakeStack, asanOldStackBottom, asanOldStackSize); 621 #endif 622 623 setcontext(&backToThreadTopContext); 624 __builtin_unreachable(); 625 } else { 626 // we've been resumed 627 628 // make sure we don't have a continuation when we get here; 629 // if we do, that means that doWork() failed to do its job for the continuation case 630 assert(!_continuationCallback); 631 632 _rwlock.unlock(); 633 634 #if DSERVER_ASAN 635 __sanitizer_finish_switch_fiber(asanOldFakeStack, &asanOldStackBottom, &asanOldStackSize); 636 asanOldFakeStack = nullptr; 637 #endif 638 } 639 }; 640 641 void DarlingServer::Thread::resume() { 642 { 643 std::shared_lock lock(_rwlock); 644 if (!_suspended) { 645 // maybe we should throw an error here? 646 return; 647 } 648 } 649 650 Server::sharedInstance().scheduleThread(shared_from_this()); 651 }; 652 653 void DarlingServer::Thread::terminate() { 654 if (_process) { 655 if (_process.get() != Process::kernelProcess().get()) { 656 throw std::runtime_error("terminate() called on non-kernel thread"); 657 } 658 } else { 659 throw std::runtime_error("terminate() called on non-kernel thread"); 660 } 661 662 _rwlock.lock(); 663 _terminating = true; 664 665 if (currentThreadVar.get() == this) { 666 // if it's the current thread, just suspend it; 667 // when we return to the "top" of the microthread, 668 // doWork() will see that it's terminating and clean up 669 _rwlock.unlock(); 670 suspend(); 671 throw std::runtime_error("terminate() on current kernel thread returned"); 672 } else { 673 // if it's not the current thread and it's not currently running, just tell it died; 674 // it should die once the caller releases their reference(s) on us 675 if (!_running) { 676 _rwlock.unlock(); 677 notifyDead(); 678 } else { 679 // otherwise, if it IS running, once it returns to the microthread "top" and sees `_terminating = true`, it'll unregister itself 680 _rwlock.unlock(); 681 } 682 } 683 }; 684 685 std::shared_ptr<DarlingServer::Thread> DarlingServer::Thread::currentThread() { 686 return currentThreadVar; 687 }; 688 689 void DarlingServer::Thread::setupKernelThread(std::function<void()> startupCallback) { 690 std::unique_lock lock(_rwlock); 691 _continuationCallback = startupCallback; 692 _suspended = true; 693 getcontext(&_resumeContext); 694 }; 695 696 void DarlingServer::Thread::startKernelThread(std::function<void()> startupCallback) { 697 setupKernelThread(startupCallback); 698 resume(); 699 }; 700 701 void DarlingServer::Thread::impersonate(std::shared_ptr<Thread> thread) { 702 std::shared_ptr<Thread> oldThread; 703 704 if (thread) { 705 // prevent the thread from running while we're impersonating it 706 // FIXME: this may lead to blocking natively while we're on a microthread. 707 // we would prefer to block using duct-taped facilities instead. 708 { 709 std::unique_lock lock(thread->_rwlock); 710 thread->_deferLocked(true, lock); 711 thread->_running = true; 712 } 713 thread->_runningCondvar.notify_all(); 714 } 715 716 { 717 std::unique_lock lock(_rwlock); 718 oldThread = _impersonating; 719 _impersonating = thread; 720 } 721 722 if (oldThread) { 723 { 724 std::unique_lock lock(oldThread->_rwlock); 725 oldThread->_running = false; 726 oldThread->_undeferLocked(lock); 727 } 728 oldThread->_runningCondvar.notify_all(); 729 } 730 }; 731 732 std::shared_ptr<DarlingServer::Thread> DarlingServer::Thread::impersonatingThread() const { 733 std::shared_lock lock(_rwlock); 734 return _impersonating; 735 }; 736 737 void DarlingServer::Thread::interruptDisable() { 738 ++interruptDisableCount; 739 }; 740 741 void DarlingServer::Thread::interruptEnable() { 742 if (interruptDisableCount-- == 0) { 743 throw std::runtime_error("interruptEnable() called when already enabled"); 744 } 745 }; 746 747 void DarlingServer::Thread::syscallReturn(int resultCode) { 748 if (!currentThreadVar) { 749 throw std::runtime_error("syscallReturn() called with no current thread"); 750 } 751 752 { 753 auto call = (currentThreadVar->_interruptedForSignal) ? currentThreadVar->_interrupts.top().interruptedCall : currentThreadVar->_activeCall; 754 if (!call || !call->isXNUTrap()) { 755 throw std::runtime_error("Attempt to return from syscall on thread with no active syscall"); 756 } 757 if (call->isBSDTrap()) { 758 call->sendBSDReply(resultCode, currentThreadVar->_bsdReturnValue); 759 } else { 760 call->sendBasicReply(resultCode); 761 } 762 } 763 764 if (currentThreadVar->_interruptedForSignal) { 765 currentThreadVar->_didSyscallReturnDuringInterrupt = true; 766 #if DSERVER_ASAN 767 if (currentThreadVar->_handlingInterruptedCall) { 768 __sanitizer_start_switch_fiber(nullptr, currentThreadVar->_stack.base, currentThreadVar->_stack.size); 769 } 770 #endif 771 setcontext(¤tThreadVar->_syscallReturnHereDuringInterrupt); 772 __builtin_unreachable(); 773 } 774 775 // jump back to the top of the thread 776 #if DSERVER_ASAN 777 __sanitizer_start_switch_fiber(nullptr, asanOldStackBottom, asanOldStackSize); 778 #endif 779 setcontext(&backToThreadTopContext); 780 __builtin_unreachable(); 781 }; 782 783 static std::queue<std::function<void()>> kernelAsyncRunnerQueue; 784 785 // we have to use a regular lock here because it needs to be lockable from both a microthread and normal thread context. 786 // additionally, it's only locked for brief periods. 787 // 788 // we use libsimple_lock_t so we can pass it to `suspend` to unlock it after suspending. 789 // XXX: we could use a std::mutex if we add an overload to `suspend` for it. 790 static libsimple_lock_t kernelAsyncRunnerQueueLock; 791 static dtape_semaphore_t* kernelAsyncRunnerQueueSempahore = nullptr; 792 static uint64_t kernelAsyncRunnersAvailable = 0; 793 static std::vector<std::shared_ptr<DarlingServer::Thread>> permanentKernelAsyncRunners; 794 795 #define MAX_PERMANENT_KERNEL_RUNNERS 10 796 797 static void kernelAsyncRunnerThreadWorker(bool permanent, std::shared_ptr<DarlingServer::Thread> self) { 798 do { 799 // we're going to wait for work; we're available now. 800 libsimple_lock_lock(&kernelAsyncRunnerQueueLock); 801 ++kernelAsyncRunnersAvailable; 802 libsimple_lock_unlock(&kernelAsyncRunnerQueueLock); 803 804 if (!dtape_semaphore_down_simple(kernelAsyncRunnerQueueSempahore)) { 805 // we were interrupted. go again if we're permanent; otherwise, die. 806 libsimple_lock_lock(&kernelAsyncRunnerQueueLock); 807 --kernelAsyncRunnersAvailable; 808 libsimple_lock_unlock(&kernelAsyncRunnerQueueLock); 809 810 if (permanent) { 811 continue; 812 } else { 813 break; 814 } 815 } 816 817 libsimple_lock_lock(&kernelAsyncRunnerQueueLock); 818 819 if (kernelAsyncRunnerQueue.empty()) { 820 // we didn't find any work (we were probably awoken spuriously). 821 // go again if we're permanent; otherwise, die. 822 --kernelAsyncRunnersAvailable; 823 libsimple_lock_unlock(&kernelAsyncRunnerQueueLock); 824 825 if (permanent) { 826 continue; 827 } else { 828 break; 829 } 830 } 831 832 // we're going to perform some work; we're no longer available 833 --kernelAsyncRunnersAvailable; 834 835 auto func = kernelAsyncRunnerQueue.front(); 836 kernelAsyncRunnerQueue.pop(); 837 838 libsimple_lock_unlock(&kernelAsyncRunnerQueueLock); 839 840 // perform the work 841 func(); 842 } while (permanent); 843 844 self = nullptr; 845 DarlingServer::Thread::currentThread()->terminate(); 846 __builtin_unreachable(); 847 }; 848 849 void DarlingServer::Thread::kernelAsync(std::function<void()> fn) { 850 static bool inited = []() { 851 kernelAsyncRunnerQueueSempahore = dtape_semaphore_create(Process::kernelProcess()->_dtapeTask, 0); 852 return true; 853 }(); 854 855 libsimple_lock_lock(&kernelAsyncRunnerQueueLock); 856 kernelAsyncRunnerQueue.push(fn); 857 if (kernelAsyncRunnersAvailable == 0) { 858 // we need to get some work done, but there are no workers available. 859 // if we have less workers than the max permanent number of workers, 860 // let's spawn a permanent worker. otherwise, just spawn a temporary worker. 861 auto thread = std::make_shared<Thread>(KernelThreadConstructorTag()); 862 auto permanent = permanentKernelAsyncRunners.size() < MAX_PERMANENT_KERNEL_RUNNERS; 863 thread->startKernelThread(std::bind(kernelAsyncRunnerThreadWorker, permanent, thread)); 864 if (permanent) { 865 permanentKernelAsyncRunners.push_back(std::move(thread)); 866 } 867 } 868 libsimple_lock_unlock(&kernelAsyncRunnerQueueLock); 869 870 // increment the semaphore to let workers know there's work available. 871 dtape_semaphore_up(kernelAsyncRunnerQueueSempahore); 872 }; 873 874 void DarlingServer::Thread::kernelSync(std::function<void()> fn) { 875 std::mutex mutex; 876 std::condition_variable condvar; 877 bool done = false; 878 879 kernelAsync([&]() { 880 fn(); 881 882 { 883 std::unique_lock lock2(mutex); 884 done = true; 885 } 886 887 // notify all, but there should only be one thread waiting 888 condvar.notify_all(); 889 }); 890 891 { 892 std::unique_lock lock(mutex); 893 condvar.wait(lock, [&]() { 894 return done; 895 }); 896 } 897 }; 898 899 std::shared_ptr<DarlingServer::Thread> DarlingServer::Thread::threadForPort(uint32_t thread_port) { 900 // prevent the target thread from dying by taking the global thread registry lock 901 auto registryLock = threadRegistry().scopedLock(); 902 903 dtape_thread_t* thread_handle = dtape_thread_for_port(thread_port); 904 if (!thread_handle) { 905 return nullptr; 906 } 907 908 Thread* thread = static_cast<Thread*>(dtape_thread_context(thread_handle)); 909 if (!thread) { 910 return nullptr; 911 } 912 913 return thread->shared_from_this(); 914 }; 915 916 void DarlingServer::Thread::loadStateFromUser(uint64_t threadState, uint64_t floatState) { 917 int ret = dtape_thread_load_state_from_user(_dtapeThread, threadState, floatState); 918 if (ret != 0) { 919 throw std::system_error(-ret, std::generic_category()); 920 } 921 }; 922 923 void DarlingServer::Thread::saveStateToUser(uint64_t threadState, uint64_t floatState) { 924 int ret = dtape_thread_save_state_to_user(_dtapeThread, threadState, floatState); 925 if (ret != 0) { 926 throw std::system_error(-ret, std::generic_category()); 927 } 928 }; 929 930 int DarlingServer::Thread::pendingSignal() const { 931 std::shared_lock lock(_rwlock); 932 return (_interrupts.empty()) ? 0 : _interrupts.top().signal; 933 }; 934 935 int DarlingServer::Thread::setPendingSignal(int signal) { 936 std::unique_lock lock(_rwlock); 937 int pendingSignal; 938 if (_interrupts.empty()) { 939 throw std::runtime_error("Can't set pending signal with no active interrupts"); 940 } else { 941 pendingSignal = _interrupts.top().signal; 942 _interrupts.top().signal = signal; 943 } 944 return pendingSignal; 945 }; 946 947 void DarlingServer::Thread::processSignal(int bsdSignalNumber, int linuxSignalNumber, int code, uintptr_t signalAddress, uintptr_t threadStateAddress, uintptr_t floatStateAddress) { 948 loadStateFromUser(threadStateAddress, floatStateAddress); 949 950 { 951 std::unique_lock lock(_rwlock); 952 _interrupts.top().signal = 0; 953 _processingSignal = true; 954 } 955 956 dtape_thread_process_signal(_dtapeThread, bsdSignalNumber, linuxSignalNumber, code, signalAddress); 957 958 // LLDB commonly suspends the thread upon reception of an exception and assumes 959 // that the thread will stay suspended after replying to the exception message, 960 // until thread_resume() is called. 961 dtape_thread_wait_while_user_suspended(_dtapeThread); 962 963 { 964 std::unique_lock lock(_rwlock); 965 _processingSignal = false; 966 } 967 968 saveStateToUser(threadStateAddress, floatStateAddress); 969 }; 970 971 void DarlingServer::Thread::handleSignal(int signal) { 972 std::unique_lock lock(_rwlock); 973 if (_processingSignal) { 974 _interrupts.top().signal = signal; 975 } else { 976 throw std::runtime_error("Attempt to handle signal while not processing signal"); 977 } 978 }; 979 980 void DarlingServer::Thread::setPendingCallOverride(bool pendingCallOverride) { 981 std::unique_lock lock(_rwlock); 982 _pendingCallOverride = pendingCallOverride; 983 }; 984 985 /* 986 * server-to-client (S2C) calls are used by darlingserver to invoke certain functions within managed processes 987 * for which there is no in-server alterative. 988 * 989 * for example, memory allocation can only be done by the managed process itself; there is no Linux syscall to allocate memory in another process. 990 * therefore, we have to ask the process to do it for us. 991 * 992 * an alternative to this system is ptrace. we can attach to the managed process and execute any function we like. 993 * this is made even easier by the fact that we have our own code in the managed process, meaning we can help out the server by 994 * providing thunks for it to execute that already include a debug trap. 995 * the problem with this alternative is that there's no good way to tell when the child is done executing the function: 996 * * we could block with waitpid, but then that would block the worker thread for an indeterminate amount of time (and we want to avoid that). 997 * * we could have the main event loop poll periodically, but polling is undesirable. 998 * additionally, if someone else is already ptracing that process, we lose the ability to execute code with this approach. 999 * therefore, we have this RPC-based system instead. 1000 * 1001 * in order to perform an S2C call, however, the target thread MUST be waiting for a message from the server. 1002 * thus, when we want to perform an S2C call on a thread that isn't waiting for a message, we send it a real-time signal 1003 * to ask it to execute the S2C call. the signal is handled with the normal wrappers (interrupt_enter and interrupt_exit) 1004 * to properly handle the case when we may be accidentally interrupting an ongoing call in the thread (since we may have raced 1005 * with thread trying to perform a server call). 1006 */ 1007 1008 static DarlingServer::Log s2cLog("s2c"); 1009 1010 std::optional<DarlingServer::Message> DarlingServer::Thread::_s2cPerform(Message&& call, dserver_s2c_msgnum_t expectedReplyNumber, size_t expectedReplySize) { 1011 std::optional<Message> reply = std::nullopt; 1012 bool usingInterrupt = false; 1013 1014 // make sure we're the only one performing an S2C call on this thread 1015 if (!dtape_semaphore_down_simple(_s2cPerformSempahore)) { 1016 // got interrupted while waiting 1017 return std::nullopt; 1018 } 1019 1020 s2cLog.debug() << *this << ": Going to perform S2C call" << s2cLog.endLog; 1021 1022 { 1023 std::unique_lock lock(_rwlock); 1024 1025 if (!_activeCall) { 1026 // signal the thread that we want to perform an S2C call and wait for it to give us the green light 1027 lock.unlock(); 1028 s2cLog.debug() << *this << ": Sending S2C signal" << s2cLog.endLog; 1029 usingInterrupt = true; 1030 sendSignal(LINUX_SIGRTMIN + 1); 1031 if (!dtape_semaphore_down_simple(_s2cInterruptEnterSemaphore)) { 1032 // got interrupted while waiting 1033 dtape_semaphore_up(_s2cPerformSempahore); 1034 return std::nullopt; 1035 } 1036 s2cLog.debug() << *this << ": Got green light to perform S2C call" << s2cLog.endLog; 1037 lock.lock(); 1038 } else if (currentThread().get() != this) { 1039 // we have an active call, so the client is waiting for a reply and is able to perform an S2C call, 1040 // but we're not the active thread. thus, in order to guarantee the client doesn't receive a reply 1041 // and stop waiting before we get a chance to perform our S2C call, let's make sure replies are deferred. 1042 _deferReplyForS2C = true; 1043 } 1044 1045 call.setAddress(_address); 1046 } 1047 1048 // at least for now, in order to wait for the S2C reply, we need the calling thread to be a microthread, 1049 // so that waiting on the duct-taped semaphore will work 1050 if (!currentThread()) { 1051 dtape_semaphore_up(_s2cPerformSempahore); 1052 throw std::runtime_error("Must be in a microthread (any microthread) to wait for S2C reply"); 1053 } 1054 1055 s2cLog.debug() << *this << ": Going to send S2C message" << s2cLog.endLog; 1056 1057 // send the call 1058 Server::sharedInstance().sendMessage(std::move(call)); 1059 1060 // now let's wait for the reply 1061 if (!dtape_semaphore_down_simple(_s2cReplySempahore)) { 1062 // got interrupted while waiting 1063 dtape_semaphore_up(_s2cPerformSempahore); 1064 return std::nullopt; 1065 } 1066 1067 s2cLog.debug() << *this << ": Received S2C reply" << s2cLog.endLog; 1068 1069 // extract the reply 1070 { 1071 std::unique_lock lock(_rwlock); 1072 1073 if (!_s2cReply) { 1074 // impossible, but just in case 1075 dtape_semaphore_up(_s2cPerformSempahore); 1076 throw std::runtime_error("S2C reply semaphore incremented, but no reply present"); 1077 } 1078 1079 reply = std::move(_s2cReply); 1080 _s2cReply = std::nullopt; 1081 1082 // if we had replies deferred, now's the time to send them 1083 if (_deferReplyForS2C) { 1084 _deferReplyForS2C = false; 1085 if (_deferredReply) { 1086 Server::sharedInstance().sendMessage(std::move(*_deferredReply)); 1087 _deferredReply = std::nullopt; 1088 } 1089 } 1090 } 1091 1092 s2cLog.debug() << *this << ": Done performing S2C call" << s2cLog.endLog; 1093 1094 // we're done performing the call; allow others to have a chance at performing an S2C call on this thread 1095 dtape_semaphore_up(_s2cPerformSempahore); 1096 1097 if (usingInterrupt) { 1098 // if we used the S2C signal to perform the call, then the s2c_perform call is currently waiting for us to finish; 1099 // let it know that we're done 1100 s2cLog.debug() << *this << ": Allowing thread to resume from S2C interrupt" << s2cLog.endLog; 1101 dtape_semaphore_up(_s2cInterruptExitSemaphore); 1102 } 1103 1104 // partially validate the reply 1105 1106 if (reply->data().size() != expectedReplySize) { 1107 throw std::runtime_error("Invalid S2C reply: unxpected size"); 1108 } 1109 1110 auto replyHeader = reinterpret_cast<dserver_s2c_replyhdr_t*>(reply->data().data()); 1111 if (replyHeader->s2c_number != expectedReplyNumber) { 1112 throw std::runtime_error("Invalid S2C reply: unexpected S2C reply number"); 1113 } 1114 1115 return std::move(*reply); 1116 }; 1117 1118 uintptr_t DarlingServer::Thread::_mmap(uintptr_t address, size_t length, int protection, int flags, int fd, off_t offset, int& outErrno) { 1119 // XXX: not sure if we want to force all allocations in 32-bit processes to be in the 32-bit address space. 1120 // for now, we leave it up to the caller. 1121 #if 0 1122 auto process = _process.lock(); 1123 1124 if (!process) { 1125 throw std::runtime_error("Cannot perform mmap without valid process"); 1126 } 1127 1128 if (process->architecture() == Process::Architecture::i386 || process->architecture() == Process::Architecture::ARM32) { 1129 flags |= MAP_32BIT; 1130 } 1131 #endif 1132 1133 Message callMessage(sizeof(dserver_s2c_call_mmap_t), (fd < 0) ? 0 : 1); 1134 auto call = reinterpret_cast<dserver_s2c_call_mmap_t*>(callMessage.data().data()); 1135 1136 call->header.call_number = dserver_callnum_s2c; 1137 call->header.s2c_number = dserver_s2c_msgnum_mmap; 1138 call->address = address; 1139 call->length = length; 1140 call->protection = protection; 1141 call->flags = flags; 1142 call->fd = (fd < 0) ? -1 : 0; 1143 call->offset = offset; 1144 1145 if (fd >= 0) { 1146 auto dupfd = dup(fd); 1147 if (dupfd < 0) { 1148 outErrno = errno; 1149 return (uintptr_t)MAP_FAILED; 1150 } 1151 callMessage.pushDescriptor(dupfd); 1152 } 1153 1154 s2cLog.debug() << "Performing _mmap with address=" << call->address << ", length=" << call->length << ", protection=" << call->protection << ", flags=" << call->flags << ", fd=" << call->fd << " (" << fd << ")" << ", offset=" << call->offset << s2cLog.endLog; 1155 1156 auto maybeReplyMessage = _s2cPerform(std::move(callMessage), dserver_s2c_msgnum_mmap, sizeof(dserver_s2c_reply_mmap_t)); 1157 if (!maybeReplyMessage) { 1158 s2cLog.debug() << "_mmap call interrupted" << s2cLog.endLog; 1159 outErrno = EINTR; 1160 return (uintptr_t)MAP_FAILED; 1161 } 1162 1163 auto replyMessage = std::move(*maybeReplyMessage); 1164 auto reply = reinterpret_cast<dserver_s2c_reply_mmap_t*>(replyMessage.data().data()); 1165 1166 s2cLog.debug() << "_mmap returned address=" << reply->address << ", errno_result=" << reply->errno_result << s2cLog.endLog; 1167 1168 outErrno = reply->errno_result; 1169 return reply->address; 1170 }; 1171 1172 int DarlingServer::Thread::_munmap(uintptr_t address, size_t length, int& outErrno) { 1173 Message callMessage(sizeof(dserver_s2c_call_munmap_t), 0); 1174 auto call = reinterpret_cast<dserver_s2c_call_munmap_t*>(callMessage.data().data()); 1175 1176 call->header.call_number = dserver_callnum_s2c; 1177 call->header.s2c_number = dserver_s2c_msgnum_munmap; 1178 call->address = address; 1179 call->length = length; 1180 1181 s2cLog.debug() << "Performing _munmap with address=" << call->address << ", length=" << call->length << s2cLog.endLog; 1182 1183 auto maybeReplyMessage = _s2cPerform(std::move(callMessage), dserver_s2c_msgnum_munmap, sizeof(dserver_s2c_reply_munmap_t)); 1184 if (!maybeReplyMessage) { 1185 s2cLog.debug() << "_munmap call interrupted" << s2cLog.endLog; 1186 outErrno = EINTR; 1187 return -1; 1188 } 1189 1190 auto replyMessage = std::move(*maybeReplyMessage); 1191 auto reply = reinterpret_cast<dserver_s2c_reply_munmap_t*>(replyMessage.data().data()); 1192 1193 s2cLog.debug() << "_munmap returned return_value=" << reply->return_value << ", errno_result=" << reply->errno_result << s2cLog.endLog; 1194 1195 outErrno = reply->errno_result; 1196 return reply->return_value; 1197 }; 1198 1199 int DarlingServer::Thread::_mprotect(uintptr_t address, size_t length, int protection, int& outErrno) { 1200 Message callMessage(sizeof(dserver_s2c_call_mprotect_t), 0); 1201 auto call = reinterpret_cast<dserver_s2c_call_mprotect_t*>(callMessage.data().data()); 1202 1203 call->header.call_number = dserver_callnum_s2c; 1204 call->header.s2c_number = dserver_s2c_msgnum_mprotect; 1205 call->address = address; 1206 call->length = length; 1207 call->protection = protection; 1208 1209 s2cLog.debug() << "Performing _mprotect with address=" << call->address << ", length=" << call->length << ", protection=" << call->protection << s2cLog.endLog; 1210 1211 auto maybeReplyMessage = _s2cPerform(std::move(callMessage), dserver_s2c_msgnum_mprotect, sizeof(dserver_s2c_reply_mprotect_t)); 1212 if (!maybeReplyMessage) { 1213 s2cLog.debug() << "_mprotect call interrupted" << s2cLog.endLog; 1214 outErrno = EINTR; 1215 return -1; 1216 } 1217 1218 auto replyMessage = std::move(*maybeReplyMessage); 1219 auto reply = reinterpret_cast<dserver_s2c_reply_mprotect_t*>(replyMessage.data().data()); 1220 1221 s2cLog.debug() << "_mprotect returned return_value=" << reply->return_value << ", errno_result=" << reply->errno_result << s2cLog.endLog; 1222 1223 outErrno = reply->errno_result; 1224 return reply->return_value; 1225 }; 1226 1227 int DarlingServer::Thread::_msync(uintptr_t address, size_t size, int sync_flags, int& outErrno) { 1228 Message callMessage(sizeof(dserver_s2c_call_msync_t), 0); 1229 auto call = reinterpret_cast<dserver_s2c_call_msync_t*>(callMessage.data().data()); 1230 1231 call->header.call_number = dserver_callnum_s2c; 1232 call->header.s2c_number = dserver_s2c_msgnum_msync; 1233 call->address = address; 1234 call->size = size; 1235 call->sync_flags = sync_flags; 1236 1237 s2cLog.debug() << "Performing _msync with address=" << call->address << ", size=" << call->size << ", sync_flags=" << call->sync_flags << s2cLog.endLog; 1238 1239 auto maybeReplyMessage = _s2cPerform(std::move(callMessage), dserver_s2c_msgnum_msync, sizeof(dserver_s2c_reply_msync_t)); 1240 if (!maybeReplyMessage) { 1241 s2cLog.debug() << "_msync call interrupted" << s2cLog.endLog; 1242 outErrno = EINTR; 1243 return -1; 1244 } 1245 1246 auto replyMessage = std::move(*maybeReplyMessage); 1247 auto reply = reinterpret_cast<dserver_s2c_reply_msync_t*>(replyMessage.data().data()); 1248 1249 s2cLog.debug() << "_msync returned return_value=" << reply->return_value << ", errno_result=" << reply->errno_result << s2cLog.endLog; 1250 1251 outErrno = reply->errno_result; 1252 return reply->return_value; 1253 }; 1254 1255 uintptr_t DarlingServer::Thread::allocatePages(size_t pageCount, int protection, uintptr_t addressHint, bool fixed, bool overwrite) { 1256 int err = 0; 1257 int flags = MAP_PRIVATE | MAP_ANONYMOUS; 1258 if (fixed && overwrite) { 1259 flags |= MAP_FIXED; 1260 } else if (fixed) { 1261 flags |= MAP_FIXED_NOREPLACE; 1262 } 1263 auto result = _mmap(addressHint, pageCount * sysconf(_SC_PAGESIZE), protection, flags, -1, 0, err); 1264 if (result == (uintptr_t)MAP_FAILED) { 1265 throw std::system_error(err, std::generic_category(), "S2C mmap call failed"); 1266 } 1267 return result; 1268 }; 1269 1270 void DarlingServer::Thread::freePages(uintptr_t address, size_t pageCount) { 1271 int err = 0; 1272 if (_munmap(address, pageCount * sysconf(_SC_PAGESIZE), err) < 0) { 1273 throw std::system_error(err, std::generic_category(), "S2C munmap call failed"); 1274 } 1275 }; 1276 1277 uintptr_t DarlingServer::Thread::mapFile(int fd, size_t pageCount, int protection, uintptr_t addressHint, size_t pageOffset, bool fixed, bool overwrite) { 1278 int err = 0; 1279 int flags = MAP_SHARED; 1280 if (fixed && overwrite) { 1281 flags |= MAP_FIXED; 1282 } else if (fixed) { 1283 flags |= MAP_FIXED_NOREPLACE; 1284 } 1285 auto result = _mmap(addressHint, pageCount * sysconf(_SC_PAGESIZE), protection, flags, fd, pageOffset * sysconf(_SC_PAGESIZE), err); 1286 if (result == (uintptr_t)MAP_FAILED) { 1287 throw std::system_error(err, std::generic_category(), "S2C mmap call failed"); 1288 } 1289 return result; 1290 }; 1291 1292 void DarlingServer::Thread::changeProtection(uintptr_t address, size_t pageCount, int protection) { 1293 int err = 0; 1294 if (_mprotect(address, pageCount * sysconf(_SC_PAGESIZE), protection, err) < 0) { 1295 throw std::system_error(err, std::generic_category(), "S2C mprotect call failed"); 1296 } 1297 }; 1298 1299 void DarlingServer::Thread::syncMemory(uintptr_t address, size_t size, int sync_flags) { 1300 int err = 0; 1301 if (_msync(address, size, sync_flags, err) < 0) { 1302 throw std::system_error(err, std::generic_category(), "S2C msync call failed"); 1303 } 1304 }; 1305 1306 void DarlingServer::Thread::waitUntilRunning() { 1307 std::shared_lock lock(_rwlock); 1308 _runningCondvar.wait(lock, [&]() { 1309 return _running; 1310 }); 1311 }; 1312 1313 void DarlingServer::Thread::waitUntilNotRunning() { 1314 std::shared_lock lock(_rwlock); 1315 _runningCondvar.wait(lock, [&]() { 1316 return !_running; 1317 }); 1318 }; 1319 1320 void DarlingServer::Thread::_deferLocked(bool wait, std::unique_lock<std::shared_mutex>& lock) { 1321 if (_deferralState == DeferralState::NotDeferred) { 1322 _deferralState = DeferralState::DeferredNotPending; 1323 } 1324 1325 if (wait) { 1326 _runningCondvar.wait(lock, [&]() { 1327 return !_running; 1328 }); 1329 } 1330 }; 1331 1332 void DarlingServer::Thread::defer(bool wait) { 1333 std::unique_lock lock(_rwlock); 1334 _deferLocked(wait, lock); 1335 }; 1336 1337 void DarlingServer::Thread::_undeferLocked(std::unique_lock<std::shared_mutex>& lock) { 1338 DeferralState previousDeferralState; 1339 1340 previousDeferralState = _deferralState; 1341 _deferralState = DeferralState::NotDeferred; 1342 1343 if (previousDeferralState == DeferralState::DeferredPending) { 1344 Server::sharedInstance().scheduleThread(shared_from_this()); 1345 } 1346 }; 1347 1348 void DarlingServer::Thread::undefer() { 1349 std::unique_lock lock(_rwlock); 1350 _undeferLocked(lock); 1351 }; 1352 1353 uint32_t* DarlingServer::Thread::bsdReturnValuePointer() { 1354 return &_bsdReturnValue; 1355 }; 1356 1357 void DarlingServer::Thread::logToStream(Log::Stream& stream) const { 1358 stream << "[T:" << _tid << "(" << _nstid << ")]"; 1359 }; 1360 1361 void DarlingServer::Thread::pushCallReply(std::shared_ptr<Call> expectedCall, Message&& reply) { 1362 std::unique_lock lock(_rwlock); 1363 1364 if (expectedCall) { 1365 _deactivateCallLocked(expectedCall); 1366 } 1367 1368 if (_interruptedForSignal) { 1369 if (_interrupts.top().savedReply) { 1370 throw std::runtime_error("New reply would overwrite existing saved reply"); 1371 } 1372 1373 _interrupts.top().savedReply = std::move(reply); 1374 } else if (_deferReplyForS2C) { 1375 _deferredReply = std::move(reply); 1376 } else if (!_dead) { 1377 Server::sharedInstance().sendMessage(std::move(reply)); 1378 } 1379 }; 1380 1381 DarlingServer::Thread::RunState DarlingServer::Thread::getRunState() const { 1382 auto process = this->process(); 1383 if (!process || isDead()) { 1384 return RunState::Dead; 1385 } 1386 1387 std::ifstream file("/proc/" + std::to_string(process->id()) + "/task/" + std::to_string(id()) + "/stat"); 1388 std::string line; 1389 if (!std::getline(file, line)) { 1390 return RunState::Dead; 1391 } 1392 1393 auto endOfComm = line.find(')'); 1394 if (endOfComm == std::string::npos) { 1395 return RunState::Dead; 1396 } 1397 1398 if (line.size() <= endOfComm + 2) { 1399 return RunState::Dead; 1400 } 1401 1402 switch (line[endOfComm + 2]) { 1403 case 'R': 1404 return RunState::Running; 1405 case 'S': 1406 return RunState::Interruptible; 1407 case 'D': 1408 return RunState::Uninterruptible; 1409 case 'T': 1410 return RunState::Stopped; 1411 default: 1412 return RunState::Dead; 1413 } 1414 }; 1415 1416 void DarlingServer::Thread::waitWhileUserSuspended(uintptr_t threadStateAddress, uintptr_t floatStateAddress) { 1417 loadStateFromUser(threadStateAddress, floatStateAddress); 1418 dtape_thread_wait_while_user_suspended(_dtapeThread); 1419 try { 1420 saveStateToUser(threadStateAddress, floatStateAddress); 1421 } catch (std::system_error e) { 1422 // if we fail to save the state back to the process, that likely means the process died or was killed while waiting. 1423 // it's nothing to worry about. just log it and move on. 1424 threadLog.warning() << *this << ": failed to save state back to user in waitWhileUserSuspended: " << e.code() << " (" << e.what() << ")" << threadLog.endLog; 1425 } 1426 }; 1427 1428 void DarlingServer::Thread::sendSignal(int signal) const { 1429 if (isDead()) { 1430 return; 1431 } 1432 if (_process) { 1433 if (syscall(SYS_tgkill, _process->id(), id(), signal) < 0) { 1434 int code = errno; 1435 throw std::system_error(code, std::generic_category()); 1436 } 1437 } else { 1438 throw std::system_error(ESRCH, std::generic_category()); 1439 } 1440 }; 1441 1442 void DarlingServer::Thread::jumpToResume(void* stack, size_t stackSize) { 1443 #if DSERVER_ASAN 1444 __sanitizer_start_switch_fiber(&asanOldFakeStack, stack, stackSize); 1445 #endif 1446 setcontext(&_resumeContext); 1447 __builtin_unreachable(); 1448 }; 1449 1450 void DarlingServer::Thread::notifyDead() { 1451 bool canRelease = false; 1452 1453 { 1454 std::unique_lock lock(_rwlock); 1455 if (_dead) { 1456 return; 1457 } 1458 1459 threadLog.info() << *this << ": thread dying" << threadLog.endLog; 1460 _dead = true; 1461 1462 if (!_activeCall) { 1463 // if we have no active call, we won't ever need to run again, 1464 // so set `_terminating` to make sure that doesn't happen 1465 _terminating = true; 1466 canRelease = true; 1467 } 1468 } 1469 1470 // keep ourselves alive until the duct-taped context is done 1471 _selfReference = shared_from_this(); 1472 1473 dtape_thread_dying(_dtapeThread); 1474 1475 if (canRelease) { 1476 _scheduleRelease(); 1477 } else { 1478 resume(); 1479 } 1480 1481 threadRegistry().unregisterEntry(shared_from_this()); 1482 }; 1483 1484 bool DarlingServer::Thread::isDead() const { 1485 std::shared_lock lock(_rwlock); 1486 return _dead; 1487 }; 1488 1489 void DarlingServer::Thread::_dispose() { 1490 threadLog.debug() << *this << ": dispose thread context" << threadLog.endLog; 1491 _selfReference = nullptr; 1492 }; 1493 1494 void DarlingServer::Thread::_scheduleRelease() { 1495 // schedule the duct-taped thread to be released 1496 // dtape_thread_release needs a microthread context, so we call it within a kernel microthread 1497 threadLog.debug() << *this << ": scheduling release" << threadLog.endLog; 1498 kernelAsync([self = shared_from_this()]() { 1499 if (self->_s2cPerformSempahore) { 1500 dtape_semaphore_destroy(self->_s2cPerformSempahore); 1501 self->_s2cPerformSempahore = nullptr; 1502 } 1503 if (self->_s2cReplySempahore) { 1504 dtape_semaphore_destroy(self->_s2cReplySempahore); 1505 self->_s2cReplySempahore = nullptr; 1506 } 1507 if (self->_s2cInterruptEnterSemaphore) { 1508 dtape_semaphore_destroy(self->_s2cInterruptEnterSemaphore); 1509 self->_s2cInterruptEnterSemaphore = nullptr; 1510 } 1511 if (self->_s2cInterruptExitSemaphore) { 1512 dtape_semaphore_destroy(self->_s2cInterruptExitSemaphore); 1513 self->_s2cInterruptExitSemaphore = nullptr; 1514 } 1515 dtape_thread_release(self->_dtapeThread); 1516 self->_dtapeThread = nullptr; 1517 }); 1518 }; 1519 1520 static thread_local std::function<void()> interruptedContinuation = nullptr; 1521 1522 void DarlingServer::Thread::_handleInterruptEnterForCurrentThread() { 1523 // FIXME: this currently does not work properly if the thread was suspended waiting for a lock 1524 1525 { 1526 std::unique_lock lock(currentThreadVar->_rwlock); 1527 1528 if (currentThreadVar->_pendingSavedReply) { 1529 if (currentThreadVar->_interrupts.top().savedReply) { 1530 throw std::runtime_error("Pending saved reply would overwrite saved reply"); 1531 } 1532 1533 currentThreadVar->_interrupts.top().savedReply = std::move(*currentThreadVar->_pendingSavedReply); 1534 currentThreadVar->_pendingSavedReply = std::nullopt; 1535 } 1536 1537 currentThreadVar->_interruptedForSignal = true; 1538 1539 interruptedContinuation = currentThreadVar->_interruptedContinuation; 1540 currentThreadVar->_interruptedContinuation = nullptr; 1541 } 1542 1543 dtape_thread_sigexc_enter(currentThreadVar->_dtapeThread); 1544 1545 currentThreadVar->_didSyscallReturnDuringInterrupt = false; 1546 getcontext(¤tThreadVar->_syscallReturnHereDuringInterrupt); 1547 1548 if (!currentThreadVar->_didSyscallReturnDuringInterrupt) { 1549 if (interruptedContinuation) { 1550 interruptedContinuation(); 1551 } else if (currentThreadVar->_interrupts.top().interruptedCall) { 1552 currentThreadVar->_handlingInterruptedCall = true; 1553 currentThreadVar->_pendingCallOverride = true; 1554 currentThreadVar->jumpToResume(currentThreadVar->_interrupts.top().savedStack.base, currentThreadVar->_interrupts.top().savedStack.size); 1555 } 1556 } else if (currentThreadVar->_handlingInterruptedCall) { 1557 #if DSERVER_ASAN 1558 const void* dummy; 1559 size_t dummy2; 1560 __sanitizer_finish_switch_fiber(nullptr, &dummy, &dummy2); 1561 #endif 1562 1563 currentThreadVar->_handlingInterruptedCall = false; 1564 currentThreadVar->_pendingCallOverride = false; 1565 } 1566 1567 interruptedContinuation = nullptr; 1568 1569 { 1570 std::unique_lock lock(currentThreadVar->_rwlock); 1571 1572 if (currentThreadVar->_interrupts.top().savedStack.isValid()) { 1573 stackPool.free(currentThreadVar->_interrupts.top().savedStack); 1574 } 1575 1576 currentThreadVar->_interruptedForSignal = false; 1577 currentThreadVar->_interrupts.top().interruptedCall = nullptr; 1578 } 1579 1580 dtape_thread_sigexc_enter2(currentThreadVar->_dtapeThread); 1581 };