MultiHandle.swift
1 // 2 // This source file is part of the Swift.org open source project 3 // 4 // Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors 5 // Licensed under Apache License v2.0 with Runtime Library Exception 6 // 7 // See http://swift.org/LICENSE.txt for license information 8 // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors 9 // 10 // ----------------------------------------------------------------------------- 11 /// 12 /// libcurl *multi handle* wrapper. 13 /// These are libcurl helpers for the URLSession API code. 14 /// - SeeAlso: https://curl.haxx.se/libcurl/c/ 15 /// - SeeAlso: URLSession.swift 16 /// 17 // ----------------------------------------------------------------------------- 18 19 #if os(macOS) || os(iOS) || os(watchOS) || os(tvOS) 20 import SwiftFoundation 21 #else 22 import Foundation 23 #endif 24 25 @_implementationOnly import CoreFoundation 26 @_implementationOnly import CFURLSessionInterface 27 import Dispatch 28 29 30 31 extension URLSession { 32 /// Minimal wrapper around [curl multi interface](https://curl.haxx.se/libcurl/c/libcurl-multi.html). 33 /// 34 /// The the *multi handle* manages the sockets for easy handles 35 /// (`_EasyHandle`), and this implementation uses 36 /// libdispatch to listen for sockets being read / write ready. 37 /// 38 /// Using `DispatchSource` allows this implementation to be 39 /// non-blocking and all code to run on the same thread / 40 /// `DispatchQueue` -- thus keeping is simple. 41 /// 42 /// - SeeAlso: _EasyHandle 43 internal final class _MultiHandle { 44 let rawHandle = CFURLSessionMultiHandleInit() 45 let queue: DispatchQueue 46 let group = DispatchGroup() 47 fileprivate var easyHandles: [_EasyHandle] = [] 48 fileprivate var timeoutSource: _TimeoutSource? = nil 49 private var reentrantInUpdateTimeoutTimer = false 50 51 init(configuration: URLSession._Configuration, workQueue: DispatchQueue) { 52 queue = DispatchQueue(label: "MultiHandle.isolation", target: workQueue) 53 setupCallbacks() 54 configure(with: configuration) 55 } 56 deinit { 57 // C.f.: <https://curl.haxx.se/libcurl/c/curl_multi_cleanup.html> 58 easyHandles.forEach { 59 try! CFURLSessionMultiHandleRemoveHandle(rawHandle, $0.rawHandle).asError() 60 } 61 try! CFURLSessionMultiHandleDeinit(rawHandle).asError() 62 } 63 } 64 } 65 66 extension URLSession._MultiHandle { 67 func configure(with configuration: URLSession._Configuration) { 68 #if !NS_CURL_MISSING_MAX_HOST_CONNECTIONS 69 try! CFURLSession_multi_setopt_l(rawHandle, CFURLSessionMultiOptionMAX_HOST_CONNECTIONS, numericCast(configuration.httpMaximumConnectionsPerHost)).asError() 70 #endif 71 72 try! CFURLSession_multi_setopt_l(rawHandle, CFURLSessionMultiOptionPIPELINING, configuration.httpShouldUsePipelining ? 3 : 2).asError() 73 //TODO: We may want to set 74 // CFURLSessionMultiOptionMAXCONNECTS 75 // CFURLSessionMultiOptionMAX_TOTAL_CONNECTIONS 76 } 77 } 78 79 fileprivate extension URLSession._MultiHandle { 80 static func from(callbackUserData userdata: UnsafeMutableRawPointer?) -> URLSession._MultiHandle? { 81 guard let userdata = userdata else { return nil } 82 return Unmanaged<URLSession._MultiHandle>.fromOpaque(userdata).takeUnretainedValue() 83 } 84 } 85 86 fileprivate extension URLSession._MultiHandle { 87 /// Forward the libcurl callbacks into Swift methods 88 func setupCallbacks() { 89 // Socket 90 try! CFURLSession_multi_setopt_ptr(rawHandle, CFURLSessionMultiOptionSOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError() 91 try! CFURLSession_multi_setopt_sf(rawHandle, CFURLSessionMultiOptionSOCKETFUNCTION) { (easyHandle: CFURLSessionEasyHandle, socket: CFURLSession_socket_t, what: Int32, userdata: UnsafeMutableRawPointer?, socketptr: UnsafeMutableRawPointer?) -> Int32 in 92 guard let handle = URLSession._MultiHandle.from(callbackUserData: userdata) else { fatalError() } 93 return handle.register(socket: socket, for: easyHandle, what: what, socketSourcePtr: socketptr) 94 }.asError() 95 // Timeout: 96 try! CFURLSession_multi_setopt_ptr(rawHandle, CFURLSessionMultiOptionTIMERDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError() 97 #if os(Windows) && (arch(arm64) || arch(x86_64)) 98 typealias CFURLSessionMultiOption = Int32 99 #else 100 typealias CFURLSessionMultiOption = Int 101 #endif 102 try! CFURLSession_multi_setopt_tf(rawHandle, CFURLSessionMultiOptionTIMERFUNCTION) { (_, timeout: CFURLSessionMultiOption, userdata: UnsafeMutableRawPointer?) -> Int32 in 103 guard let handle = URLSession._MultiHandle.from(callbackUserData: userdata) else { fatalError() } 104 handle.updateTimeoutTimer(to: numericCast(timeout)) 105 return 0 106 }.asError() 107 } 108 /// <https://curl.haxx.se/libcurl/c/CURLMOPT_SOCKETFUNCTION.html> and 109 /// <https://curl.haxx.se/libcurl/c/curl_multi_socket_action.html> 110 func register(socket: CFURLSession_socket_t, for easyHandle: CFURLSessionEasyHandle, what: Int32, socketSourcePtr: UnsafeMutableRawPointer?) -> Int32 { 111 // We get this callback whenever we need to register or unregister a 112 // given socket with libdispatch. 113 // The `action` / `what` defines if we should register or unregister 114 // that we're interested in read and/or write readiness. We will do so 115 // through libdispatch (DispatchSource) and store the source(s) inside 116 // a `SocketSources` which we in turn store inside libcurl's multi handle 117 // by means of curl_multi_assign() -- we retain the object fist. 118 let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what)) 119 var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr) 120 if socketSources == nil && action.needsSource { 121 let s = _SocketSources() 122 let p = Unmanaged.passRetained(s).toOpaque() 123 CFURLSessionMultiHandleAssign(rawHandle, socket, UnsafeMutableRawPointer(p)) 124 socketSources = s 125 } else if socketSources != nil && action == .unregister { 126 // We need to release the stored pointer: 127 if let opaque = socketSourcePtr { 128 Unmanaged<_SocketSources>.fromOpaque(opaque).release() 129 } 130 socketSources = nil 131 } 132 if let ss = socketSources { 133 let handler = DispatchWorkItem { [weak self] in 134 self?.performAction(for: socket) 135 } 136 ss.createSources(with: action, socket: socket, queue: queue, handler: handler) 137 } 138 return 0 139 } 140 141 /// What read / write ready event to register / unregister. 142 /// 143 /// This re-maps `CFURLSessionPoll` / `CURL_POLL`. 144 enum _SocketRegisterAction { 145 case none 146 case registerRead 147 case registerWrite 148 case registerReadAndWrite 149 case unregister 150 } 151 } 152 153 extension Collection where Element == _EasyHandle { 154 internal func firstIndex(of element: Element) -> Index? { 155 var i = self.startIndex 156 while i != self.endIndex { 157 if self[i] == element { return i } 158 self.formIndex(after: &i) 159 } 160 return nil 161 } 162 } 163 164 internal extension URLSession._MultiHandle { 165 /// Add an easy handle -- start its transfer. 166 func add(_ handle: _EasyHandle) { 167 // If this is the first handle being added, we need to `kick` the 168 // underlying multi handle by calling `timeoutTimerFired` as 169 // described in 170 // <https://curl.haxx.se/libcurl/c/curl_multi_socket_action.html>. 171 // That will initiate the registration for timeout timer and socket 172 // readiness. 173 let needsTimeout = self.easyHandles.isEmpty 174 self.easyHandles.append(handle) 175 try! CFURLSessionMultiHandleAddHandle(self.rawHandle, handle.rawHandle).asError() 176 if needsTimeout { 177 self.timeoutTimerFired() 178 } 179 } 180 /// Remove an easy handle -- stop its transfer. 181 func remove(_ handle: _EasyHandle) { 182 guard let idx = self.easyHandles.firstIndex(of: handle) else { 183 fatalError("Handle not in list.") 184 } 185 self.easyHandles.remove(at: idx) 186 try! CFURLSessionMultiHandleRemoveHandle(self.rawHandle, handle.rawHandle).asError() 187 } 188 } 189 190 fileprivate extension URLSession._MultiHandle { 191 /// This gets called when we should ask curl to perform action on a socket. 192 func performAction(for socket: CFURLSession_socket_t) { 193 try! readAndWriteAvailableData(on: socket) 194 } 195 /// This gets called when our timeout timer fires. 196 /// 197 /// libcurl relies on us calling curl_multi_socket_action() every now and then. 198 func timeoutTimerFired() { 199 try! readAndWriteAvailableData(on: CFURLSessionSocketTimeout) 200 } 201 /// reads/writes available data given an action 202 func readAndWriteAvailableData(on socket: CFURLSession_socket_t) throws { 203 var runningHandlesCount = Int32(0) 204 try CFURLSessionMultiHandleAction(rawHandle, socket, 0, &runningHandlesCount).asError() 205 //TODO: Do we remove the timeout timer here if / when runningHandles == 0 ? 206 readMessages() 207 } 208 209 /// Check the status of all individual transfers. 210 /// 211 /// libcurl refers to this as “read multi stack informationals”. 212 /// Check for transfers that completed. 213 func readMessages() { 214 // We pop the messages one by one in a loop: 215 repeat { 216 // count will contain the messages left in the queue 217 var count = Int32(0) 218 let info = CFURLSessionMultiHandleInfoRead(rawHandle, &count) 219 guard let handle = info.easyHandle else { break } 220 let code = info.resultCode 221 completedTransfer(forEasyHandle: handle, easyCode: code) 222 } while true 223 } 224 /// Transfer completed. 225 func completedTransfer(forEasyHandle handle: CFURLSessionEasyHandle, easyCode: CFURLSessionEasyCode) { 226 // Look up the matching wrapper: 227 guard let idx = easyHandles.firstIndex(where: { $0.rawHandle == handle }) else { 228 fatalError("Transfer completed for easy handle, but it is not in the list of added handles.") 229 } 230 let easyHandle = easyHandles[idx] 231 // Find the NSURLError code 232 var error: NSError? 233 if let errorCode = easyHandle.urlErrorCode(for: easyCode) { 234 var errorDescription: String = "" 235 if easyHandle.errorBuffer[0] == 0 { 236 let description = CFURLSessionEasyCodeDescription(easyCode)! 237 errorDescription = NSString(bytes: UnsafeMutableRawPointer(mutating: description), length: strlen(description), encoding: String.Encoding.utf8.rawValue)! as String 238 } else { 239 errorDescription = String(cString: easyHandle.errorBuffer) 240 } 241 242 error = NSError(domain: NSURLErrorDomain, code: errorCode, userInfo: [ 243 NSLocalizedDescriptionKey: errorDescription 244 ]) 245 } 246 completedTransfer(forEasyHandle: easyHandle, error: error) 247 } 248 /// Transfer completed. 249 func completedTransfer(forEasyHandle handle: _EasyHandle, error: NSError?) { 250 handle.completedTransfer(withError: error) 251 } 252 } 253 254 fileprivate extension _EasyHandle { 255 /// An error code within the `NSURLErrorDomain` based on the error of the 256 /// easy handle. 257 /// - Note: The error value is set only on failure. You can't use it to 258 /// determine *if* something failed or not, only *why* it failed. 259 func urlErrorCode(for easyCode: CFURLSessionEasyCode) -> Int? { 260 switch (easyCode, CInt(connectFailureErrno)) { 261 case (CFURLSessionEasyCodeOK, _): 262 return nil 263 case (_, ECONNREFUSED): 264 return NSURLErrorCannotConnectToHost 265 case (CFURLSessionEasyCodeUNSUPPORTED_PROTOCOL, _): 266 return NSURLErrorUnsupportedURL 267 case (CFURLSessionEasyCodeURL_MALFORMAT, _): 268 return NSURLErrorBadURL 269 case (CFURLSessionEasyCodeCOULDNT_RESOLVE_HOST, _): 270 // Oddly, this appears to happen for malformed URLs, too. 271 return NSURLErrorCannotFindHost 272 case (CFURLSessionEasyCodeRECV_ERROR, ECONNRESET): 273 return NSURLErrorNetworkConnectionLost 274 case (CFURLSessionEasyCodeSEND_ERROR, ECONNRESET): 275 return NSURLErrorNetworkConnectionLost 276 case (CFURLSessionEasyCodeGOT_NOTHING, _): 277 return NSURLErrorBadServerResponse 278 case (CFURLSessionEasyCodeABORTED_BY_CALLBACK, _): 279 return NSURLErrorUnknown // Or NSURLErrorCancelled if we're in such a state 280 case (CFURLSessionEasyCodeCOULDNT_CONNECT, ETIMEDOUT): 281 return NSURLErrorTimedOut 282 case (CFURLSessionEasyCodeOPERATION_TIMEDOUT, _): 283 return NSURLErrorTimedOut 284 default: 285 //TODO: Need to map to one of the NSURLError... constants 286 return NSURLErrorUnknown 287 } 288 } 289 } 290 291 internal func ==(lhs: CFURLSessionPoll, rhs: CFURLSessionPoll) -> Bool { 292 return lhs.value == rhs.value 293 } 294 internal func ~=(lhs: CFURLSessionPoll, rhs: CFURLSessionPoll) -> Bool { 295 return lhs == rhs 296 } 297 298 fileprivate extension URLSession._MultiHandle._SocketRegisterAction { 299 init(rawValue: CFURLSessionPoll) { 300 switch rawValue { 301 case CFURLSessionPollNone: 302 self = .none 303 case CFURLSessionPollIn: 304 self = .registerRead 305 case CFURLSessionPollOut: 306 self = .registerWrite 307 case CFURLSessionPollInOut: 308 self = .registerReadAndWrite 309 case CFURLSessionPollRemove: 310 self = .unregister 311 default: 312 fatalError("Invalid CFURLSessionPoll value.") 313 } 314 } 315 } 316 317 fileprivate extension URLSession._MultiHandle._SocketRegisterAction { 318 /// Should a libdispatch source be registered for **read** readiness? 319 var needsReadSource: Bool { 320 switch self { 321 case .none: return false 322 case .registerRead: return true 323 case .registerWrite: return false 324 case .registerReadAndWrite: return true 325 case .unregister: return false 326 } 327 } 328 /// Should a libdispatch source be registered for **write** readiness? 329 var needsWriteSource: Bool { 330 switch self { 331 case .none: return false 332 case .registerRead: return false 333 case .registerWrite: return true 334 case .registerReadAndWrite: return true 335 case .unregister: return false 336 } 337 } 338 /// Should either a **read** or a **write** readiness libdispatch source be 339 /// registered? 340 var needsSource: Bool { 341 return needsReadSource || needsWriteSource 342 } 343 } 344 345 /// A helper class that wraps a libdispatch timer. 346 /// 347 /// Used to implement the timeout of `URLSession.MultiHandle` and `URLSession.EasyHandle` 348 class _TimeoutSource { 349 let rawSource: DispatchSource 350 let milliseconds: Int 351 let queue: DispatchQueue //needed to restart the timer for EasyHandles 352 let handler: DispatchWorkItem //needed to restart the timer for EasyHandles 353 init(queue: DispatchQueue, milliseconds: Int, handler: DispatchWorkItem) { 354 self.queue = queue 355 self.handler = handler 356 self.milliseconds = milliseconds 357 self.rawSource = DispatchSource.makeTimerSource(queue: queue) as! DispatchSource 358 359 let delay = UInt64(max(1, milliseconds - 1)) 360 let start = DispatchTime.now() + DispatchTimeInterval.milliseconds(Int(delay)) 361 362 rawSource.schedule(deadline: start, repeating: .milliseconds(Int(delay)), leeway: (milliseconds == 1) ? .microseconds(Int(1)) : .milliseconds(Int(1))) 363 rawSource.setEventHandler(handler: handler) 364 rawSource.resume() 365 } 366 deinit { 367 rawSource.cancel() 368 } 369 } 370 371 fileprivate extension URLSession._MultiHandle { 372 373 /// <https://curl.haxx.se/libcurl/c/CURLMOPT_TIMERFUNCTION.html> 374 func updateTimeoutTimer(to value: Int) { 375 updateTimeoutTimer(to: _Timeout(timeout: value)) 376 } 377 378 func updateTimeoutTimer(to timeout: _Timeout) { 379 // Set up a timeout timer based on the given value: 380 switch timeout { 381 case .none: 382 timeoutSource = nil 383 case .immediate: 384 timeoutSource = nil 385 queue.async { self.timeoutTimerFired() } 386 case .milliseconds(let milliseconds): 387 if (timeoutSource == nil) || timeoutSource!.milliseconds != milliseconds { 388 //TODO: Could simply change the existing timer by using DispatchSourceTimer again. 389 let block = DispatchWorkItem { [weak self] in 390 self?.timeoutTimerFired() 391 } 392 timeoutSource = _TimeoutSource(queue: queue, milliseconds: milliseconds, handler: block) 393 } 394 } 395 } 396 enum _Timeout { 397 case milliseconds(Int) 398 case none 399 case immediate 400 } 401 } 402 403 fileprivate extension URLSession._MultiHandle._Timeout { 404 init(timeout: Int) { 405 switch timeout { 406 case -1: 407 self = .none 408 case 0: 409 self = .immediate 410 default: 411 self = .milliseconds(timeout) 412 } 413 } 414 } 415 416 417 /// Read and write libdispatch sources for a specific socket. 418 /// 419 /// A simple helper that combines two sources -- both being optional. 420 /// 421 /// This info is stored into the socket using `curl_multi_assign()`. 422 /// 423 /// - SeeAlso: URLSession.MultiHandle.SocketRegisterAction 424 fileprivate class _SocketSources { 425 var readSource: DispatchSource? 426 var writeSource: DispatchSource? 427 428 func createReadSource(socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { 429 guard readSource == nil else { return } 430 #if os(Windows) 431 let s = DispatchSource.makeReadSource(handle: HANDLE(bitPattern: Int(socket))!, queue: queue) 432 #else 433 let s = DispatchSource.makeReadSource(fileDescriptor: socket, queue: queue) 434 #endif 435 s.setEventHandler(handler: handler) 436 readSource = s as? DispatchSource 437 s.resume() 438 } 439 440 func createWriteSource(socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { 441 guard writeSource == nil else { return } 442 #if os(Windows) 443 let s = DispatchSource.makeWriteSource(handle: HANDLE(bitPattern: Int(socket))!, queue: queue) 444 #else 445 let s = DispatchSource.makeWriteSource(fileDescriptor: socket, queue: queue) 446 #endif 447 s.setEventHandler(handler: handler) 448 writeSource = s as? DispatchSource 449 s.resume() 450 } 451 452 func tearDown() { 453 if let s = readSource { 454 s.cancel() 455 } 456 readSource = nil 457 if let s = writeSource { 458 s.cancel() 459 } 460 writeSource = nil 461 } 462 } 463 extension _SocketSources { 464 /// Create a read and/or write source as specified by the action. 465 func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { 466 if action.needsReadSource { 467 createReadSource(socket: socket, queue: queue, handler: handler) 468 } 469 if action.needsWriteSource { 470 createWriteSource(socket: socket, queue: queue, handler: handler) 471 } 472 } 473 } 474 extension _SocketSources { 475 /// Unwraps the `SocketSources` 476 /// 477 /// A `SocketSources` is stored into the multi handle's socket using 478 /// `curl_multi_assign()`. This helper unwraps it from the returned 479 /// `UnsafeMutablePointer<Void>`. 480 static func from(socketSourcePtr ptr: UnsafeMutableRawPointer?) -> _SocketSources? { 481 guard let ptr = ptr else { return nil } 482 return Unmanaged<_SocketSources>.fromOpaque(ptr).takeUnretainedValue() 483 } 484 } 485 486 487 internal func ==(lhs: CFURLSessionMultiCode, rhs: CFURLSessionMultiCode) -> Bool { 488 return lhs.value == rhs.value 489 } 490 internal func ~=(lhs: CFURLSessionMultiCode, rhs: CFURLSessionMultiCode) -> Bool { 491 return lhs == rhs 492 } 493 494 extension CFURLSessionMultiCode { 495 internal func asError() throws { 496 if self == CFURLSessionMultiCodeOK { return } 497 throw NSError(domain: "libcurl.multi", code: Int(self.value)) 498 } 499 }