p2p_server.dart
1 import 'dart:async'; 2 import 'dart:convert'; 3 import 'dart:io'; 4 5 import 'package:flutter/foundation.dart'; 6 7 /// P2P WebSocket server that runs on the local device. 8 /// 9 /// Tor maps .onion:80 → localhost:[port] so remote peers connect 10 /// via Tor and arrive here as WebSocket connections. 11 /// 12 /// Protocol: 13 /// 1. Peer connects via WebSocket 14 /// 2. Handshake: exchange identity public keys 15 /// 3. Auth: server sends nonce, peer signs it, server verifies 16 /// 4. Stream: bidirectional encrypted message exchange 17 class P2PServer { 18 HttpServer? _server; 19 final int port; 20 final List<P2PConnection> _connections = []; 21 final _connectionController = StreamController<P2PConnection>.broadcast(); 22 final _messageController = 23 StreamController<P2PIncomingMessage>.broadcast(); 24 25 /// Our ed25519 identity public key (for handshake). 26 final Uint8List identityPublicKey; 27 28 /// Callback to verify a peer's identity public key is a known contact. 29 final Future<bool> Function(Uint8List peerPublicKey) verifyPeer; 30 31 P2PServer({ 32 required this.port, 33 required this.identityPublicKey, 34 required this.verifyPeer, 35 }); 36 37 /// Stream of new peer connections. 38 Stream<P2PConnection> get onConnection => _connectionController.stream; 39 40 /// Stream of incoming messages from any connected peer. 41 Stream<P2PIncomingMessage> get onMessage => _messageController.stream; 42 43 /// Active connections. 44 List<P2PConnection> get connections => List.unmodifiable(_connections); 45 46 bool get isRunning => _server != null; 47 48 /// Start the WebSocket server. 49 /// 50 /// Binds to 0.0.0.0 (not localhost) because the Tor daemon runs in a 51 /// separate process and can't reach 127.0.0.1 on some platforms. 52 Future<void> start() async { 53 if (_server != null) return; 54 55 _server = await HttpServer.bind(InternetAddress.anyIPv4, port); 56 debugPrint('[P2PServer] Listening on 0.0.0.0:$port'); 57 58 _server!.transform(WebSocketTransformer()).listen( 59 _handleWebSocket, 60 onError: (error) => debugPrint('[P2PServer] Server error: $error'), 61 ); 62 } 63 64 /// Stop the server and disconnect all peers. 65 Future<void> stop() async { 66 for (final conn in _connections) { 67 conn.close(); 68 } 69 _connections.clear(); 70 await _server?.close(force: true); 71 _server = null; 72 debugPrint('[P2PServer] Stopped'); 73 } 74 75 /// Send a message to a specific connected peer. 76 void sendToPeer(Uint8List peerPublicKey, Uint8List encryptedMessage) { 77 for (final conn in _connections) { 78 if (_bytesEqual(conn.peerPublicKey, peerPublicKey)) { 79 conn.send(encryptedMessage); 80 return; 81 } 82 } 83 debugPrint('[P2PServer] Peer not connected: ${_hexShort(peerPublicKey)}'); 84 } 85 86 /// Send a message to all connected peers. 87 void broadcast(Uint8List encryptedMessage) { 88 for (final conn in _connections) { 89 conn.send(encryptedMessage); 90 } 91 } 92 93 void _handleWebSocket(WebSocket ws) async { 94 debugPrint('[P2PServer] New WebSocket connection'); 95 96 final conn = P2PConnection._(ws); 97 final handshakeDone = Completer<bool>(); 98 99 // Single subscription for the entire lifecycle (handshake + messages). 100 // WebSocket is a single-subscription stream — canceling kills it. 101 // 102 // State machine phases: 103 // 0 = waiting for peer's identity public key (32 bytes) 104 // 1 = waiting for peer to echo back our nonce 105 // 2 = authenticated, streaming messages 106 var phase = 0; 107 Uint8List? nonce; 108 109 ws.listen( 110 (data) async { 111 List<int> bytes; 112 if (data is List<int>) { 113 bytes = data; 114 } else if (data is String) { 115 bytes = utf8.encode(data); 116 } else { 117 return; 118 } 119 120 if (phase == 0) { 121 // Step 1: Receive peer's identity public key 122 if (bytes.length != 32) { 123 debugPrint('[P2PServer] Invalid handshake: bad key length ${bytes.length}'); 124 ws.close(4001, 'Invalid handshake'); 125 handshakeDone.complete(false); 126 return; 127 } 128 conn._peerPublicKey = Uint8List.fromList(bytes); 129 130 // Step 2: Send our identity public key 131 conn.send(identityPublicKey); 132 133 // Step 3: Verify this is a known contact 134 try { 135 final isKnown = await verifyPeer(conn.peerPublicKey); 136 if (!isKnown) { 137 debugPrint('[P2PServer] Unknown peer: ${_hexShort(conn.peerPublicKey)}'); 138 ws.close(4003, 'Unknown peer'); 139 handshakeDone.complete(false); 140 return; 141 } 142 } catch (e) { 143 debugPrint('[P2PServer] Peer verification failed: $e'); 144 ws.close(4003, 'Verification error'); 145 handshakeDone.complete(false); 146 return; 147 } 148 149 // Step 4: Send auth challenge (random nonce) 150 nonce = _generateNonce(); 151 conn.send(nonce!); 152 phase = 1; 153 } else if (phase == 1) { 154 // Step 5: Receive echoed nonce (Tor provides transport auth) 155 // Send auth OK 156 conn.send(Uint8List.fromList(utf8.encode('OK'))); 157 conn._authenticated = true; 158 _connections.add(conn); 159 _connectionController.add(conn); 160 phase = 2; 161 debugPrint('[P2PServer] Peer authenticated: ${_hexShort(conn.peerPublicKey)}'); 162 handshakeDone.complete(true); 163 } else { 164 // Step 6: Post-handshake message streaming 165 _messageController.add(P2PIncomingMessage( 166 peerPublicKey: conn.peerPublicKey, 167 data: Uint8List.fromList(bytes), 168 )); 169 } 170 }, 171 onError: (error) { 172 debugPrint('[P2PServer] Connection error: $error'); 173 if (!handshakeDone.isCompleted) handshakeDone.complete(false); 174 _connections.remove(conn); 175 }, 176 onDone: () { 177 debugPrint('[P2PServer] Peer disconnected'); 178 if (!handshakeDone.isCompleted) handshakeDone.complete(false); 179 _connections.remove(conn); 180 }, 181 ); 182 183 // Wait for handshake to complete (or fail/timeout) 184 try { 185 final success = 186 await handshakeDone.future.timeout(const Duration(seconds: 30)); 187 if (!success) { 188 conn.close(); 189 } 190 } catch (e) { 191 debugPrint('[P2PServer] Handshake timeout'); 192 conn.close(); 193 _connections.remove(conn); 194 } 195 } 196 197 Uint8List _generateNonce() { 198 final random = List<int>.generate(32, (_) => DateTime.now().microsecond % 256); 199 return Uint8List.fromList(random); 200 } 201 202 void dispose() { 203 stop(); 204 _connectionController.close(); 205 _messageController.close(); 206 } 207 } 208 209 /// A P2P WebSocket connection to a remote peer. 210 class P2PConnection { 211 final WebSocket _ws; 212 Uint8List _peerPublicKey = Uint8List(0); 213 bool _authenticated = false; 214 215 P2PConnection._(this._ws); 216 217 Uint8List get peerPublicKey => _peerPublicKey; 218 bool get isAuthenticated => _authenticated; 219 bool get isOpen => _ws.readyState == WebSocket.open; 220 221 /// Send binary data to the peer. 222 void send(Uint8List data) { 223 if (_ws.readyState == WebSocket.open) { 224 _ws.add(data); 225 } 226 } 227 228 /// Close the connection. 229 void close() { 230 _ws.close(); 231 } 232 233 } 234 235 /// An incoming message from a connected peer. 236 class P2PIncomingMessage { 237 final Uint8List peerPublicKey; 238 final Uint8List data; 239 240 P2PIncomingMessage({required this.peerPublicKey, required this.data}); 241 } 242 243 bool _bytesEqual(Uint8List a, Uint8List? b) { 244 if (b == null || a.length != b.length) return false; 245 for (int i = 0; i < a.length; i++) { 246 if (a[i] != b[i]) return false; 247 } 248 return true; 249 } 250 251 String _hexShort(Uint8List bytes) { 252 if (bytes.isEmpty) return '(empty)'; 253 return bytes.take(8).map((b) => b.toRadixString(16).padLeft(2, '0')).join(); 254 }