BodySource.swift
1 // Foundation/URLSession/BodySource.swift - URLSession & libcurl 2 // 3 // This source file is part of the Swift.org open source project 4 // 5 // Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors 6 // Licensed under Apache License v2.0 with Runtime Library Exception 7 // 8 // See http://swift.org/LICENSE.txt for license information 9 // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors 10 // 11 // ----------------------------------------------------------------------------- 12 /// 13 /// These are libcurl helpers for the URLSession API code. 14 /// - SeeAlso: https://curl.haxx.se/libcurl/c/ 15 /// - SeeAlso: URLSession.swift 16 /// 17 // ----------------------------------------------------------------------------- 18 19 #if os(macOS) || os(iOS) || os(watchOS) || os(tvOS) 20 import SwiftFoundation 21 #else 22 import Foundation 23 #endif 24 25 @_implementationOnly import CoreFoundation 26 @_implementationOnly import CFURLSessionInterface 27 import Dispatch 28 29 30 /// Turn `Data` into `DispatchData` 31 internal func createDispatchData(_ data: Data) -> DispatchData { 32 //TODO: Avoid copying data 33 return data.withUnsafeBytes { DispatchData(bytes: $0) } 34 } 35 36 /// Copy data from `DispatchData` into memory pointed to by an `UnsafeMutableBufferPointer`. 37 internal func copyDispatchData<T>(_ data: DispatchData, infoBuffer buffer: UnsafeMutableBufferPointer<T>) { 38 precondition(data.count <= (buffer.count * MemoryLayout<T>.size)) 39 _ = data.copyBytes(to: buffer) 40 } 41 42 /// Split `DispatchData` into `(head, tail)` pair. 43 internal func splitData(dispatchData data: DispatchData, atPosition position: Int) -> (DispatchData,DispatchData) { 44 return (data.subdata(in: 0..<position), data.subdata(in: position..<data.count)) 45 } 46 47 /// A (non-blocking) source for body data. 48 internal protocol _BodySource: AnyObject { 49 /// Get the next chunck of data. 50 /// 51 /// - Returns: `.data` until the source is exhausted, at which point it will 52 /// return `.done`. Since this is non-blocking, it will return `.retryLater` 53 /// if no data is available at this point, but will be available later. 54 func getNextChunk(withLength length: Int) -> _BodySourceDataChunk 55 } 56 internal enum _BodySourceDataChunk { 57 case data(DispatchData) 58 /// The source is depleted. 59 case done 60 /// Retry later to get more data. 61 case retryLater 62 case error 63 } 64 65 internal final class _BodyStreamSource { 66 let inputStream: InputStream 67 68 init(inputStream: InputStream) { 69 assert(inputStream.streamStatus == .notOpen) 70 inputStream.open() 71 self.inputStream = inputStream 72 } 73 } 74 75 extension _BodyStreamSource : _BodySource { 76 func getNextChunk(withLength length: Int) -> _BodySourceDataChunk { 77 guard inputStream.hasBytesAvailable else { 78 return .done 79 } 80 81 let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: length, alignment: MemoryLayout<UInt8>.alignment) 82 83 guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { 84 buffer.deallocate() 85 return .error 86 } 87 88 let readBytes = self.inputStream.read(pointer, maxLength: length) 89 if readBytes > 0 { 90 let dispatchData = DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .custom(nil, { buffer.deallocate() })) 91 return .data(dispatchData.subdata(in: 0 ..< readBytes)) 92 } 93 else if readBytes == 0 { 94 buffer.deallocate() 95 return .done 96 } 97 else { 98 buffer.deallocate() 99 return .error 100 } 101 } 102 } 103 104 /// A body data source backed by `DispatchData`. 105 internal final class _BodyDataSource { 106 var data: DispatchData! 107 init(data: DispatchData) { 108 self.data = data 109 } 110 } 111 112 extension _BodyDataSource : _BodySource { 113 enum _Error : Error { 114 case unableToRewindData 115 } 116 117 func getNextChunk(withLength length: Int) -> _BodySourceDataChunk { 118 let remaining = data.count 119 if remaining == 0 { 120 return .done 121 } else if remaining <= length { 122 let r: DispatchData! = data 123 data = DispatchData.empty 124 return .data(r) 125 } else { 126 let (chunk, remainder) = splitData(dispatchData: data, atPosition: length) 127 data = remainder 128 return .data(chunk) 129 } 130 } 131 } 132 133 134 /// A HTTP body data source backed by a file. 135 /// 136 /// This allows non-blocking streaming of file data to the remote server. 137 /// 138 /// The source reads data using a `DispatchIO` channel, and hence reading 139 /// file data is non-blocking. It has a local buffer that it fills as calls 140 /// to `getNextChunk(withLength:)` drain it. 141 /// 142 /// - Note: Calls to `getNextChunk(withLength:)` and callbacks from libdispatch 143 /// should all happen on the same (serial) queue, and hence this code doesn't 144 /// have to be thread safe. 145 internal final class _BodyFileSource { 146 fileprivate let fileURL: URL 147 fileprivate let channel: DispatchIO 148 fileprivate let workQueue: DispatchQueue 149 fileprivate let dataAvailableHandler: () -> Void 150 fileprivate var hasActiveReadHandler = false 151 fileprivate var availableChunk: _Chunk = .empty 152 153 /// Create a new data source backed by a file. 154 /// 155 /// - Parameter fileURL: the file to read from 156 /// - Parameter workQueue: the queue that it's safe to call 157 /// `getNextChunk(withLength:)` on, and that the `dataAvailableHandler` 158 /// will be called on. 159 /// - Parameter dataAvailableHandler: Will be called when data becomes 160 /// available. Reading data is done in a non-blocking way, such that 161 /// no data may be available even if there's more data in the file. 162 /// if `getNextChunk(withLength:)` returns `.retryLater`, this handler 163 /// will be called once data becomes available. 164 init(fileURL: URL, workQueue: DispatchQueue, dataAvailableHandler: @escaping () -> Void) { 165 guard fileURL.isFileURL else { fatalError("The body data URL must be a file URL.") } 166 self.fileURL = fileURL 167 self.workQueue = workQueue 168 self.dataAvailableHandler = dataAvailableHandler 169 170 guard let channel = fileURL.withUnsafeFileSystemRepresentation({ 171 // DispatchIO (dispatch_io_create_with_path) makes a copy of the path 172 DispatchIO(type: .stream, path: $0!, 173 oflag: O_RDONLY, mode: 0, queue: workQueue, 174 cleanupHandler: {_ in }) 175 }) else { 176 fatalError("Can't create DispatchIO channel") 177 } 178 self.channel = channel 179 self.channel.setLimit(highWater: CFURLSessionMaxWriteSize) 180 } 181 182 fileprivate enum _Chunk { 183 /// Nothing has been read, yet 184 case empty 185 /// An error has occurred while reading 186 case errorDetected(Int) 187 /// Data has been read 188 case data(DispatchData) 189 /// All data has been read from the file (EOF). 190 case done(DispatchData?) 191 } 192 } 193 194 extension _BodyFileSource { 195 fileprivate var desiredBufferLength: Int { return 3 * CFURLSessionMaxWriteSize } 196 /// Enqueue a dispatch I/O read to fill the buffer. 197 /// 198 /// - Note: This is a no-op if the buffer is full, or if a read operation 199 /// is already enqueued. 200 fileprivate func readNextChunk() { 201 // libcurl likes to use a buffer of size CFURLSessionMaxWriteSize, we'll 202 // try to keep 3 x of that around in the `chunk` buffer. 203 guard availableByteCount < desiredBufferLength else { return } 204 guard !hasActiveReadHandler else { return } // We're already reading 205 hasActiveReadHandler = true 206 207 let lengthToRead = desiredBufferLength - availableByteCount 208 channel.read(offset: 0, length: lengthToRead, queue: workQueue) { (done: Bool, data: DispatchData?, errno: Int32) in 209 let wasEmpty = self.availableByteCount == 0 210 self.hasActiveReadHandler = !done 211 212 switch (done, data, errno) { 213 case (true, _, errno) where errno != 0: 214 self.availableChunk = .errorDetected(Int(errno)) 215 case (true, let d?, 0) where d.isEmpty: 216 self.append(data: d, endOfFile: true) 217 case (true, let d?, 0): 218 self.append(data: d, endOfFile: false) 219 case (false, let d?, 0): 220 self.append(data: d, endOfFile: false) 221 default: 222 fatalError("Invalid arguments to read(3) callback.") 223 } 224 225 if wasEmpty && (0 < self.availableByteCount) { 226 self.dataAvailableHandler() 227 } 228 } 229 } 230 231 fileprivate func append(data: DispatchData, endOfFile: Bool) { 232 switch availableChunk { 233 case .empty: 234 availableChunk = endOfFile ? .done(data) : .data(data) 235 case .errorDetected: 236 break 237 case .data(var oldData): 238 oldData.append(data) 239 availableChunk = endOfFile ? .done(oldData) : .data(oldData) 240 case .done: 241 fatalError("Trying to append data, but end-of-file was already detected.") 242 } 243 } 244 245 fileprivate var availableByteCount: Int { 246 switch availableChunk { 247 case .empty: return 0 248 case .errorDetected: return 0 249 case .data(let d): return d.count 250 case .done(let d?): return d.count 251 case .done(nil): return 0 252 } 253 } 254 } 255 256 extension _BodyFileSource : _BodySource { 257 func getNextChunk(withLength length: Int) -> _BodySourceDataChunk { 258 switch availableChunk { 259 case .empty: 260 readNextChunk() 261 return .retryLater 262 case .errorDetected: 263 return .error 264 case .data(let data): 265 let l = min(length, data.count) 266 let (head, tail) = splitData(dispatchData: data, atPosition: l) 267 268 availableChunk = tail.isEmpty ? .empty : .data(tail) 269 readNextChunk() 270 271 if head.isEmpty { 272 return .retryLater 273 } else { 274 return .data(head) 275 } 276 case .done(let data?): 277 let l = min(length, data.count) 278 let (head, tail) = splitData(dispatchData: data, atPosition: l) 279 availableChunk = tail.isEmpty ? .done(nil) : .done(tail) 280 if head.isEmpty { 281 return .done 282 } else { 283 return .data(head) 284 } 285 case .done(nil): 286 return .done 287 } 288 } 289 }