async-writer.cpp
1 #include <darlingserver/async-writer.hpp> 2 #include <darlingserver/server.hpp> 3 4 #include <sys/fcntl.h> 5 6 // 7 // writer 8 // 9 10 DarlingServer::AsyncWriter::~AsyncWriter() { 11 if (auto monitor = _monitor.lock()) { 12 Server::sharedInstance().removeMonitor(monitor); 13 } 14 }; 15 16 void DarlingServer::AsyncWriter::init(std::shared_ptr<FD> fd) { 17 _fd = fd; 18 19 int flags = fcntl(_fd->fd(), F_GETFL); 20 if (flags < 0) { 21 throw std::system_error(errno, std::generic_category()); 22 } 23 24 if (fcntl(_fd->fd(), F_SETFL, flags | O_NONBLOCK) < 0) { 25 throw std::system_error(errno, std::generic_category()); 26 } 27 28 auto weakSelf = weak_from_this(); 29 auto monitor = std::make_shared<Monitor>(_fd, Monitor::Event::Writable | Monitor::Event::HangUp, true, false, [weakSelf](std::shared_ptr<Monitor> monitor, Monitor::Event events) { 30 auto self = weakSelf.lock(); 31 if (!self) { 32 return; 33 } 34 35 if (!!(events & Monitor::Event::HangUp)) { 36 Server::sharedInstance().removeMonitor(monitor); 37 self->_monitor.reset(); 38 return; 39 } 40 41 if (!!(events & Monitor::Event::Writable)) { 42 std::unique_lock lock(self->_mutex); 43 self->_canSend = true; 44 self->_trySendLocked(); 45 } 46 }); 47 _monitor = monitor; 48 49 Server::sharedInstance().addMonitor(monitor); 50 }; 51 52 void DarlingServer::AsyncWriter::_trySendLocked() { 53 do { 54 auto written = ::write(_fd->fd(), _buffer.data(), _buffer.size()); 55 if (written < 0) { 56 if (errno == EINTR) { 57 // just try again 58 continue; 59 } else if (errno == EAGAIN || errno == EWOULDBLOCK) { 60 // we can't write anymore for now 61 _canSend = false; 62 } else { 63 throw std::system_error(errno, std::generic_category()); 64 } 65 } else { 66 _buffer.erase(_buffer.begin(), _buffer.begin() + written); 67 68 if (_buffer.empty()) { 69 // we've written all the data we had; 70 // we can now die if no one else is holding a reference to us 71 _keepMeAliveUntilEmpty = nullptr; 72 break; 73 } 74 } 75 } while (_canSend); 76 }; 77 78 std::shared_ptr<DarlingServer::AsyncWriter> DarlingServer::AsyncWriter::make(std::shared_ptr<FD> fd) { 79 auto writer = std::make_shared<AsyncWriter>(); 80 writer->init(fd); 81 return writer; 82 }; 83 84 DarlingServer::AsyncWriter::Stream DarlingServer::AsyncWriter::stream() { 85 return AsyncWriter::Stream(shared_from_this()); 86 }; 87 88 void DarlingServer::AsyncWriter::write(const char* data, size_t length) { 89 if (length == 0) { 90 return; 91 } 92 93 std::unique_lock lock(_mutex); 94 _buffer.insert(_buffer.end(), data, data + length); 95 96 // okay, the buffer is now non-empty; 97 // let's ensure we stay alive at least until we finish writing all the data 98 _keepMeAliveUntilEmpty = shared_from_this(); 99 100 _trySendLocked(); 101 }; 102 103 void DarlingServer::AsyncWriter::write(const std::string& data) { 104 return write(data.data(), data.length()); 105 }; 106 107 // 108 // stream 109 // 110 111 DarlingServer::AsyncWriter::Stream::Stream(std::shared_ptr<AsyncWriter> writer): 112 _writer(writer) 113 {}; 114 115 DarlingServer::AsyncWriter::Stream::~Stream() { 116 _writer->write(_stream.str()); 117 };