/ src / kqchan.cpp
kqchan.cpp
  1  #include <darlingserver/kqchan.hpp>
  2  #include <darlingserver/server.hpp>
  3  #include <darlingserver/rpc-supplement.h>
  4  #include <darlingserver/thread.hpp>
  5  #include <darlingserver/logging.hpp>
  6  
  7  #include <sys/socket.h>
  8  #include <fcntl.h>
  9  #include <atomic>
 10  
 11  static DarlingServer::Log kqchanLog("kqchan");
 12  static DarlingServer::Log kqchanMachPortLog("kqchan:mach_port");
 13  static DarlingServer::Log kqchanProcLog("kqchan:proc");
 14  static std::atomic_uint64_t kqchanDebugIDCounter = 0;
 15  
 16  //
 17  // base class
 18  //
 19  
 20  DarlingServer::Kqchan::Kqchan(std::shared_ptr<DarlingServer::Process> process):
 21  	_debugID(kqchanDebugIDCounter++),
 22  	_process(process)
 23  {
 24  	kqchanLog.debug() << *this << ": Constructing kqchan" << kqchanLog.endLog;
 25  };
 26  
 27  DarlingServer::Kqchan::~Kqchan() {
 28  	kqchanLog.debug() << *this << ": Destroying kqchan" << kqchanLog.endLog;
 29  };
 30  
 31  uintptr_t DarlingServer::Kqchan::_idForProcess() const {
 32  	throw std::runtime_error("must be overridden in derived class");
 33  };
 34  
 35  void DarlingServer::Kqchan::_processMessages() {
 36  	throw std::runtime_error("must be overridden in derived class");
 37  };
 38  
 39  std::shared_ptr<DarlingServer::Kqchan> DarlingServer::Kqchan::sharedFromRoot() {
 40  	throw std::runtime_error("must be overridden in derived class");
 41  };
 42  
 43  int DarlingServer::Kqchan::setup() {
 44  	int fds[2];
 45  
 46  	kqchanLog.debug() << *this << ": Setting up kqchan" << kqchanLog.endLog;
 47  
 48  	if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds) < 0) {
 49  		int ret = errno;
 50  		kqchanLog.debug() << "Failed to create socket pair" << kqchanLog.endLog;
 51  		throw std::system_error(ret, std::generic_category());
 52  	}
 53  
 54  	// we'll keep fds[0] and give fds[1] away
 55  	_socket = std::make_shared<FD>(fds[0]);
 56  
 57  	kqchanLog.debug() << *this << ": Keeping socket " << fds[0] << " and giving away " << fds[1] << kqchanLog.endLog;
 58  
 59  	// set O_NONBLOCK on our socket
 60  	int flags = fcntl(_socket->fd(), F_GETFL);
 61  	if (flags < 0) {
 62  		int ret = errno;
 63  		throw std::system_error(ret, std::generic_category());
 64  	}
 65  	if (fcntl(_socket->fd(), F_SETFL, flags | O_NONBLOCK) < 0) {
 66  		int ret = errno;
 67  		throw std::system_error(ret, std::generic_category());
 68  	}
 69  
 70  	std::weak_ptr<Kqchan> weakThis = sharedFromRoot();
 71  
 72  	_outbox.setMessageArrivalNotificationCallback([weakThis]() {
 73  		auto self = weakThis.lock();
 74  
 75  		if (!self) {
 76  			return;
 77  		}
 78  
 79  		std::unique_lock lock(self->_sendingMutex);
 80  
 81  		kqchanLog.debug() << *self << ": Got messages to send, attempting to send them" << kqchanLog.endLog;
 82  
 83  		// we probably won't be sending very many messages at once;
 84  		// we can send all the messages in the same context that they were pushed
 85  		do {
 86  			self->_canSend = self->_outbox.sendMany(self->_socket->fd());
 87  		} while (self->_canSend && !self->_outbox.empty());
 88  	});
 89  
 90  	_monitor = std::make_shared<Monitor>(_socket, Monitor::Event::Readable | Monitor::Event::Writable | Monitor::Event::HangUp, true, false, [weakThis](std::shared_ptr<Monitor> monitor, Monitor::Event event) {
 91  		auto self = weakThis.lock();
 92  
 93  		if (!self) {
 94  			return;
 95  		}
 96  
 97  		kqchanLog.debug() << *self << ": Got event(s) on socket: " << static_cast<uint64_t>(event) << kqchanLog.endLog;
 98  
 99  		if (static_cast<uint64_t>(event & Monitor::Event::HangUp) != 0) {
100  			// socket hangup (peer closed their socket)
101  
102  			kqchanLog.debug() << *self << ": Peer hung up their socket; cleaning up monitor and kqchan" << kqchanLog.endLog;
103  
104  			// stop monitoring the socket (we're not gonna get any more events out of it)
105  			Server::sharedInstance().removeMonitor(monitor);
106  
107  			// and unregister ourselves from the process (if it still exists)
108  			// (which means the instance should be freed once we return)
109  			if (auto process = self->_process.lock()) {
110  				process->unregisterKqchan(self);
111  			}
112  
113  			// no need to process any other events that may have occurred
114  			return;
115  		}
116  
117  		if (static_cast<uint64_t>(event & Monitor::Event::Readable) != 0) {
118  			// incoming messages
119  
120  			kqchanLog.debug() << *self << ": socket has pending incoming messages" << kqchanLog.endLog;
121  
122  			// receive them all
123  			while (self->_inbox.receiveMany(self->_socket->fd()));
124  
125  			// process the messages
126  			self->_processMessages();
127  		}
128  
129  		if (static_cast<uint64_t>(event & Monitor::Event::Writable) != 0) {
130  			// we can now send messages again;
131  			// send as many messages as we can
132  
133  			std::unique_lock lock(self->_sendingMutex);
134  
135  			kqchanLog.debug() << *self << ": socket is now writable; sending all pending outgoing messages" << kqchanLog.endLog;
136  			do {
137  				self->_canSend = self->_outbox.sendMany(self->_socket->fd());
138  			} while (self->_canSend  && !self->_outbox.empty());
139  		}
140  	});
141  
142  	DarlingServer::Server::sharedInstance().addMonitor(_monitor);
143  
144  	return fds[1];
145  };
146  
147  void DarlingServer::Kqchan::_sendNotification() {
148  	std::unique_lock lock(_notificationMutex);
149  
150  	kqchanLog.debug() << *this << ": received request to send notification" << kqchanLog.endLog;
151  
152  	if (!_canSendNotification) {
153  		// we've already sent our peer a notification that they haven't acknowledged yet;
154  		// let's not send another and needlessly clog up the socket
155  		kqchanLog.debug() << *this << ": earlier notification has not yet been acknowledged; not sending another notification" << kqchanLog.endLog;
156  		return;
157  	}
158  
159  	kqchanLog.debug() << *this << ": sending notification " << _notificationCount << kqchanLog.endLog;
160  
161  	// now that we're sending the notification, we shouldn't send another one until our peer acknowledges this one
162  	_canSendNotification = false;
163  
164  	Message msg(sizeof(dserver_kqchan_call_notification_t), 0);
165  
166  	auto notification = reinterpret_cast<dserver_kqchan_call_notification_t*>(msg.data().data());
167  	notification->header.number = dserver_kqchan_msgnum_notification;
168  	notification->header.pid = 0;
169  	notification->header.tid = 0;
170  
171  	if (_deferNotification) {
172  		_deferredNotification = std::move(msg);
173  	} else {
174  		lock.unlock(); // the outbox has its own lock
175  		_outbox.push(std::move(msg));
176  	}
177  };
178  
179  void DarlingServer::Kqchan::_sendDeferredNotification() {
180  	std::unique_lock lock(_notificationMutex);
181  
182  	_deferNotification = false;
183  
184  	if (_deferredNotification) {
185  		Message notification(std::move(*_deferredNotification));
186  		_deferredNotification = std::nullopt;
187  
188  		lock.unlock(); // the outbox has its own lock
189  		_outbox.push(std::move(notification));
190  	}
191  };
192  
193  void DarlingServer::Kqchan::logToStream(Log::Stream& stream) const {
194  	auto proc = _process.lock();
195  	stream << "[KQ:" << _debugID << ":";
196  	if (proc) {
197  		stream << *proc;
198  	} else {
199  		stream << "<null>";
200  	}
201  	stream << "]";
202  };
203  
204  //
205  // mach port
206  //
207  
208  DarlingServer::Kqchan::MachPort::MachPort(std::shared_ptr<DarlingServer::Process> process, uint32_t port, uint64_t receiveBuffer, uint64_t receiveBufferSize, uint64_t savedFilterFlags):
209  	Kqchan(process),
210  	_port(port),
211  	_receiveBuffer(receiveBuffer),
212  	_receiveBufferSize(receiveBufferSize),
213  	_savedFilterFlags(savedFilterFlags)
214  {
215  	kqchanMachPortLog.debug() << *this << ": Constructing Mach port kqchan" << kqchanMachPortLog.endLog;
216  };
217  
218  DarlingServer::Kqchan::MachPort::~MachPort() {
219  	kqchanMachPortLog.debug() << *this << ": Destroying Mach port kqchan" << kqchanMachPortLog.endLog;
220  
221  	if (_dtapeKqchan) {
222  		auto kqchan = _dtapeKqchan;
223  
224  		// disable notifications so that `this` won't be used after we're destroyed
225  		dtape_kqchan_mach_port_disable_notifications(kqchan);
226  
227  		// and schedule the duct-taped kqchan to be destroyed on a microthread
228  		auto debugID = _debugID;
229  		Thread::kernelAsync([=]() {
230  			kqchanMachPortLog.debug() << "Destroying duct-taped Mach port kqchan with ID " << debugID << kqchanMachPortLog.endLog;
231  			dtape_kqchan_mach_port_destroy(kqchan);
232  		});
233  	}
234  };
235  
236  uintptr_t DarlingServer::Kqchan::MachPort::_idForProcess() const {
237  	return reinterpret_cast<uintptr_t>(this);
238  };
239  
240  std::shared_ptr<DarlingServer::Kqchan> DarlingServer::Kqchan::MachPort::sharedFromRoot() {
241  	return shared_from_this();
242  };
243  
244  int DarlingServer::Kqchan::MachPort::setup() {
245  	auto proc = _process.lock();
246  
247  	if (!proc) {
248  		throw std::system_error(ESRCH, std::generic_category());
249  	}
250  
251  	kqchanMachPortLog.debug() << *this << ": Setting up Mach port kqchan" << kqchanMachPortLog.endLog;
252  
253  	// NOTE: the duct-taped kqchan will never notify us after we die
254  	//       since we disable notifications upon destruction,
255  	//       so using `this` here is safe
256  	_dtapeKqchan = dtape_kqchan_mach_port_create(proc->_dtapeTask, _port, _receiveBuffer, _receiveBufferSize, _savedFilterFlags, [](void* context) {
257  		auto self = reinterpret_cast<MachPort*>(context);
258  		self->_notify();
259  	}, this);
260  	if (!_dtapeKqchan) {
261  		kqchanMachPortLog.debug() << *this << ": Failed to create duct-taped Mach port kqchan for port " << _port << kqchanMachPortLog.endLog;
262  		throw std::system_error(ESRCH, std::generic_category());
263  	}
264  
265  	int fd = Kqchan::setup();
266  
267  	if (dtape_kqchan_mach_port_has_events(_dtapeKqchan)) {
268  		// if we already have an event, notify the kqchan;
269  		// this will simply enqueue a message to be sent to the peer.
270  		// since our libkqueue filter uses level-triggered epoll,
271  		// our peer will immediately see there's an event available when it starts waiting.
272  		_notify();
273  	}
274  
275  	return fd;
276  };
277  
278  void DarlingServer::Kqchan::MachPort::_processMessages() {
279  	while (auto msg = _inbox.pop()) {
280  		if (msg->data().size() < sizeof(dserver_kqchan_callhdr_t)) {
281  			throw std::invalid_argument("Message buffer was too small for kqchan call header");
282  		}
283  
284  		auto callhdr = reinterpret_cast<dserver_kqchan_callhdr_t*>(msg->data().data());
285  
286  		switch (callhdr->number) {
287  			case dserver_kqchan_msgnum_mach_port_modify: {
288  				if (msg->data().size() < sizeof(dserver_kqchan_call_mach_port_modify_t)) {
289  					throw std::invalid_argument("Message buffer was too small for dserver_kqchan_msgnum_mach_port_modify");
290  				}
291  
292  				auto modify = reinterpret_cast<dserver_kqchan_call_mach_port_modify_t*>(callhdr);
293  
294  				_modify(modify->receive_buffer, modify->receive_buffer_size, modify->saved_filter_flags, modify->header.tid);
295  			} break;
296  
297  			case dserver_kqchan_msgnum_mach_port_read: {
298  				if (msg->data().size() < sizeof(dserver_kqchan_call_mach_port_read_t)) {
299  					throw std::invalid_argument("Message buffer was too small for dserver_kqchan_call_mach_port_read");
300  				}
301  
302  				auto read = reinterpret_cast<dserver_kqchan_call_mach_port_read_t*>(callhdr);
303  
304  				_read(read->default_buffer, read->default_buffer_size, read->header.tid);
305  			} break;
306  
307  			default:
308  				throw std::invalid_argument("Unknown/invalid kqchan msgnum");
309  		}
310  	}
311  };
312  
313  void DarlingServer::Kqchan::MachPort::_modify(uint64_t receiveBuffer, uint64_t receiveBufferSize, uint64_t savedFilterFlags, pid_t nstid) {
314  	kqchanMachPortLog.debug() << *this << ": Received modification request with {receiveBuffer=" << receiveBuffer << ",receiveBufferSize=" << receiveBufferSize << ",savedFilterFlags=" << savedFilterFlags << "}" << kqchanMachPortLog.endLog;
315  
316  	auto maybeThread = threadRegistry().lookupEntryByNSID(nstid);
317  
318  	if (!maybeThread) {
319  		throw std::runtime_error("No thread for Mach port kqchan modification?");
320  	}
321  
322  	auto thread = *maybeThread;
323  
324  	auto self = shared_from_this();
325  	Thread::kernelAsync([self, thread, receiveBuffer, receiveBufferSize, savedFilterFlags]() {
326  		kqchanMachPortLog.debug() << *self << ": Handling modification request in microthread" << kqchanMachPortLog.endLog;
327  
328  		Thread::currentThread()->impersonate(thread);
329  		dtape_kqchan_mach_port_modify(self->_dtapeKqchan, receiveBuffer, receiveBufferSize, savedFilterFlags);
330  		Thread::currentThread()->impersonate(nullptr);
331  
332  		Message msg(sizeof(dserver_kqchan_reply_mach_port_modify_t), 0, self->_checkForEventsAsyncFactory());
333  
334  		auto reply = reinterpret_cast<dserver_kqchan_reply_mach_port_modify_t*>(msg.data().data());
335  
336  		reply->header.number = dserver_kqchan_msgnum_mach_port_modify;
337  		reply->header.code = 0;
338  
339  		kqchanMachPortLog.debug() << *self << ": Sending modification reply/acknowledgement" << kqchanMachPortLog.endLog;
340  
341  		self->_outbox.push(std::move(msg));
342  	});
343  };
344  
345  void DarlingServer::Kqchan::MachPort::_read(uint64_t defaultBuffer, uint64_t defaultBufferSize, pid_t nstid) {
346  	kqchanMachPortLog.debug() << *this << ": received read request with {defaultBuffer=" << defaultBuffer << ",defaultBufferSize=" << defaultBufferSize << "}" << kqchanMachPortLog.endLog;
347  
348  	{
349  		// our peer has acknowledged our notification by asking for the pending messages;
350  		// we can now send a notification again if we receive more data
351  		std::unique_lock lock(_notificationMutex);
352  		kqchanMachPortLog.debug() << *this << ": received acknowledgement (implicitly via read) for notification " << _notificationCount++ << "; notifications may now be sent" << kqchanMachPortLog.endLog;
353  		_canSendNotification = true;
354  
355  		// defer future notifications until we send our reply.
356  		// we do it this way because:
357  		// 1) if we don't defer them and just send them right now,
358  		//    a notification may be sent before we send our reply,
359  		//    leading to out-of-order messages in the channel (which
360  		//    causes an abort on the client side).
361  		// 2) if we instead move the `_canSendNotification` update to after
362  		//    we send the reply, we may miss a notification for an event
363  		//    that occurred right after we generated the reply but before
364  		//    we updated `_canSendNotification`.
365  		// with this approach (notification deferral), channel messages are kept in-order
366  		// and we don't miss any notifications. worst case scenario, we might send
367  		// a duplicate notification for an event that occurred right after this update
368  		// but before we generate the reply; in that case, the client will simply try to read
369  		// the duplicate event but we won't have anything and we'll tell it to drop the event.
370  		_deferNotification = true;
371  	}
372  
373  	auto maybeThread = threadRegistry().lookupEntryByNSID(nstid);
374  
375  	if (!maybeThread) {
376  		throw std::runtime_error("No thread for Mach port kqchan read?");
377  	}
378  
379  	auto thread = *maybeThread;
380  
381  	auto self = shared_from_this();
382  	Thread::kernelAsync([self, thread, defaultBuffer, defaultBufferSize]() {
383  		Message msg(sizeof(dserver_kqchan_reply_mach_port_read_t), 0, self->_checkForEventsAsyncFactory());
384  
385  		kqchanMachPortLog.debug() << *self << ": handling read request in microthread" << kqchanMachPortLog.endLog;
386  
387  		auto reply = reinterpret_cast<dserver_kqchan_reply_mach_port_read_t*>(msg.data().data());
388  
389  		reply->header.code = 0;
390  		reply->header.number = dserver_kqchan_msgnum_mach_port_read;
391  
392  		Thread::currentThread()->impersonate(thread);
393  		if (!dtape_kqchan_mach_port_fill(self->_dtapeKqchan, reply, defaultBuffer, defaultBufferSize)) {
394  			kqchanMachPortLog.debug() << *self << ": no events to read" << kqchanMachPortLog.endLog;
395  			reply->header.code = 0xdead;
396  		}
397  		Thread::currentThread()->impersonate(nullptr);
398  
399  		self->_outbox.push(std::move(msg));
400  
401  		// now let's send any deferred notifications we might have
402  		self->_sendDeferredNotification();
403  	});
404  };
405  
406  void DarlingServer::Kqchan::MachPort::_notify() {
407  	_sendNotification();
408  };
409  
410  void DarlingServer::Kqchan::MachPort::_checkForEventsAsync() {
411  	Thread::kernelAsync([weakSelf = weak_from_this()]() {
412  		auto self = weakSelf.lock();
413  		if (!self) {
414  			return;
415  		}
416  		// if we have more events ready, notify our peer
417  		if (dtape_kqchan_mach_port_has_events(self->_dtapeKqchan)) {
418  			self->_notify();
419  		}
420  	});
421  };
422  
423  std::function<void()> DarlingServer::Kqchan::MachPort::_checkForEventsAsyncFactory() {
424  	return [weakSelf = weak_from_this()]() {
425  		auto self = weakSelf.lock();
426  		if (!self) {
427  			return;
428  		}
429  		self->_checkForEventsAsync();
430  	};
431  };
432  
433  //
434  // process
435  //
436  // TODO: NOTE_REAP and NOTE_SIGNAL
437  //
438  
439  DarlingServer::Kqchan::Process::Process(std::shared_ptr<DarlingServer::Process> process, pid_t nspid, uint32_t flags):
440  	Kqchan(process),
441  	_nspid(nspid),
442  	_flags(flags)
443  {
444  	kqchanProcLog.debug() << *this << ": Constructing process kqchan" << kqchanProcLog.endLog;
445  };
446  
447  DarlingServer::Kqchan::Process::~Process() {
448  	kqchanProcLog.debug() << *this << ": Destroying process kqchan" << kqchanProcLog.endLog;
449  
450  	if (auto targetProcess = _targetProcess.lock()) {
451  		targetProcess->unregisterListeningKqchan(_idForProcess());
452  	}
453  };
454  
455  uintptr_t DarlingServer::Kqchan::Process::_idForProcess() const {
456  	return reinterpret_cast<uintptr_t>(this);
457  };
458  
459  std::shared_ptr<DarlingServer::Kqchan> DarlingServer::Kqchan::Process::sharedFromRoot() {
460  	return shared_from_this();
461  };
462  
463  int DarlingServer::Kqchan::Process::setup() {
464  	kqchanProcLog.debug() << *this << ": Setting up process kqchan" << kqchanProcLog.endLog;
465  
466  	auto maybeTargetProcess = processRegistry().lookupEntryByNSID(_nspid);
467  	if (!maybeTargetProcess) {
468  		kqchanProcLog.debug() << *this << ": Failed to create process kqchan for PID " << _nspid << kqchanProcLog.endLog;
469  		throw std::system_error(ESRCH, std::generic_category());
470  	}
471  
472  	auto targetProcess = *maybeTargetProcess;
473  
474  	_targetProcess = targetProcess;
475  
476  	int fd = Kqchan::setup();
477  
478  	{
479  		std::unique_lock lock(_mutex);
480  		if (!_attached) {
481  			targetProcess->registerListeningKqchan(shared_from_this());
482  			_attached = true;
483  		}
484  		if (!_events.empty()) {
485  			_sendNotification();
486  		}
487  	}
488  
489  	return fd;
490  };
491  
492  void DarlingServer::Kqchan::Process::_processMessages() {
493  	while (auto msg = _inbox.pop()) {
494  		if (msg->data().size() < sizeof(dserver_kqchan_callhdr_t)) {
495  			throw std::invalid_argument("Message buffer was too small for kqchan call header");
496  		}
497  
498  		auto callhdr = reinterpret_cast<dserver_kqchan_callhdr_t*>(msg->data().data());
499  
500  		switch (callhdr->number) {
501  			case dserver_kqchan_msgnum_proc_modify: {
502  				if (msg->data().size() < sizeof(dserver_kqchan_call_proc_modify_t)) {
503  					throw std::invalid_argument("Message buffer was too small for dserver_kqchan_call_proc_modify_");
504  				}
505  
506  				auto modify = reinterpret_cast<dserver_kqchan_call_proc_modify_t*>(callhdr);
507  
508  				_modify(modify->flags);
509  			} break;
510  
511  			case dserver_kqchan_msgnum_proc_read: {
512  				if (msg->data().size() < sizeof(dserver_kqchan_call_proc_read_t)) {
513  					throw std::invalid_argument("Message buffer was too small for dserver_kqchan_call_proc_read");
514  				}
515  
516  				auto read = reinterpret_cast<dserver_kqchan_call_proc_read_t*>(callhdr);
517  
518  				_read();
519  			} break;
520  
521  			default:
522  				throw std::invalid_argument("Unknown/invalid kqchan msgnum");
523  		}
524  	}
525  };
526  
527  void DarlingServer::Kqchan::Process::_modify(uint32_t flags) {
528  	std::unique_lock lock(_mutex);
529  
530  	_flags = flags;
531  
532  	Message msg(sizeof(dserver_kqchan_reply_proc_modify_t), 0, _checkForEventsAsyncFactory());
533  
534  	auto reply = reinterpret_cast<dserver_kqchan_reply_proc_modify_t*>(msg.data().data());
535  
536  	reply->header.number = dserver_kqchan_msgnum_proc_modify;
537  	reply->header.code = 0;
538  
539  	kqchanProcLog.debug() << *this << ": Sending modification reply/acknowledgement" << kqchanProcLog.endLog;
540  
541  	lock.unlock(); // the outbox has its own lock
542  	_outbox.push(std::move(msg));
543  };
544  
545  void DarlingServer::Kqchan::Process::_read() {
546  	auto listeningProcess = _process.lock();
547  
548  	if (!listeningProcess) {
549  		// if the listening process is dead, log it and ignore the request (no one's listening for the reply anyways)
550  		kqchanProcLog.warning() << *this << ": received read request after listening process died" << kqchanProcLog.endLog;
551  		return;
552  	}
553  
554  	kqchanProcLog.debug() << *this << ": received read request" << kqchanProcLog.endLog;
555  
556  	{
557  		// our peer has acknowledged our notification by asking for the pending messages;
558  		// we can now send a notification again if we receive more data
559  		std::unique_lock lock(_notificationMutex);
560  		kqchanProcLog.debug() << *this << ": received acknowledgement (implicitly via read) for notification " << _notificationCount++ << "; notifications may now be sent" << kqchanProcLog.endLog;
561  		_canSendNotification = true;
562  
563  		// see MachPort::_read() for why we defer notifications
564  		_deferNotification = true;
565  	}
566  
567  	std::unique_lock lock(_mutex, std::defer_lock);
568  	Message msg(sizeof(dserver_kqchan_reply_proc_read_t), 0, _checkForEventsAsyncFactory());
569  	auto reply = reinterpret_cast<dserver_kqchan_reply_proc_read_t*>(msg.data().data());
570  
571  	reply->header.number = dserver_kqchan_msgnum_proc_read;
572  	reply->header.code = 0;
573  	reply->data = 0;
574  	reply->fflags = 0;
575  
576  	while (true) {
577  		lock.lock();
578  
579  		if (_events.empty()) {
580  			lock.unlock();
581  
582  			// if we don't have any events, tell our peer
583  			kqchanProcLog.debug() << *this << ": no events to read" << kqchanProcLog.endLog;
584  
585  			reply->header.code = 0xdead;
586  			break;
587  		} else {
588  			auto event = std::move(_events.front());
589  			_events.pop_front();
590  
591  			auto savedFlags = _flags;
592  
593  			// drop the lock; we don't need it to set up the new kqchan or to discard it, nor to push the message to the outbox.
594  			// additionally, we don't want to hold it if we decide to drop the new kqchan, since that takes its own set of locks
595  			// when dying (and the fewer active locks we hold concurrently, the better)
596  			lock.unlock();
597  
598  			reply->data = event.data;
599  			reply->fflags = event.events & savedFlags;
600  
601  			if (reply->fflags == 0) {
602  				// if this event contains no events that the user is interested in, drop it
603  				kqchanProcLog.debug() << *this << ": event does not contain any events the user is interested in; dropping event" << kqchanProcLog.endLog;
604  				continue;
605  			}
606  
607  			if (savedFlags & NOTE_TRACK) {
608  				if (event.newKqchan) {
609  					try {
610  						FD newKqchanSocket(event.newKqchan->setup());
611  
612  						// no errors SHOULD be thrown from this point forward
613  
614  						// give the new kqchan the most recent flags we have
615  						{
616  							std::unique_lock lock(event.newKqchan->_mutex);
617  							event.newKqchan->_flags = savedFlags;
618  						}
619  
620  						listeningProcess->registerKqchan(event.newKqchan);
621  						msg.pushDescriptor(newKqchanSocket.extract());
622  
623  						kqchanProcLog.debug() << *this << ": new process kqchan setup for child process and being returned" << kqchanProcLog.endLog;
624  					} catch (...) {
625  						kqchanProcLog.error() << *this << ": failed to setup new kqchan for child process" << kqchanProcLog.endLog;
626  						reply->fflags |= NOTE_TRACKERR;
627  					}
628  				} else if (event.events & NOTE_FORK) {
629  					kqchanProcLog.error() << *this << ": read NOTE_FORK event and user has requested NOTE_TRACK, but no new kqchan was associated with event" << kqchanProcLog.endLog;
630  					reply->fflags |= NOTE_TRACKERR;
631  				}
632  			} else if (event.newKqchan) {
633  				kqchanProcLog.info() << *this << ": event contains new kqchan, but user has not requested NOTE_TRACK; dropping new kqchan" << kqchanProcLog.endLog;
634  			}
635  
636  			break;
637  		}
638  	}
639  
640  	_outbox.push(std::move(msg));
641  
642  	// now let's send any deferred notifications we might have
643  	_sendDeferredNotification();
644  };
645  
646  void DarlingServer::Kqchan::Process::_notify(uint32_t event, int64_t data) {
647  	Event newEvent;
648  
649  	kqchanProcLog.debug() << *this << ": notified with {event=" << event << ",data=" << data << "}" << kqchanProcLog.endLog;
650  
651  	newEvent.data = data;
652  	newEvent.events = event;
653  	newEvent.newKqchan = nullptr;
654  
655  	if (event == NOTE_FORK) {
656  		// NOTE: we always setup a new kqchan regardless of whether or not the user has currently requested NOTE_TRACK or not,
657  		//       in case the user does have NOTE_TRACK when reading the event.
658  
659  		auto listeningProcess = _process.lock();
660  
661  		if (!listeningProcess) {
662  			return;
663  		}
664  
665  		auto maybeChild = processRegistry().lookupEntryByNSID(data & NOTE_PDATAMASK);
666  
667  		if (!maybeChild) {
668  			kqchanProcLog.debug() << *this << ": notified with NOTE_FORK (and wanted NOTE_TRACK), but couldn't find child" << kqchanProcLog.endLog;
669  		} else {
670  			auto child = *maybeChild;
671  
672  			auto newKqchan = std::make_shared<Process>(listeningProcess, data & NOTE_PDATAMASK, _flags);
673  
674  			child->registerListeningKqchan(newKqchan);
675  			{
676  				std::unique_lock newLock(newKqchan->_mutex);
677  				newKqchan->_targetProcess = child;
678  				newKqchan->_attached = true;
679  			}
680  
681  			auto childParent = child->parentProcess();
682  
683  			newKqchan->_notify(NOTE_CHILD, (childParent ? childParent->nsid() : 0) & NOTE_PDATAMASK);
684  
685  			newEvent.newKqchan = newKqchan;
686  		}
687  	}
688  
689  	{
690  		std::unique_lock lock(_mutex);
691  		_events.push_back(std::move(newEvent));
692  		if (_socket) {
693  			_sendNotification();
694  		}
695  	}
696  };
697  
698  void DarlingServer::Kqchan::Process::_checkForEventsAsync() {
699  	Thread::kernelAsync([weakSelf = weak_from_this()]() {
700  		auto self = weakSelf.lock();
701  		if (!self) {
702  			return;
703  		}
704  		// if we have more events ready, tell our peer
705  		if (!self->_events.empty()) {
706  			self->_sendNotification();
707  		}
708  	});
709  };
710  
711  std::function<void()> DarlingServer::Kqchan::Process::_checkForEventsAsyncFactory() {
712  	return [weakSelf = weak_from_this()]() {
713  		auto self = weakSelf.lock();
714  		if (!self) {
715  			return;
716  		}
717  		self->_checkForEventsAsync();
718  	};
719  };
720  
721  void DarlingServer::Kqchan::Process::logToStream(Log::Stream& stream) const {
722  	auto proc = _targetProcess.lock();
723  
724  	Kqchan::logToStream(stream);
725  
726  	stream << ":";
727  	if (proc) {
728  		stream << *proc;
729  	} else {
730  		stream << "<null>";
731  	}
732  };