kqchan.cpp
1 #include <darlingserver/kqchan.hpp> 2 #include <darlingserver/server.hpp> 3 #include <darlingserver/rpc-supplement.h> 4 #include <darlingserver/thread.hpp> 5 #include <darlingserver/logging.hpp> 6 7 #include <sys/socket.h> 8 #include <fcntl.h> 9 #include <atomic> 10 11 static DarlingServer::Log kqchanLog("kqchan"); 12 static DarlingServer::Log kqchanMachPortLog("kqchan:mach_port"); 13 static DarlingServer::Log kqchanProcLog("kqchan:proc"); 14 static std::atomic_uint64_t kqchanDebugIDCounter = 0; 15 16 // 17 // base class 18 // 19 20 DarlingServer::Kqchan::Kqchan(std::shared_ptr<DarlingServer::Process> process): 21 _debugID(kqchanDebugIDCounter++), 22 _process(process) 23 { 24 kqchanLog.debug() << *this << ": Constructing kqchan" << kqchanLog.endLog; 25 }; 26 27 DarlingServer::Kqchan::~Kqchan() { 28 kqchanLog.debug() << *this << ": Destroying kqchan" << kqchanLog.endLog; 29 }; 30 31 uintptr_t DarlingServer::Kqchan::_idForProcess() const { 32 throw std::runtime_error("must be overridden in derived class"); 33 }; 34 35 void DarlingServer::Kqchan::_processMessages() { 36 throw std::runtime_error("must be overridden in derived class"); 37 }; 38 39 std::shared_ptr<DarlingServer::Kqchan> DarlingServer::Kqchan::sharedFromRoot() { 40 throw std::runtime_error("must be overridden in derived class"); 41 }; 42 43 int DarlingServer::Kqchan::setup() { 44 int fds[2]; 45 46 kqchanLog.debug() << *this << ": Setting up kqchan" << kqchanLog.endLog; 47 48 if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds) < 0) { 49 int ret = errno; 50 kqchanLog.debug() << "Failed to create socket pair" << kqchanLog.endLog; 51 throw std::system_error(ret, std::generic_category()); 52 } 53 54 // we'll keep fds[0] and give fds[1] away 55 _socket = std::make_shared<FD>(fds[0]); 56 57 kqchanLog.debug() << *this << ": Keeping socket " << fds[0] << " and giving away " << fds[1] << kqchanLog.endLog; 58 59 // set O_NONBLOCK on our socket 60 int flags = fcntl(_socket->fd(), F_GETFL); 61 if (flags < 0) { 62 int ret = errno; 63 throw std::system_error(ret, std::generic_category()); 64 } 65 if (fcntl(_socket->fd(), F_SETFL, flags | O_NONBLOCK) < 0) { 66 int ret = errno; 67 throw std::system_error(ret, std::generic_category()); 68 } 69 70 std::weak_ptr<Kqchan> weakThis = sharedFromRoot(); 71 72 _outbox.setMessageArrivalNotificationCallback([weakThis]() { 73 auto self = weakThis.lock(); 74 75 if (!self) { 76 return; 77 } 78 79 std::unique_lock lock(self->_sendingMutex); 80 81 kqchanLog.debug() << *self << ": Got messages to send, attempting to send them" << kqchanLog.endLog; 82 83 // we probably won't be sending very many messages at once; 84 // we can send all the messages in the same context that they were pushed 85 do { 86 self->_canSend = self->_outbox.sendMany(self->_socket->fd()); 87 } while (self->_canSend && !self->_outbox.empty()); 88 }); 89 90 _monitor = std::make_shared<Monitor>(_socket, Monitor::Event::Readable | Monitor::Event::Writable | Monitor::Event::HangUp, true, false, [weakThis](std::shared_ptr<Monitor> monitor, Monitor::Event event) { 91 auto self = weakThis.lock(); 92 93 if (!self) { 94 return; 95 } 96 97 kqchanLog.debug() << *self << ": Got event(s) on socket: " << static_cast<uint64_t>(event) << kqchanLog.endLog; 98 99 if (static_cast<uint64_t>(event & Monitor::Event::HangUp) != 0) { 100 // socket hangup (peer closed their socket) 101 102 kqchanLog.debug() << *self << ": Peer hung up their socket; cleaning up monitor and kqchan" << kqchanLog.endLog; 103 104 // stop monitoring the socket (we're not gonna get any more events out of it) 105 Server::sharedInstance().removeMonitor(monitor); 106 107 // and unregister ourselves from the process (if it still exists) 108 // (which means the instance should be freed once we return) 109 if (auto process = self->_process.lock()) { 110 process->unregisterKqchan(self); 111 } 112 113 // no need to process any other events that may have occurred 114 return; 115 } 116 117 if (static_cast<uint64_t>(event & Monitor::Event::Readable) != 0) { 118 // incoming messages 119 120 kqchanLog.debug() << *self << ": socket has pending incoming messages" << kqchanLog.endLog; 121 122 // receive them all 123 while (self->_inbox.receiveMany(self->_socket->fd())); 124 125 // process the messages 126 self->_processMessages(); 127 } 128 129 if (static_cast<uint64_t>(event & Monitor::Event::Writable) != 0) { 130 // we can now send messages again; 131 // send as many messages as we can 132 133 std::unique_lock lock(self->_sendingMutex); 134 135 kqchanLog.debug() << *self << ": socket is now writable; sending all pending outgoing messages" << kqchanLog.endLog; 136 do { 137 self->_canSend = self->_outbox.sendMany(self->_socket->fd()); 138 } while (self->_canSend && !self->_outbox.empty()); 139 } 140 }); 141 142 DarlingServer::Server::sharedInstance().addMonitor(_monitor); 143 144 return fds[1]; 145 }; 146 147 void DarlingServer::Kqchan::_sendNotification() { 148 std::unique_lock lock(_notificationMutex); 149 150 kqchanLog.debug() << *this << ": received request to send notification" << kqchanLog.endLog; 151 152 if (!_canSendNotification) { 153 // we've already sent our peer a notification that they haven't acknowledged yet; 154 // let's not send another and needlessly clog up the socket 155 kqchanLog.debug() << *this << ": earlier notification has not yet been acknowledged; not sending another notification" << kqchanLog.endLog; 156 return; 157 } 158 159 kqchanLog.debug() << *this << ": sending notification " << _notificationCount << kqchanLog.endLog; 160 161 // now that we're sending the notification, we shouldn't send another one until our peer acknowledges this one 162 _canSendNotification = false; 163 164 Message msg(sizeof(dserver_kqchan_call_notification_t), 0); 165 166 auto notification = reinterpret_cast<dserver_kqchan_call_notification_t*>(msg.data().data()); 167 notification->header.number = dserver_kqchan_msgnum_notification; 168 notification->header.pid = 0; 169 notification->header.tid = 0; 170 171 if (_deferNotification) { 172 _deferredNotification = std::move(msg); 173 } else { 174 lock.unlock(); // the outbox has its own lock 175 _outbox.push(std::move(msg)); 176 } 177 }; 178 179 void DarlingServer::Kqchan::_sendDeferredNotification() { 180 std::unique_lock lock(_notificationMutex); 181 182 _deferNotification = false; 183 184 if (_deferredNotification) { 185 Message notification(std::move(*_deferredNotification)); 186 _deferredNotification = std::nullopt; 187 188 lock.unlock(); // the outbox has its own lock 189 _outbox.push(std::move(notification)); 190 } 191 }; 192 193 void DarlingServer::Kqchan::logToStream(Log::Stream& stream) const { 194 auto proc = _process.lock(); 195 stream << "[KQ:" << _debugID << ":"; 196 if (proc) { 197 stream << *proc; 198 } else { 199 stream << "<null>"; 200 } 201 stream << "]"; 202 }; 203 204 // 205 // mach port 206 // 207 208 DarlingServer::Kqchan::MachPort::MachPort(std::shared_ptr<DarlingServer::Process> process, uint32_t port, uint64_t receiveBuffer, uint64_t receiveBufferSize, uint64_t savedFilterFlags): 209 Kqchan(process), 210 _port(port), 211 _receiveBuffer(receiveBuffer), 212 _receiveBufferSize(receiveBufferSize), 213 _savedFilterFlags(savedFilterFlags) 214 { 215 kqchanMachPortLog.debug() << *this << ": Constructing Mach port kqchan" << kqchanMachPortLog.endLog; 216 }; 217 218 DarlingServer::Kqchan::MachPort::~MachPort() { 219 kqchanMachPortLog.debug() << *this << ": Destroying Mach port kqchan" << kqchanMachPortLog.endLog; 220 221 if (_dtapeKqchan) { 222 auto kqchan = _dtapeKqchan; 223 224 // disable notifications so that `this` won't be used after we're destroyed 225 dtape_kqchan_mach_port_disable_notifications(kqchan); 226 227 // and schedule the duct-taped kqchan to be destroyed on a microthread 228 auto debugID = _debugID; 229 Thread::kernelAsync([=]() { 230 kqchanMachPortLog.debug() << "Destroying duct-taped Mach port kqchan with ID " << debugID << kqchanMachPortLog.endLog; 231 dtape_kqchan_mach_port_destroy(kqchan); 232 }); 233 } 234 }; 235 236 uintptr_t DarlingServer::Kqchan::MachPort::_idForProcess() const { 237 return reinterpret_cast<uintptr_t>(this); 238 }; 239 240 std::shared_ptr<DarlingServer::Kqchan> DarlingServer::Kqchan::MachPort::sharedFromRoot() { 241 return shared_from_this(); 242 }; 243 244 int DarlingServer::Kqchan::MachPort::setup() { 245 auto proc = _process.lock(); 246 247 if (!proc) { 248 throw std::system_error(ESRCH, std::generic_category()); 249 } 250 251 kqchanMachPortLog.debug() << *this << ": Setting up Mach port kqchan" << kqchanMachPortLog.endLog; 252 253 // NOTE: the duct-taped kqchan will never notify us after we die 254 // since we disable notifications upon destruction, 255 // so using `this` here is safe 256 _dtapeKqchan = dtape_kqchan_mach_port_create(proc->_dtapeTask, _port, _receiveBuffer, _receiveBufferSize, _savedFilterFlags, [](void* context) { 257 auto self = reinterpret_cast<MachPort*>(context); 258 self->_notify(); 259 }, this); 260 if (!_dtapeKqchan) { 261 kqchanMachPortLog.debug() << *this << ": Failed to create duct-taped Mach port kqchan for port " << _port << kqchanMachPortLog.endLog; 262 throw std::system_error(ESRCH, std::generic_category()); 263 } 264 265 int fd = Kqchan::setup(); 266 267 if (dtape_kqchan_mach_port_has_events(_dtapeKqchan)) { 268 // if we already have an event, notify the kqchan; 269 // this will simply enqueue a message to be sent to the peer. 270 // since our libkqueue filter uses level-triggered epoll, 271 // our peer will immediately see there's an event available when it starts waiting. 272 _notify(); 273 } 274 275 return fd; 276 }; 277 278 void DarlingServer::Kqchan::MachPort::_processMessages() { 279 while (auto msg = _inbox.pop()) { 280 if (msg->data().size() < sizeof(dserver_kqchan_callhdr_t)) { 281 throw std::invalid_argument("Message buffer was too small for kqchan call header"); 282 } 283 284 auto callhdr = reinterpret_cast<dserver_kqchan_callhdr_t*>(msg->data().data()); 285 286 switch (callhdr->number) { 287 case dserver_kqchan_msgnum_mach_port_modify: { 288 if (msg->data().size() < sizeof(dserver_kqchan_call_mach_port_modify_t)) { 289 throw std::invalid_argument("Message buffer was too small for dserver_kqchan_msgnum_mach_port_modify"); 290 } 291 292 auto modify = reinterpret_cast<dserver_kqchan_call_mach_port_modify_t*>(callhdr); 293 294 _modify(modify->receive_buffer, modify->receive_buffer_size, modify->saved_filter_flags, modify->header.tid); 295 } break; 296 297 case dserver_kqchan_msgnum_mach_port_read: { 298 if (msg->data().size() < sizeof(dserver_kqchan_call_mach_port_read_t)) { 299 throw std::invalid_argument("Message buffer was too small for dserver_kqchan_call_mach_port_read"); 300 } 301 302 auto read = reinterpret_cast<dserver_kqchan_call_mach_port_read_t*>(callhdr); 303 304 _read(read->default_buffer, read->default_buffer_size, read->header.tid); 305 } break; 306 307 default: 308 throw std::invalid_argument("Unknown/invalid kqchan msgnum"); 309 } 310 } 311 }; 312 313 void DarlingServer::Kqchan::MachPort::_modify(uint64_t receiveBuffer, uint64_t receiveBufferSize, uint64_t savedFilterFlags, pid_t nstid) { 314 kqchanMachPortLog.debug() << *this << ": Received modification request with {receiveBuffer=" << receiveBuffer << ",receiveBufferSize=" << receiveBufferSize << ",savedFilterFlags=" << savedFilterFlags << "}" << kqchanMachPortLog.endLog; 315 316 auto maybeThread = threadRegistry().lookupEntryByNSID(nstid); 317 318 if (!maybeThread) { 319 throw std::runtime_error("No thread for Mach port kqchan modification?"); 320 } 321 322 auto thread = *maybeThread; 323 324 auto self = shared_from_this(); 325 Thread::kernelAsync([self, thread, receiveBuffer, receiveBufferSize, savedFilterFlags]() { 326 kqchanMachPortLog.debug() << *self << ": Handling modification request in microthread" << kqchanMachPortLog.endLog; 327 328 Thread::currentThread()->impersonate(thread); 329 dtape_kqchan_mach_port_modify(self->_dtapeKqchan, receiveBuffer, receiveBufferSize, savedFilterFlags); 330 Thread::currentThread()->impersonate(nullptr); 331 332 Message msg(sizeof(dserver_kqchan_reply_mach_port_modify_t), 0, self->_checkForEventsAsyncFactory()); 333 334 auto reply = reinterpret_cast<dserver_kqchan_reply_mach_port_modify_t*>(msg.data().data()); 335 336 reply->header.number = dserver_kqchan_msgnum_mach_port_modify; 337 reply->header.code = 0; 338 339 kqchanMachPortLog.debug() << *self << ": Sending modification reply/acknowledgement" << kqchanMachPortLog.endLog; 340 341 self->_outbox.push(std::move(msg)); 342 }); 343 }; 344 345 void DarlingServer::Kqchan::MachPort::_read(uint64_t defaultBuffer, uint64_t defaultBufferSize, pid_t nstid) { 346 kqchanMachPortLog.debug() << *this << ": received read request with {defaultBuffer=" << defaultBuffer << ",defaultBufferSize=" << defaultBufferSize << "}" << kqchanMachPortLog.endLog; 347 348 { 349 // our peer has acknowledged our notification by asking for the pending messages; 350 // we can now send a notification again if we receive more data 351 std::unique_lock lock(_notificationMutex); 352 kqchanMachPortLog.debug() << *this << ": received acknowledgement (implicitly via read) for notification " << _notificationCount++ << "; notifications may now be sent" << kqchanMachPortLog.endLog; 353 _canSendNotification = true; 354 355 // defer future notifications until we send our reply. 356 // we do it this way because: 357 // 1) if we don't defer them and just send them right now, 358 // a notification may be sent before we send our reply, 359 // leading to out-of-order messages in the channel (which 360 // causes an abort on the client side). 361 // 2) if we instead move the `_canSendNotification` update to after 362 // we send the reply, we may miss a notification for an event 363 // that occurred right after we generated the reply but before 364 // we updated `_canSendNotification`. 365 // with this approach (notification deferral), channel messages are kept in-order 366 // and we don't miss any notifications. worst case scenario, we might send 367 // a duplicate notification for an event that occurred right after this update 368 // but before we generate the reply; in that case, the client will simply try to read 369 // the duplicate event but we won't have anything and we'll tell it to drop the event. 370 _deferNotification = true; 371 } 372 373 auto maybeThread = threadRegistry().lookupEntryByNSID(nstid); 374 375 if (!maybeThread) { 376 throw std::runtime_error("No thread for Mach port kqchan read?"); 377 } 378 379 auto thread = *maybeThread; 380 381 auto self = shared_from_this(); 382 Thread::kernelAsync([self, thread, defaultBuffer, defaultBufferSize]() { 383 Message msg(sizeof(dserver_kqchan_reply_mach_port_read_t), 0, self->_checkForEventsAsyncFactory()); 384 385 kqchanMachPortLog.debug() << *self << ": handling read request in microthread" << kqchanMachPortLog.endLog; 386 387 auto reply = reinterpret_cast<dserver_kqchan_reply_mach_port_read_t*>(msg.data().data()); 388 389 reply->header.code = 0; 390 reply->header.number = dserver_kqchan_msgnum_mach_port_read; 391 392 Thread::currentThread()->impersonate(thread); 393 if (!dtape_kqchan_mach_port_fill(self->_dtapeKqchan, reply, defaultBuffer, defaultBufferSize)) { 394 kqchanMachPortLog.debug() << *self << ": no events to read" << kqchanMachPortLog.endLog; 395 reply->header.code = 0xdead; 396 } 397 Thread::currentThread()->impersonate(nullptr); 398 399 self->_outbox.push(std::move(msg)); 400 401 // now let's send any deferred notifications we might have 402 self->_sendDeferredNotification(); 403 }); 404 }; 405 406 void DarlingServer::Kqchan::MachPort::_notify() { 407 _sendNotification(); 408 }; 409 410 void DarlingServer::Kqchan::MachPort::_checkForEventsAsync() { 411 Thread::kernelAsync([weakSelf = weak_from_this()]() { 412 auto self = weakSelf.lock(); 413 if (!self) { 414 return; 415 } 416 // if we have more events ready, notify our peer 417 if (dtape_kqchan_mach_port_has_events(self->_dtapeKqchan)) { 418 self->_notify(); 419 } 420 }); 421 }; 422 423 std::function<void()> DarlingServer::Kqchan::MachPort::_checkForEventsAsyncFactory() { 424 return [weakSelf = weak_from_this()]() { 425 auto self = weakSelf.lock(); 426 if (!self) { 427 return; 428 } 429 self->_checkForEventsAsync(); 430 }; 431 }; 432 433 // 434 // process 435 // 436 // TODO: NOTE_REAP and NOTE_SIGNAL 437 // 438 439 DarlingServer::Kqchan::Process::Process(std::shared_ptr<DarlingServer::Process> process, pid_t nspid, uint32_t flags): 440 Kqchan(process), 441 _nspid(nspid), 442 _flags(flags) 443 { 444 kqchanProcLog.debug() << *this << ": Constructing process kqchan" << kqchanProcLog.endLog; 445 }; 446 447 DarlingServer::Kqchan::Process::~Process() { 448 kqchanProcLog.debug() << *this << ": Destroying process kqchan" << kqchanProcLog.endLog; 449 450 if (auto targetProcess = _targetProcess.lock()) { 451 targetProcess->unregisterListeningKqchan(_idForProcess()); 452 } 453 }; 454 455 uintptr_t DarlingServer::Kqchan::Process::_idForProcess() const { 456 return reinterpret_cast<uintptr_t>(this); 457 }; 458 459 std::shared_ptr<DarlingServer::Kqchan> DarlingServer::Kqchan::Process::sharedFromRoot() { 460 return shared_from_this(); 461 }; 462 463 int DarlingServer::Kqchan::Process::setup() { 464 kqchanProcLog.debug() << *this << ": Setting up process kqchan" << kqchanProcLog.endLog; 465 466 auto maybeTargetProcess = processRegistry().lookupEntryByNSID(_nspid); 467 if (!maybeTargetProcess) { 468 kqchanProcLog.debug() << *this << ": Failed to create process kqchan for PID " << _nspid << kqchanProcLog.endLog; 469 throw std::system_error(ESRCH, std::generic_category()); 470 } 471 472 auto targetProcess = *maybeTargetProcess; 473 474 _targetProcess = targetProcess; 475 476 int fd = Kqchan::setup(); 477 478 { 479 std::unique_lock lock(_mutex); 480 if (!_attached) { 481 targetProcess->registerListeningKqchan(shared_from_this()); 482 _attached = true; 483 } 484 if (!_events.empty()) { 485 _sendNotification(); 486 } 487 } 488 489 return fd; 490 }; 491 492 void DarlingServer::Kqchan::Process::_processMessages() { 493 while (auto msg = _inbox.pop()) { 494 if (msg->data().size() < sizeof(dserver_kqchan_callhdr_t)) { 495 throw std::invalid_argument("Message buffer was too small for kqchan call header"); 496 } 497 498 auto callhdr = reinterpret_cast<dserver_kqchan_callhdr_t*>(msg->data().data()); 499 500 switch (callhdr->number) { 501 case dserver_kqchan_msgnum_proc_modify: { 502 if (msg->data().size() < sizeof(dserver_kqchan_call_proc_modify_t)) { 503 throw std::invalid_argument("Message buffer was too small for dserver_kqchan_call_proc_modify_"); 504 } 505 506 auto modify = reinterpret_cast<dserver_kqchan_call_proc_modify_t*>(callhdr); 507 508 _modify(modify->flags); 509 } break; 510 511 case dserver_kqchan_msgnum_proc_read: { 512 if (msg->data().size() < sizeof(dserver_kqchan_call_proc_read_t)) { 513 throw std::invalid_argument("Message buffer was too small for dserver_kqchan_call_proc_read"); 514 } 515 516 auto read = reinterpret_cast<dserver_kqchan_call_proc_read_t*>(callhdr); 517 518 _read(); 519 } break; 520 521 default: 522 throw std::invalid_argument("Unknown/invalid kqchan msgnum"); 523 } 524 } 525 }; 526 527 void DarlingServer::Kqchan::Process::_modify(uint32_t flags) { 528 std::unique_lock lock(_mutex); 529 530 _flags = flags; 531 532 Message msg(sizeof(dserver_kqchan_reply_proc_modify_t), 0, _checkForEventsAsyncFactory()); 533 534 auto reply = reinterpret_cast<dserver_kqchan_reply_proc_modify_t*>(msg.data().data()); 535 536 reply->header.number = dserver_kqchan_msgnum_proc_modify; 537 reply->header.code = 0; 538 539 kqchanProcLog.debug() << *this << ": Sending modification reply/acknowledgement" << kqchanProcLog.endLog; 540 541 lock.unlock(); // the outbox has its own lock 542 _outbox.push(std::move(msg)); 543 }; 544 545 void DarlingServer::Kqchan::Process::_read() { 546 auto listeningProcess = _process.lock(); 547 548 if (!listeningProcess) { 549 // if the listening process is dead, log it and ignore the request (no one's listening for the reply anyways) 550 kqchanProcLog.warning() << *this << ": received read request after listening process died" << kqchanProcLog.endLog; 551 return; 552 } 553 554 kqchanProcLog.debug() << *this << ": received read request" << kqchanProcLog.endLog; 555 556 { 557 // our peer has acknowledged our notification by asking for the pending messages; 558 // we can now send a notification again if we receive more data 559 std::unique_lock lock(_notificationMutex); 560 kqchanProcLog.debug() << *this << ": received acknowledgement (implicitly via read) for notification " << _notificationCount++ << "; notifications may now be sent" << kqchanProcLog.endLog; 561 _canSendNotification = true; 562 563 // see MachPort::_read() for why we defer notifications 564 _deferNotification = true; 565 } 566 567 std::unique_lock lock(_mutex, std::defer_lock); 568 Message msg(sizeof(dserver_kqchan_reply_proc_read_t), 0, _checkForEventsAsyncFactory()); 569 auto reply = reinterpret_cast<dserver_kqchan_reply_proc_read_t*>(msg.data().data()); 570 571 reply->header.number = dserver_kqchan_msgnum_proc_read; 572 reply->header.code = 0; 573 reply->data = 0; 574 reply->fflags = 0; 575 576 while (true) { 577 lock.lock(); 578 579 if (_events.empty()) { 580 lock.unlock(); 581 582 // if we don't have any events, tell our peer 583 kqchanProcLog.debug() << *this << ": no events to read" << kqchanProcLog.endLog; 584 585 reply->header.code = 0xdead; 586 break; 587 } else { 588 auto event = std::move(_events.front()); 589 _events.pop_front(); 590 591 auto savedFlags = _flags; 592 593 // drop the lock; we don't need it to set up the new kqchan or to discard it, nor to push the message to the outbox. 594 // additionally, we don't want to hold it if we decide to drop the new kqchan, since that takes its own set of locks 595 // when dying (and the fewer active locks we hold concurrently, the better) 596 lock.unlock(); 597 598 reply->data = event.data; 599 reply->fflags = event.events & savedFlags; 600 601 if (reply->fflags == 0) { 602 // if this event contains no events that the user is interested in, drop it 603 kqchanProcLog.debug() << *this << ": event does not contain any events the user is interested in; dropping event" << kqchanProcLog.endLog; 604 continue; 605 } 606 607 if (savedFlags & NOTE_TRACK) { 608 if (event.newKqchan) { 609 try { 610 FD newKqchanSocket(event.newKqchan->setup()); 611 612 // no errors SHOULD be thrown from this point forward 613 614 // give the new kqchan the most recent flags we have 615 { 616 std::unique_lock lock(event.newKqchan->_mutex); 617 event.newKqchan->_flags = savedFlags; 618 } 619 620 listeningProcess->registerKqchan(event.newKqchan); 621 msg.pushDescriptor(newKqchanSocket.extract()); 622 623 kqchanProcLog.debug() << *this << ": new process kqchan setup for child process and being returned" << kqchanProcLog.endLog; 624 } catch (...) { 625 kqchanProcLog.error() << *this << ": failed to setup new kqchan for child process" << kqchanProcLog.endLog; 626 reply->fflags |= NOTE_TRACKERR; 627 } 628 } else if (event.events & NOTE_FORK) { 629 kqchanProcLog.error() << *this << ": read NOTE_FORK event and user has requested NOTE_TRACK, but no new kqchan was associated with event" << kqchanProcLog.endLog; 630 reply->fflags |= NOTE_TRACKERR; 631 } 632 } else if (event.newKqchan) { 633 kqchanProcLog.info() << *this << ": event contains new kqchan, but user has not requested NOTE_TRACK; dropping new kqchan" << kqchanProcLog.endLog; 634 } 635 636 break; 637 } 638 } 639 640 _outbox.push(std::move(msg)); 641 642 // now let's send any deferred notifications we might have 643 _sendDeferredNotification(); 644 }; 645 646 void DarlingServer::Kqchan::Process::_notify(uint32_t event, int64_t data) { 647 Event newEvent; 648 649 kqchanProcLog.debug() << *this << ": notified with {event=" << event << ",data=" << data << "}" << kqchanProcLog.endLog; 650 651 newEvent.data = data; 652 newEvent.events = event; 653 newEvent.newKqchan = nullptr; 654 655 if (event == NOTE_FORK) { 656 // NOTE: we always setup a new kqchan regardless of whether or not the user has currently requested NOTE_TRACK or not, 657 // in case the user does have NOTE_TRACK when reading the event. 658 659 auto listeningProcess = _process.lock(); 660 661 if (!listeningProcess) { 662 return; 663 } 664 665 auto maybeChild = processRegistry().lookupEntryByNSID(data & NOTE_PDATAMASK); 666 667 if (!maybeChild) { 668 kqchanProcLog.debug() << *this << ": notified with NOTE_FORK (and wanted NOTE_TRACK), but couldn't find child" << kqchanProcLog.endLog; 669 } else { 670 auto child = *maybeChild; 671 672 auto newKqchan = std::make_shared<Process>(listeningProcess, data & NOTE_PDATAMASK, _flags); 673 674 child->registerListeningKqchan(newKqchan); 675 { 676 std::unique_lock newLock(newKqchan->_mutex); 677 newKqchan->_targetProcess = child; 678 newKqchan->_attached = true; 679 } 680 681 auto childParent = child->parentProcess(); 682 683 newKqchan->_notify(NOTE_CHILD, (childParent ? childParent->nsid() : 0) & NOTE_PDATAMASK); 684 685 newEvent.newKqchan = newKqchan; 686 } 687 } 688 689 { 690 std::unique_lock lock(_mutex); 691 _events.push_back(std::move(newEvent)); 692 if (_socket) { 693 _sendNotification(); 694 } 695 } 696 }; 697 698 void DarlingServer::Kqchan::Process::_checkForEventsAsync() { 699 Thread::kernelAsync([weakSelf = weak_from_this()]() { 700 auto self = weakSelf.lock(); 701 if (!self) { 702 return; 703 } 704 // if we have more events ready, tell our peer 705 if (!self->_events.empty()) { 706 self->_sendNotification(); 707 } 708 }); 709 }; 710 711 std::function<void()> DarlingServer::Kqchan::Process::_checkForEventsAsyncFactory() { 712 return [weakSelf = weak_from_this()]() { 713 auto self = weakSelf.lock(); 714 if (!self) { 715 return; 716 } 717 self->_checkForEventsAsync(); 718 }; 719 }; 720 721 void DarlingServer::Kqchan::Process::logToStream(Log::Stream& stream) const { 722 auto proc = _targetProcess.lock(); 723 724 Kqchan::logToStream(stream); 725 726 stream << ":"; 727 if (proc) { 728 stream << *proc; 729 } else { 730 stream << "<null>"; 731 } 732 };