/ RNS / vendor / i2plib / tunnel.py
tunnel.py
  1  import logging
  2  import asyncio
  3  import argparse
  4  
  5  from . import sam 
  6  from . import aiosam
  7  from . import utils
  8  from .log import logger
  9  
 10  BUFFER_SIZE = 65536
 11  
 12  async def proxy_data(reader, writer):
 13      """Proxy data from reader to writer"""
 14      try:
 15          while True:
 16              data = await reader.read(BUFFER_SIZE)
 17              if not data:
 18                  break
 19              writer.write(data)
 20      except Exception as e:
 21          logger.debug('proxy_data_task exception {}'.format(e))
 22      finally:
 23          try:
 24              writer.close()
 25          except RuntimeError:
 26              pass
 27          logger.debug('close connection')
 28  
 29  class I2PTunnel(object):
 30      """Base I2P Tunnel object, not to be used directly
 31  
 32      :param local_address: A local address to use for a tunnel. 
 33                            E.g. ("127.0.0.1", 6668)
 34      :param destination: (optional) Destination to use for this tunnel. Can be 
 35                          a base64 encoded string, :class:`Destination`
 36                          instance or None. A new destination is created when it
 37                          is None.
 38      :param session_name: (optional) Session nick name. A new session nickname is
 39                          generated if not specified.
 40      :param options: (optional) A dict object with i2cp options
 41      :param loop: (optional) Event loop instance
 42      :param sam_address: (optional) SAM API address
 43      """
 44  
 45      def __init__(self, local_address, destination=None, session_name=None, 
 46                   options={}, loop=None, sam_address=sam.DEFAULT_ADDRESS):
 47          self.local_address = local_address
 48          self.destination = destination
 49          self.session_name = session_name or utils.generate_session_id()
 50          self.options = options
 51          self.loop = loop
 52          self.sam_address = sam_address
 53  
 54      async def _pre_run(self):
 55          if not self.destination:
 56              self.destination = await aiosam.new_destination(
 57                  sam_address=self.sam_address, loop=self.loop)
 58          _, self.session_writer = await aiosam.create_session(
 59                  self.session_name, style=self.style, options=self.options,
 60                  sam_address=self.sam_address, 
 61                  loop=self.loop, destination=self.destination)
 62  
 63      def stop(self):
 64          """Stop the tunnel"""
 65          self.session_writer.close()
 66  
 67  class ClientTunnel(I2PTunnel):
 68      """Client tunnel, a subclass of tunnel.I2PTunnel
 69  
 70      If you run a client tunnel with a local address ("127.0.0.1", 6668) and
 71      a remote destination "irc.echelon.i2p", all connections to 127.0.0.1:6668 
 72      will be proxied to irc.echelon.i2p.
 73  
 74      :param remote_destination: Remote I2P destination, can be either .i2p 
 75                          domain, .b32.i2p address, base64 destination or 
 76                          :class:`Destination` instance
 77      """
 78  
 79      def __init__(self, remote_destination, *args, **kwargs):
 80          super().__init__(*args, **kwargs)
 81          self.style = "STREAM"
 82          self.remote_destination = remote_destination
 83  
 84      async def run(self):
 85          """A coroutine used to run the tunnel"""
 86          await self._pre_run()
 87  
 88          self.status = { "setup_ran": False, "setup_failed": False, "exception": None, "connect_tasks": [] }
 89          async def handle_client(client_reader, client_writer):
 90              """Handle local client connection"""
 91              try:
 92                  sc_task = aiosam.stream_connect(
 93                          self.session_name, self.remote_destination, 
 94                          sam_address=self.sam_address, loop=self.loop)
 95                  self.status["connect_tasks"].append(sc_task)
 96                  
 97                  remote_reader, remote_writer = await sc_task
 98                  asyncio.ensure_future(proxy_data(remote_reader, client_writer), 
 99                                        loop=self.loop)
100                  asyncio.ensure_future(proxy_data(client_reader, remote_writer),
101                                        loop=self.loop)
102  
103              except Exception as e:
104                  self.status["setup_ran"] = True
105                  self.status["setup_failed"] = True
106                  self.status["exception"] = e
107  
108  
109          try:
110              self.server = await asyncio.start_server(handle_client, *self.local_address)
111              self.status["setup_ran"] = True
112  
113          except Exception as e:
114              self.status["setup_ran"] = True
115              self.status["setup_failed"] = True
116              self.status["exception"] = e
117  
118      def stop(self):
119          super().stop()
120          self.server.close()
121  
122  class ServerTunnel(I2PTunnel):
123      """Server tunnel, a subclass of tunnel.I2PTunnel
124  
125      If you want to expose a local service 127.0.0.1:80 to the I2P network, run
126      a server tunnel with a local address ("127.0.0.1", 80). If you don't 
127      provide a private key or a session name, it will use a TRANSIENT 
128      destination.
129      """
130      def __init__(self, *args, **kwargs):
131          super().__init__(*args, **kwargs)
132          self.style = "STREAM"
133  
134      async def run(self):
135          """A coroutine used to run the tunnel"""
136          await self._pre_run()
137  
138          self.status = { "setup_ran": False, "setup_failed": False, "exception": None, "connect_tasks": [] }
139          async def handle_client(incoming, client_reader, client_writer):
140              try:
141                  # data and dest may come in one chunk
142                  dest, data = incoming.split(b"\n", 1) 
143                  remote_destination = sam.Destination(dest.decode())
144                  logger.debug("{} client connected: {}.b32.i2p".format(
145                      self.session_name, remote_destination.base32))
146              
147              except Exception as e:
148                  self.status["exception"] = e
149                  self.status["setup_failed"] = True
150                  data = None
151  
152              try:
153                  sc_task = asyncio.wait_for(
154                          asyncio.open_connection(
155                             host=self.local_address[0], 
156                             port=self.local_address[1]),
157                          timeout=5)
158                  self.status["connect_tasks"].append(sc_task)
159  
160                  remote_reader, remote_writer = await sc_task
161                  if data: remote_writer.write(data)
162                  asyncio.ensure_future(proxy_data(remote_reader, client_writer),
163                                        loop=self.loop)
164                  asyncio.ensure_future(proxy_data(client_reader, remote_writer),
165                                        loop=self.loop)
166  
167              except ConnectionRefusedError:
168                  client_writer.close()
169                  self.status["exception"] = e
170                  self.status["setup_failed"] = True
171  
172          async def server_loop():
173              try:
174                  while True:
175                      client_reader, client_writer = await aiosam.stream_accept(
176                              self.session_name, sam_address=self.sam_address, 
177                              loop=self.loop)
178                      incoming = await client_reader.read(BUFFER_SIZE)
179                      asyncio.ensure_future(handle_client(
180                          incoming, client_reader, client_writer), loop=self.loop)
181              except asyncio.CancelledError:
182                  pass
183  
184          self.server_loop = asyncio.ensure_future(server_loop(), loop=self.loop)
185          self.status["setup_ran"] = True
186  
187      def stop(self):
188          super().stop()
189          self.server_loop.cancel()
190  
191  
192  if __name__ == '__main__':
193      parser = argparse.ArgumentParser()
194      parser.add_argument('type', metavar="TYPE", choices=('server', 'client'),
195                          help="Tunnel type (server or client)")
196      parser.add_argument('address', metavar="ADDRESS", 
197                          help="Local address (e.g. 127.0.0.1:8000)")
198      parser.add_argument('--debug', '-d', action='store_true',
199                          help='Debugging')
200      parser.add_argument('--key', '-k', default='', metavar='PRIVATE_KEY',
201                          help='Path to private key file')
202      parser.add_argument('--destination', '-D', default='', 
203                          metavar='DESTINATION', help='Remote destination')
204      args = parser.parse_args()
205  
206      SAM_ADDRESS = utils.get_sam_address()
207  
208      logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
209      loop = asyncio.get_event_loop()
210      loop.set_debug(args.debug)
211  
212      if args.key:
213          destination = sam.Destination(path=args.key, has_private_key=True)
214      else:
215          destination = None
216  
217      local_address = utils.address_from_string(args.address)
218  
219      if args.type == "client":
220          tunnel = ClientTunnel(args.destination, local_address, loop=loop, 
221                  destination=destination, sam_address=SAM_ADDRESS)
222      elif args.type == "server":
223          tunnel = ServerTunnel(local_address, loop=loop, destination=destination, 
224                  sam_address=SAM_ADDRESS)
225  
226      asyncio.ensure_future(tunnel.run(), loop=loop)
227  
228      try:
229          loop.run_forever()
230      except KeyboardInterrupt:
231          tunnel.stop()
232      finally:
233          loop.stop()
234          loop.close()