/ src / async-writer.cpp
async-writer.cpp
  1  #include <darlingserver/async-writer.hpp>
  2  #include <darlingserver/server.hpp>
  3  
  4  #include <sys/fcntl.h>
  5  
  6  //
  7  // writer
  8  //
  9  
 10  DarlingServer::AsyncWriter::~AsyncWriter() {
 11  	if (auto monitor = _monitor.lock()) {
 12  		Server::sharedInstance().removeMonitor(monitor);
 13  	}
 14  };
 15  
 16  void DarlingServer::AsyncWriter::init(std::shared_ptr<FD> fd) {
 17  	_fd = fd;
 18  
 19  	int flags = fcntl(_fd->fd(), F_GETFL);
 20  	if (flags < 0) {
 21  		throw std::system_error(errno, std::generic_category());
 22  	}
 23  
 24  	if (fcntl(_fd->fd(), F_SETFL, flags | O_NONBLOCK) < 0) {
 25  		throw std::system_error(errno, std::generic_category());
 26  	}
 27  
 28  	auto weakSelf = weak_from_this();
 29  	auto monitor = std::make_shared<Monitor>(_fd, Monitor::Event::Writable | Monitor::Event::HangUp, true, false, [weakSelf](std::shared_ptr<Monitor> monitor, Monitor::Event events) {
 30  		auto self = weakSelf.lock();
 31  		if (!self) {
 32  			return;
 33  		}
 34  
 35  		if (!!(events & Monitor::Event::HangUp)) {
 36  			Server::sharedInstance().removeMonitor(monitor);
 37  			self->_monitor.reset();
 38  			return;
 39  		}
 40  
 41  		if (!!(events & Monitor::Event::Writable)) {
 42  			std::unique_lock lock(self->_mutex);
 43  			self->_canSend = true;
 44  			self->_trySendLocked();
 45  		}
 46  	});
 47  	_monitor = monitor;
 48  
 49  	Server::sharedInstance().addMonitor(monitor);
 50  };
 51  
 52  void DarlingServer::AsyncWriter::_trySendLocked() {
 53  	do {
 54  		auto written = ::write(_fd->fd(), _buffer.data(), _buffer.size());
 55  		if (written < 0) {
 56  			if (errno == EINTR) {
 57  				// just try again
 58  				continue;
 59  			} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
 60  				// we can't write anymore for now
 61  				_canSend = false;
 62  			} else {
 63  				throw std::system_error(errno, std::generic_category());
 64  			}
 65  		} else {
 66  			_buffer.erase(_buffer.begin(), _buffer.begin() + written);
 67  
 68  			if (_buffer.empty()) {
 69  				// we've written all the data we had;
 70  				// we can now die if no one else is holding a reference to us
 71  				_keepMeAliveUntilEmpty = nullptr;
 72  				break;
 73  			}
 74  		}
 75  	} while (_canSend);
 76  };
 77  
 78  std::shared_ptr<DarlingServer::AsyncWriter> DarlingServer::AsyncWriter::make(std::shared_ptr<FD> fd) {
 79  	auto writer = std::make_shared<AsyncWriter>();
 80  	writer->init(fd);
 81  	return writer;
 82  };
 83  
 84  DarlingServer::AsyncWriter::Stream DarlingServer::AsyncWriter::stream() {
 85  	return AsyncWriter::Stream(shared_from_this());
 86  };
 87  
 88  void DarlingServer::AsyncWriter::write(const char* data, size_t length) {
 89  	if (length == 0) {
 90  		return;
 91  	}
 92  
 93  	std::unique_lock lock(_mutex);
 94  	_buffer.insert(_buffer.end(), data, data + length);
 95  
 96  	// okay, the buffer is now non-empty;
 97  	// let's ensure we stay alive at least until we finish writing all the data
 98  	_keepMeAliveUntilEmpty = shared_from_this();
 99  
100  	_trySendLocked();
101  };
102  
103  void DarlingServer::AsyncWriter::write(const std::string& data) {
104  	return write(data.data(), data.length());
105  };
106  
107  //
108  // stream
109  //
110  
111  DarlingServer::AsyncWriter::Stream::Stream(std::shared_ptr<AsyncWriter> writer):
112  	_writer(writer)
113  	{};
114  
115  DarlingServer::AsyncWriter::Stream::~Stream() {
116  	_writer->write(_stream.str());
117  };