/ core / src / storage / messages.rs
messages.rs
   1  //! Message storage for outbound queue and inbound messages.
   2  //!
   3  //! This module provides storage operations for both outbound (queued for
   4  //! delivery) and inbound (received) messages. Messages are stored with
   5  //! their encrypted form and plaintext content for display.
   6  //!
   7  //! # Outbound Messages
   8  //!
   9  //! Outbound messages go through the following lifecycle:
  10  //! - `Queued` → `Transmitting` → `Delivered` → `Acknowledged`
  11  //! - Messages can expire if not delivered within their TTL
  12  //!
  13  //! # Inbound Messages
  14  //!
  15  //! Inbound messages go through:
  16  //! - `Received` → `Read` → auto-deleted after TTL
  17  
  18  use chrono::{DateTime, Duration, TimeZone, Utc};
  19  use rusqlite::params;
  20  
  21  use crate::error::{DeadDropError, Result};
  22  use crate::protocol::generate_message_id;
  23  use crate::protocol::messages::{
  24      ContactId, ContentType, EncryptedMessage, InboundMessage, InboundState, MessageId,
  25      OutboundMessage, OutboundState, DEFAULT_AUTO_DELETE_DAYS,
  26  };
  27  use crate::storage::Database;
  28  
  29  impl Database {
  30      // =========================================================================
  31      // OUTBOUND MESSAGE OPERATIONS
  32      // =========================================================================
  33  
  34      /// Queue a new outbound message for delivery.
  35      ///
  36      /// The message should already be encrypted before calling this method.
  37      ///
  38      /// # Arguments
  39      ///
  40      /// * `message` - The outbound message to queue
  41      ///
  42      /// # Errors
  43      ///
  44      /// - `AlreadyExists` if a message with the same ID exists
  45      /// - `Database` if storage fails
  46      pub fn queue_message(&self, message: &OutboundMessage) -> Result<()> {
  47          // Serialize the encrypted message
  48          let encrypted_bytes = message.encrypted.to_bytes()?;
  49  
  50          self.connection().execute(
  51              "INSERT INTO outbound_messages (
  52                  message_id, recipient_id, content_type, plaintext,
  53                  encrypted, state, created_at, expires_at, attempts, filename, document_id, group_id
  54              ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
  55              params![
  56                  message.message_id.as_slice(),
  57                  message.recipient_id.as_slice(),
  58                  message.content_type.to_byte() as i32,
  59                  message.plaintext.as_slice(),
  60                  encrypted_bytes.as_slice(),
  61                  message.state as i32,
  62                  message.created_at.timestamp(),
  63                  message.expires_at.timestamp(),
  64                  message.attempts as i32,
  65                  message.filename.as_deref(),
  66                  message.document_id.as_ref().map(|id| id.as_slice()),
  67                  message.group_id.as_ref().map(|id| id.as_slice()),
  68              ],
  69          )?;
  70  
  71          Ok(())
  72      }
  73  
  74      /// Get an outbound message by ID.
  75      pub fn get_outbound_message(&self, message_id: &MessageId) -> Result<Option<OutboundMessage>> {
  76          let result = self.connection().query_row(
  77              "SELECT message_id, recipient_id, content_type, plaintext,
  78                      encrypted, state, created_at, expires_at, attempts, filename, document_id
  79               FROM outbound_messages WHERE message_id = ?",
  80              [message_id.as_slice()],
  81              |row| Self::outbound_from_row(row),
  82          );
  83  
  84          match result {
  85              Ok(msg) => Ok(Some(msg)),
  86              Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
  87              Err(e) => Err(DeadDropError::Database(e.to_string())),
  88          }
  89      }
  90  
  91      /// Get all queued messages for a specific contact.
  92      ///
  93      /// Returns messages that are ready for delivery (state = Queued).
  94      pub fn get_queued_for_contact(&self, contact_id: &ContactId) -> Result<Vec<OutboundMessage>> {
  95          let mut stmt = self.connection().prepare(
  96              "SELECT message_id, recipient_id, content_type, plaintext,
  97                      encrypted, state, created_at, expires_at, attempts, filename, document_id
  98               FROM outbound_messages
  99               WHERE recipient_id = ? AND state IN (?, ?)
 100               ORDER BY created_at ASC",
 101          )?;
 102  
 103          let messages = stmt
 104              .query_map(
 105                  params![contact_id.as_slice(), OutboundState::Queued as i32, OutboundState::PendingRelay as i32],
 106                  |row| Self::outbound_from_row(row),
 107              )?
 108              .collect::<std::result::Result<Vec<_>, _>>()?;
 109  
 110          Ok(messages)
 111      }
 112  
 113      /// Get all queued messages (for any contact).
 114      pub fn get_all_queued(&self) -> Result<Vec<OutboundMessage>> {
 115          let mut stmt = self.connection().prepare(
 116              "SELECT message_id, recipient_id, content_type, plaintext,
 117                      encrypted, state, created_at, expires_at, attempts, filename, document_id
 118               FROM outbound_messages
 119               WHERE state IN (?, ?)
 120               ORDER BY created_at ASC",
 121          )?;
 122  
 123          let messages = stmt
 124              .query_map(params![OutboundState::Queued as i32, OutboundState::PendingRelay as i32], |row| {
 125                  Self::outbound_from_row(row)
 126              })?
 127              .collect::<std::result::Result<Vec<_>, _>>()?;
 128  
 129          Ok(messages)
 130      }
 131  
 132      /// Get all outbound messages for a contact (any state).
 133      pub fn get_outbound_for_contact(&self, contact_id: &ContactId) -> Result<Vec<OutboundMessage>> {
 134          let mut stmt = self.connection().prepare(
 135              "SELECT message_id, recipient_id, content_type, plaintext,
 136                      encrypted, state, created_at, expires_at, attempts, filename, document_id
 137               FROM outbound_messages
 138               WHERE recipient_id = ?
 139               ORDER BY created_at DESC",
 140          )?;
 141  
 142          let messages = stmt
 143              .query_map([contact_id.as_slice()], |row| Self::outbound_from_row(row))?
 144              .collect::<std::result::Result<Vec<_>, _>>()?;
 145  
 146          Ok(messages)
 147      }
 148  
 149      /// Update the state of an outbound message.
 150      pub fn update_outbound_state(&self, message_id: &MessageId, state: OutboundState) -> Result<()> {
 151          let rows = self.connection().execute(
 152              "UPDATE outbound_messages SET state = ? WHERE message_id = ?",
 153              params![state as i32, message_id.as_slice()],
 154          )?;
 155  
 156          if rows == 0 {
 157              return Err(DeadDropError::NotFound("Message not found".to_string()));
 158          }
 159  
 160          Ok(())
 161      }
 162  
 163      /// Increment the delivery attempt counter.
 164      pub fn increment_outbound_attempts(&self, message_id: &MessageId) -> Result<u32> {
 165          self.connection().execute(
 166              "UPDATE outbound_messages SET attempts = attempts + 1 WHERE message_id = ?",
 167              [message_id.as_slice()],
 168          )?;
 169  
 170          let attempts: i32 = self.connection().query_row(
 171              "SELECT attempts FROM outbound_messages WHERE message_id = ?",
 172              [message_id.as_slice()],
 173              |row| row.get(0),
 174          )?;
 175  
 176          Ok(attempts as u32)
 177      }
 178  
 179      /// Delete expired outbound messages.
 180      ///
 181      /// Returns the number of messages deleted.
 182      pub fn delete_expired_outbound(&self) -> Result<u32> {
 183          let now = Utc::now().timestamp();
 184  
 185          let count = self.connection().execute(
 186              "DELETE FROM outbound_messages WHERE expires_at < ?",
 187              [now],
 188          )?;
 189  
 190          Ok(count as u32)
 191      }
 192  
 193      /// Delete an outbound message.
 194      pub fn delete_outbound_message(&self, message_id: &MessageId) -> Result<()> {
 195          let rows = self.connection().execute(
 196              "DELETE FROM outbound_messages WHERE message_id = ?",
 197              [message_id.as_slice()],
 198          )?;
 199  
 200          if rows == 0 {
 201              return Err(DeadDropError::NotFound("Message not found".to_string()));
 202          }
 203  
 204          Ok(())
 205      }
 206  
 207      /// Count outbound messages by state.
 208      pub fn count_outbound(&self) -> Result<OutboundCounts> {
 209          let queued: u32 = self.connection().query_row(
 210              "SELECT COUNT(*) FROM outbound_messages WHERE state = ?",
 211              [OutboundState::Queued as i32],
 212              |row| row.get(0),
 213          )?;
 214  
 215          let transmitting: u32 = self.connection().query_row(
 216              "SELECT COUNT(*) FROM outbound_messages WHERE state = ?",
 217              [OutboundState::Transmitting as i32],
 218              |row| row.get(0),
 219          )?;
 220  
 221          let delivered: u32 = self.connection().query_row(
 222              "SELECT COUNT(*) FROM outbound_messages WHERE state = ?",
 223              [OutboundState::Delivered as i32],
 224              |row| row.get(0),
 225          )?;
 226  
 227          let failed: u32 = self.connection().query_row(
 228              "SELECT COUNT(*) FROM outbound_messages WHERE state = ?",
 229              [OutboundState::Failed as i32],
 230              |row| row.get(0),
 231          )?;
 232  
 233          Ok(OutboundCounts {
 234              queued,
 235              transmitting,
 236              delivered,
 237              failed,
 238          })
 239      }
 240  
 241      /// Update delivery status of an outbound message.
 242      ///
 243      /// Records the transport used, delivery timestamp, and any error.
 244      pub fn update_delivery_status(
 245          &self,
 246          message_id: &MessageId,
 247          state: OutboundState,
 248          transport: Option<DeliveryTransport>,
 249          error: Option<&str>,
 250      ) -> Result<()> {
 251          let now = Utc::now().timestamp();
 252          let delivered_at = if state == OutboundState::Delivered || state == OutboundState::Acknowledged {
 253              Some(now)
 254          } else {
 255              None
 256          };
 257  
 258          // Only upgrade state, never downgrade (e.g. don't overwrite Acknowledged with Delivered).
 259          // Exception: PendingRelay (7) can be overridden by Delivered (3) or Acknowledged (4)
 260          // since PendingRelay is numerically higher but logically lower priority.
 261          let rows = self.connection().execute(
 262              "UPDATE outbound_messages
 263               SET state = ?, delivery_transport = COALESCE(?, delivery_transport),
 264                   delivered_at = COALESCE(delivered_at, ?), last_error = ?
 265               WHERE message_id = ? AND (state < ? OR state = ?)",
 266              params![
 267                  state as i32,
 268                  transport.map(|t| t as i32),
 269                  delivered_at,
 270                  error,
 271                  message_id.as_slice(),
 272                  state as i32,
 273                  OutboundState::PendingRelay as i32,
 274              ],
 275          )?;
 276  
 277          // rows == 0 is OK if state was already >= target (not a downgrade error)
 278          Ok(())
 279      }
 280  
 281      /// Set the delivery transport for an outbound message (only if not already set).
 282      pub fn set_outbound_transport(&self, message_id: &MessageId, transport: DeliveryTransport) -> Result<()> {
 283          self.connection().execute(
 284              "UPDATE outbound_messages SET delivery_transport = ? WHERE message_id = ? AND delivery_transport IS NULL",
 285              params![transport as i32, message_id.as_slice()],
 286          )?;
 287          Ok(())
 288      }
 289  
 290      /// Get all failed messages for retry.
 291      pub fn get_failed_messages(&self) -> Result<Vec<OutboundMessage>> {
 292          let mut stmt = self.connection().prepare(
 293              "SELECT message_id, recipient_id, content_type, plaintext,
 294                      encrypted, state, created_at, expires_at, attempts, filename, document_id
 295               FROM outbound_messages
 296               WHERE state = ?
 297               ORDER BY created_at ASC",
 298          )?;
 299  
 300          let messages = stmt
 301              .query_map([OutboundState::Failed as i32], |row| {
 302                  Self::outbound_from_row(row)
 303              })?
 304              .collect::<std::result::Result<Vec<_>, _>>()?;
 305  
 306          Ok(messages)
 307      }
 308  
 309      /// Retry a failed message by resetting its state to Queued.
 310      pub fn retry_failed_message(&self, message_id: &MessageId) -> Result<()> {
 311          let rows = self.connection().execute(
 312              "UPDATE outbound_messages
 313               SET state = ?, last_error = NULL
 314               WHERE message_id = ? AND state = ?",
 315              params![
 316                  OutboundState::Queued as i32,
 317                  message_id.as_slice(),
 318                  OutboundState::Failed as i32,
 319              ],
 320          )?;
 321  
 322          if rows == 0 {
 323              return Err(DeadDropError::NotFound(
 324                  "Message not found or not in failed state".to_string(),
 325              ));
 326          }
 327  
 328          Ok(())
 329      }
 330  
 331      /// Retry messages stuck in Transmitting state by resetting to Queued.
 332      ///
 333      /// Messages that have been in Transmitting for longer than `timeout_secs`
 334      /// are reset to Queued so another transport can pick them up.
 335      /// Max 5 retry attempts to prevent infinite loops.
 336      /// Returns the number of messages retried.
 337      pub fn retry_stuck_transmitting(&self, timeout_secs: i64) -> Result<i32> {
 338          let cutoff = Utc::now().timestamp() - timeout_secs;
 339  
 340          let rows = self.connection().execute(
 341              "UPDATE outbound_messages
 342               SET state = ?, attempts = attempts + 1, last_error = 'Retried: stuck in transmitting'
 343               WHERE state = ? AND created_at < ? AND attempts < 5",
 344              params![
 345                  OutboundState::Queued as i32,
 346                  OutboundState::Transmitting as i32,
 347                  cutoff,
 348              ],
 349          )?;
 350  
 351          Ok(rows as i32)
 352      }
 353  
 354      /// Promote outbound messages stuck in Queued to gossip relay.
 355      ///
 356      /// Finds messages that have been Queued for longer than `min_age_secs` and:
 357      /// 1. Transitions to PendingRelay if already in forwarding store
 358      /// 2. Re-stores into forwarding store + transitions if missing
 359      ///
 360      /// Returns the number of messages promoted.
 361      pub fn promote_to_gossip_relay(&self, min_age_secs: i64) -> Result<u32> {
 362          let cutoff = Utc::now().timestamp() - min_age_secs;
 363  
 364          // Phase 1: Messages already in forwarding store → PendingRelay
 365          let already_stored = self.connection().execute(
 366              "UPDATE outbound_messages
 367               SET state = ?, last_error = 'Waiting for mesh relay'
 368               WHERE state = ? AND created_at < ?
 369               AND message_id IN (SELECT message_id FROM forwarded_messages)",
 370              params![
 371                  OutboundState::PendingRelay as i32,
 372                  OutboundState::Queued as i32,
 373                  cutoff,
 374              ],
 375          )?;
 376  
 377          // Phase 2: Queued messages NOT in forwarding store → re-store + promote
 378          let mut stmt = self.connection().prepare(
 379              "SELECT o.message_id, o.encrypted, c.exchange_public
 380               FROM outbound_messages o
 381               JOIN contacts c ON o.recipient_id = c.contact_id
 382               WHERE o.state = ? AND o.created_at < ?
 383               AND o.message_id NOT IN (SELECT message_id FROM forwarded_messages)",
 384          )?;
 385  
 386          let missing: Vec<([u8; 16], Vec<u8>, [u8; 32])> = stmt
 387              .query_map(
 388                  params![OutboundState::Queued as i32, cutoff],
 389                  |row| {
 390                      let msg_id_vec: Vec<u8> = row.get(0)?;
 391                      let encrypted_bytes: Vec<u8> = row.get(1)?;
 392                      let exchange_key_vec: Vec<u8> = row.get(2)?;
 393  
 394                      let mut msg_id = [0u8; 16];
 395                      if msg_id_vec.len() == 16 {
 396                          msg_id.copy_from_slice(&msg_id_vec);
 397                      }
 398                      let mut exchange_key = [0u8; 32];
 399                      if exchange_key_vec.len() == 32 {
 400                          exchange_key.copy_from_slice(&exchange_key_vec);
 401                      }
 402  
 403                      Ok((msg_id, encrypted_bytes, exchange_key))
 404                  },
 405              )?
 406              .collect::<std::result::Result<Vec<_>, _>>()?;
 407  
 408          let mut re_stored = 0u32;
 409          for (message_id, encrypted_bytes, exchange_key) in &missing {
 410              // The encrypted column already contains bincode-serialized EncryptedMessage,
 411              // which is exactly the payload format forwarded_messages expects.
 412              let forwarded = crate::storage::forwarding::ForwardedMessage::new(
 413                  *message_id,
 414                  exchange_key,
 415                  encrypted_bytes.clone(),
 416                  None,
 417              );
 418  
 419              let stored = self.store_for_forwarding(&forwarded).unwrap_or(false);
 420              if stored {
 421                  re_stored += 1;
 422              }
 423  
 424              // Transition to PendingRelay regardless (message exists or store is full)
 425              let _ = self.connection().execute(
 426                  "UPDATE outbound_messages
 427                   SET state = ?, last_error = 'Waiting for mesh relay'
 428                   WHERE message_id = ? AND state = ?",
 429                  params![
 430                      OutboundState::PendingRelay as i32,
 431                      message_id.as_slice(),
 432                      OutboundState::Queued as i32,
 433                  ],
 434              );
 435          }
 436  
 437          Ok(already_stored as u32 + missing.len() as u32)
 438      }
 439  
 440      /// Expire stale outbound messages stuck in Queued or Transmitting state.
 441      ///
 442      /// Messages older than `max_age_secs` are marked as Failed with a timeout error.
 443      /// Returns the number of expired messages.
 444      pub fn expire_stale_outbound(&self, max_age_secs: i64) -> Result<i32> {
 445          let cutoff = Utc::now().timestamp() - max_age_secs;
 446  
 447          // Phase 1: Messages with a copy in forwarded_messages → PendingRelay
 448          // (still potentially deliverable via BLE gossip mesh)
 449          let pending_relay_count = self.connection().execute(
 450              "UPDATE outbound_messages
 451               SET state = ?, last_error = 'Waiting for mesh relay'
 452               WHERE state IN (?, ?) AND created_at < ?
 453               AND message_id IN (SELECT message_id FROM forwarded_messages)",
 454              params![
 455                  OutboundState::PendingRelay as i32,
 456                  OutboundState::Queued as i32,
 457                  OutboundState::Transmitting as i32,
 458                  cutoff,
 459              ],
 460          )?;
 461  
 462          // Phase 2: Remaining stale messages → Failed (truly undeliverable)
 463          let failed_count = self.connection().execute(
 464              "UPDATE outbound_messages
 465               SET state = ?, last_error = 'Delivery timed out'
 466               WHERE state IN (?, ?) AND created_at < ?",
 467              params![
 468                  OutboundState::Failed as i32,
 469                  OutboundState::Queued as i32,
 470                  OutboundState::Transmitting as i32,
 471                  cutoff,
 472              ],
 473          )?;
 474  
 475          Ok((pending_relay_count + failed_count) as i32)
 476      }
 477  
 478      /// Expire PendingRelay messages older than `max_age_secs`.
 479      ///
 480      /// These have exceeded the forwarding store TTL window and will never
 481      /// be delivered via gossip. Returns the number of messages marked Failed.
 482      pub fn expire_stale_pending_relay(&self, max_age_secs: i64) -> Result<i32> {
 483          let cutoff = Utc::now().timestamp() - max_age_secs;
 484          let rows = self.connection().execute(
 485              "UPDATE outbound_messages
 486               SET state = ?, last_error = 'Gossip relay timed out'
 487               WHERE state = ? AND created_at < ?",
 488              params![
 489                  OutboundState::Failed as i32,
 490                  OutboundState::PendingRelay as i32,
 491                  cutoff,
 492              ],
 493          )?;
 494          Ok(rows as i32)
 495      }
 496  
 497      /// Get delivery info for a message.
 498      pub fn get_delivery_info(
 499          &self,
 500          message_id: &MessageId,
 501      ) -> Result<Option<(Option<DeliveryTransport>, Option<DateTime<Utc>>, Option<String>)>> {
 502          let result = self.connection().query_row(
 503              "SELECT delivery_transport, delivered_at, last_error
 504               FROM outbound_messages WHERE message_id = ?",
 505              [message_id.as_slice()],
 506              |row| {
 507                  let transport_int: Option<i32> = row.get(0)?;
 508                  let delivered_at_ts: Option<i64> = row.get(1)?;
 509                  let last_error: Option<String> = row.get(2)?;
 510  
 511                  let transport = transport_int.and_then(|t| DeliveryTransport::from_byte(t as u8));
 512                  let delivered_at = delivered_at_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single());
 513  
 514                  Ok((transport, delivered_at, last_error))
 515              },
 516          );
 517  
 518          match result {
 519              Ok(info) => Ok(Some(info)),
 520              Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
 521              Err(e) => Err(DeadDropError::Database(e.to_string())),
 522          }
 523      }
 524  
 525      /// Count messages by transport type.
 526      pub fn count_by_transport(&self) -> Result<TransportStats> {
 527          let ble: u32 = self.connection().query_row(
 528              "SELECT COUNT(*) FROM outbound_messages WHERE delivery_transport = ?",
 529              [DeliveryTransport::Ble as i32],
 530              |row| row.get(0),
 531          ).unwrap_or(0);
 532  
 533          let dht: u32 = self.connection().query_row(
 534              "SELECT COUNT(*) FROM outbound_messages WHERE delivery_transport = ?",
 535              [DeliveryTransport::Dht as i32],
 536              |row| row.get(0),
 537          ).unwrap_or(0);
 538  
 539          let relay: u32 = self.connection().query_row(
 540              "SELECT COUNT(*) FROM outbound_messages WHERE delivery_transport = ?",
 541              [DeliveryTransport::Relay as i32],
 542              |row| row.get(0),
 543          ).unwrap_or(0);
 544  
 545          let iroh: u32 = self.connection().query_row(
 546              "SELECT COUNT(*) FROM outbound_messages WHERE delivery_transport = ?",
 547              [DeliveryTransport::IrohBlobs as i32],
 548              |row| row.get(0),
 549          ).unwrap_or(0);
 550  
 551          Ok(TransportStats { ble, dht, relay, iroh })
 552      }
 553  
 554      /// Helper to construct OutboundMessage from a row.
 555      fn outbound_from_row(row: &rusqlite::Row) -> rusqlite::Result<OutboundMessage> {
 556          let message_id_vec: Vec<u8> = row.get(0)?;
 557          let message_id: MessageId = message_id_vec
 558              .try_into()
 559              .map_err(|_| rusqlite::Error::InvalidColumnType(0, "message_id".to_string(), rusqlite::types::Type::Blob))?;
 560  
 561          let recipient_id_vec: Vec<u8> = row.get(1)?;
 562          let recipient_id: ContactId = recipient_id_vec
 563              .try_into()
 564              .map_err(|_| rusqlite::Error::InvalidColumnType(1, "recipient_id".to_string(), rusqlite::types::Type::Blob))?;
 565  
 566          let content_type_int: i32 = row.get(2)?;
 567          let content_type = ContentType::from_byte(content_type_int as u8)
 568              .ok_or_else(|| rusqlite::Error::InvalidColumnType(2, "content_type".to_string(), rusqlite::types::Type::Integer))?;
 569  
 570          let plaintext: Vec<u8> = row.get(3)?;
 571          let encrypted_bytes: Vec<u8> = row.get(4)?;
 572  
 573          let encrypted = EncryptedMessage::from_bytes(&encrypted_bytes)
 574              .map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Blob, Box::new(e)))?;
 575  
 576          let state_int: i32 = row.get(5)?;
 577          let state = OutboundState::from_byte(state_int as u8)
 578              .ok_or_else(|| rusqlite::Error::InvalidColumnType(5, "state".to_string(), rusqlite::types::Type::Integer))?;
 579  
 580          let created_at_ts: i64 = row.get(6)?;
 581          let created_at = Utc.timestamp_opt(created_at_ts, 0).single().unwrap_or_else(Utc::now);
 582  
 583          let expires_at_ts: i64 = row.get(7)?;
 584          let expires_at = Utc.timestamp_opt(expires_at_ts, 0).single().unwrap_or_else(Utc::now);
 585  
 586          let attempts: i32 = row.get(8)?;
 587          let filename: Option<String> = row.get(9)?;
 588          let document_id_vec: Option<Vec<u8>> = row.get(10)?;
 589          let document_id: Option<MessageId> = document_id_vec.and_then(|v| v.try_into().ok());
 590  
 591          Ok(OutboundMessage {
 592              message_id,
 593              recipient_id,
 594              content_type,
 595              plaintext,
 596              filename,
 597              encrypted,
 598              state,
 599              created_at,
 600              expires_at,
 601              attempts: attempts as u32,
 602              document_id,
 603              group_id: None, // Populated by caller when needed
 604          })
 605      }
 606  
 607      // =========================================================================
 608      // INBOUND MESSAGE OPERATIONS
 609      // =========================================================================
 610  
 611      /// Store a received inbound message.
 612      ///
 613      /// The message should already be decrypted before calling this method.
 614      pub fn store_inbound(&self, message: &InboundMessage) -> Result<()> {
 615          self.connection().execute(
 616              "INSERT INTO inbound_messages (
 617                  message_id, sender_id, content_type, plaintext, filename,
 618                  state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at, group_id
 619              ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
 620              params![
 621                  message.message_id.as_slice(),
 622                  message.sender_id.as_slice(),
 623                  message.content_type.to_byte() as i32,
 624                  message.plaintext.as_slice(),
 625                  message.filename,
 626                  message.state as i32,
 627                  message.received_at.timestamp(),
 628                  message.read_at.map(|t| t.timestamp()),
 629                  message.delete_at.timestamp(),
 630                  message.receive_transport.map(|t| t as i32),
 631                  message.disappearing_duration as i64,
 632                  message.sent_at.map(|t| t.timestamp()),
 633                  message.group_id.as_ref().map(|id| id.as_slice()),
 634              ],
 635          )?;
 636  
 637          Ok(())
 638      }
 639  
 640      /// Get an inbound message by ID.
 641      pub fn get_inbound_message(&self, message_id: &MessageId) -> Result<Option<InboundMessage>> {
 642          let result = self.connection().query_row(
 643              "SELECT message_id, sender_id, content_type, plaintext, filename,
 644                      state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at
 645               FROM inbound_messages WHERE message_id = ?",
 646              [message_id.as_slice()],
 647              |row| Self::inbound_from_row(row),
 648          );
 649  
 650          match result {
 651              Ok(msg) => Ok(Some(msg)),
 652              Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
 653              Err(e) => Err(DeadDropError::Database(e.to_string())),
 654          }
 655      }
 656  
 657      /// Get all messages from a specific contact.
 658      pub fn get_messages_from(&self, contact_id: &ContactId) -> Result<Vec<InboundMessage>> {
 659          let mut stmt = self.connection().prepare(
 660              "SELECT message_id, sender_id, content_type, plaintext, filename,
 661                      state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at
 662               FROM inbound_messages
 663               WHERE sender_id = ? AND state != ? AND group_id IS NULL
 664               ORDER BY received_at DESC",
 665          )?;
 666  
 667          let messages = stmt
 668              .query_map(
 669                  params![contact_id.as_slice(), InboundState::Deleted as i32],
 670                  |row| Self::inbound_from_row(row),
 671              )?
 672              .collect::<std::result::Result<Vec<_>, _>>()?;
 673  
 674          Ok(messages)
 675      }
 676  
 677      /// Get all unread messages.
 678      pub fn get_unread(&self) -> Result<Vec<InboundMessage>> {
 679          let mut stmt = self.connection().prepare(
 680              "SELECT message_id, sender_id, content_type, plaintext, filename,
 681                      state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at
 682               FROM inbound_messages
 683               WHERE state = ?
 684               ORDER BY received_at DESC",
 685          )?;
 686  
 687          let messages = stmt
 688              .query_map([InboundState::Received as i32], |row| {
 689                  Self::inbound_from_row(row)
 690              })?
 691              .collect::<std::result::Result<Vec<_>, _>>()?;
 692  
 693          Ok(messages)
 694      }
 695  
 696      /// Mark a message as read.
 697      ///
 698      /// If the message has a disappearing_duration > 0, sets delete_at to
 699      /// read_at + duration. Otherwise uses the default 7-day auto-delete.
 700      pub fn mark_message_read(&self, message_id: &MessageId) -> Result<()> {
 701          let now = Utc::now().timestamp();
 702  
 703          // Check if this message has a disappearing duration
 704          let disappearing_duration: i64 = self.connection().query_row(
 705              "SELECT COALESCE(disappearing_duration, 0) FROM inbound_messages WHERE message_id = ?",
 706              [message_id.as_slice()],
 707              |row| row.get(0),
 708          ).unwrap_or(0);
 709  
 710          let delete_at = if disappearing_duration > 0 {
 711              now + disappearing_duration
 712          } else {
 713              (Utc::now() + Duration::days(DEFAULT_AUTO_DELETE_DAYS)).timestamp()
 714          };
 715  
 716          let rows = self.connection().execute(
 717              "UPDATE inbound_messages
 718               SET state = ?, read_at = ?, delete_at = ?
 719               WHERE message_id = ? AND state = ?",
 720              params![
 721                  InboundState::Read as i32,
 722                  now,
 723                  delete_at,
 724                  message_id.as_slice(),
 725                  InboundState::Received as i32,
 726              ],
 727          )?;
 728  
 729          if rows == 0 {
 730              // Check if message exists but is already read
 731              let exists: u32 = self.connection().query_row(
 732                  "SELECT COUNT(*) FROM inbound_messages WHERE message_id = ?",
 733                  [message_id.as_slice()],
 734                  |row| row.get(0),
 735              )?;
 736  
 737              if exists == 0 {
 738                  return Err(DeadDropError::NotFound("Message not found".to_string()));
 739              }
 740              // Message exists but was already read - that's fine
 741          }
 742  
 743          Ok(())
 744      }
 745  
 746      /// Delete an inbound message (mark as deleted).
 747      pub fn delete_inbound_message(&self, message_id: &MessageId) -> Result<()> {
 748          let rows = self.connection().execute(
 749              "UPDATE inbound_messages SET state = ? WHERE message_id = ?",
 750              params![InboundState::Deleted as i32, message_id.as_slice()],
 751          )?;
 752  
 753          if rows == 0 {
 754              return Err(DeadDropError::NotFound("Message not found".to_string()));
 755          }
 756  
 757          Ok(())
 758      }
 759  
 760      /// Permanently delete an inbound message from the database.
 761      pub fn purge_inbound_message(&self, message_id: &MessageId) -> Result<()> {
 762          let rows = self.connection().execute(
 763              "DELETE FROM inbound_messages WHERE message_id = ?",
 764              [message_id.as_slice()],
 765          )?;
 766  
 767          if rows == 0 {
 768              return Err(DeadDropError::NotFound("Message not found".to_string()));
 769          }
 770  
 771          Ok(())
 772      }
 773  
 774      /// Clear all messages for a contact (hard delete).
 775      pub fn clear_conversation(&self, contact_id: &ContactId) -> Result<u32> {
 776          let conn = self.connection();
 777          let outbound = conn.execute(
 778              "DELETE FROM outbound_messages WHERE recipient_id = ?",
 779              [contact_id.as_slice()],
 780          )?;
 781          let inbound = conn.execute(
 782              "DELETE FROM inbound_messages WHERE sender_id = ?",
 783              [contact_id.as_slice()],
 784          )?;
 785          let _ = conn.execute(
 786              "DELETE FROM document_chunks WHERE contact_id = ?",
 787              [contact_id.as_slice()],
 788          );
 789          Ok((outbound + inbound) as u32)
 790      }
 791  
 792      /// Delete messages past their auto-delete time.
 793      ///
 794      /// Returns the number of messages deleted.
 795      pub fn delete_expired_inbound(&self) -> Result<u32> {
 796          let now = Utc::now().timestamp();
 797  
 798          let count = self.connection().execute(
 799              "DELETE FROM inbound_messages WHERE delete_at < ?",
 800              [now],
 801          )?;
 802  
 803          Ok(count as u32)
 804      }
 805  
 806      /// Count unread messages.
 807      pub fn count_unread(&self) -> Result<u32> {
 808          let count: u32 = self.connection().query_row(
 809              "SELECT COUNT(*) FROM inbound_messages WHERE state = ? AND group_id IS NULL",
 810              [InboundState::Received as i32],
 811              |row| row.get(0),
 812          )?;
 813          Ok(count)
 814      }
 815  
 816      /// Count unread messages from a specific contact.
 817      pub fn count_unread_from(&self, contact_id: &ContactId) -> Result<u32> {
 818          let count: u32 = self.connection().query_row(
 819              "SELECT COUNT(*) FROM inbound_messages WHERE sender_id = ? AND state = ? AND group_id IS NULL",
 820              params![contact_id.as_slice(), InboundState::Received as i32],
 821              |row| row.get(0),
 822          )?;
 823          Ok(count)
 824      }
 825  
 826      /// Helper to construct InboundMessage from a row.
 827      /// Expects columns: message_id, sender_id, content_type, plaintext, filename,
 828      ///                  state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at
 829      fn inbound_from_row(row: &rusqlite::Row) -> rusqlite::Result<InboundMessage> {
 830          let message_id_vec: Vec<u8> = row.get(0)?;
 831          let message_id: MessageId = message_id_vec
 832              .try_into()
 833              .map_err(|_| rusqlite::Error::InvalidColumnType(0, "message_id".to_string(), rusqlite::types::Type::Blob))?;
 834  
 835          let sender_id_vec: Vec<u8> = row.get(1)?;
 836          let sender_id: ContactId = sender_id_vec
 837              .try_into()
 838              .map_err(|_| rusqlite::Error::InvalidColumnType(1, "sender_id".to_string(), rusqlite::types::Type::Blob))?;
 839  
 840          let content_type_int: i32 = row.get(2)?;
 841          let content_type = ContentType::from_byte(content_type_int as u8)
 842              .ok_or_else(|| rusqlite::Error::InvalidColumnType(2, "content_type".to_string(), rusqlite::types::Type::Integer))?;
 843  
 844          let plaintext: Vec<u8> = row.get(3)?;
 845          let filename: Option<String> = row.get(4)?;
 846  
 847          let state_int: i32 = row.get(5)?;
 848          let state = InboundState::from_byte(state_int as u8)
 849              .ok_or_else(|| rusqlite::Error::InvalidColumnType(5, "state".to_string(), rusqlite::types::Type::Integer))?;
 850  
 851          let received_at_ts: i64 = row.get(6)?;
 852          let received_at = Utc.timestamp_opt(received_at_ts, 0).single().unwrap_or_else(Utc::now);
 853  
 854          let read_at_ts: Option<i64> = row.get(7)?;
 855          let read_at = read_at_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single());
 856  
 857          let delete_at_ts: i64 = row.get(8)?;
 858          let delete_at = Utc.timestamp_opt(delete_at_ts, 0).single().unwrap_or_else(Utc::now);
 859  
 860          let receive_transport: Option<i32> = row.get(9)?;
 861          let disappearing_duration: i64 = row.get::<_, Option<i64>>(10)?.unwrap_or(0);
 862          let sent_at_ts: Option<i64> = row.get(11)?;
 863          let sent_at = sent_at_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single());
 864  
 865          Ok(InboundMessage {
 866              message_id,
 867              sender_id,
 868              content_type,
 869              plaintext,
 870              filename,
 871              state,
 872              received_at,
 873              read_at,
 874              delete_at,
 875              receive_transport: receive_transport.map(|t| t as u8),
 876              disappearing_duration: disappearing_duration as u64,
 877              sent_at,
 878              group_id: None, // Populated by caller when needed
 879          })
 880      }
 881  
 882      // =========================================================================
 883      // CONVERSATION VIEW
 884      // =========================================================================
 885  
 886      /// Get a conversation with a contact (both inbound and outbound).
 887      ///
 888      /// Returns messages ordered by timestamp (newest first).
 889      pub fn get_conversation(&self, contact_id: &ContactId) -> Result<Vec<ConversationMessage>> {
 890          let mut messages = Vec::new();
 891  
 892          // Get outbound messages with delivery info (exclude group messages)
 893          let mut stmt = self.connection().prepare(
 894              "SELECT message_id, recipient_id, content_type, plaintext,
 895                      encrypted, state, created_at, expires_at, attempts, filename, document_id,
 896                      delivery_transport, delivered_at, last_error
 897               FROM outbound_messages
 898               WHERE recipient_id = ? AND group_id IS NULL
 899               ORDER BY created_at DESC",
 900          )?;
 901  
 902          let outbound_messages = stmt
 903              .query_map([contact_id.as_slice()], |row| {
 904                  let msg = Self::outbound_from_row(row)?;
 905  
 906                  // Get delivery info from additional columns (indices 11, 12, 13)
 907                  let transport_int: Option<i32> = row.get(11)?;
 908                  let delivered_at_ts: Option<i64> = row.get(12)?;
 909                  let last_error: Option<String> = row.get(13)?;
 910  
 911                  let transport = transport_int.and_then(|t| DeliveryTransport::from_byte(t as u8));
 912                  let delivered_at = delivered_at_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single());
 913  
 914                  Ok((msg, transport, delivered_at, last_error))
 915              })?
 916              .collect::<std::result::Result<Vec<_>, _>>()?;
 917  
 918          for (msg, transport, delivered_at, last_error) in outbound_messages {
 919              // Derive disappearing_duration from the message's own expires_at.
 920              // Messages sent with disappearing enabled have a short TTL (seconds/hours),
 921              // while normal messages have DEFAULT_MESSAGE_EXPIRY_DAYS (30 days).
 922              let ttl_secs = (msg.expires_at - msg.created_at).num_seconds().max(0) as u64;
 923              let is_disappearing = ttl_secs < 86400 * 2; // Less than 2 days = disappearing
 924              messages.push(ConversationMessage {
 925                  message_id: msg.message_id,
 926                  is_outbound: true,
 927                  content_type: msg.content_type,
 928                  plaintext: msg.plaintext,
 929                  filename: msg.filename,
 930                  timestamp: msg.created_at,
 931                  outbound_state: Some(msg.state),
 932                  inbound_state: None,
 933                  delivery_transport: transport,
 934                  delivered_at,
 935                  last_error,
 936                  disappearing_duration: if is_disappearing { ttl_secs } else { 0 },
 937                  sender_id: None,
 938                  group_id: None,
 939              });
 940          }
 941  
 942          // Get inbound messages
 943          let inbound = self.get_messages_from(contact_id)?;
 944          for msg in inbound {
 945              // Convert receive_transport to DeliveryTransport for display
 946              let transport = msg.receive_transport.and_then(DeliveryTransport::from_byte);
 947  
 948              messages.push(ConversationMessage {
 949                  message_id: msg.message_id,
 950                  is_outbound: false,
 951                  content_type: msg.content_type,
 952                  plaintext: msg.plaintext.clone(),
 953                  filename: msg.filename.clone(),
 954                  timestamp: msg.sent_at.unwrap_or(msg.received_at),
 955                  outbound_state: None,
 956                  inbound_state: Some(msg.state),
 957                  delivery_transport: transport, // Transport used to receive the message
 958                  delivered_at: None,
 959                  disappearing_duration: msg.disappearing_duration,
 960                  last_error: None,
 961                  sender_id: Some(msg.sender_id),
 962                  group_id: None,
 963              });
 964          }
 965  
 966          // Sort by timestamp (newest first)
 967          messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
 968  
 969          Ok(messages)
 970      }
 971  }
 972  
 973  /// Outbound message counts by state.
 974  #[derive(Debug, Clone)]
 975  pub struct OutboundCounts {
 976      /// Messages waiting for delivery.
 977      pub queued: u32,
 978      /// Messages currently being transmitted.
 979      pub transmitting: u32,
 980      /// Messages delivered but not acknowledged.
 981      pub delivered: u32,
 982      /// Messages that failed to deliver.
 983      pub failed: u32,
 984  }
 985  
 986  /// Transport type used for message delivery.
 987  #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 988  #[repr(u8)]
 989  pub enum DeliveryTransport {
 990      /// Bluetooth Low Energy.
 991      Ble = 0,
 992      /// DHT (Distributed Hash Table).
 993      Dht = 1,
 994      /// Relay server.
 995      Relay = 2,
 996      /// Tor P2P (direct .onion connection).
 997      Tor = 3,
 998      /// Iroh (QUIC-based P2P with NAT traversal).
 999      IrohBlobs = 4,
1000      /// BLE Gossip (store-and-forward mesh relay).
1001      BleGossip = 5,
1002  }
1003  
1004  impl DeliveryTransport {
1005      /// Convert from byte.
1006      pub fn from_byte(byte: u8) -> Option<Self> {
1007          match byte {
1008              0 => Some(DeliveryTransport::Ble),
1009              1 => Some(DeliveryTransport::Dht),
1010              2 => Some(DeliveryTransport::Relay),
1011              3 => Some(DeliveryTransport::Tor),
1012              4 => Some(DeliveryTransport::IrohBlobs),
1013              5 => Some(DeliveryTransport::BleGossip),
1014              _ => None,
1015          }
1016      }
1017  
1018      /// Convert to string for display.
1019      pub fn as_str(&self) -> &'static str {
1020          match self {
1021              DeliveryTransport::Ble => "ble",
1022              DeliveryTransport::Dht => "dht",
1023              DeliveryTransport::Relay => "relay",
1024              DeliveryTransport::Tor => "tor",
1025              DeliveryTransport::IrohBlobs => "iroh",
1026              DeliveryTransport::BleGossip => "ble-gossip",
1027          }
1028      }
1029  }
1030  
1031  /// Statistics for message delivery by transport.
1032  #[derive(Debug, Clone, Default)]
1033  pub struct TransportStats {
1034      /// Messages delivered via BLE.
1035      pub ble: u32,
1036      /// Messages delivered via DHT.
1037      pub dht: u32,
1038      /// Messages delivered via Relay.
1039      pub relay: u32,
1040      /// Messages delivered via Iroh.
1041      pub iroh: u32,
1042  }
1043  
1044  /// A message in a conversation (either inbound or outbound).
1045  #[derive(Debug, Clone)]
1046  pub struct ConversationMessage {
1047      /// Unique message ID.
1048      pub message_id: MessageId,
1049      /// True if this is an outbound message.
1050      pub is_outbound: bool,
1051      /// Content type.
1052      pub content_type: ContentType,
1053      /// Message content.
1054      pub plaintext: Vec<u8>,
1055      /// Filename (for documents).
1056      pub filename: Option<String>,
1057      /// Message timestamp.
1058      pub timestamp: DateTime<Utc>,
1059      /// Outbound state (if outbound).
1060      pub outbound_state: Option<OutboundState>,
1061      /// Inbound state (if inbound).
1062      pub inbound_state: Option<InboundState>,
1063      /// Transport used to deliver (if delivered).
1064      pub delivery_transport: Option<DeliveryTransport>,
1065      /// Delivery timestamp (if delivered).
1066      pub delivered_at: Option<DateTime<Utc>>,
1067      /// Last error message (if failed).
1068      pub last_error: Option<String>,
1069      /// Disappearing message duration in seconds (0 = standard).
1070      pub disappearing_duration: u64,
1071      /// Sender contact ID (for group inbound messages).
1072      pub sender_id: Option<ContactId>,
1073      /// Group ID (if this is a group message).
1074      pub group_id: Option<[u8; 16]>,
1075  }
1076  
1077  impl ConversationMessage {
1078      /// Get content as text (if text type).
1079      pub fn as_text(&self) -> Option<String> {
1080          if self.content_type == ContentType::Text {
1081              String::from_utf8(self.plaintext.clone()).ok()
1082          } else {
1083              None
1084          }
1085      }
1086  }
1087  
1088  // =========================================================================
1089  // MESSAGE REACTIONS
1090  // =========================================================================
1091  
1092  /// A reaction to a message.
1093  #[derive(Debug, Clone)]
1094  pub struct MessageReaction {
1095      /// Unique reaction ID.
1096      pub reaction_id: [u8; 16],
1097      /// ID of the message being reacted to.
1098      pub message_id: MessageId,
1099      /// Contact who added the reaction (all zeros for self).
1100      pub contact_id: ContactId,
1101      /// The emoji reaction.
1102      pub emoji: String,
1103      /// When the reaction was added.
1104      pub created_at: DateTime<Utc>,
1105  }
1106  
1107  /// Summary of reactions for a message (grouped by emoji).
1108  #[derive(Debug, Clone)]
1109  pub struct ReactionSummary {
1110      /// The emoji.
1111      pub emoji: String,
1112      /// Number of this reaction.
1113      pub count: u32,
1114      /// Whether the current user has added this reaction.
1115      pub includes_self: bool,
1116  }
1117  
1118  impl Database {
1119      /// Add a reaction to a message.
1120      ///
1121      /// If the same contact already has this emoji on this message,
1122      /// it will be replaced (upsert behavior via UNIQUE constraint).
1123      pub fn add_reaction(
1124          &self,
1125          message_id: &MessageId,
1126          contact_id: &ContactId,
1127          emoji: &str,
1128      ) -> Result<[u8; 16]> {
1129          let reaction_id = generate_message_id();
1130          let now = Utc::now().timestamp();
1131  
1132          self.connection().execute(
1133              "INSERT OR REPLACE INTO message_reactions
1134               (reaction_id, message_id, contact_id, emoji, created_at)
1135               VALUES (?, ?, ?, ?, ?)",
1136              params![
1137                  reaction_id.as_slice(),
1138                  message_id.as_slice(),
1139                  contact_id.as_slice(),
1140                  emoji,
1141                  now,
1142              ],
1143          )?;
1144  
1145          Ok(reaction_id)
1146      }
1147  
1148      /// Remove a reaction from a message.
1149      pub fn remove_reaction(
1150          &self,
1151          message_id: &MessageId,
1152          contact_id: &ContactId,
1153          emoji: &str,
1154      ) -> Result<()> {
1155          self.connection().execute(
1156              "DELETE FROM message_reactions
1157               WHERE message_id = ? AND contact_id = ? AND emoji = ?",
1158              params![
1159                  message_id.as_slice(),
1160                  contact_id.as_slice(),
1161                  emoji,
1162              ],
1163          )?;
1164          Ok(())
1165      }
1166  
1167      /// Get all reactions for a message.
1168      pub fn get_reactions(&self, message_id: &MessageId) -> Result<Vec<MessageReaction>> {
1169          let mut stmt = self.connection().prepare(
1170              "SELECT reaction_id, message_id, contact_id, emoji, created_at
1171               FROM message_reactions WHERE message_id = ?",
1172          )?;
1173  
1174          let reactions = stmt
1175              .query_map([message_id.as_slice()], |row| {
1176                  let reaction_id: Vec<u8> = row.get(0)?;
1177                  let msg_id: Vec<u8> = row.get(1)?;
1178                  let contact: Vec<u8> = row.get(2)?;
1179                  let emoji: String = row.get(3)?;
1180                  let created_at: i64 = row.get(4)?;
1181  
1182                  Ok(MessageReaction {
1183                      reaction_id: reaction_id.try_into().unwrap_or([0u8; 16]),
1184                      message_id: msg_id.try_into().unwrap_or([0u8; 16]),
1185                      contact_id: contact.try_into().unwrap_or([0u8; 16]),
1186                      emoji,
1187                      created_at: Utc.timestamp_opt(created_at, 0).single().unwrap_or_else(Utc::now),
1188                  })
1189              })?
1190              .collect::<std::result::Result<Vec<_>, _>>()?;
1191  
1192          Ok(reactions)
1193      }
1194  
1195      /// Get reaction summaries for a message (grouped by emoji).
1196      pub fn get_reaction_summaries(&self, message_id: &MessageId) -> Result<Vec<ReactionSummary>> {
1197          let self_contact_id: ContactId = [0u8; 16];
1198  
1199          let mut stmt = self.connection().prepare(
1200              "SELECT emoji, COUNT(*) as count,
1201                      MAX(CASE WHEN contact_id = ? THEN 1 ELSE 0 END) as includes_self
1202               FROM message_reactions
1203               WHERE message_id = ?
1204               GROUP BY emoji
1205               ORDER BY MIN(created_at)",
1206          )?;
1207  
1208          let summaries = stmt
1209              .query_map(params![self_contact_id.as_slice(), message_id.as_slice()], |row| {
1210                  Ok(ReactionSummary {
1211                      emoji: row.get(0)?,
1212                      count: row.get(1)?,
1213                      includes_self: row.get::<_, i32>(2)? == 1,
1214                  })
1215              })?
1216              .collect::<std::result::Result<Vec<_>, _>>()?;
1217  
1218          Ok(summaries)
1219      }
1220  
1221      // =========================================================================
1222      // DOCUMENT CHUNKS
1223      // =========================================================================
1224  
1225      /// Store a document chunk.
1226      pub fn store_chunk(&self, chunk: &DocumentChunk) -> Result<()> {
1227          let now = Utc::now().timestamp();
1228  
1229          self.connection().execute(
1230              "INSERT OR REPLACE INTO document_chunks
1231               (document_id, chunk_index, total_chunks, filename, total_size, data, contact_id, received_at)
1232               VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
1233              params![
1234                  chunk.document_id.as_slice(),
1235                  chunk.chunk_index as i32,
1236                  chunk.total_chunks as i32,
1237                  &chunk.filename,
1238                  chunk.total_size as i64,
1239                  chunk.data.as_slice(),
1240                  chunk.contact_id.as_slice(),
1241                  now,
1242              ],
1243          )?;
1244  
1245          Ok(())
1246      }
1247  
1248      /// Get all chunks for a document, ordered by chunk index.
1249      pub fn get_chunks(&self, document_id: &[u8; 16]) -> Result<Vec<DocumentChunk>> {
1250          let mut stmt = self.connection().prepare(
1251              "SELECT document_id, chunk_index, total_chunks, filename, total_size, data, contact_id
1252               FROM document_chunks
1253               WHERE document_id = ?
1254               ORDER BY chunk_index ASC",
1255          )?;
1256  
1257          let chunks = stmt
1258              .query_map([document_id.as_slice()], |row| {
1259                  let doc_id: Vec<u8> = row.get(0)?;
1260                  let chunk_index: i32 = row.get(1)?;
1261                  let total_chunks: i32 = row.get(2)?;
1262                  let filename: Option<String> = row.get(3)?;
1263                  let total_size: i64 = row.get(4)?;
1264                  let data: Vec<u8> = row.get(5)?;
1265                  let contact: Vec<u8> = row.get(6)?;
1266  
1267                  Ok(DocumentChunk {
1268                      document_id: doc_id.try_into().unwrap_or([0u8; 16]),
1269                      chunk_index: chunk_index as u16,
1270                      total_chunks: total_chunks as u16,
1271                      filename: filename.unwrap_or_default(),
1272                      total_size: total_size as u64,
1273                      data,
1274                      contact_id: contact.try_into().unwrap_or([0u8; 16]),
1275                  })
1276              })?
1277              .collect::<std::result::Result<Vec<_>, _>>()?;
1278  
1279          Ok(chunks)
1280      }
1281  
1282      /// Check if all chunks have been received for a document.
1283      pub fn is_document_complete(&self, document_id: &[u8; 16]) -> Result<bool> {
1284          let result: Option<(i32, i32)> = self.connection().query_row(
1285              "SELECT total_chunks, COUNT(*) as received
1286               FROM document_chunks
1287               WHERE document_id = ?
1288               GROUP BY document_id",
1289              [document_id.as_slice()],
1290              |row| Ok((row.get(0)?, row.get(1)?)),
1291          ).ok();
1292  
1293          match result {
1294              Some((total, received)) => Ok(total == received),
1295              None => Ok(false),
1296          }
1297      }
1298  
1299      /// Get document info (filename and total size) from the first chunk.
1300      pub fn get_document_info(&self, document_id: &[u8; 16]) -> Result<Option<(String, u64)>> {
1301          let result = self.connection().query_row(
1302              "SELECT filename, total_size FROM document_chunks WHERE document_id = ? AND chunk_index = 0",
1303              [document_id.as_slice()],
1304              |row| {
1305                  let filename: Option<String> = row.get(0)?;
1306                  let total_size: i64 = row.get(1)?;
1307                  Ok((filename.unwrap_or_default(), total_size as u64))
1308              },
1309          );
1310  
1311          match result {
1312              Ok(info) => Ok(Some(info)),
1313              Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1314              Err(e) => Err(DeadDropError::Database(e.to_string())),
1315          }
1316      }
1317  
1318      /// Delete all chunks for a document (cleanup after reassembly).
1319      pub fn delete_chunks(&self, document_id: &[u8; 16]) -> Result<u32> {
1320          let count = self.connection().execute(
1321              "DELETE FROM document_chunks WHERE document_id = ?",
1322              [document_id.as_slice()],
1323          )?;
1324  
1325          Ok(count as u32)
1326      }
1327  
1328      /// Delete old chunks that haven't been completed (cleanup).
1329      ///
1330      /// Returns the number of chunks deleted.
1331      pub fn delete_stale_chunks(&self, max_age_secs: i64) -> Result<u32> {
1332          let cutoff = Utc::now().timestamp() - max_age_secs;
1333  
1334          let count = self.connection().execute(
1335              "DELETE FROM document_chunks WHERE received_at < ?",
1336              [cutoff],
1337          )?;
1338  
1339          Ok(count as u32)
1340      }
1341  
1342      // =========================================================================
1343      // DOCUMENT TRANSFER PROGRESS
1344      // =========================================================================
1345  
1346      /// Get send progress for a chunked document.
1347      /// Returns (delivered_count, total_chunks) or None if not a chunked document.
1348      pub fn get_send_progress(&self, document_id: &[u8; 16]) -> Result<Option<(u32, u32)>> {
1349          let result = self.connection().query_row(
1350              "SELECT COUNT(*),
1351                      SUM(CASE WHEN state >= 2 THEN 1 ELSE 0 END)
1352               FROM outbound_messages
1353               WHERE document_id = ? AND content_type = ?",
1354              params![document_id.as_slice(), ContentType::DocumentChunk.to_byte() as i32],
1355              |row| {
1356                  let total: i32 = row.get(0)?;
1357                  let delivered: i32 = row.get::<_, Option<i32>>(1)?.unwrap_or(0);
1358                  Ok((total, delivered))
1359              },
1360          );
1361          match result {
1362              Ok((total, delivered)) if total > 0 => Ok(Some((delivered as u32, total as u32))),
1363              Ok(_) => Ok(None),
1364              Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1365              Err(e) => Err(DeadDropError::Database(e.to_string())),
1366          }
1367      }
1368  
1369      /// Acknowledge all outbound chunks belonging to a document.
1370      /// Called when a delivery receipt is received for the parent document message,
1371      /// so that `get_send_progress` reflects completion.
1372      pub fn acknowledge_document_chunks(&self, document_id: &[u8; 16]) -> Result<usize> {
1373          let count = self.connection().execute(
1374              "UPDATE outbound_messages SET state = ?
1375               WHERE document_id = ? AND content_type = ? AND (state < ? OR state = ?)",
1376              params![
1377                  OutboundState::Acknowledged as i32,
1378                  document_id.as_slice(),
1379                  ContentType::DocumentChunk.to_byte() as i32,
1380                  OutboundState::Acknowledged as i32,
1381                  OutboundState::PendingRelay as i32,
1382              ],
1383          )?;
1384          Ok(count)
1385      }
1386  
1387      /// Get receive progress for a chunked document.
1388      /// Returns (received_count, total_chunks) or None if no chunks found.
1389      pub fn get_receive_progress(&self, document_id: &[u8; 16]) -> Result<Option<(u32, u32)>> {
1390          let result = self.connection().query_row(
1391              "SELECT COUNT(*), MAX(total_chunks)
1392               FROM document_chunks
1393               WHERE document_id = ?",
1394              [document_id.as_slice()],
1395              |row| {
1396                  let received: i32 = row.get(0)?;
1397                  let total: Option<i32> = row.get(1)?;
1398                  Ok((received, total.unwrap_or(0)))
1399              },
1400          );
1401          match result {
1402              Ok((received, total)) if received > 0 => Ok(Some((received as u32, total as u32))),
1403              Ok(_) => Ok(None),
1404              Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1405              Err(e) => Err(DeadDropError::Database(e.to_string())),
1406          }
1407      }
1408  
1409      // =========================================================================
1410      // MESSAGE SEARCH
1411      // =========================================================================
1412  
1413      /// Search messages within a single conversation.
1414      /// Searches text content and document filenames, case-insensitive.
1415      pub fn search_conversation(
1416          &self,
1417          contact_id: &ContactId,
1418          query: &str,
1419          limit: u32,
1420      ) -> Result<Vec<SearchResult>> {
1421          let like_pattern = format!("%{}%", query);
1422          let mut results = Vec::new();
1423  
1424          // Search outbound text messages and document filenames
1425          {
1426              let mut stmt = self.connection().prepare(
1427                  "SELECT message_id, recipient_id, content_type, plaintext, filename, created_at
1428                   FROM outbound_messages
1429                   WHERE recipient_id = ?
1430                     AND content_type IN (0, 1)
1431                     AND (
1432                         (content_type = 0 AND CAST(plaintext AS TEXT) LIKE ?1 COLLATE NOCASE)
1433                         OR (content_type = 1 AND filename LIKE ?1 COLLATE NOCASE)
1434                     )
1435                   ORDER BY created_at DESC
1436                   LIMIT ?",
1437              )?;
1438  
1439              let rows = stmt.query_map(
1440                  params![contact_id.as_slice(), &like_pattern, limit as i64],
1441                  |row| Self::search_result_from_row(row, true),
1442              )?.collect::<std::result::Result<Vec<_>, _>>()?;
1443              results.extend(rows);
1444          }
1445  
1446          // Search inbound text messages and document filenames
1447          {
1448              let mut stmt = self.connection().prepare(
1449                  "SELECT message_id, sender_id, content_type, plaintext, filename, received_at
1450                   FROM inbound_messages
1451                   WHERE sender_id = ?
1452                     AND content_type IN (0, 1)
1453                     AND (
1454                         (content_type = 0 AND CAST(plaintext AS TEXT) LIKE ?1 COLLATE NOCASE)
1455                         OR (content_type = 1 AND filename LIKE ?1 COLLATE NOCASE)
1456                     )
1457                   ORDER BY received_at DESC
1458                   LIMIT ?",
1459              )?;
1460  
1461              let rows = stmt.query_map(
1462                  params![contact_id.as_slice(), &like_pattern, limit as i64],
1463                  |row| Self::search_result_from_row(row, false),
1464              )?.collect::<std::result::Result<Vec<_>, _>>()?;
1465              results.extend(rows);
1466          }
1467  
1468          // Sort combined by timestamp, newest first
1469          results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1470          results.truncate(limit as usize);
1471          Ok(results)
1472      }
1473  
1474      /// Search across all conversations.
1475      pub fn search_all_messages(
1476          &self,
1477          query: &str,
1478          limit: u32,
1479      ) -> Result<Vec<SearchResult>> {
1480          let like_pattern = format!("%{}%", query);
1481          let mut results = Vec::new();
1482  
1483          // Search outbound
1484          {
1485              let mut stmt = self.connection().prepare(
1486                  "SELECT message_id, recipient_id, content_type, plaintext, filename, created_at
1487                   FROM outbound_messages
1488                   WHERE content_type IN (0, 1)
1489                     AND (
1490                         (content_type = 0 AND CAST(plaintext AS TEXT) LIKE ?1 COLLATE NOCASE)
1491                         OR (content_type = 1 AND filename LIKE ?1 COLLATE NOCASE)
1492                     )
1493                   ORDER BY created_at DESC
1494                   LIMIT ?",
1495              )?;
1496  
1497              let rows = stmt.query_map(
1498                  params![&like_pattern, limit as i64],
1499                  |row| Self::search_result_from_row(row, true),
1500              )?.collect::<std::result::Result<Vec<_>, _>>()?;
1501              results.extend(rows);
1502          }
1503  
1504          // Search inbound
1505          {
1506              let mut stmt = self.connection().prepare(
1507                  "SELECT message_id, sender_id, content_type, plaintext, filename, received_at
1508                   FROM inbound_messages
1509                   WHERE content_type IN (0, 1)
1510                     AND (
1511                         (content_type = 0 AND CAST(plaintext AS TEXT) LIKE ?1 COLLATE NOCASE)
1512                         OR (content_type = 1 AND filename LIKE ?1 COLLATE NOCASE)
1513                     )
1514                   ORDER BY received_at DESC
1515                   LIMIT ?",
1516              )?;
1517  
1518              let rows = stmt.query_map(
1519                  params![&like_pattern, limit as i64],
1520                  |row| Self::search_result_from_row(row, false),
1521              )?.collect::<std::result::Result<Vec<_>, _>>()?;
1522              results.extend(rows);
1523          }
1524  
1525          results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1526          results.truncate(limit as usize);
1527          Ok(results)
1528      }
1529  
1530      /// Helper to construct SearchResult from a row.
1531      fn search_result_from_row(row: &rusqlite::Row, is_outbound: bool) -> rusqlite::Result<SearchResult> {
1532          let message_id_vec: Vec<u8> = row.get(0)?;
1533          let message_id: MessageId = message_id_vec
1534              .try_into()
1535              .map_err(|_| rusqlite::Error::InvalidColumnType(0, "message_id".to_string(), rusqlite::types::Type::Blob))?;
1536          let contact_id_vec: Vec<u8> = row.get(1)?;
1537          let contact_id: ContactId = contact_id_vec
1538              .try_into()
1539              .map_err(|_| rusqlite::Error::InvalidColumnType(1, "contact_id".to_string(), rusqlite::types::Type::Blob))?;
1540          let content_type_int: i32 = row.get(2)?;
1541          let content_type = ContentType::from_byte(content_type_int as u8)
1542              .ok_or_else(|| rusqlite::Error::InvalidColumnType(2, "content_type".to_string(), rusqlite::types::Type::Integer))?;
1543          let plaintext: Vec<u8> = row.get(3)?;
1544          let filename: Option<String> = row.get(4)?;
1545          let timestamp_ts: i64 = row.get(5)?;
1546          let timestamp = Utc.timestamp_opt(timestamp_ts, 0).single().unwrap_or_else(Utc::now);
1547  
1548          Ok(SearchResult {
1549              message_id,
1550              contact_id,
1551              is_outbound,
1552              content_type,
1553              plaintext,
1554              filename,
1555              timestamp,
1556          })
1557      }
1558  }
1559  
1560  /// A stored document chunk.
1561  #[derive(Debug, Clone)]
1562  pub struct DocumentChunk {
1563      /// Unique document ID (groups chunks together).
1564      pub document_id: [u8; 16],
1565      /// Index of this chunk (0-based).
1566      pub chunk_index: u16,
1567      /// Total number of chunks.
1568      pub total_chunks: u16,
1569      /// Filename (from chunk 0).
1570      pub filename: String,
1571      /// Total file size in bytes.
1572      pub total_size: u64,
1573      /// Chunk data.
1574      pub data: Vec<u8>,
1575      /// Contact who sent this chunk.
1576      pub contact_id: ContactId,
1577  }
1578  
1579  /// A message search result.
1580  #[derive(Debug, Clone)]
1581  pub struct SearchResult {
1582      pub message_id: MessageId,
1583      pub contact_id: ContactId,
1584      pub is_outbound: bool,
1585      pub content_type: ContentType,
1586      pub plaintext: Vec<u8>,
1587      pub filename: Option<String>,
1588      pub timestamp: DateTime<Utc>,
1589  }
1590  
1591  #[cfg(test)]
1592  mod tests {
1593      use super::*;
1594      use crate::crypto::keys::{ExchangeKeyPair, IdentityKeyPair};
1595      use crate::protocol::messages::{encrypt_message, generate_message_id, PlaintextMessage, DEFAULT_MESSAGE_EXPIRY_DAYS};
1596      use crate::storage::contacts::Contact;
1597  
1598      fn test_db() -> Database {
1599          Database::open_in_memory(b"test_passphrase").unwrap()
1600      }
1601  
1602      fn create_test_outbound(recipient_id: ContactId) -> OutboundMessage {
1603          let sender = IdentityKeyPair::generate();
1604          let recipient = ExchangeKeyPair::generate();
1605  
1606          let plaintext = PlaintextMessage::text("Test message");
1607          let encrypted = encrypt_message(&plaintext, &sender, &recipient.public_bytes()).unwrap();
1608  
1609          let now = Utc::now();
1610  
1611          OutboundMessage {
1612              message_id: encrypted.message_id,
1613              recipient_id,
1614              content_type: ContentType::Text,
1615              plaintext: plaintext.content,
1616              filename: None,
1617              encrypted,
1618              state: OutboundState::Queued,
1619              created_at: now,
1620              expires_at: now + Duration::days(DEFAULT_MESSAGE_EXPIRY_DAYS),
1621              attempts: 0,
1622              document_id: None,
1623              group_id: None,
1624          }
1625      }
1626  
1627      fn create_test_inbound(sender_id: ContactId) -> InboundMessage {
1628          let now = Utc::now();
1629  
1630          InboundMessage {
1631              message_id: generate_message_id(),
1632              sender_id,
1633              content_type: ContentType::Text,
1634              plaintext: b"Received message".to_vec(),
1635              filename: None,
1636              state: InboundState::Received,
1637              received_at: now,
1638              read_at: None,
1639              delete_at: now + Duration::days(DEFAULT_AUTO_DELETE_DAYS),
1640              receive_transport: None,
1641              disappearing_duration: 0,
1642              sent_at: None,
1643              group_id: None,
1644          }
1645      }
1646  
1647      fn setup_contact(db: &Database) -> ContactId {
1648          let identity = IdentityKeyPair::generate();
1649          let exchange = ExchangeKeyPair::generate();
1650          let contact = Contact::new(identity.public_bytes(), exchange.public_bytes());
1651          let contact_id = contact.contact_id;
1652          db.add_contact(&contact).unwrap();
1653          contact_id
1654      }
1655  
1656      // ==================== Outbound Tests ====================
1657  
1658      #[test]
1659      fn test_queue_message() {
1660          let db = test_db();
1661          let contact_id = setup_contact(&db);
1662  
1663          let message = create_test_outbound(contact_id);
1664          db.queue_message(&message).unwrap();
1665  
1666          let retrieved = db.get_outbound_message(&message.message_id).unwrap().unwrap();
1667          assert_eq!(retrieved.message_id, message.message_id);
1668          assert_eq!(retrieved.recipient_id, contact_id);
1669          assert_eq!(retrieved.state, OutboundState::Queued);
1670      }
1671  
1672      #[test]
1673      fn test_get_queued_for_contact() {
1674          let db = test_db();
1675          let contact_id = setup_contact(&db);
1676  
1677          // Queue multiple messages
1678          for _ in 0..3 {
1679              let message = create_test_outbound(contact_id);
1680              db.queue_message(&message).unwrap();
1681          }
1682  
1683          let queued = db.get_queued_for_contact(&contact_id).unwrap();
1684          assert_eq!(queued.len(), 3);
1685      }
1686  
1687      #[test]
1688      fn test_get_all_queued() {
1689          let db = test_db();
1690  
1691          // Create messages for different contacts
1692          let contact1 = setup_contact(&db);
1693          let contact2 = setup_contact(&db);
1694  
1695          db.queue_message(&create_test_outbound(contact1)).unwrap();
1696          db.queue_message(&create_test_outbound(contact1)).unwrap();
1697          db.queue_message(&create_test_outbound(contact2)).unwrap();
1698  
1699          let all_queued = db.get_all_queued().unwrap();
1700          assert_eq!(all_queued.len(), 3);
1701      }
1702  
1703      #[test]
1704      fn test_update_outbound_state() {
1705          let db = test_db();
1706          let contact_id = setup_contact(&db);
1707  
1708          let message = create_test_outbound(contact_id);
1709          let message_id = message.message_id;
1710          db.queue_message(&message).unwrap();
1711  
1712          // Update to Transmitting
1713          db.update_outbound_state(&message_id, OutboundState::Transmitting).unwrap();
1714          let retrieved = db.get_outbound_message(&message_id).unwrap().unwrap();
1715          assert_eq!(retrieved.state, OutboundState::Transmitting);
1716  
1717          // Update to Delivered
1718          db.update_outbound_state(&message_id, OutboundState::Delivered).unwrap();
1719          let retrieved = db.get_outbound_message(&message_id).unwrap().unwrap();
1720          assert_eq!(retrieved.state, OutboundState::Delivered);
1721      }
1722  
1723      #[test]
1724      fn test_increment_outbound_attempts() {
1725          let db = test_db();
1726          let contact_id = setup_contact(&db);
1727  
1728          let message = create_test_outbound(contact_id);
1729          let message_id = message.message_id;
1730          db.queue_message(&message).unwrap();
1731  
1732          assert_eq!(db.increment_outbound_attempts(&message_id).unwrap(), 1);
1733          assert_eq!(db.increment_outbound_attempts(&message_id).unwrap(), 2);
1734          assert_eq!(db.increment_outbound_attempts(&message_id).unwrap(), 3);
1735      }
1736  
1737      #[test]
1738      fn test_delete_expired_outbound() {
1739          let db = test_db();
1740          let contact_id = setup_contact(&db);
1741  
1742          // Create an expired message
1743          let sender = IdentityKeyPair::generate();
1744          let recipient = ExchangeKeyPair::generate();
1745          let plaintext = PlaintextMessage::text("Expired");
1746          let encrypted = encrypt_message(&plaintext, &sender, &recipient.public_bytes()).unwrap();
1747  
1748          let now = Utc::now();
1749          let expired_message = OutboundMessage {
1750              message_id: encrypted.message_id,
1751              recipient_id: contact_id,
1752              content_type: ContentType::Text,
1753              plaintext: plaintext.content,
1754              filename: None,
1755              encrypted,
1756              state: OutboundState::Queued,
1757              created_at: now - Duration::days(40),
1758              expires_at: now - Duration::days(10), // Expired 10 days ago
1759              attempts: 0,
1760              document_id: None,
1761              group_id: None,
1762          };
1763  
1764          db.queue_message(&expired_message).unwrap();
1765  
1766          // Create a non-expired message
1767          let non_expired = create_test_outbound(contact_id);
1768          db.queue_message(&non_expired).unwrap();
1769  
1770          // Delete expired
1771          let deleted = db.delete_expired_outbound().unwrap();
1772          assert_eq!(deleted, 1);
1773  
1774          // Non-expired should still exist
1775          let remaining = db.get_all_queued().unwrap();
1776          assert_eq!(remaining.len(), 1);
1777      }
1778  
1779      #[test]
1780      fn test_count_outbound() {
1781          let db = test_db();
1782          let contact_id = setup_contact(&db);
1783  
1784          // Queue messages
1785          let msg1 = create_test_outbound(contact_id);
1786          let msg2 = create_test_outbound(contact_id);
1787          let msg3 = create_test_outbound(contact_id);
1788  
1789          db.queue_message(&msg1).unwrap();
1790          db.queue_message(&msg2).unwrap();
1791          db.queue_message(&msg3).unwrap();
1792  
1793          // Update some states
1794          db.update_outbound_state(&msg2.message_id, OutboundState::Transmitting).unwrap();
1795          db.update_outbound_state(&msg3.message_id, OutboundState::Delivered).unwrap();
1796  
1797          let counts = db.count_outbound().unwrap();
1798          assert_eq!(counts.queued, 1);
1799          assert_eq!(counts.transmitting, 1);
1800          assert_eq!(counts.delivered, 1);
1801      }
1802  
1803      // ==================== Inbound Tests ====================
1804  
1805      #[test]
1806      fn test_store_inbound() {
1807          let db = test_db();
1808          let contact_id = setup_contact(&db);
1809  
1810          let message = create_test_inbound(contact_id);
1811          db.store_inbound(&message).unwrap();
1812  
1813          let retrieved = db.get_inbound_message(&message.message_id).unwrap().unwrap();
1814          assert_eq!(retrieved.message_id, message.message_id);
1815          assert_eq!(retrieved.sender_id, contact_id);
1816          assert_eq!(retrieved.state, InboundState::Received);
1817      }
1818  
1819      #[test]
1820      fn test_get_messages_from() {
1821          let db = test_db();
1822          let contact_id = setup_contact(&db);
1823  
1824          // Store multiple messages
1825          for _ in 0..3 {
1826              let message = create_test_inbound(contact_id);
1827              db.store_inbound(&message).unwrap();
1828          }
1829  
1830          let messages = db.get_messages_from(&contact_id).unwrap();
1831          assert_eq!(messages.len(), 3);
1832      }
1833  
1834      #[test]
1835      fn test_get_unread() {
1836          let db = test_db();
1837          let contact_id = setup_contact(&db);
1838  
1839          // Store messages
1840          let msg1 = create_test_inbound(contact_id);
1841          let msg2 = create_test_inbound(contact_id);
1842          db.store_inbound(&msg1).unwrap();
1843          db.store_inbound(&msg2).unwrap();
1844  
1845          // Mark one as read
1846          db.mark_message_read(&msg1.message_id).unwrap();
1847  
1848          let unread = db.get_unread().unwrap();
1849          assert_eq!(unread.len(), 1);
1850          assert_eq!(unread[0].message_id, msg2.message_id);
1851      }
1852  
1853      #[test]
1854      fn test_mark_message_read() {
1855          let db = test_db();
1856          let contact_id = setup_contact(&db);
1857  
1858          let message = create_test_inbound(contact_id);
1859          db.store_inbound(&message).unwrap();
1860  
1861          db.mark_message_read(&message.message_id).unwrap();
1862  
1863          let retrieved = db.get_inbound_message(&message.message_id).unwrap().unwrap();
1864          assert_eq!(retrieved.state, InboundState::Read);
1865          assert!(retrieved.read_at.is_some());
1866      }
1867  
1868      #[test]
1869      fn test_delete_inbound_message() {
1870          let db = test_db();
1871          let contact_id = setup_contact(&db);
1872  
1873          let message = create_test_inbound(contact_id);
1874          db.store_inbound(&message).unwrap();
1875  
1876          db.delete_inbound_message(&message.message_id).unwrap();
1877  
1878          let retrieved = db.get_inbound_message(&message.message_id).unwrap().unwrap();
1879          assert_eq!(retrieved.state, InboundState::Deleted);
1880  
1881          // Should not appear in get_messages_from
1882          let messages = db.get_messages_from(&contact_id).unwrap();
1883          assert!(messages.is_empty());
1884      }
1885  
1886      #[test]
1887      fn test_count_unread() {
1888          let db = test_db();
1889          let contact_id = setup_contact(&db);
1890  
1891          // Store messages
1892          for _ in 0..3 {
1893              let message = create_test_inbound(contact_id);
1894              db.store_inbound(&message).unwrap();
1895          }
1896  
1897          assert_eq!(db.count_unread().unwrap(), 3);
1898          assert_eq!(db.count_unread_from(&contact_id).unwrap(), 3);
1899      }
1900  
1901      // ==================== Conversation Tests ====================
1902  
1903      #[test]
1904      fn test_get_conversation() {
1905          let db = test_db();
1906          let contact_id = setup_contact(&db);
1907  
1908          // Add outbound messages
1909          let outbound = create_test_outbound(contact_id);
1910          db.queue_message(&outbound).unwrap();
1911  
1912          // Add inbound messages
1913          let inbound = create_test_inbound(contact_id);
1914          db.store_inbound(&inbound).unwrap();
1915  
1916          let conversation = db.get_conversation(&contact_id).unwrap();
1917          assert_eq!(conversation.len(), 2);
1918  
1919          // Check that we have both types
1920          let outbound_count = conversation.iter().filter(|m| m.is_outbound).count();
1921          let inbound_count = conversation.iter().filter(|m| !m.is_outbound).count();
1922          assert_eq!(outbound_count, 1);
1923          assert_eq!(inbound_count, 1);
1924      }
1925  
1926      #[test]
1927      fn test_conversation_message_as_text() {
1928          let db = test_db();
1929          let contact_id = setup_contact(&db);
1930  
1931          let message = create_test_inbound(contact_id);
1932          db.store_inbound(&message).unwrap();
1933  
1934          let conversation = db.get_conversation(&contact_id).unwrap();
1935          assert!(!conversation.is_empty());
1936  
1937          let msg = &conversation[0];
1938          assert_eq!(msg.as_text(), Some("Received message".to_string()));
1939      }
1940  }