/ Sources / FoundationNetworking / URLSession / libcurl / MultiHandle.swift
MultiHandle.swift
  1  //
  2  // This source file is part of the Swift.org open source project
  3  //
  4  // Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors
  5  // Licensed under Apache License v2.0 with Runtime Library Exception
  6  //
  7  // See http://swift.org/LICENSE.txt for license information
  8  // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
  9  //
 10  // -----------------------------------------------------------------------------
 11  ///
 12  /// libcurl *multi handle* wrapper.
 13  /// These are libcurl helpers for the URLSession API code.
 14  /// - SeeAlso: https://curl.haxx.se/libcurl/c/
 15  /// - SeeAlso: URLSession.swift
 16  ///
 17  // -----------------------------------------------------------------------------
 18  
 19  #if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
 20  import SwiftFoundation
 21  #else
 22  import Foundation
 23  #endif
 24  
 25  @_implementationOnly import CoreFoundation
 26  @_implementationOnly import CFURLSessionInterface
 27  import Dispatch
 28  
 29  
 30  
 31  extension URLSession {
 32      /// Minimal wrapper around [curl multi interface](https://curl.haxx.se/libcurl/c/libcurl-multi.html).
 33      ///
 34      /// The the *multi handle* manages the sockets for easy handles
 35      /// (`_EasyHandle`), and this implementation uses
 36      /// libdispatch to listen for sockets being read / write ready.
 37      ///
 38      /// Using `DispatchSource` allows this implementation to be
 39      /// non-blocking and all code to run on the same thread /
 40      /// `DispatchQueue` -- thus keeping is simple.
 41      ///
 42      /// - SeeAlso: _EasyHandle
 43      internal final class _MultiHandle {
 44          let rawHandle = CFURLSessionMultiHandleInit()
 45          let queue: DispatchQueue
 46          let group = DispatchGroup()
 47          fileprivate var easyHandles: [_EasyHandle] = []
 48          fileprivate var timeoutSource: _TimeoutSource? = nil
 49          private var reentrantInUpdateTimeoutTimer = false
 50          
 51          init(configuration: URLSession._Configuration, workQueue: DispatchQueue) {
 52              queue = DispatchQueue(label: "MultiHandle.isolation", target: workQueue)
 53              setupCallbacks()
 54              configure(with: configuration)
 55          }
 56          deinit {
 57              // C.f.: <https://curl.haxx.se/libcurl/c/curl_multi_cleanup.html>
 58              easyHandles.forEach {
 59                  try! CFURLSessionMultiHandleRemoveHandle(rawHandle, $0.rawHandle).asError()
 60              }
 61              try! CFURLSessionMultiHandleDeinit(rawHandle).asError()
 62          }
 63      }
 64  }
 65  
 66  extension URLSession._MultiHandle {
 67      func configure(with configuration: URLSession._Configuration) {
 68          #if !NS_CURL_MISSING_MAX_HOST_CONNECTIONS
 69          try! CFURLSession_multi_setopt_l(rawHandle, CFURLSessionMultiOptionMAX_HOST_CONNECTIONS, numericCast(configuration.httpMaximumConnectionsPerHost)).asError()
 70          #endif
 71          
 72          try! CFURLSession_multi_setopt_l(rawHandle, CFURLSessionMultiOptionPIPELINING, configuration.httpShouldUsePipelining ? 3 : 2).asError()
 73          //TODO: We may want to set
 74          //    CFURLSessionMultiOptionMAXCONNECTS
 75          //    CFURLSessionMultiOptionMAX_TOTAL_CONNECTIONS
 76      }
 77  }
 78  
 79  fileprivate extension URLSession._MultiHandle {
 80      static func from(callbackUserData userdata: UnsafeMutableRawPointer?) -> URLSession._MultiHandle? {
 81          guard let userdata = userdata else { return nil }
 82          return Unmanaged<URLSession._MultiHandle>.fromOpaque(userdata).takeUnretainedValue()
 83      }
 84  }
 85  
 86  fileprivate extension URLSession._MultiHandle {
 87      /// Forward the libcurl callbacks into Swift methods
 88      func setupCallbacks() {
 89          // Socket
 90          try! CFURLSession_multi_setopt_ptr(rawHandle, CFURLSessionMultiOptionSOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError()
 91          try! CFURLSession_multi_setopt_sf(rawHandle, CFURLSessionMultiOptionSOCKETFUNCTION) { (easyHandle: CFURLSessionEasyHandle, socket: CFURLSession_socket_t, what: Int32, userdata: UnsafeMutableRawPointer?, socketptr: UnsafeMutableRawPointer?) -> Int32 in
 92              guard let handle = URLSession._MultiHandle.from(callbackUserData: userdata) else { fatalError() }
 93              return handle.register(socket: socket, for: easyHandle, what: what, socketSourcePtr: socketptr)
 94              }.asError()
 95          // Timeout:
 96          try! CFURLSession_multi_setopt_ptr(rawHandle, CFURLSessionMultiOptionTIMERDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError()
 97  #if os(Windows) && (arch(arm64) || arch(x86_64))
 98          typealias CFURLSessionMultiOption = Int32
 99  #else
100          typealias CFURLSessionMultiOption = Int
101  #endif
102          try! CFURLSession_multi_setopt_tf(rawHandle, CFURLSessionMultiOptionTIMERFUNCTION) { (_, timeout: CFURLSessionMultiOption, userdata: UnsafeMutableRawPointer?) -> Int32 in
103              guard let handle = URLSession._MultiHandle.from(callbackUserData: userdata) else { fatalError() }
104              handle.updateTimeoutTimer(to: numericCast(timeout))
105              return 0
106              }.asError()
107      }
108      /// <https://curl.haxx.se/libcurl/c/CURLMOPT_SOCKETFUNCTION.html> and
109      /// <https://curl.haxx.se/libcurl/c/curl_multi_socket_action.html>
110      func register(socket: CFURLSession_socket_t, for easyHandle: CFURLSessionEasyHandle, what: Int32, socketSourcePtr: UnsafeMutableRawPointer?) -> Int32 {
111          // We get this callback whenever we need to register or unregister a
112          // given socket with libdispatch.
113          // The `action` / `what` defines if we should register or unregister
114          // that we're interested in read and/or write readiness. We will do so
115          // through libdispatch (DispatchSource) and store the source(s) inside
116          // a `SocketSources` which we in turn store inside libcurl's multi handle
117          // by means of curl_multi_assign() -- we retain the object fist.
118          let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
119          var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
120          if socketSources == nil && action.needsSource {
121              let s = _SocketSources()
122              let p = Unmanaged.passRetained(s).toOpaque()
123              CFURLSessionMultiHandleAssign(rawHandle, socket, UnsafeMutableRawPointer(p))
124              socketSources = s
125          } else if socketSources != nil && action == .unregister {
126              // We need to release the stored pointer:
127              if let opaque = socketSourcePtr {
128                  Unmanaged<_SocketSources>.fromOpaque(opaque).release()
129              }
130              socketSources = nil
131          }
132          if let ss = socketSources {
133              let handler = DispatchWorkItem { [weak self] in
134                  self?.performAction(for: socket)
135              }
136              ss.createSources(with: action, socket: socket, queue: queue, handler: handler)
137          }
138          return 0
139      }
140  
141      /// What read / write ready event to register / unregister.
142      ///
143      /// This re-maps `CFURLSessionPoll` / `CURL_POLL`.
144      enum _SocketRegisterAction {
145          case none
146          case registerRead
147          case registerWrite
148          case registerReadAndWrite
149          case unregister
150      }
151  }
152  
153  extension Collection where Element == _EasyHandle {
154    internal func firstIndex(of element: Element) -> Index? {
155      var i = self.startIndex
156      while i != self.endIndex {
157        if self[i] == element { return i }
158        self.formIndex(after: &i)
159      }
160      return nil
161    }
162  }
163  
164  internal extension URLSession._MultiHandle {
165      /// Add an easy handle -- start its transfer.
166      func add(_ handle: _EasyHandle) {
167          // If this is the first handle being added, we need to `kick` the
168          // underlying multi handle by calling `timeoutTimerFired` as
169          // described in
170          // <https://curl.haxx.se/libcurl/c/curl_multi_socket_action.html>.
171          // That will initiate the registration for timeout timer and socket
172          // readiness.
173          let needsTimeout = self.easyHandles.isEmpty
174          self.easyHandles.append(handle)
175          try! CFURLSessionMultiHandleAddHandle(self.rawHandle, handle.rawHandle).asError()
176          if needsTimeout {
177              self.timeoutTimerFired()
178          }
179      }
180      /// Remove an easy handle -- stop its transfer.
181      func remove(_ handle: _EasyHandle) {
182          guard let idx = self.easyHandles.firstIndex(of: handle) else {
183              fatalError("Handle not in list.")
184          }
185          self.easyHandles.remove(at: idx)
186          try! CFURLSessionMultiHandleRemoveHandle(self.rawHandle, handle.rawHandle).asError()
187      }
188  }
189  
190  fileprivate extension URLSession._MultiHandle {
191      /// This gets called when we should ask curl to perform action on a socket.
192      func performAction(for socket: CFURLSession_socket_t) {
193          try! readAndWriteAvailableData(on: socket)
194      }
195      /// This gets called when our timeout timer fires.
196      ///
197      /// libcurl relies on us calling curl_multi_socket_action() every now and then.
198      func timeoutTimerFired() {
199          try! readAndWriteAvailableData(on: CFURLSessionSocketTimeout)
200      }
201      /// reads/writes available data given an action
202      func readAndWriteAvailableData(on socket: CFURLSession_socket_t) throws {
203          var runningHandlesCount = Int32(0)
204          try CFURLSessionMultiHandleAction(rawHandle, socket, 0, &runningHandlesCount).asError()
205          //TODO: Do we remove the timeout timer here if / when runningHandles == 0 ?
206          readMessages()
207      }
208      
209      /// Check the status of all individual transfers.
210      ///
211      /// libcurl refers to this as “read multi stack informationals”.
212      /// Check for transfers that completed.
213      func readMessages() {
214          // We pop the messages one by one in a loop:
215          repeat {
216              // count will contain the messages left in the queue
217              var count = Int32(0)
218              let info = CFURLSessionMultiHandleInfoRead(rawHandle, &count)
219              guard let handle = info.easyHandle else { break }
220              let code = info.resultCode
221              completedTransfer(forEasyHandle: handle, easyCode: code)
222          } while true
223      }
224      /// Transfer completed.
225      func completedTransfer(forEasyHandle handle: CFURLSessionEasyHandle, easyCode: CFURLSessionEasyCode) {
226          // Look up the matching wrapper:
227          guard let idx = easyHandles.firstIndex(where: { $0.rawHandle == handle }) else {
228              fatalError("Transfer completed for easy handle, but it is not in the list of added handles.")
229          }
230          let easyHandle = easyHandles[idx]
231          // Find the NSURLError code
232          var error: NSError?
233          if let errorCode = easyHandle.urlErrorCode(for: easyCode) {
234              var errorDescription: String = ""
235              if easyHandle.errorBuffer[0] == 0 {
236                let description = CFURLSessionEasyCodeDescription(easyCode)!
237                errorDescription = NSString(bytes: UnsafeMutableRawPointer(mutating: description), length: strlen(description), encoding: String.Encoding.utf8.rawValue)! as String
238              } else {
239                errorDescription = String(cString: easyHandle.errorBuffer)
240              }
241  
242              error = NSError(domain: NSURLErrorDomain, code: errorCode, userInfo: [
243                  NSLocalizedDescriptionKey: errorDescription
244              ])
245          }
246          completedTransfer(forEasyHandle: easyHandle, error: error)
247      }
248      /// Transfer completed.
249      func completedTransfer(forEasyHandle handle: _EasyHandle, error: NSError?) {
250          handle.completedTransfer(withError: error)
251      }
252  }
253  
254  fileprivate extension _EasyHandle {
255      /// An error code within the `NSURLErrorDomain` based on the error of the
256      /// easy handle.
257      /// - Note: The error value is set only on failure. You can't use it to
258      ///   determine *if* something failed or not, only *why* it failed.
259      func urlErrorCode(for easyCode: CFURLSessionEasyCode) -> Int? {
260          switch (easyCode, CInt(connectFailureErrno)) {
261          case (CFURLSessionEasyCodeOK, _):
262              return nil
263          case (_, ECONNREFUSED):
264              return NSURLErrorCannotConnectToHost
265          case (CFURLSessionEasyCodeUNSUPPORTED_PROTOCOL, _):
266              return NSURLErrorUnsupportedURL
267          case (CFURLSessionEasyCodeURL_MALFORMAT, _):
268              return NSURLErrorBadURL
269          case (CFURLSessionEasyCodeCOULDNT_RESOLVE_HOST, _):
270              // Oddly, this appears to happen for malformed URLs, too.
271              return NSURLErrorCannotFindHost
272          case (CFURLSessionEasyCodeRECV_ERROR, ECONNRESET):
273              return NSURLErrorNetworkConnectionLost
274          case (CFURLSessionEasyCodeSEND_ERROR, ECONNRESET):
275              return NSURLErrorNetworkConnectionLost
276          case (CFURLSessionEasyCodeGOT_NOTHING, _):
277              return NSURLErrorBadServerResponse
278          case (CFURLSessionEasyCodeABORTED_BY_CALLBACK, _):
279              return NSURLErrorUnknown // Or NSURLErrorCancelled if we're in such a state
280          case (CFURLSessionEasyCodeCOULDNT_CONNECT, ETIMEDOUT):
281              return NSURLErrorTimedOut
282          case (CFURLSessionEasyCodeOPERATION_TIMEDOUT, _):
283              return NSURLErrorTimedOut
284          default:
285              //TODO: Need to map to one of the NSURLError... constants
286              return NSURLErrorUnknown
287          }
288      }
289  }
290  
291  internal func ==(lhs: CFURLSessionPoll, rhs: CFURLSessionPoll) -> Bool {
292      return lhs.value == rhs.value
293  }
294  internal func ~=(lhs: CFURLSessionPoll, rhs: CFURLSessionPoll) -> Bool {
295      return lhs == rhs
296  }
297  
298  fileprivate extension URLSession._MultiHandle._SocketRegisterAction {
299      init(rawValue: CFURLSessionPoll) {
300          switch rawValue {
301          case CFURLSessionPollNone:
302              self = .none
303          case CFURLSessionPollIn:
304              self = .registerRead
305          case CFURLSessionPollOut:
306              self = .registerWrite
307          case CFURLSessionPollInOut:
308              self = .registerReadAndWrite
309          case CFURLSessionPollRemove:
310              self = .unregister
311          default:
312              fatalError("Invalid CFURLSessionPoll value.")
313          }
314      }
315  }
316  
317  fileprivate extension URLSession._MultiHandle._SocketRegisterAction {
318      /// Should a libdispatch source be registered for **read** readiness?
319      var needsReadSource: Bool {
320          switch self {
321          case .none: return false
322          case .registerRead: return true
323          case .registerWrite: return false
324          case .registerReadAndWrite: return true
325          case .unregister: return false
326          }
327      }
328      /// Should a libdispatch source be registered for **write** readiness?
329      var needsWriteSource: Bool {
330          switch self {
331          case .none: return false
332          case .registerRead: return false
333          case .registerWrite: return true
334          case .registerReadAndWrite: return true
335          case .unregister: return false
336          }
337      }
338      /// Should either a **read** or a **write** readiness libdispatch source be
339      /// registered?
340      var needsSource: Bool {
341          return needsReadSource || needsWriteSource
342      }
343  }
344  
345  /// A helper class that wraps a libdispatch timer.
346  ///
347  /// Used to implement the timeout of `URLSession.MultiHandle` and `URLSession.EasyHandle`
348  class _TimeoutSource {
349      let rawSource: DispatchSource 
350      let milliseconds: Int
351      let queue: DispatchQueue        //needed to restart the timer for EasyHandles
352      let handler: DispatchWorkItem   //needed to restart the timer for EasyHandles
353      init(queue: DispatchQueue, milliseconds: Int, handler: DispatchWorkItem) {
354          self.queue = queue
355          self.handler = handler
356          self.milliseconds = milliseconds
357          self.rawSource = DispatchSource.makeTimerSource(queue: queue) as! DispatchSource
358          
359          let delay = UInt64(max(1, milliseconds - 1)) 
360          let start = DispatchTime.now() + DispatchTimeInterval.milliseconds(Int(delay))
361          
362          rawSource.schedule(deadline: start, repeating: .milliseconds(Int(delay)), leeway: (milliseconds == 1) ? .microseconds(Int(1)) : .milliseconds(Int(1)))
363          rawSource.setEventHandler(handler: handler)
364          rawSource.resume() 
365      }
366      deinit {
367          rawSource.cancel()
368      }
369  }
370  
371  fileprivate extension URLSession._MultiHandle {
372  
373      /// <https://curl.haxx.se/libcurl/c/CURLMOPT_TIMERFUNCTION.html>
374      func updateTimeoutTimer(to value: Int) {
375          updateTimeoutTimer(to: _Timeout(timeout: value))
376      }
377      
378      func updateTimeoutTimer(to timeout: _Timeout) {
379          // Set up a timeout timer based on the given value:
380          switch timeout {
381          case .none:
382              timeoutSource = nil
383          case .immediate:
384              timeoutSource = nil
385              queue.async { self.timeoutTimerFired() }
386          case .milliseconds(let milliseconds):
387              if (timeoutSource == nil) || timeoutSource!.milliseconds != milliseconds {
388                  //TODO: Could simply change the existing timer by using DispatchSourceTimer again.
389                  let block = DispatchWorkItem { [weak self] in
390                      self?.timeoutTimerFired()
391                  }
392                  timeoutSource = _TimeoutSource(queue: queue, milliseconds: milliseconds, handler: block)
393              }
394          }
395      }
396      enum _Timeout {
397          case milliseconds(Int)
398          case none
399          case immediate
400      }
401  }
402  
403  fileprivate extension URLSession._MultiHandle._Timeout {
404      init(timeout: Int) {
405          switch timeout {
406          case -1:
407              self = .none
408          case 0:
409              self = .immediate
410          default:
411              self = .milliseconds(timeout)
412          }
413      }
414  }
415  
416  
417  /// Read and write libdispatch sources for a specific socket.
418  ///
419  /// A simple helper that combines two sources -- both being optional.
420  ///
421  /// This info is stored into the socket using `curl_multi_assign()`.
422  ///
423  /// - SeeAlso: URLSession.MultiHandle.SocketRegisterAction
424  fileprivate class _SocketSources {
425      var readSource: DispatchSource?
426      var writeSource: DispatchSource?
427  
428      func createReadSource(socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
429          guard readSource == nil else { return }
430  #if os(Windows)
431          let s = DispatchSource.makeReadSource(handle: HANDLE(bitPattern: Int(socket))!, queue: queue)
432  #else
433          let s = DispatchSource.makeReadSource(fileDescriptor: socket, queue: queue)
434  #endif
435          s.setEventHandler(handler: handler)
436          readSource = s as? DispatchSource
437          s.resume()
438      }
439  
440      func createWriteSource(socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
441          guard writeSource == nil else { return }
442  #if os(Windows)
443          let s = DispatchSource.makeWriteSource(handle: HANDLE(bitPattern: Int(socket))!, queue: queue)
444  #else
445          let s = DispatchSource.makeWriteSource(fileDescriptor: socket, queue: queue)
446  #endif
447          s.setEventHandler(handler: handler)
448          writeSource = s as? DispatchSource
449          s.resume()
450      }
451  
452      func tearDown() {
453          if let s = readSource {
454              s.cancel()
455          }
456          readSource = nil
457          if let s = writeSource {
458              s.cancel()
459          }
460          writeSource = nil
461      }
462  }
463  extension _SocketSources {
464      /// Create a read and/or write source as specified by the action.
465      func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
466          if action.needsReadSource {
467              createReadSource(socket: socket, queue: queue, handler: handler)
468          }
469          if action.needsWriteSource {
470              createWriteSource(socket: socket, queue: queue, handler: handler)
471          }
472      }
473  }
474  extension _SocketSources {
475      /// Unwraps the `SocketSources`
476      ///
477      /// A `SocketSources` is stored into the multi handle's socket using
478      /// `curl_multi_assign()`. This helper unwraps it from the returned
479      /// `UnsafeMutablePointer<Void>`.
480      static func from(socketSourcePtr ptr: UnsafeMutableRawPointer?) -> _SocketSources? {
481          guard let ptr = ptr else { return nil }
482          return Unmanaged<_SocketSources>.fromOpaque(ptr).takeUnretainedValue()
483      }
484  }
485  
486  
487  internal func ==(lhs: CFURLSessionMultiCode, rhs: CFURLSessionMultiCode) -> Bool {
488      return lhs.value == rhs.value
489  }
490  internal func ~=(lhs: CFURLSessionMultiCode, rhs: CFURLSessionMultiCode) -> Bool {
491      return lhs == rhs
492  }
493  
494  extension CFURLSessionMultiCode {
495      internal func asError() throws {
496          if self == CFURLSessionMultiCodeOK { return }
497          throw NSError(domain: "libcurl.multi", code: Int(self.value))
498      }
499  }