rpc.py
1 """ 2 Type-based wrappers around our FFI functions. 3 4 These types are responsible for providing a python-like API 5 to the Arti RPC library. 6 7 TODO RPC: NOTE that these APIs are still in flux; 8 we will break them a lot before we declare them stable. 9 Don't use them in production. 10 """ 11 12 # mypy: allow-redefinition 13 14 from __future__ import annotations 15 16 # Design notes: 17 # 18 # - Every object gets a reference to the ctypes library object 19 # from the `ffi` module. 20 # We do this to better support programs that want exact control 21 # over how the library is loaded. 22 # 23 # - Exported types start with "Arti", to make imports safer. 24 25 import json 26 import logging 27 import os 28 import socket 29 import sys 30 from ctypes import POINTER, byref, c_int, _Pointer as Ptr 31 from enum import Enum 32 import arti_rpc.ffi 33 from arti_rpc.ffi import ( 34 ArtiRpcStr as FfiStr, 35 ArtiRpcError as FfiError, 36 ArtiRpcHandle as FfiHandle, 37 ArtiRpcConn as FfiConn, 38 ArtiRpcConnBuilder as FfiBuilder, 39 _ArtiRpcStatus as FfiStatus, 40 ) 41 from typing import ( 42 Optional, 43 Tuple, 44 Union, 45 ) # needed for Python 3.9, which lacks some syntax. 46 47 if os.name == "nt": 48 49 def _socket_is_valid(sock): 50 """Return true if `sock` is a valid SOCKET.""" 51 return sock != arti_rpc.ffi.INVALID_SOCKET 52 53 else: 54 55 def _socket_is_valid(sock): 56 """Return true if `sock` is a valid fd.""" 57 return sock >= 0 58 59 60 _logger = logging.getLogger(__name__) 61 62 63 class _RpcBase: 64 def __init__(self, rpc_lib): 65 self._rpc = rpc_lib 66 67 def _consume_rpc_str(self, s: Ptr[FfiStr]) -> str: 68 """ 69 Consume an ffi.ArtiRpcStr and return a python string. 70 """ 71 try: 72 bs = self._rpc.arti_rpc_str_get(s) 73 return bs.decode("utf-8") 74 finally: 75 self._rpc.arti_rpc_str_free(s) 76 77 def _handle_error(self, rv: FfiStatus, error_ptr: Ptr[FfiError]) -> None: 78 """ 79 If `(rv,error_ptr)` indicates an error, then raise that error. 80 Otherwise do nothing. 81 82 NOTE: Here we rely on the property that, 83 when there is an error in a function, 84 _only the error is actually set_. 85 (No other object was constructed and needs to be freed.) 86 """ 87 if rv != 0: 88 raise ArtiRpcError(rv, error_ptr, self._rpc) 89 elif error_ptr: 90 # This should be impossible; it indicates misbehavior on arti's part. 91 raise ArtiRpcError(rv, error_ptr, self._rpc) 92 93 94 def _into_json_str(o: Union[str, dict]) -> str: 95 """ 96 If 'o' is a dict, convert it into a json string. 97 98 Otherwise return it as-is. 99 """ 100 if isinstance(o, dict): 101 return json.dumps(o) 102 else: 103 return o 104 105 106 class _BuildEntType(Enum): 107 """ 108 Value to indicate the kind of an RPC connect point search path entry. 109 110 Returned by ArtiRpcResponse.kind(). 111 """ 112 113 LITERAL_CONNECT_POINT = 1 114 EXPANDABLE_PATH = 2 115 LITERAL_PATH = 3 116 117 118 class ArtiRpcConnBuilder(_RpcBase): 119 """ 120 A builder object used to configure connections to Arti. 121 """ 122 123 _builder: Optional[Ptr[FfiBuilder]] 124 125 def __init__(self, rpc_lib=None): 126 """ 127 Return a new ArtiR 128 """ 129 self._builder = None 130 131 if rpc_lib is None: 132 rpc_lib = arti_rpc.ffi.get_library() 133 134 _RpcBase.__init__(self, rpc_lib) 135 136 builder = POINTER(arti_rpc.ffi.ArtiRpcConnBuilder)() 137 error = POINTER(arti_rpc.ffi.ArtiRpcError)() 138 rv = self._rpc.arti_rpc_conn_builder_new(byref(builder), byref(error)) 139 self._handle_error(rv, error) 140 assert builder 141 self._builder = builder 142 143 def __del__(self): 144 if self._builder is not None: 145 self._rpc.arti_rpc_conn_builder_free(self._builder) 146 self._builder = None 147 148 def _prepend_entry(self, entrykind: _BuildEntType, entry: str) -> None: 149 """ 150 Helper: Prepend `entry` to the search path of this builder. 151 """ 152 error = POINTER(arti_rpc.ffi.ArtiRpcError)() 153 rv = self._rpc.arti_rpc_conn_builder_prepend_entry( 154 self._builder, entrykind.value, entry.encode("utf-8"), byref(error) 155 ) 156 self._handle_error(rv, error) 157 158 def prepend_literal_connect_point(self, connect_point: str) -> None: 159 """ 160 Prepend `connect_point` to this builder's search path 161 as a literal connect point. 162 """ 163 self._prepend_entry(_BuildEntType.LITERAL_CONNECT_POINT, connect_point) 164 165 def prepend_expandable_path(self, path: str) -> None: 166 """ 167 Prepend `path` to this builder's search path 168 as an expandable path (one to which Arti's variable substitution applies). 169 """ 170 self._prepend_entry(_BuildEntType.EXPANDABLE_PATH, path) 171 172 def prepend_literal_path(self, path: str) -> None: 173 """ 174 Prepend `path` to this builder's search path 175 as a literal path (one to which Arti's variable substitution does not apply). 176 """ 177 self._prepend_entry(_BuildEntType.LITERAL_PATH, path) 178 179 def connect(self) -> ArtiRpcConn: 180 """ 181 Use the settings in this builder to open a connection to Arti. 182 """ 183 conn = self._connect_inner() 184 185 return ArtiRpcConn(rpc_lib=self._rpc, _conn=conn) 186 187 def _connect_inner(self) -> Ptr[FfiConn]: 188 """ 189 Helper: Use the settings in this builder to open a connection to Arti, 190 and return a pointer to that connection. 191 """ 192 conn = POINTER(arti_rpc.ffi.ArtiRpcConn)() 193 error = POINTER(arti_rpc.ffi.ArtiRpcError)() 194 rv = self._rpc.arti_rpc_conn_builder_connect( 195 self._builder, byref(conn), byref(error) 196 ) 197 self._handle_error(rv, error) 198 assert conn 199 200 return conn 201 202 203 class ArtiRpcConn(_RpcBase): 204 """ 205 An open connection to Arti. 206 """ 207 208 _conn: Optional[Ptr[FfiConn]] 209 _session: ArtiRpcObject 210 211 def __init__(self, rpc_lib=None, _conn: Optional[Ptr[FfiConn]] = None): 212 """ 213 Try to connect to Arti using default settings. 214 215 If `rpc_lib` is specified, it must be a ctypes DLL, 216 constructed with `arti_rpc.ffi.get_library`. 217 If it's None, we use the default. 218 """ 219 self._conn = None 220 221 if rpc_lib is None: 222 rpc_lib = arti_rpc.ffi.get_library() 223 224 _RpcBase.__init__(self, rpc_lib) 225 226 if _conn is None: 227 _conn = ArtiRpcConnBuilder()._connect_inner() 228 229 self._conn = _conn 230 s = self._rpc.arti_rpc_conn_get_session_id(self._conn).decode("utf-8") 231 self._session = self.make_object(s) 232 233 def __del__(self): 234 if self._conn is not None: 235 # Note that if _conn is set, then _rpc is necessarily set. 236 self._rpc.arti_rpc_conn_free(self._conn) 237 self._conn = None 238 239 def make_object(self, object_id: str) -> ArtiRpcObject: 240 """ 241 Return an ArtiRpcObject for a given object ID on this connection. 242 243 (The `ArtiRpcObject` API is a convenience wrapper that provides 244 a more ergonomic interface to `execute` and `execute_with_handle`.) 245 """ 246 return ArtiRpcObject(object_id, self) 247 248 def session(self) -> ArtiRpcObject: 249 """ 250 Return an ArtiRpcObject for this connection's Session object. 251 252 (The Session is the root object of any RPC session; 253 by invoking methods on the session, 254 you can get the IDs for other objects.) 255 """ 256 return self._session 257 258 def execute(self, request: Union[str, dict]) -> dict: 259 """ 260 Run an RPC request on this connection. 261 262 On success, return the "response" from the RPC reply. 263 Otherwise, raise an error. 264 265 You may (and probably should) omit the `id` field from your request. 266 If you do, a new id will be automatically generated. 267 268 The request may be a string, or a dict that will be encoded 269 as a json object. 270 """ 271 msg = _into_json_str(request) 272 response = POINTER(arti_rpc.ffi.ArtiRpcStr)() 273 error = POINTER(arti_rpc.ffi.ArtiRpcError)() 274 rv = self._rpc.arti_rpc_conn_execute( 275 self._conn, msg.encode("utf-8"), byref(response), byref(error) 276 ) 277 self._handle_error(rv, error) 278 r = ArtiRpcResponse(self._consume_rpc_str(response)) 279 assert r.kind() == ArtiRpcResponseKind.RESULT 280 return r["result"] 281 282 def execute_with_handle(self, request: Union[str, dict]) -> ArtiRequestHandle: 283 """ 284 Launch an RPC request on this connection, and return a ArtiRequestHandle 285 to the open request. 286 287 This API is suitable for use when you want incremental updates 288 about the request status. 289 """ 290 msg = _into_json_str(request) 291 handle = POINTER(arti_rpc.ffi.ArtiRpcHandle)() 292 error = POINTER(arti_rpc.ffi.ArtiRpcError)() 293 rv = self._rpc.arti_rpc_conn_execute_with_handle( 294 self._conn, msg.encode("utf-8"), byref(handle), byref(error) 295 ) 296 self._handle_error(rv, error) 297 return ArtiRequestHandle(handle, self._rpc) 298 299 def open_stream( 300 self, 301 hostname: str, 302 port: int, 303 *, 304 on_object: Union[ArtiRpcObject, str, None] = None, 305 isolation: str = "", 306 want_stream_id: bool = False, 307 ) -> Tuple[socket.socket, Optional[ArtiRpcObject]]: 308 """ 309 Open an anonymized data stream to `hostname`:`port` over Arti. 310 311 If `on_object` if provided, is the client-like object which will 312 be told to open the connection. Otherwise, the session 313 will be told to open the connection. 314 315 If `isolation` is provided, the resulting stream will be configured 316 not to share a circuit with any other stream 317 having a different `isolation`. 318 319 If `want_stream_id` is true, then we register the resulting data stream 320 as an RPC object, and return it along with the resulting socket. 321 322 Caveats: TODO RPC. Copy-paste the caveats from arti-rpc-client-core, 323 once they have stabilized. 324 """ 325 hostname: bytes = hostname.encode("utf-8") 326 isolation: bytes = isolation.encode("utf-8") 327 on_object: Optional[bytes] = _opt_object_id_to_bytes(on_object) 328 if want_stream_id: 329 stream_id = POINTER(arti_rpc.ffi.ArtiRpcStr)() 330 stream_id_ptr = byref(stream_id) 331 else: 332 stream_id_ptr = None 333 sock_cint = c_int(arti_rpc.ffi.INVALID_SOCKET) 334 error = POINTER(arti_rpc.ffi.ArtiRpcError)() 335 336 rv = self._rpc.arti_rpc_conn_open_stream( 337 self._conn, 338 hostname, 339 port, 340 on_object, 341 isolation, 342 byref(sock_cint), 343 stream_id_ptr, 344 byref(error), 345 ) 346 self._handle_error(rv, error) 347 348 assert _socket_is_valid(sock_cint.value) 349 sock = socket.socket(fileno=sock_cint.value) 350 351 if want_stream_id: 352 stream_id_obj = self.make_object(self._consume_rpc_str(stream_id)) 353 return (sock, stream_id_obj) 354 else: 355 return (sock, None) 356 357 358 class ArtiRpcErrorStatus(Enum): 359 """ 360 Value return to indicate the type of an error returned by the 361 RPC library. 362 363 This may or may not correspond to an error from the RPC server. 364 365 Returned by ArtiRpcError.status_code() 366 367 See arti-rpc-client-core documentation for more information. 368 """ 369 370 SUCCESS = 0 371 INVALID_INPUT = 1 372 NOT_SUPPORTED = 2 373 CONNECT_IO = 3 374 BAD_AUTH = 4 375 PEER_PROTOCOL_VIOLATION = 5 376 SHUTDOWN = 6 377 INTERNAL = 7 378 REQUEST_FAILED = 8 379 REQUEST_COMPLETED = 9 380 PROXY_IO = 10 381 STREAM_FAILED = 11 382 NOT_AUTHENTICATED = 12 383 ALL_CONNECT_ATTEMPTS_FAILED = 13 384 CONNECT_POINT_NOT_USABLE = 14 385 BAD_CONNECT_POINT_PATH = 15 386 387 388 def _error_status_from_int(status: int) -> Union[ArtiRpcErrorStatus, int]: 389 """ 390 If `status` is a recognized member of `ArtiRpcErrorStatus`, 391 return that member. 392 Otherwise, return `status`. 393 """ 394 try: 395 return ArtiRpcErrorStatus(status) 396 except ValueError: 397 return status 398 399 400 class ArtiRpcError(Exception): 401 """ 402 An error returned by the RPC library. 403 """ 404 405 _rv: FfiStatus 406 _err: Ptr[FfiError] 407 408 def __init__(self, rv: FfiStatus, err: Ptr[FfiError], rpc): 409 self._rv = rv 410 self._err = err 411 self._rpc = rpc 412 413 def __del__(self): 414 if self._err is not None: 415 self._rpc.arti_rpc_err_free(self._err) 416 self._err = None 417 418 def __str__(self): 419 status = self._rpc.arti_rpc_status_to_str( 420 self._rpc.arti_rpc_err_status(self._err) 421 ).decode("utf-8") 422 msg = self._rpc.arti_rpc_err_message(self._err).decode("utf-8") 423 if status == msg: 424 return status 425 else: 426 return f"{status}: {msg}" 427 428 def status_code(self) -> Union[ArtiRpcErrorStatus, int]: 429 """ 430 Return the status code for this error. 431 432 This code is generated by the underlying RPC library. 433 """ 434 return _error_status_from_int(self._rpc.arti_rpc_err_status(self._err)) 435 436 def os_error_code(self) -> Optional[int]: 437 """ 438 Return the OS error code (e.g., errno) associated with this error, 439 if there is one. 440 """ 441 code = self._rpc.arti_rpc_err_os_code(self._rpc._err) 442 if code == 0: 443 return None 444 else: 445 return code 446 447 def response_str(self) -> Optional[str]: 448 """ 449 Return the RPC response string associated with this error, 450 if this error represents an error message from the RPC server. 451 """ 452 response = self._rpc.arti_rpc_err_response(self._err) 453 if response is None: 454 return None 455 else: 456 return response.decode("utf-8") 457 458 def response_obj(self) -> Optional[dict]: 459 """ 460 Return the RPC error object associated with this error, 461 if this error represents an error message from the RPC server. 462 """ 463 response = self.response_str() 464 if response is None: 465 return None 466 else: 467 return json.loads(response)["error"] 468 469 470 def _opt_object_id_to_bytes( 471 object_id: Union[ArtiRpcObject, str, None] 472 ) -> Optional[bytes]: 473 """ 474 Convert `object_id` (if it is present) to a `bytes`. 475 """ 476 if object_id is None: 477 return None 478 elif isinstance(object_id, ArtiRpcObject): 479 return object_id.id().encode("UTF-8") 480 else: 481 return object_id.encode("UTF-8") 482 483 484 class ArtiRpcObject(_RpcBase): 485 """ 486 Wrapper around an object ID and an ArtiRpcConn; 487 used to launch RPC requests ergonomically. 488 """ 489 490 _id: str 491 _conn: ArtiRpcConn 492 _owned: bool 493 _meta: Optional[dict] 494 495 def __init__(self, object_id: str, connection: ArtiRpcConn): 496 _RpcBase.__init__(self, connection._rpc) 497 self._id = object_id 498 self._conn = connection 499 self._owned = True 500 self._meta = None 501 502 def id(self) -> str: 503 """ 504 Return the ObjectId for this object. 505 """ 506 return self._id 507 508 def invoke(self, method: str, **params) -> dict: 509 """ 510 Invoke a given RPC method with a given set of parameters, 511 wait for it to complete, 512 and return its result as a json object. 513 """ 514 request = {"obj": self._id, "method": method, "params": params} 515 if self._meta is not None: 516 request["meta"] = self._meta 517 return self._conn.execute(json.dumps(request)) 518 519 def invoke_with_handle(self, method: str, **params): 520 """ 521 Invoke a given RPC method with a given set of parameters, 522 and return an RpcHandle that can be used to check its progress. 523 """ 524 request = {"obj": self._id, "method": method, "params": params} 525 if self._meta is not None: 526 request["meta"] = self._meta 527 return self._conn.execute_with_handle(json.dumps(request)) 528 529 def with_meta(self, **params) -> ArtiRpcObject: 530 """ 531 Return a helper that can be used to set meta-parameters 532 on a request made with this object. 533 534 The wrapper will support `invoke` and `invoke_with_handle`, 535 and will pass them any provided `params` given as an argument 536 to this function as meta-request parameters. 537 538 The resulting object does not have ownership on the 539 underlying RPC object. 540 """ 541 new_obj = ArtiRpcObject(self._id, self._conn) 542 new_obj._owned = False 543 if params: 544 new_obj._meta = params 545 else: 546 new_obj._meta = None 547 return new_obj 548 549 def release_ownership(self): 550 """ 551 Release ownership of the underlying RPC object. 552 553 By default, when the last reference to an ArtiRpcObject is dropped, 554 we tell the RPC server to release the corresponding RPC ObjectID. 555 After that happens, nothing else can use that ObjectID 556 (and the object may get freed on the server side, 557 if nothing else refers to it.) 558 559 Calling this method releases ownership, such that we will not 560 tell the RPC server to release the ObjectID when this object is dropped. 561 """ 562 self._owned = False 563 564 def __del__(self): 565 if self._owned and self._conn._conn is not None: 566 try: 567 self.invoke("rpc:release") 568 except ArtiRpcError: 569 _logger.warn("RPC error while deleting object", exc_info=sys.exc_info()) 570 571 572 class ArtiRpcResponseKind(Enum): 573 """ 574 Value to indicate the type of a response to an RPC request. 575 576 Returned by ArtiRpcResponse.kind(). 577 """ 578 579 RESULT = 1 580 UPDATE = 2 581 ERROR = 3 582 583 584 class ArtiRpcResponse: 585 """ 586 A response from the RPC server. 587 588 May be a successful result; 589 an incremental update; 590 or an error. 591 """ 592 593 _kind: ArtiRpcResponseKind 594 _response: str 595 _obj: dict 596 597 def __init__(self, response: str): 598 self._response = response 599 self._obj = json.loads(response) 600 601 have_result = "result" in self._obj 602 have_error = "error" in self._obj 603 have_update = "update" in self._obj 604 605 # Here we (ab)use the property that the booleans True and False 606 # can also be used as the ints 1 and 0. 607 assert have_result + have_error + have_update == 1 608 609 if have_result: 610 self._kind = ArtiRpcResponseKind.RESULT 611 elif have_error: 612 self._kind = ArtiRpcResponseKind.ERROR 613 elif have_update: 614 self._kind = ArtiRpcResponseKind.UPDATE 615 else: 616 # Unreachable. 617 assert False 618 619 def __str__(self): 620 return self._response 621 622 def __getitem__(self, key: str): 623 return self._obj[key] 624 625 def kind(self) -> ArtiRpcResponseKind: 626 """Return the kind of response that this is.""" 627 return self._kind 628 629 def error(self) -> Optional[dict]: 630 """ 631 If this is an error response, return its `error` member. 632 Otherwise return `None`. 633 """ 634 return self._obj.get("error") 635 636 def result(self) -> Optional[dict]: 637 """ 638 If this is a successful result, return its 'result' member. 639 Otherwise return `None`. 640 """ 641 return self._obj.get("result") 642 643 def update(self) -> Optional[dict]: 644 """ 645 If this is an incremental update, return its 'update' member. 646 Otherwise return `None`. 647 """ 648 return self._obj.get("update") 649 650 651 class ArtiRequestHandle(_RpcBase): 652 """ 653 Handle to a pending RPC request. 654 """ 655 656 _handle: Ptr[FfiHandle] 657 658 def __init__(self, handle: Ptr[FfiHandle], rpc): 659 _RpcBase.__init__(self, rpc) 660 self._handle = handle 661 662 def __del__(self): 663 if self._handle is not None: 664 self._rpc.arti_rpc_handle_free(self._handle) 665 self._handle = None 666 667 def wait(self) -> ArtiRpcResponse: 668 """ 669 Wait for a response (update, error, or final result) 670 on this handle. 671 672 Return the response received. 673 """ 674 response = POINTER(arti_rpc.ffi.ArtiRpcStr)() 675 responsetype = arti_rpc.ffi.ArtiRpcResponseType(0) 676 error = POINTER(arti_rpc.ffi.ArtiRpcError)() 677 rv = self._rpc.arti_rpc_handle_wait( 678 self._handle, byref(response), byref(responsetype), byref(error) 679 ) 680 self._handle_error(rv, error) 681 response_obj = ArtiRpcResponse(self._consume_rpc_str(response)) 682 expected_kind = ArtiRpcResponseKind(responsetype.value) 683 assert response_obj.kind() == expected_kind 684 return response_obj