/ RNS / vendor / i2plib / sam.py
sam.py
  1  from base64 import b64decode, b64encode, b32encode
  2  from hashlib import sha256
  3  import struct
  4  import re
  5  
  6  
  7  I2P_B64_CHARS = "-~"
  8  
  9  def i2p_b64encode(x):
 10      """Encode I2P destination"""
 11      return b64encode(x, altchars=I2P_B64_CHARS.encode()).decode() 
 12  
 13  def i2p_b64decode(x):
 14      """Decode I2P destination"""
 15      return b64decode(x, altchars=I2P_B64_CHARS, validate=True)
 16  
 17  SAM_BUFSIZE = 4096
 18  DEFAULT_ADDRESS = ("127.0.0.1", 7656)
 19  DEFAULT_MIN_VER = "3.1"
 20  DEFAULT_MAX_VER = "3.1"
 21  TRANSIENT_DESTINATION = "TRANSIENT"
 22  
 23  VALID_BASE32_ADDRESS = re.compile(r"^([a-zA-Z0-9]{52}).b32.i2p$")
 24  VALID_BASE64_ADDRESS = re.compile(r"^([a-zA-Z0-9-~=]{516,528})$")
 25  
 26  class Message(object):
 27      """Parse SAM message to an object"""
 28      def __init__(self, s):
 29          self.opts = {}
 30          if type(s) != str:
 31              self._reply_string = s.decode().strip()
 32          else:
 33              self._reply_string = s
 34  
 35          self.cmd, self.action, opts = self._reply_string.split(" ", 2)
 36          for v in opts.split(" "):
 37              data = v.split("=", 1) if "=" in v else (v, True)
 38              self.opts[data[0]] = data[1]
 39  
 40      def __getitem__(self, key):
 41          return self.opts[key]
 42  
 43      @property
 44      def ok(self):
 45          return self["RESULT"] == "OK"
 46  
 47      def __repr__(self):
 48          return self._reply_string
 49  
 50  
 51  # SAM request messages
 52  
 53  def hello(min_version, max_version):
 54      return "HELLO VERSION MIN={} MAX={}\n".format(min_version, 
 55              max_version).encode()
 56  
 57  def session_create(style, session_id, destination, options=""):
 58      return "SESSION CREATE STYLE={} ID={} DESTINATION={} {}\n".format(
 59              style, session_id, destination, options).encode()
 60  
 61  
 62  def stream_connect(session_id, destination, silent="false"):
 63      return "STREAM CONNECT ID={} DESTINATION={} SILENT={}\n".format(
 64              session_id, destination, silent).encode()
 65  
 66  def stream_accept(session_id, silent="false"):
 67      return "STREAM ACCEPT ID={} SILENT={}\n".format(session_id, silent).encode()
 68  
 69  def stream_forward(session_id, port, options=""):
 70      return "STREAM FORWARD ID={} PORT={} {}\n".format(
 71              session_id, port, options).encode()
 72  
 73  
 74  
 75  def naming_lookup(name):
 76      return "NAMING LOOKUP NAME={}\n".format(name).encode()
 77  
 78  def dest_generate(signature_type):
 79      return "DEST GENERATE SIGNATURE_TYPE={}\n".format(signature_type).encode()
 80  
 81  class Destination(object):
 82      """I2P destination
 83  
 84      https://geti2p.net/spec/common-structures#destination
 85  
 86      :param data: (optional) Base64 encoded data or binary data 
 87      :param path: (optional) A path to a file with binary data 
 88      :param has_private_key: (optional) Does data have a private key? 
 89      """
 90  
 91      ECDSA_SHA256_P256 = 1
 92      ECDSA_SHA384_P384 = 2
 93      ECDSA_SHA512_P521 = 3
 94      EdDSA_SHA512_Ed25519 = 7
 95  
 96      default_sig_type = EdDSA_SHA512_Ed25519
 97  
 98      _pubkey_size = 256
 99      _signkey_size = 128
100      _min_cert_size = 3
101  
102      def __init__(self, data=None, path=None, has_private_key=False):
103          #: Binary destination
104          self.data = bytes() 
105          #: Base64 encoded destination
106          self.base64 = ""    
107          #: :class:`RNS.vendor.i2plib.PrivateKey` instance or None
108          self.private_key = None    
109          
110          if path:
111              with open(path, "rb") as f: data = f.read()
112  
113          if data and has_private_key:
114              self.private_key = PrivateKey(data)
115  
116              cert_len = struct.unpack("!H", self.private_key.data[385:387])[0]
117              data = self.private_key.data[:387+cert_len]
118  
119          if not data:
120              raise Exception("Can't create a destination with no data")
121  
122          self.data = data if type(data) == bytes else i2p_b64decode(data)
123          self.base64 = data if type(data) == str else i2p_b64encode(data)
124  
125      def __repr__(self):
126          return "<Destination: {}>".format(self.base32)
127  
128      @property
129      def base32(self):
130          """Base32 destination hash of this destination"""
131          desthash = sha256(self.data).digest()
132          return b32encode(desthash).decode()[:52].lower()
133      
134  class PrivateKey(object):
135      """I2P private key
136  
137      https://geti2p.net/spec/common-structures#keysandcert
138  
139      :param data: Base64 encoded data or binary data 
140      """
141  
142      def __init__(self, data):
143          #: Binary private key
144          self.data = data if type(data) == bytes else i2p_b64decode(data)
145          #: Base64 encoded private key
146          self.base64 = data if type(data) == str else i2p_b64encode(data)
147