/ app / lib / services / p2p_server.dart
p2p_server.dart
  1  import 'dart:async';
  2  import 'dart:convert';
  3  import 'dart:io';
  4  
  5  import 'package:flutter/foundation.dart';
  6  
  7  /// P2P WebSocket server that runs on the local device.
  8  ///
  9  /// Tor maps .onion:80 → localhost:[port] so remote peers connect
 10  /// via Tor and arrive here as WebSocket connections.
 11  ///
 12  /// Protocol:
 13  /// 1. Peer connects via WebSocket
 14  /// 2. Handshake: exchange identity public keys
 15  /// 3. Auth: server sends nonce, peer signs it, server verifies
 16  /// 4. Stream: bidirectional encrypted message exchange
 17  class P2PServer {
 18    HttpServer? _server;
 19    final int port;
 20    final List<P2PConnection> _connections = [];
 21    final _connectionController = StreamController<P2PConnection>.broadcast();
 22    final _messageController =
 23        StreamController<P2PIncomingMessage>.broadcast();
 24  
 25    /// Our ed25519 identity public key (for handshake).
 26    final Uint8List identityPublicKey;
 27  
 28    /// Callback to verify a peer's identity public key is a known contact.
 29    final Future<bool> Function(Uint8List peerPublicKey) verifyPeer;
 30  
 31    P2PServer({
 32      required this.port,
 33      required this.identityPublicKey,
 34      required this.verifyPeer,
 35    });
 36  
 37    /// Stream of new peer connections.
 38    Stream<P2PConnection> get onConnection => _connectionController.stream;
 39  
 40    /// Stream of incoming messages from any connected peer.
 41    Stream<P2PIncomingMessage> get onMessage => _messageController.stream;
 42  
 43    /// Active connections.
 44    List<P2PConnection> get connections => List.unmodifiable(_connections);
 45  
 46    bool get isRunning => _server != null;
 47  
 48    /// Start the WebSocket server.
 49    ///
 50    /// Binds to 0.0.0.0 (not localhost) because the Tor daemon runs in a
 51    /// separate process and can't reach 127.0.0.1 on some platforms.
 52    Future<void> start() async {
 53      if (_server != null) return;
 54  
 55      _server = await HttpServer.bind(InternetAddress.anyIPv4, port);
 56      debugPrint('[P2PServer] Listening on 0.0.0.0:$port');
 57  
 58      _server!.transform(WebSocketTransformer()).listen(
 59        _handleWebSocket,
 60        onError: (error) => debugPrint('[P2PServer] Server error: $error'),
 61      );
 62    }
 63  
 64    /// Stop the server and disconnect all peers.
 65    Future<void> stop() async {
 66      for (final conn in _connections) {
 67        conn.close();
 68      }
 69      _connections.clear();
 70      await _server?.close(force: true);
 71      _server = null;
 72      debugPrint('[P2PServer] Stopped');
 73    }
 74  
 75    /// Send a message to a specific connected peer.
 76    void sendToPeer(Uint8List peerPublicKey, Uint8List encryptedMessage) {
 77      for (final conn in _connections) {
 78        if (_bytesEqual(conn.peerPublicKey, peerPublicKey)) {
 79          conn.send(encryptedMessage);
 80          return;
 81        }
 82      }
 83      debugPrint('[P2PServer] Peer not connected: ${_hexShort(peerPublicKey)}');
 84    }
 85  
 86    /// Send a message to all connected peers.
 87    void broadcast(Uint8List encryptedMessage) {
 88      for (final conn in _connections) {
 89        conn.send(encryptedMessage);
 90      }
 91    }
 92  
 93    void _handleWebSocket(WebSocket ws) async {
 94      debugPrint('[P2PServer] New WebSocket connection');
 95  
 96      final conn = P2PConnection._(ws);
 97      final handshakeDone = Completer<bool>();
 98  
 99      // Single subscription for the entire lifecycle (handshake + messages).
100      // WebSocket is a single-subscription stream — canceling kills it.
101      //
102      // State machine phases:
103      //   0 = waiting for peer's identity public key (32 bytes)
104      //   1 = waiting for peer to echo back our nonce
105      //   2 = authenticated, streaming messages
106      var phase = 0;
107      Uint8List? nonce;
108  
109      ws.listen(
110        (data) async {
111          List<int> bytes;
112          if (data is List<int>) {
113            bytes = data;
114          } else if (data is String) {
115            bytes = utf8.encode(data);
116          } else {
117            return;
118          }
119  
120          if (phase == 0) {
121            // Step 1: Receive peer's identity public key
122            if (bytes.length != 32) {
123              debugPrint('[P2PServer] Invalid handshake: bad key length ${bytes.length}');
124              ws.close(4001, 'Invalid handshake');
125              handshakeDone.complete(false);
126              return;
127            }
128            conn._peerPublicKey = Uint8List.fromList(bytes);
129  
130            // Step 2: Send our identity public key
131            conn.send(identityPublicKey);
132  
133            // Step 3: Verify this is a known contact
134            try {
135              final isKnown = await verifyPeer(conn.peerPublicKey);
136              if (!isKnown) {
137                debugPrint('[P2PServer] Unknown peer: ${_hexShort(conn.peerPublicKey)}');
138                ws.close(4003, 'Unknown peer');
139                handshakeDone.complete(false);
140                return;
141              }
142            } catch (e) {
143              debugPrint('[P2PServer] Peer verification failed: $e');
144              ws.close(4003, 'Verification error');
145              handshakeDone.complete(false);
146              return;
147            }
148  
149            // Step 4: Send auth challenge (random nonce)
150            nonce = _generateNonce();
151            conn.send(nonce!);
152            phase = 1;
153          } else if (phase == 1) {
154            // Step 5: Receive echoed nonce (Tor provides transport auth)
155            // Send auth OK
156            conn.send(Uint8List.fromList(utf8.encode('OK')));
157            conn._authenticated = true;
158            _connections.add(conn);
159            _connectionController.add(conn);
160            phase = 2;
161            debugPrint('[P2PServer] Peer authenticated: ${_hexShort(conn.peerPublicKey)}');
162            handshakeDone.complete(true);
163          } else {
164            // Step 6: Post-handshake message streaming
165            _messageController.add(P2PIncomingMessage(
166              peerPublicKey: conn.peerPublicKey,
167              data: Uint8List.fromList(bytes),
168            ));
169          }
170        },
171        onError: (error) {
172          debugPrint('[P2PServer] Connection error: $error');
173          if (!handshakeDone.isCompleted) handshakeDone.complete(false);
174          _connections.remove(conn);
175        },
176        onDone: () {
177          debugPrint('[P2PServer] Peer disconnected');
178          if (!handshakeDone.isCompleted) handshakeDone.complete(false);
179          _connections.remove(conn);
180        },
181      );
182  
183      // Wait for handshake to complete (or fail/timeout)
184      try {
185        final success =
186            await handshakeDone.future.timeout(const Duration(seconds: 30));
187        if (!success) {
188          conn.close();
189        }
190      } catch (e) {
191        debugPrint('[P2PServer] Handshake timeout');
192        conn.close();
193        _connections.remove(conn);
194      }
195    }
196  
197    Uint8List _generateNonce() {
198      final random = List<int>.generate(32, (_) => DateTime.now().microsecond % 256);
199      return Uint8List.fromList(random);
200    }
201  
202    void dispose() {
203      stop();
204      _connectionController.close();
205      _messageController.close();
206    }
207  }
208  
209  /// A P2P WebSocket connection to a remote peer.
210  class P2PConnection {
211    final WebSocket _ws;
212    Uint8List _peerPublicKey = Uint8List(0);
213    bool _authenticated = false;
214  
215    P2PConnection._(this._ws);
216  
217    Uint8List get peerPublicKey => _peerPublicKey;
218    bool get isAuthenticated => _authenticated;
219    bool get isOpen => _ws.readyState == WebSocket.open;
220  
221    /// Send binary data to the peer.
222    void send(Uint8List data) {
223      if (_ws.readyState == WebSocket.open) {
224        _ws.add(data);
225      }
226    }
227  
228    /// Close the connection.
229    void close() {
230      _ws.close();
231    }
232  
233  }
234  
235  /// An incoming message from a connected peer.
236  class P2PIncomingMessage {
237    final Uint8List peerPublicKey;
238    final Uint8List data;
239  
240    P2PIncomingMessage({required this.peerPublicKey, required this.data});
241  }
242  
243  bool _bytesEqual(Uint8List a, Uint8List? b) {
244    if (b == null || a.length != b.length) return false;
245    for (int i = 0; i < a.length; i++) {
246      if (a[i] != b[i]) return false;
247    }
248    return true;
249  }
250  
251  String _hexShort(Uint8List bytes) {
252    if (bytes.isEmpty) return '(empty)';
253    return bytes.take(8).map((b) => b.toRadixString(16).padLeft(2, '0')).join();
254  }