/ Sources / FoundationNetworking / URLSession / BodySource.swift
BodySource.swift
  1  // Foundation/URLSession/BodySource.swift - URLSession & libcurl
  2  //
  3  // This source file is part of the Swift.org open source project
  4  //
  5  // Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors
  6  // Licensed under Apache License v2.0 with Runtime Library Exception
  7  //
  8  // See http://swift.org/LICENSE.txt for license information
  9  // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
 10  //
 11  // -----------------------------------------------------------------------------
 12  ///
 13  /// These are libcurl helpers for the URLSession API code.
 14  /// - SeeAlso: https://curl.haxx.se/libcurl/c/
 15  /// - SeeAlso: URLSession.swift
 16  ///
 17  // -----------------------------------------------------------------------------
 18  
 19  #if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
 20  import SwiftFoundation
 21  #else
 22  import Foundation
 23  #endif
 24  
 25  @_implementationOnly import CoreFoundation
 26  @_implementationOnly import CFURLSessionInterface
 27  import Dispatch
 28  
 29  
 30  /// Turn `Data` into `DispatchData`
 31  internal func createDispatchData(_ data: Data) -> DispatchData {
 32      //TODO: Avoid copying data
 33      return data.withUnsafeBytes { DispatchData(bytes: $0) }
 34  }
 35  
 36  /// Copy data from `DispatchData` into memory pointed to by an `UnsafeMutableBufferPointer`.
 37  internal func copyDispatchData<T>(_ data: DispatchData, infoBuffer buffer: UnsafeMutableBufferPointer<T>) {
 38      precondition(data.count <= (buffer.count * MemoryLayout<T>.size))
 39      _ = data.copyBytes(to: buffer)
 40  }
 41  
 42  /// Split `DispatchData` into `(head, tail)` pair.
 43  internal func splitData(dispatchData data: DispatchData, atPosition position: Int) -> (DispatchData,DispatchData) {
 44      return (data.subdata(in: 0..<position), data.subdata(in: position..<data.count))
 45  }
 46  
 47  /// A (non-blocking) source for body data.
 48  internal protocol _BodySource: AnyObject {
 49      /// Get the next chunck of data.
 50      ///
 51      /// - Returns: `.data` until the source is exhausted, at which point it will
 52      /// return `.done`. Since this is non-blocking, it will return `.retryLater`
 53      /// if no data is available at this point, but will be available later.
 54      func getNextChunk(withLength length: Int) -> _BodySourceDataChunk
 55  }
 56  internal enum _BodySourceDataChunk {
 57      case data(DispatchData)
 58      /// The source is depleted.
 59      case done
 60      /// Retry later to get more data.
 61      case retryLater
 62      case error
 63  }
 64  
 65  internal final class _BodyStreamSource {
 66      let inputStream: InputStream
 67      
 68      init(inputStream: InputStream) {
 69          assert(inputStream.streamStatus == .notOpen)
 70          inputStream.open()
 71          self.inputStream = inputStream
 72      }
 73  }
 74  
 75  extension _BodyStreamSource : _BodySource {
 76      func getNextChunk(withLength length: Int) -> _BodySourceDataChunk {
 77          guard inputStream.hasBytesAvailable else {
 78              return .done
 79          }
 80          
 81          let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: length, alignment: MemoryLayout<UInt8>.alignment)
 82          
 83          guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else {
 84              buffer.deallocate()
 85              return .error
 86          }
 87          
 88          let readBytes = self.inputStream.read(pointer, maxLength: length)
 89          if readBytes > 0 {
 90              let dispatchData = DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .custom(nil, { buffer.deallocate() }))
 91              return .data(dispatchData.subdata(in: 0 ..< readBytes))
 92          }
 93          else if readBytes == 0 {
 94              buffer.deallocate()
 95              return .done
 96          }
 97          else {
 98              buffer.deallocate()
 99              return .error
100          }
101      }
102  }
103  
104  /// A body data source backed by `DispatchData`.
105  internal final class _BodyDataSource {
106      var data: DispatchData! 
107      init(data: DispatchData) {
108          self.data = data
109      }
110  }
111  
112  extension _BodyDataSource : _BodySource {
113      enum _Error : Error {
114          case unableToRewindData
115      }
116  
117      func getNextChunk(withLength length: Int) -> _BodySourceDataChunk {
118          let remaining = data.count
119          if remaining == 0 {
120              return .done
121          } else if remaining <= length {
122              let r: DispatchData! = data
123              data = DispatchData.empty 
124              return .data(r)
125          } else {
126              let (chunk, remainder) = splitData(dispatchData: data, atPosition: length)
127              data = remainder
128              return .data(chunk)
129          }
130      }
131  }
132  
133  
134  /// A HTTP body data source backed by a file.
135  ///
136  /// This allows non-blocking streaming of file data to the remote server.
137  ///
138  /// The source reads data using a `DispatchIO` channel, and hence reading
139  /// file data is non-blocking. It has a local buffer that it fills as calls
140  /// to `getNextChunk(withLength:)` drain it.
141  ///
142  /// - Note: Calls to `getNextChunk(withLength:)` and callbacks from libdispatch
143  /// should all happen on the same (serial) queue, and hence this code doesn't
144  /// have to be thread safe.
145  internal final class _BodyFileSource {
146      fileprivate let fileURL: URL
147      fileprivate let channel: DispatchIO
148      fileprivate let workQueue: DispatchQueue
149      fileprivate let dataAvailableHandler: () -> Void
150      fileprivate var hasActiveReadHandler = false
151      fileprivate var availableChunk: _Chunk = .empty
152  
153      /// Create a new data source backed by a file.
154      ///
155      /// - Parameter fileURL: the file to read from
156      /// - Parameter workQueue: the queue that it's safe to call
157      ///     `getNextChunk(withLength:)` on, and that the `dataAvailableHandler`
158      ///     will be called on.
159      /// - Parameter dataAvailableHandler: Will be called when data becomes
160      ///     available. Reading data is done in a non-blocking way, such that
161      ///     no data may be available even if there's more data in the file.
162      ///     if `getNextChunk(withLength:)` returns `.retryLater`, this handler
163      ///     will be called once data becomes available.
164      init(fileURL: URL, workQueue: DispatchQueue, dataAvailableHandler: @escaping () -> Void) {
165          guard fileURL.isFileURL else { fatalError("The body data URL must be a file URL.") }
166          self.fileURL = fileURL
167          self.workQueue = workQueue
168          self.dataAvailableHandler = dataAvailableHandler
169  
170          guard let channel = fileURL.withUnsafeFileSystemRepresentation({
171              // DispatchIO (dispatch_io_create_with_path) makes a copy of the path
172              DispatchIO(type: .stream, path: $0!,
173                         oflag: O_RDONLY, mode: 0, queue: workQueue,
174                         cleanupHandler: {_ in })
175          }) else {
176              fatalError("Can't create DispatchIO channel")
177          }
178          self.channel = channel
179          self.channel.setLimit(highWater: CFURLSessionMaxWriteSize)
180      }
181  
182      fileprivate enum _Chunk {
183          /// Nothing has been read, yet
184          case empty
185          /// An error has occurred while reading
186          case errorDetected(Int)
187          /// Data has been read
188          case data(DispatchData)
189          /// All data has been read from the file (EOF).
190          case done(DispatchData?)
191      }
192  }
193  
194  extension _BodyFileSource {
195      fileprivate var desiredBufferLength: Int { return 3 * CFURLSessionMaxWriteSize }
196      /// Enqueue a dispatch I/O read to fill the buffer.
197      ///
198      /// - Note: This is a no-op if the buffer is full, or if a read operation
199      /// is already enqueued.
200      fileprivate func readNextChunk() {
201          // libcurl likes to use a buffer of size CFURLSessionMaxWriteSize, we'll
202          // try to keep 3 x of that around in the `chunk` buffer.
203          guard availableByteCount < desiredBufferLength else { return }
204          guard !hasActiveReadHandler else { return } // We're already reading
205          hasActiveReadHandler = true
206          
207          let lengthToRead = desiredBufferLength - availableByteCount
208          channel.read(offset: 0, length: lengthToRead, queue: workQueue) { (done: Bool, data: DispatchData?, errno: Int32) in
209              let wasEmpty = self.availableByteCount == 0
210              self.hasActiveReadHandler = !done
211              
212              switch (done, data, errno) {
213              case (true, _, errno) where errno != 0:
214                  self.availableChunk = .errorDetected(Int(errno))
215              case (true, let d?, 0) where d.isEmpty:
216                  self.append(data: d, endOfFile: true)
217              case (true, let d?, 0):
218                  self.append(data: d, endOfFile: false)
219              case (false, let d?, 0):
220                  self.append(data: d, endOfFile: false)
221              default:
222                  fatalError("Invalid arguments to read(3) callback.")
223              }
224              
225              if wasEmpty && (0 < self.availableByteCount) {
226                  self.dataAvailableHandler()
227              }
228          }
229      }
230  
231      fileprivate func append(data: DispatchData, endOfFile: Bool) {
232          switch availableChunk {
233          case .empty:
234              availableChunk = endOfFile ? .done(data) : .data(data)
235          case .errorDetected:
236              break
237          case .data(var oldData):
238              oldData.append(data)
239              availableChunk = endOfFile ? .done(oldData) : .data(oldData)
240          case .done:
241              fatalError("Trying to append data, but end-of-file was already detected.")
242          }
243      }
244  
245      fileprivate var availableByteCount: Int {
246          switch availableChunk {
247          case .empty: return 0
248          case .errorDetected: return 0
249          case .data(let d): return d.count
250          case .done(let d?): return d.count
251          case .done(nil): return 0
252          }
253      }
254  }
255  
256  extension _BodyFileSource : _BodySource {
257      func getNextChunk(withLength length: Int) -> _BodySourceDataChunk {    
258          switch availableChunk {
259          case .empty:
260              readNextChunk()
261              return .retryLater
262          case .errorDetected:
263              return .error
264          case .data(let data):
265              let l = min(length, data.count)
266              let (head, tail) = splitData(dispatchData: data, atPosition: l)
267              
268              availableChunk = tail.isEmpty ? .empty : .data(tail)
269              readNextChunk()
270              
271              if head.isEmpty {
272                  return .retryLater
273              } else {
274                  return .data(head)
275              }
276          case .done(let data?):
277              let l = min(length, data.count)
278              let (head, tail) = splitData(dispatchData: data, atPosition: l)
279              availableChunk = tail.isEmpty ? .done(nil) : .done(tail)
280              if head.isEmpty {
281                  return .done
282              } else {
283                  return .data(head)
284              }
285          case .done(nil):
286              return .done
287          }
288      }
289  }