/ python / arti_rpc / src / arti_rpc / rpc.py
rpc.py
  1  """
  2  Type-based wrappers around our FFI functions.
  3  
  4  These types are responsible for providing a python-like API
  5  to the Arti RPC library.
  6  
  7  TODO RPC: NOTE that these APIs are still in flux;
  8  we will break them a lot before we declare them stable.
  9  Don't use them in production.
 10  """
 11  
 12  # mypy: allow-redefinition
 13  
 14  from __future__ import annotations
 15  
 16  # Design notes:
 17  #
 18  # - Every object gets a reference to the ctypes library object
 19  #   from the `ffi` module.
 20  #   We do this to better support programs that want exact control
 21  #   over how the library is loaded.
 22  #
 23  # - Exported types start with "Arti", to make imports safer.
 24  
 25  import json
 26  import logging
 27  import os
 28  import socket
 29  import sys
 30  from ctypes import POINTER, byref, c_int, _Pointer as Ptr
 31  from enum import Enum
 32  import arti_rpc.ffi
 33  from arti_rpc.ffi import (
 34      ArtiRpcStr as FfiStr,
 35      ArtiRpcError as FfiError,
 36      ArtiRpcHandle as FfiHandle,
 37      ArtiRpcConn as FfiConn,
 38      ArtiRpcConnBuilder as FfiBuilder,
 39      _ArtiRpcStatus as FfiStatus,
 40  )
 41  from typing import (
 42      Optional,
 43      Tuple,
 44      Union,
 45  )  # needed for Python 3.9, which lacks some syntax.
 46  
 47  if os.name == "nt":
 48  
 49      def _socket_is_valid(sock):
 50          """Return true if `sock` is a valid SOCKET."""
 51          return sock != arti_rpc.ffi.INVALID_SOCKET
 52  
 53  else:
 54  
 55      def _socket_is_valid(sock):
 56          """Return true if `sock` is a valid fd."""
 57          return sock >= 0
 58  
 59  
 60  _logger = logging.getLogger(__name__)
 61  
 62  
 63  class _RpcBase:
 64      def __init__(self, rpc_lib):
 65          self._rpc = rpc_lib
 66  
 67      def _consume_rpc_str(self, s: Ptr[FfiStr]) -> str:
 68          """
 69          Consume an ffi.ArtiRpcStr and return a python string.
 70          """
 71          try:
 72              bs = self._rpc.arti_rpc_str_get(s)
 73              return bs.decode("utf-8")
 74          finally:
 75              self._rpc.arti_rpc_str_free(s)
 76  
 77      def _handle_error(self, rv: FfiStatus, error_ptr: Ptr[FfiError]) -> None:
 78          """
 79          If `(rv,error_ptr)` indicates an error, then raise that error.
 80          Otherwise do nothing.
 81  
 82          NOTE: Here we rely on the property that,
 83          when there is an error in a function,
 84          _only the error is actually set_.
 85          (No other object was constructed and needs to be freed.)
 86          """
 87          if rv != 0:
 88              raise ArtiRpcError(rv, error_ptr, self._rpc)
 89          elif error_ptr:
 90              # This should be impossible; it indicates misbehavior on arti's part.
 91              raise ArtiRpcError(rv, error_ptr, self._rpc)
 92  
 93  
 94  def _into_json_str(o: Union[str, dict]) -> str:
 95      """
 96      If 'o' is a dict, convert it into a json string.
 97  
 98      Otherwise return it as-is.
 99      """
100      if isinstance(o, dict):
101          return json.dumps(o)
102      else:
103          return o
104  
105  
106  class _BuildEntType(Enum):
107      """
108      Value to indicate the kind of an RPC connect point search path entry.
109  
110      Returned by ArtiRpcResponse.kind().
111      """
112  
113      LITERAL_CONNECT_POINT = 1
114      EXPANDABLE_PATH = 2
115      LITERAL_PATH = 3
116  
117  
118  class ArtiRpcConnBuilder(_RpcBase):
119      """
120      A builder object used to configure connections to Arti.
121      """
122  
123      _builder: Optional[Ptr[FfiBuilder]]
124  
125      def __init__(self, rpc_lib=None):
126          """
127          Return a new ArtiR
128          """
129          self._builder = None
130  
131          if rpc_lib is None:
132              rpc_lib = arti_rpc.ffi.get_library()
133  
134          _RpcBase.__init__(self, rpc_lib)
135  
136          builder = POINTER(arti_rpc.ffi.ArtiRpcConnBuilder)()
137          error = POINTER(arti_rpc.ffi.ArtiRpcError)()
138          rv = self._rpc.arti_rpc_conn_builder_new(byref(builder), byref(error))
139          self._handle_error(rv, error)
140          assert builder
141          self._builder = builder
142  
143      def __del__(self):
144          if self._builder is not None:
145              self._rpc.arti_rpc_conn_builder_free(self._builder)
146              self._builder = None
147  
148      def _prepend_entry(self, entrykind: _BuildEntType, entry: str) -> None:
149          """
150          Helper: Prepend `entry` to the search path of this builder.
151          """
152          error = POINTER(arti_rpc.ffi.ArtiRpcError)()
153          rv = self._rpc.arti_rpc_conn_builder_prepend_entry(
154              self._builder, entrykind.value, entry.encode("utf-8"), byref(error)
155          )
156          self._handle_error(rv, error)
157  
158      def prepend_literal_connect_point(self, connect_point: str) -> None:
159          """
160          Prepend `connect_point` to this builder's search path
161          as a literal connect point.
162          """
163          self._prepend_entry(_BuildEntType.LITERAL_CONNECT_POINT, connect_point)
164  
165      def prepend_expandable_path(self, path: str) -> None:
166          """
167          Prepend `path` to this builder's search path
168          as an expandable path (one to which Arti's variable substitution applies).
169          """
170          self._prepend_entry(_BuildEntType.EXPANDABLE_PATH, path)
171  
172      def prepend_literal_path(self, path: str) -> None:
173          """
174          Prepend `path` to this builder's search path
175          as a literal path (one to which Arti's variable substitution does not apply).
176          """
177          self._prepend_entry(_BuildEntType.LITERAL_PATH, path)
178  
179      def connect(self) -> ArtiRpcConn:
180          """
181          Use the settings in this builder to open a connection to Arti.
182          """
183          conn = self._connect_inner()
184  
185          return ArtiRpcConn(rpc_lib=self._rpc, _conn=conn)
186  
187      def _connect_inner(self) -> Ptr[FfiConn]:
188          """
189          Helper: Use the settings in this builder to open a connection to Arti,
190          and return a pointer to that connection.
191          """
192          conn = POINTER(arti_rpc.ffi.ArtiRpcConn)()
193          error = POINTER(arti_rpc.ffi.ArtiRpcError)()
194          rv = self._rpc.arti_rpc_conn_builder_connect(
195              self._builder, byref(conn), byref(error)
196          )
197          self._handle_error(rv, error)
198          assert conn
199  
200          return conn
201  
202  
203  class ArtiRpcConn(_RpcBase):
204      """
205      An open connection to Arti.
206      """
207  
208      _conn: Optional[Ptr[FfiConn]]
209      _session: ArtiRpcObject
210  
211      def __init__(self, rpc_lib=None, _conn: Optional[Ptr[FfiConn]] = None):
212          """
213          Try to connect to Arti using default settings.
214  
215          If `rpc_lib` is specified, it must be a ctypes DLL,
216          constructed with `arti_rpc.ffi.get_library`.
217          If it's None, we use the default.
218          """
219          self._conn = None
220  
221          if rpc_lib is None:
222              rpc_lib = arti_rpc.ffi.get_library()
223  
224          _RpcBase.__init__(self, rpc_lib)
225  
226          if _conn is None:
227              _conn = ArtiRpcConnBuilder()._connect_inner()
228  
229          self._conn = _conn
230          s = self._rpc.arti_rpc_conn_get_session_id(self._conn).decode("utf-8")
231          self._session = self.make_object(s)
232  
233      def __del__(self):
234          if self._conn is not None:
235              # Note that if _conn is set, then _rpc is necessarily set.
236              self._rpc.arti_rpc_conn_free(self._conn)
237              self._conn = None
238  
239      def make_object(self, object_id: str) -> ArtiRpcObject:
240          """
241          Return an ArtiRpcObject for a given object ID on this connection.
242  
243          (The `ArtiRpcObject` API is a convenience wrapper that provides
244          a more ergonomic interface to `execute` and `execute_with_handle`.)
245          """
246          return ArtiRpcObject(object_id, self)
247  
248      def session(self) -> ArtiRpcObject:
249          """
250          Return an ArtiRpcObject for this connection's Session object.
251  
252          (The Session is the root object of any RPC session;
253          by invoking methods on the session,
254          you can get the IDs for other objects.)
255          """
256          return self._session
257  
258      def execute(self, request: Union[str, dict]) -> dict:
259          """
260          Run an RPC request on this connection.
261  
262          On success, return the "response" from the RPC reply.
263          Otherwise, raise an error.
264  
265          You may (and probably should) omit the `id` field from your request.
266          If you do, a new id will be automatically generated.
267  
268          The request may be a string, or a dict that will be encoded
269          as a json object.
270          """
271          msg = _into_json_str(request)
272          response = POINTER(arti_rpc.ffi.ArtiRpcStr)()
273          error = POINTER(arti_rpc.ffi.ArtiRpcError)()
274          rv = self._rpc.arti_rpc_conn_execute(
275              self._conn, msg.encode("utf-8"), byref(response), byref(error)
276          )
277          self._handle_error(rv, error)
278          r = ArtiRpcResponse(self._consume_rpc_str(response))
279          assert r.kind() == ArtiRpcResponseKind.RESULT
280          return r["result"]
281  
282      def execute_with_handle(self, request: Union[str, dict]) -> ArtiRequestHandle:
283          """
284          Launch an RPC request on this connection, and return a ArtiRequestHandle
285          to the open request.
286  
287          This API is suitable for use when you want incremental updates
288          about the request status.
289          """
290          msg = _into_json_str(request)
291          handle = POINTER(arti_rpc.ffi.ArtiRpcHandle)()
292          error = POINTER(arti_rpc.ffi.ArtiRpcError)()
293          rv = self._rpc.arti_rpc_conn_execute_with_handle(
294              self._conn, msg.encode("utf-8"), byref(handle), byref(error)
295          )
296          self._handle_error(rv, error)
297          return ArtiRequestHandle(handle, self._rpc)
298  
299      def open_stream(
300          self,
301          hostname: str,
302          port: int,
303          *,
304          on_object: Union[ArtiRpcObject, str, None] = None,
305          isolation: str = "",
306          want_stream_id: bool = False,
307      ) -> Tuple[socket.socket, Optional[ArtiRpcObject]]:
308          """
309          Open an anonymized data stream to `hostname`:`port` over Arti.
310  
311          If `on_object` if provided, is the client-like object which will
312          be told to open the connection.  Otherwise, the session
313          will be told to open the connection.
314  
315          If `isolation` is provided, the resulting stream will be configured
316          not to share a circuit with any other stream
317          having a different `isolation`.
318  
319          If `want_stream_id` is true, then we register the resulting data stream
320          as an RPC object, and return it along with the resulting socket.
321  
322          Caveats: TODO RPC.  Copy-paste the caveats from arti-rpc-client-core,
323          once they have stabilized.
324          """
325          hostname: bytes = hostname.encode("utf-8")
326          isolation: bytes = isolation.encode("utf-8")
327          on_object: Optional[bytes] = _opt_object_id_to_bytes(on_object)
328          if want_stream_id:
329              stream_id = POINTER(arti_rpc.ffi.ArtiRpcStr)()
330              stream_id_ptr = byref(stream_id)
331          else:
332              stream_id_ptr = None
333          sock_cint = c_int(arti_rpc.ffi.INVALID_SOCKET)
334          error = POINTER(arti_rpc.ffi.ArtiRpcError)()
335  
336          rv = self._rpc.arti_rpc_conn_open_stream(
337              self._conn,
338              hostname,
339              port,
340              on_object,
341              isolation,
342              byref(sock_cint),
343              stream_id_ptr,
344              byref(error),
345          )
346          self._handle_error(rv, error)
347  
348          assert _socket_is_valid(sock_cint.value)
349          sock = socket.socket(fileno=sock_cint.value)
350  
351          if want_stream_id:
352              stream_id_obj = self.make_object(self._consume_rpc_str(stream_id))
353              return (sock, stream_id_obj)
354          else:
355              return (sock, None)
356  
357  
358  class ArtiRpcErrorStatus(Enum):
359      """
360      Value return to indicate the type of an error returned by the
361      RPC library.
362  
363      This may or may not correspond to an error from the RPC server.
364  
365      Returned by ArtiRpcError.status_code()
366  
367      See arti-rpc-client-core documentation for more information.
368      """
369  
370      SUCCESS = 0
371      INVALID_INPUT = 1
372      NOT_SUPPORTED = 2
373      CONNECT_IO = 3
374      BAD_AUTH = 4
375      PEER_PROTOCOL_VIOLATION = 5
376      SHUTDOWN = 6
377      INTERNAL = 7
378      REQUEST_FAILED = 8
379      REQUEST_COMPLETED = 9
380      PROXY_IO = 10
381      STREAM_FAILED = 11
382      NOT_AUTHENTICATED = 12
383      ALL_CONNECT_ATTEMPTS_FAILED = 13
384      CONNECT_POINT_NOT_USABLE = 14
385      BAD_CONNECT_POINT_PATH = 15
386  
387  
388  def _error_status_from_int(status: int) -> Union[ArtiRpcErrorStatus, int]:
389      """
390      If `status` is a recognized member of `ArtiRpcErrorStatus`,
391      return that member.
392      Otherwise, return `status`.
393      """
394      try:
395          return ArtiRpcErrorStatus(status)
396      except ValueError:
397          return status
398  
399  
400  class ArtiRpcError(Exception):
401      """
402      An error returned by the RPC library.
403      """
404  
405      _rv: FfiStatus
406      _err: Ptr[FfiError]
407  
408      def __init__(self, rv: FfiStatus, err: Ptr[FfiError], rpc):
409          self._rv = rv
410          self._err = err
411          self._rpc = rpc
412  
413      def __del__(self):
414          if self._err is not None:
415              self._rpc.arti_rpc_err_free(self._err)
416              self._err = None
417  
418      def __str__(self):
419          status = self._rpc.arti_rpc_status_to_str(
420              self._rpc.arti_rpc_err_status(self._err)
421          ).decode("utf-8")
422          msg = self._rpc.arti_rpc_err_message(self._err).decode("utf-8")
423          if status == msg:
424              return status
425          else:
426              return f"{status}: {msg}"
427  
428      def status_code(self) -> Union[ArtiRpcErrorStatus, int]:
429          """
430          Return the status code for this error.
431  
432          This code is generated by the underlying RPC library.
433          """
434          return _error_status_from_int(self._rpc.arti_rpc_err_status(self._err))
435  
436      def os_error_code(self) -> Optional[int]:
437          """
438          Return the OS error code (e.g., errno) associated with this error,
439          if there is one.
440          """
441          code = self._rpc.arti_rpc_err_os_code(self._rpc._err)
442          if code == 0:
443              return None
444          else:
445              return code
446  
447      def response_str(self) -> Optional[str]:
448          """
449          Return the RPC response string associated with this error,
450          if this error represents an error message from the RPC server.
451          """
452          response = self._rpc.arti_rpc_err_response(self._err)
453          if response is None:
454              return None
455          else:
456              return response.decode("utf-8")
457  
458      def response_obj(self) -> Optional[dict]:
459          """
460          Return the RPC error object associated with this error,
461          if this error represents an error message from the RPC server.
462          """
463          response = self.response_str()
464          if response is None:
465              return None
466          else:
467              return json.loads(response)["error"]
468  
469  
470  def _opt_object_id_to_bytes(
471      object_id: Union[ArtiRpcObject, str, None]
472  ) -> Optional[bytes]:
473      """
474      Convert `object_id` (if it is present) to a `bytes`.
475      """
476      if object_id is None:
477          return None
478      elif isinstance(object_id, ArtiRpcObject):
479          return object_id.id().encode("UTF-8")
480      else:
481          return object_id.encode("UTF-8")
482  
483  
484  class ArtiRpcObject(_RpcBase):
485      """
486      Wrapper around an object ID and an ArtiRpcConn;
487      used to launch RPC requests ergonomically.
488      """
489  
490      _id: str
491      _conn: ArtiRpcConn
492      _owned: bool
493      _meta: Optional[dict]
494  
495      def __init__(self, object_id: str, connection: ArtiRpcConn):
496          _RpcBase.__init__(self, connection._rpc)
497          self._id = object_id
498          self._conn = connection
499          self._owned = True
500          self._meta = None
501  
502      def id(self) -> str:
503          """
504          Return the ObjectId for this object.
505          """
506          return self._id
507  
508      def invoke(self, method: str, **params) -> dict:
509          """
510          Invoke a given RPC method with a given set of parameters,
511          wait for it to complete,
512          and return its result as a json object.
513          """
514          request = {"obj": self._id, "method": method, "params": params}
515          if self._meta is not None:
516              request["meta"] = self._meta
517          return self._conn.execute(json.dumps(request))
518  
519      def invoke_with_handle(self, method: str, **params):
520          """
521          Invoke a given RPC method with a given set of parameters,
522          and return an RpcHandle that can be used to check its progress.
523          """
524          request = {"obj": self._id, "method": method, "params": params}
525          if self._meta is not None:
526              request["meta"] = self._meta
527          return self._conn.execute_with_handle(json.dumps(request))
528  
529      def with_meta(self, **params) -> ArtiRpcObject:
530          """
531          Return a helper that can be used to set meta-parameters
532          on a request made with this object.
533  
534          The wrapper will support `invoke` and `invoke_with_handle`,
535          and will pass them any provided `params` given as an argument
536          to this function as meta-request parameters.
537  
538          The resulting object does not have ownership on the
539          underlying RPC object.
540          """
541          new_obj = ArtiRpcObject(self._id, self._conn)
542          new_obj._owned = False
543          if params:
544              new_obj._meta = params
545          else:
546              new_obj._meta = None
547          return new_obj
548  
549      def release_ownership(self):
550          """
551          Release ownership of the underlying RPC object.
552  
553          By default, when the last reference to an ArtiRpcObject is dropped,
554          we tell the RPC server to release the corresponding RPC ObjectID.
555          After that happens, nothing else can use that ObjectID
556          (and the object may get freed on the server side,
557          if nothing else refers to it.)
558  
559          Calling this method releases ownership, such that we will not
560          tell the RPC server to release the ObjectID when this object is dropped.
561          """
562          self._owned = False
563  
564      def __del__(self):
565          if self._owned and self._conn._conn is not None:
566              try:
567                  self.invoke("rpc:release")
568              except ArtiRpcError:
569                  _logger.warn("RPC error while deleting object", exc_info=sys.exc_info())
570  
571  
572  class ArtiRpcResponseKind(Enum):
573      """
574      Value to indicate the type of a response to an RPC request.
575  
576      Returned by ArtiRpcResponse.kind().
577      """
578  
579      RESULT = 1
580      UPDATE = 2
581      ERROR = 3
582  
583  
584  class ArtiRpcResponse:
585      """
586      A response from the RPC server.
587  
588      May be a successful result;
589      an incremental update;
590      or an error.
591      """
592  
593      _kind: ArtiRpcResponseKind
594      _response: str
595      _obj: dict
596  
597      def __init__(self, response: str):
598          self._response = response
599          self._obj = json.loads(response)
600  
601          have_result = "result" in self._obj
602          have_error = "error" in self._obj
603          have_update = "update" in self._obj
604  
605          # Here we (ab)use the property that the booleans True and False
606          # can also be used as the ints 1 and 0.
607          assert have_result + have_error + have_update == 1
608  
609          if have_result:
610              self._kind = ArtiRpcResponseKind.RESULT
611          elif have_error:
612              self._kind = ArtiRpcResponseKind.ERROR
613          elif have_update:
614              self._kind = ArtiRpcResponseKind.UPDATE
615          else:
616              # Unreachable.
617              assert False
618  
619      def __str__(self):
620          return self._response
621  
622      def __getitem__(self, key: str):
623          return self._obj[key]
624  
625      def kind(self) -> ArtiRpcResponseKind:
626          """Return the kind of response that this is."""
627          return self._kind
628  
629      def error(self) -> Optional[dict]:
630          """
631          If this is an error response, return its `error` member.
632          Otherwise return `None`.
633          """
634          return self._obj.get("error")
635  
636      def result(self) -> Optional[dict]:
637          """
638          If this is a successful result, return its 'result' member.
639          Otherwise return `None`.
640          """
641          return self._obj.get("result")
642  
643      def update(self) -> Optional[dict]:
644          """
645          If this is an incremental update, return its 'update' member.
646          Otherwise return `None`.
647          """
648          return self._obj.get("update")
649  
650  
651  class ArtiRequestHandle(_RpcBase):
652      """
653      Handle to a pending RPC request.
654      """
655  
656      _handle: Ptr[FfiHandle]
657  
658      def __init__(self, handle: Ptr[FfiHandle], rpc):
659          _RpcBase.__init__(self, rpc)
660          self._handle = handle
661  
662      def __del__(self):
663          if self._handle is not None:
664              self._rpc.arti_rpc_handle_free(self._handle)
665              self._handle = None
666  
667      def wait(self) -> ArtiRpcResponse:
668          """
669          Wait for a response (update, error, or final result)
670          on this handle.
671  
672          Return the response received.
673          """
674          response = POINTER(arti_rpc.ffi.ArtiRpcStr)()
675          responsetype = arti_rpc.ffi.ArtiRpcResponseType(0)
676          error = POINTER(arti_rpc.ffi.ArtiRpcError)()
677          rv = self._rpc.arti_rpc_handle_wait(
678              self._handle, byref(response), byref(responsetype), byref(error)
679          )
680          self._handle_error(rv, error)
681          response_obj = ArtiRpcResponse(self._consume_rpc_str(response))
682          expected_kind = ArtiRpcResponseKind(responsetype.value)
683          assert response_obj.kind() == expected_kind
684          return response_obj