tunnel.py
1 import logging 2 import asyncio 3 import argparse 4 5 from . import sam 6 from . import aiosam 7 from . import utils 8 from .log import logger 9 10 BUFFER_SIZE = 65536 11 12 async def proxy_data(reader, writer): 13 """Proxy data from reader to writer""" 14 try: 15 while True: 16 data = await reader.read(BUFFER_SIZE) 17 if not data: 18 break 19 writer.write(data) 20 except Exception as e: 21 logger.debug('proxy_data_task exception {}'.format(e)) 22 finally: 23 try: 24 writer.close() 25 except RuntimeError: 26 pass 27 logger.debug('close connection') 28 29 class I2PTunnel(object): 30 """Base I2P Tunnel object, not to be used directly 31 32 :param local_address: A local address to use for a tunnel. 33 E.g. ("127.0.0.1", 6668) 34 :param destination: (optional) Destination to use for this tunnel. Can be 35 a base64 encoded string, :class:`Destination` 36 instance or None. A new destination is created when it 37 is None. 38 :param session_name: (optional) Session nick name. A new session nickname is 39 generated if not specified. 40 :param options: (optional) A dict object with i2cp options 41 :param loop: (optional) Event loop instance 42 :param sam_address: (optional) SAM API address 43 """ 44 45 def __init__(self, local_address, destination=None, session_name=None, 46 options={}, loop=None, sam_address=sam.DEFAULT_ADDRESS): 47 self.local_address = local_address 48 self.destination = destination 49 self.session_name = session_name or utils.generate_session_id() 50 self.options = options 51 self.loop = loop 52 self.sam_address = sam_address 53 54 async def _pre_run(self): 55 if not self.destination: 56 self.destination = await aiosam.new_destination( 57 sam_address=self.sam_address, loop=self.loop) 58 _, self.session_writer = await aiosam.create_session( 59 self.session_name, style=self.style, options=self.options, 60 sam_address=self.sam_address, 61 loop=self.loop, destination=self.destination) 62 63 def stop(self): 64 """Stop the tunnel""" 65 self.session_writer.close() 66 67 class ClientTunnel(I2PTunnel): 68 """Client tunnel, a subclass of tunnel.I2PTunnel 69 70 If you run a client tunnel with a local address ("127.0.0.1", 6668) and 71 a remote destination "irc.echelon.i2p", all connections to 127.0.0.1:6668 72 will be proxied to irc.echelon.i2p. 73 74 :param remote_destination: Remote I2P destination, can be either .i2p 75 domain, .b32.i2p address, base64 destination or 76 :class:`Destination` instance 77 """ 78 79 def __init__(self, remote_destination, *args, **kwargs): 80 super().__init__(*args, **kwargs) 81 self.style = "STREAM" 82 self.remote_destination = remote_destination 83 84 async def run(self): 85 """A coroutine used to run the tunnel""" 86 await self._pre_run() 87 88 self.status = { "setup_ran": False, "setup_failed": False, "exception": None, "connect_tasks": [] } 89 async def handle_client(client_reader, client_writer): 90 """Handle local client connection""" 91 try: 92 sc_task = aiosam.stream_connect( 93 self.session_name, self.remote_destination, 94 sam_address=self.sam_address, loop=self.loop) 95 self.status["connect_tasks"].append(sc_task) 96 97 remote_reader, remote_writer = await sc_task 98 asyncio.ensure_future(proxy_data(remote_reader, client_writer), 99 loop=self.loop) 100 asyncio.ensure_future(proxy_data(client_reader, remote_writer), 101 loop=self.loop) 102 103 except Exception as e: 104 self.status["setup_ran"] = True 105 self.status["setup_failed"] = True 106 self.status["exception"] = e 107 108 109 try: 110 self.server = await asyncio.start_server(handle_client, *self.local_address) 111 self.status["setup_ran"] = True 112 113 except Exception as e: 114 self.status["setup_ran"] = True 115 self.status["setup_failed"] = True 116 self.status["exception"] = e 117 118 def stop(self): 119 super().stop() 120 self.server.close() 121 122 class ServerTunnel(I2PTunnel): 123 """Server tunnel, a subclass of tunnel.I2PTunnel 124 125 If you want to expose a local service 127.0.0.1:80 to the I2P network, run 126 a server tunnel with a local address ("127.0.0.1", 80). If you don't 127 provide a private key or a session name, it will use a TRANSIENT 128 destination. 129 """ 130 def __init__(self, *args, **kwargs): 131 super().__init__(*args, **kwargs) 132 self.style = "STREAM" 133 134 async def run(self): 135 """A coroutine used to run the tunnel""" 136 await self._pre_run() 137 138 self.status = { "setup_ran": False, "setup_failed": False, "exception": None, "connect_tasks": [] } 139 async def handle_client(incoming, client_reader, client_writer): 140 try: 141 # data and dest may come in one chunk 142 dest, data = incoming.split(b"\n", 1) 143 remote_destination = sam.Destination(dest.decode()) 144 logger.debug("{} client connected: {}.b32.i2p".format( 145 self.session_name, remote_destination.base32)) 146 147 except Exception as e: 148 self.status["exception"] = e 149 self.status["setup_failed"] = True 150 data = None 151 152 try: 153 sc_task = asyncio.wait_for( 154 asyncio.open_connection( 155 host=self.local_address[0], 156 port=self.local_address[1]), 157 timeout=5) 158 self.status["connect_tasks"].append(sc_task) 159 160 remote_reader, remote_writer = await sc_task 161 if data: remote_writer.write(data) 162 asyncio.ensure_future(proxy_data(remote_reader, client_writer), 163 loop=self.loop) 164 asyncio.ensure_future(proxy_data(client_reader, remote_writer), 165 loop=self.loop) 166 167 except ConnectionRefusedError: 168 client_writer.close() 169 self.status["exception"] = e 170 self.status["setup_failed"] = True 171 172 async def server_loop(): 173 try: 174 while True: 175 client_reader, client_writer = await aiosam.stream_accept( 176 self.session_name, sam_address=self.sam_address, 177 loop=self.loop) 178 incoming = await client_reader.read(BUFFER_SIZE) 179 asyncio.ensure_future(handle_client( 180 incoming, client_reader, client_writer), loop=self.loop) 181 except asyncio.CancelledError: 182 pass 183 184 self.server_loop = asyncio.ensure_future(server_loop(), loop=self.loop) 185 self.status["setup_ran"] = True 186 187 def stop(self): 188 super().stop() 189 self.server_loop.cancel() 190 191 192 if __name__ == '__main__': 193 parser = argparse.ArgumentParser() 194 parser.add_argument('type', metavar="TYPE", choices=('server', 'client'), 195 help="Tunnel type (server or client)") 196 parser.add_argument('address', metavar="ADDRESS", 197 help="Local address (e.g. 127.0.0.1:8000)") 198 parser.add_argument('--debug', '-d', action='store_true', 199 help='Debugging') 200 parser.add_argument('--key', '-k', default='', metavar='PRIVATE_KEY', 201 help='Path to private key file') 202 parser.add_argument('--destination', '-D', default='', 203 metavar='DESTINATION', help='Remote destination') 204 args = parser.parse_args() 205 206 SAM_ADDRESS = utils.get_sam_address() 207 208 logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO) 209 loop = asyncio.get_event_loop() 210 loop.set_debug(args.debug) 211 212 if args.key: 213 destination = sam.Destination(path=args.key, has_private_key=True) 214 else: 215 destination = None 216 217 local_address = utils.address_from_string(args.address) 218 219 if args.type == "client": 220 tunnel = ClientTunnel(args.destination, local_address, loop=loop, 221 destination=destination, sam_address=SAM_ADDRESS) 222 elif args.type == "server": 223 tunnel = ServerTunnel(local_address, loop=loop, destination=destination, 224 sam_address=SAM_ADDRESS) 225 226 asyncio.ensure_future(tunnel.run(), loop=loop) 227 228 try: 229 loop.run_forever() 230 except KeyboardInterrupt: 231 tunnel.stop() 232 finally: 233 loop.stop() 234 loop.close()