messages.rs
1 //! Message storage for outbound queue and inbound messages. 2 //! 3 //! This module provides storage operations for both outbound (queued for 4 //! delivery) and inbound (received) messages. Messages are stored with 5 //! their encrypted form and plaintext content for display. 6 //! 7 //! # Outbound Messages 8 //! 9 //! Outbound messages go through the following lifecycle: 10 //! - `Queued` → `Transmitting` → `Delivered` → `Acknowledged` 11 //! - Messages can expire if not delivered within their TTL 12 //! 13 //! # Inbound Messages 14 //! 15 //! Inbound messages go through: 16 //! - `Received` → `Read` → auto-deleted after TTL 17 18 use chrono::{DateTime, Duration, TimeZone, Utc}; 19 use rusqlite::params; 20 21 use crate::error::{DeadDropError, Result}; 22 use crate::protocol::generate_message_id; 23 use crate::protocol::messages::{ 24 ContactId, ContentType, EncryptedMessage, InboundMessage, InboundState, MessageId, 25 OutboundMessage, OutboundState, DEFAULT_AUTO_DELETE_DAYS, 26 }; 27 use crate::storage::Database; 28 29 impl Database { 30 // ========================================================================= 31 // OUTBOUND MESSAGE OPERATIONS 32 // ========================================================================= 33 34 /// Queue a new outbound message for delivery. 35 /// 36 /// The message should already be encrypted before calling this method. 37 /// 38 /// # Arguments 39 /// 40 /// * `message` - The outbound message to queue 41 /// 42 /// # Errors 43 /// 44 /// - `AlreadyExists` if a message with the same ID exists 45 /// - `Database` if storage fails 46 pub fn queue_message(&self, message: &OutboundMessage) -> Result<()> { 47 // Serialize the encrypted message 48 let encrypted_bytes = message.encrypted.to_bytes()?; 49 50 self.connection().execute( 51 "INSERT INTO outbound_messages ( 52 message_id, recipient_id, content_type, plaintext, 53 encrypted, state, created_at, expires_at, attempts, filename, document_id, group_id 54 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 55 params![ 56 message.message_id.as_slice(), 57 message.recipient_id.as_slice(), 58 message.content_type.to_byte() as i32, 59 message.plaintext.as_slice(), 60 encrypted_bytes.as_slice(), 61 message.state as i32, 62 message.created_at.timestamp(), 63 message.expires_at.timestamp(), 64 message.attempts as i32, 65 message.filename.as_deref(), 66 message.document_id.as_ref().map(|id| id.as_slice()), 67 message.group_id.as_ref().map(|id| id.as_slice()), 68 ], 69 )?; 70 71 Ok(()) 72 } 73 74 /// Get an outbound message by ID. 75 pub fn get_outbound_message(&self, message_id: &MessageId) -> Result<Option<OutboundMessage>> { 76 let result = self.connection().query_row( 77 "SELECT message_id, recipient_id, content_type, plaintext, 78 encrypted, state, created_at, expires_at, attempts, filename, document_id 79 FROM outbound_messages WHERE message_id = ?", 80 [message_id.as_slice()], 81 |row| Self::outbound_from_row(row), 82 ); 83 84 match result { 85 Ok(msg) => Ok(Some(msg)), 86 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), 87 Err(e) => Err(DeadDropError::Database(e.to_string())), 88 } 89 } 90 91 /// Get all queued messages for a specific contact. 92 /// 93 /// Returns messages that are ready for delivery (state = Queued). 94 pub fn get_queued_for_contact(&self, contact_id: &ContactId) -> Result<Vec<OutboundMessage>> { 95 let mut stmt = self.connection().prepare( 96 "SELECT message_id, recipient_id, content_type, plaintext, 97 encrypted, state, created_at, expires_at, attempts, filename, document_id 98 FROM outbound_messages 99 WHERE recipient_id = ? AND state IN (?, ?) 100 ORDER BY created_at ASC", 101 )?; 102 103 let messages = stmt 104 .query_map( 105 params![contact_id.as_slice(), OutboundState::Queued as i32, OutboundState::PendingRelay as i32], 106 |row| Self::outbound_from_row(row), 107 )? 108 .collect::<std::result::Result<Vec<_>, _>>()?; 109 110 Ok(messages) 111 } 112 113 /// Get all queued messages (for any contact). 114 pub fn get_all_queued(&self) -> Result<Vec<OutboundMessage>> { 115 let mut stmt = self.connection().prepare( 116 "SELECT message_id, recipient_id, content_type, plaintext, 117 encrypted, state, created_at, expires_at, attempts, filename, document_id 118 FROM outbound_messages 119 WHERE state IN (?, ?) 120 ORDER BY created_at ASC", 121 )?; 122 123 let messages = stmt 124 .query_map(params![OutboundState::Queued as i32, OutboundState::PendingRelay as i32], |row| { 125 Self::outbound_from_row(row) 126 })? 127 .collect::<std::result::Result<Vec<_>, _>>()?; 128 129 Ok(messages) 130 } 131 132 /// Get all outbound messages for a contact (any state). 133 pub fn get_outbound_for_contact(&self, contact_id: &ContactId) -> Result<Vec<OutboundMessage>> { 134 let mut stmt = self.connection().prepare( 135 "SELECT message_id, recipient_id, content_type, plaintext, 136 encrypted, state, created_at, expires_at, attempts, filename, document_id 137 FROM outbound_messages 138 WHERE recipient_id = ? 139 ORDER BY created_at DESC", 140 )?; 141 142 let messages = stmt 143 .query_map([contact_id.as_slice()], |row| Self::outbound_from_row(row))? 144 .collect::<std::result::Result<Vec<_>, _>>()?; 145 146 Ok(messages) 147 } 148 149 /// Update the state of an outbound message. 150 pub fn update_outbound_state(&self, message_id: &MessageId, state: OutboundState) -> Result<()> { 151 let rows = self.connection().execute( 152 "UPDATE outbound_messages SET state = ? WHERE message_id = ?", 153 params![state as i32, message_id.as_slice()], 154 )?; 155 156 if rows == 0 { 157 return Err(DeadDropError::NotFound("Message not found".to_string())); 158 } 159 160 Ok(()) 161 } 162 163 /// Increment the delivery attempt counter. 164 pub fn increment_outbound_attempts(&self, message_id: &MessageId) -> Result<u32> { 165 self.connection().execute( 166 "UPDATE outbound_messages SET attempts = attempts + 1 WHERE message_id = ?", 167 [message_id.as_slice()], 168 )?; 169 170 let attempts: i32 = self.connection().query_row( 171 "SELECT attempts FROM outbound_messages WHERE message_id = ?", 172 [message_id.as_slice()], 173 |row| row.get(0), 174 )?; 175 176 Ok(attempts as u32) 177 } 178 179 /// Delete expired outbound messages. 180 /// 181 /// Returns the number of messages deleted. 182 pub fn delete_expired_outbound(&self) -> Result<u32> { 183 let now = Utc::now().timestamp(); 184 185 let count = self.connection().execute( 186 "DELETE FROM outbound_messages WHERE expires_at < ?", 187 [now], 188 )?; 189 190 Ok(count as u32) 191 } 192 193 /// Delete an outbound message. 194 pub fn delete_outbound_message(&self, message_id: &MessageId) -> Result<()> { 195 let rows = self.connection().execute( 196 "DELETE FROM outbound_messages WHERE message_id = ?", 197 [message_id.as_slice()], 198 )?; 199 200 if rows == 0 { 201 return Err(DeadDropError::NotFound("Message not found".to_string())); 202 } 203 204 Ok(()) 205 } 206 207 /// Count outbound messages by state. 208 pub fn count_outbound(&self) -> Result<OutboundCounts> { 209 let queued: u32 = self.connection().query_row( 210 "SELECT COUNT(*) FROM outbound_messages WHERE state = ?", 211 [OutboundState::Queued as i32], 212 |row| row.get(0), 213 )?; 214 215 let transmitting: u32 = self.connection().query_row( 216 "SELECT COUNT(*) FROM outbound_messages WHERE state = ?", 217 [OutboundState::Transmitting as i32], 218 |row| row.get(0), 219 )?; 220 221 let delivered: u32 = self.connection().query_row( 222 "SELECT COUNT(*) FROM outbound_messages WHERE state = ?", 223 [OutboundState::Delivered as i32], 224 |row| row.get(0), 225 )?; 226 227 let failed: u32 = self.connection().query_row( 228 "SELECT COUNT(*) FROM outbound_messages WHERE state = ?", 229 [OutboundState::Failed as i32], 230 |row| row.get(0), 231 )?; 232 233 Ok(OutboundCounts { 234 queued, 235 transmitting, 236 delivered, 237 failed, 238 }) 239 } 240 241 /// Update delivery status of an outbound message. 242 /// 243 /// Records the transport used, delivery timestamp, and any error. 244 pub fn update_delivery_status( 245 &self, 246 message_id: &MessageId, 247 state: OutboundState, 248 transport: Option<DeliveryTransport>, 249 error: Option<&str>, 250 ) -> Result<()> { 251 let now = Utc::now().timestamp(); 252 let delivered_at = if state == OutboundState::Delivered || state == OutboundState::Acknowledged { 253 Some(now) 254 } else { 255 None 256 }; 257 258 // Only upgrade state, never downgrade (e.g. don't overwrite Acknowledged with Delivered). 259 // Exception: PendingRelay (7) can be overridden by Delivered (3) or Acknowledged (4) 260 // since PendingRelay is numerically higher but logically lower priority. 261 let rows = self.connection().execute( 262 "UPDATE outbound_messages 263 SET state = ?, delivery_transport = COALESCE(?, delivery_transport), 264 delivered_at = COALESCE(delivered_at, ?), last_error = ? 265 WHERE message_id = ? AND (state < ? OR state = ?)", 266 params![ 267 state as i32, 268 transport.map(|t| t as i32), 269 delivered_at, 270 error, 271 message_id.as_slice(), 272 state as i32, 273 OutboundState::PendingRelay as i32, 274 ], 275 )?; 276 277 // rows == 0 is OK if state was already >= target (not a downgrade error) 278 Ok(()) 279 } 280 281 /// Set the delivery transport for an outbound message (only if not already set). 282 pub fn set_outbound_transport(&self, message_id: &MessageId, transport: DeliveryTransport) -> Result<()> { 283 self.connection().execute( 284 "UPDATE outbound_messages SET delivery_transport = ? WHERE message_id = ? AND delivery_transport IS NULL", 285 params![transport as i32, message_id.as_slice()], 286 )?; 287 Ok(()) 288 } 289 290 /// Get all failed messages for retry. 291 pub fn get_failed_messages(&self) -> Result<Vec<OutboundMessage>> { 292 let mut stmt = self.connection().prepare( 293 "SELECT message_id, recipient_id, content_type, plaintext, 294 encrypted, state, created_at, expires_at, attempts, filename, document_id 295 FROM outbound_messages 296 WHERE state = ? 297 ORDER BY created_at ASC", 298 )?; 299 300 let messages = stmt 301 .query_map([OutboundState::Failed as i32], |row| { 302 Self::outbound_from_row(row) 303 })? 304 .collect::<std::result::Result<Vec<_>, _>>()?; 305 306 Ok(messages) 307 } 308 309 /// Retry a failed message by resetting its state to Queued. 310 pub fn retry_failed_message(&self, message_id: &MessageId) -> Result<()> { 311 let rows = self.connection().execute( 312 "UPDATE outbound_messages 313 SET state = ?, last_error = NULL 314 WHERE message_id = ? AND state = ?", 315 params![ 316 OutboundState::Queued as i32, 317 message_id.as_slice(), 318 OutboundState::Failed as i32, 319 ], 320 )?; 321 322 if rows == 0 { 323 return Err(DeadDropError::NotFound( 324 "Message not found or not in failed state".to_string(), 325 )); 326 } 327 328 Ok(()) 329 } 330 331 /// Retry messages stuck in Transmitting state by resetting to Queued. 332 /// 333 /// Messages that have been in Transmitting for longer than `timeout_secs` 334 /// are reset to Queued so another transport can pick them up. 335 /// Max 5 retry attempts to prevent infinite loops. 336 /// Returns the number of messages retried. 337 pub fn retry_stuck_transmitting(&self, timeout_secs: i64) -> Result<i32> { 338 let cutoff = Utc::now().timestamp() - timeout_secs; 339 340 let rows = self.connection().execute( 341 "UPDATE outbound_messages 342 SET state = ?, attempts = attempts + 1, last_error = 'Retried: stuck in transmitting' 343 WHERE state = ? AND created_at < ? AND attempts < 5", 344 params![ 345 OutboundState::Queued as i32, 346 OutboundState::Transmitting as i32, 347 cutoff, 348 ], 349 )?; 350 351 Ok(rows as i32) 352 } 353 354 /// Promote outbound messages stuck in Queued to gossip relay. 355 /// 356 /// Finds messages that have been Queued for longer than `min_age_secs` and: 357 /// 1. Transitions to PendingRelay if already in forwarding store 358 /// 2. Re-stores into forwarding store + transitions if missing 359 /// 360 /// Returns the number of messages promoted. 361 pub fn promote_to_gossip_relay(&self, min_age_secs: i64) -> Result<u32> { 362 let cutoff = Utc::now().timestamp() - min_age_secs; 363 364 // Phase 1: Messages already in forwarding store → PendingRelay 365 let already_stored = self.connection().execute( 366 "UPDATE outbound_messages 367 SET state = ?, last_error = 'Waiting for mesh relay' 368 WHERE state = ? AND created_at < ? 369 AND message_id IN (SELECT message_id FROM forwarded_messages)", 370 params![ 371 OutboundState::PendingRelay as i32, 372 OutboundState::Queued as i32, 373 cutoff, 374 ], 375 )?; 376 377 // Phase 2: Queued messages NOT in forwarding store → re-store + promote 378 let mut stmt = self.connection().prepare( 379 "SELECT o.message_id, o.encrypted, c.exchange_public 380 FROM outbound_messages o 381 JOIN contacts c ON o.recipient_id = c.contact_id 382 WHERE o.state = ? AND o.created_at < ? 383 AND o.message_id NOT IN (SELECT message_id FROM forwarded_messages)", 384 )?; 385 386 let missing: Vec<([u8; 16], Vec<u8>, [u8; 32])> = stmt 387 .query_map( 388 params![OutboundState::Queued as i32, cutoff], 389 |row| { 390 let msg_id_vec: Vec<u8> = row.get(0)?; 391 let encrypted_bytes: Vec<u8> = row.get(1)?; 392 let exchange_key_vec: Vec<u8> = row.get(2)?; 393 394 let mut msg_id = [0u8; 16]; 395 if msg_id_vec.len() == 16 { 396 msg_id.copy_from_slice(&msg_id_vec); 397 } 398 let mut exchange_key = [0u8; 32]; 399 if exchange_key_vec.len() == 32 { 400 exchange_key.copy_from_slice(&exchange_key_vec); 401 } 402 403 Ok((msg_id, encrypted_bytes, exchange_key)) 404 }, 405 )? 406 .collect::<std::result::Result<Vec<_>, _>>()?; 407 408 let mut re_stored = 0u32; 409 for (message_id, encrypted_bytes, exchange_key) in &missing { 410 // The encrypted column already contains bincode-serialized EncryptedMessage, 411 // which is exactly the payload format forwarded_messages expects. 412 let forwarded = crate::storage::forwarding::ForwardedMessage::new( 413 *message_id, 414 exchange_key, 415 encrypted_bytes.clone(), 416 None, 417 ); 418 419 let stored = self.store_for_forwarding(&forwarded).unwrap_or(false); 420 if stored { 421 re_stored += 1; 422 } 423 424 // Transition to PendingRelay regardless (message exists or store is full) 425 let _ = self.connection().execute( 426 "UPDATE outbound_messages 427 SET state = ?, last_error = 'Waiting for mesh relay' 428 WHERE message_id = ? AND state = ?", 429 params![ 430 OutboundState::PendingRelay as i32, 431 message_id.as_slice(), 432 OutboundState::Queued as i32, 433 ], 434 ); 435 } 436 437 Ok(already_stored as u32 + missing.len() as u32) 438 } 439 440 /// Expire stale outbound messages stuck in Queued or Transmitting state. 441 /// 442 /// Messages older than `max_age_secs` are marked as Failed with a timeout error. 443 /// Returns the number of expired messages. 444 pub fn expire_stale_outbound(&self, max_age_secs: i64) -> Result<i32> { 445 let cutoff = Utc::now().timestamp() - max_age_secs; 446 447 // Phase 1: Messages with a copy in forwarded_messages → PendingRelay 448 // (still potentially deliverable via BLE gossip mesh) 449 let pending_relay_count = self.connection().execute( 450 "UPDATE outbound_messages 451 SET state = ?, last_error = 'Waiting for mesh relay' 452 WHERE state IN (?, ?) AND created_at < ? 453 AND message_id IN (SELECT message_id FROM forwarded_messages)", 454 params![ 455 OutboundState::PendingRelay as i32, 456 OutboundState::Queued as i32, 457 OutboundState::Transmitting as i32, 458 cutoff, 459 ], 460 )?; 461 462 // Phase 2: Remaining stale messages → Failed (truly undeliverable) 463 let failed_count = self.connection().execute( 464 "UPDATE outbound_messages 465 SET state = ?, last_error = 'Delivery timed out' 466 WHERE state IN (?, ?) AND created_at < ?", 467 params![ 468 OutboundState::Failed as i32, 469 OutboundState::Queued as i32, 470 OutboundState::Transmitting as i32, 471 cutoff, 472 ], 473 )?; 474 475 Ok((pending_relay_count + failed_count) as i32) 476 } 477 478 /// Expire PendingRelay messages older than `max_age_secs`. 479 /// 480 /// These have exceeded the forwarding store TTL window and will never 481 /// be delivered via gossip. Returns the number of messages marked Failed. 482 pub fn expire_stale_pending_relay(&self, max_age_secs: i64) -> Result<i32> { 483 let cutoff = Utc::now().timestamp() - max_age_secs; 484 let rows = self.connection().execute( 485 "UPDATE outbound_messages 486 SET state = ?, last_error = 'Gossip relay timed out' 487 WHERE state = ? AND created_at < ?", 488 params![ 489 OutboundState::Failed as i32, 490 OutboundState::PendingRelay as i32, 491 cutoff, 492 ], 493 )?; 494 Ok(rows as i32) 495 } 496 497 /// Get delivery info for a message. 498 pub fn get_delivery_info( 499 &self, 500 message_id: &MessageId, 501 ) -> Result<Option<(Option<DeliveryTransport>, Option<DateTime<Utc>>, Option<String>)>> { 502 let result = self.connection().query_row( 503 "SELECT delivery_transport, delivered_at, last_error 504 FROM outbound_messages WHERE message_id = ?", 505 [message_id.as_slice()], 506 |row| { 507 let transport_int: Option<i32> = row.get(0)?; 508 let delivered_at_ts: Option<i64> = row.get(1)?; 509 let last_error: Option<String> = row.get(2)?; 510 511 let transport = transport_int.and_then(|t| DeliveryTransport::from_byte(t as u8)); 512 let delivered_at = delivered_at_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single()); 513 514 Ok((transport, delivered_at, last_error)) 515 }, 516 ); 517 518 match result { 519 Ok(info) => Ok(Some(info)), 520 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), 521 Err(e) => Err(DeadDropError::Database(e.to_string())), 522 } 523 } 524 525 /// Count messages by transport type. 526 pub fn count_by_transport(&self) -> Result<TransportStats> { 527 let ble: u32 = self.connection().query_row( 528 "SELECT COUNT(*) FROM outbound_messages WHERE delivery_transport = ?", 529 [DeliveryTransport::Ble as i32], 530 |row| row.get(0), 531 ).unwrap_or(0); 532 533 let dht: u32 = self.connection().query_row( 534 "SELECT COUNT(*) FROM outbound_messages WHERE delivery_transport = ?", 535 [DeliveryTransport::Dht as i32], 536 |row| row.get(0), 537 ).unwrap_or(0); 538 539 let relay: u32 = self.connection().query_row( 540 "SELECT COUNT(*) FROM outbound_messages WHERE delivery_transport = ?", 541 [DeliveryTransport::Relay as i32], 542 |row| row.get(0), 543 ).unwrap_or(0); 544 545 let iroh: u32 = self.connection().query_row( 546 "SELECT COUNT(*) FROM outbound_messages WHERE delivery_transport = ?", 547 [DeliveryTransport::IrohBlobs as i32], 548 |row| row.get(0), 549 ).unwrap_or(0); 550 551 Ok(TransportStats { ble, dht, relay, iroh }) 552 } 553 554 /// Helper to construct OutboundMessage from a row. 555 fn outbound_from_row(row: &rusqlite::Row) -> rusqlite::Result<OutboundMessage> { 556 let message_id_vec: Vec<u8> = row.get(0)?; 557 let message_id: MessageId = message_id_vec 558 .try_into() 559 .map_err(|_| rusqlite::Error::InvalidColumnType(0, "message_id".to_string(), rusqlite::types::Type::Blob))?; 560 561 let recipient_id_vec: Vec<u8> = row.get(1)?; 562 let recipient_id: ContactId = recipient_id_vec 563 .try_into() 564 .map_err(|_| rusqlite::Error::InvalidColumnType(1, "recipient_id".to_string(), rusqlite::types::Type::Blob))?; 565 566 let content_type_int: i32 = row.get(2)?; 567 let content_type = ContentType::from_byte(content_type_int as u8) 568 .ok_or_else(|| rusqlite::Error::InvalidColumnType(2, "content_type".to_string(), rusqlite::types::Type::Integer))?; 569 570 let plaintext: Vec<u8> = row.get(3)?; 571 let encrypted_bytes: Vec<u8> = row.get(4)?; 572 573 let encrypted = EncryptedMessage::from_bytes(&encrypted_bytes) 574 .map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Blob, Box::new(e)))?; 575 576 let state_int: i32 = row.get(5)?; 577 let state = OutboundState::from_byte(state_int as u8) 578 .ok_or_else(|| rusqlite::Error::InvalidColumnType(5, "state".to_string(), rusqlite::types::Type::Integer))?; 579 580 let created_at_ts: i64 = row.get(6)?; 581 let created_at = Utc.timestamp_opt(created_at_ts, 0).single().unwrap_or_else(Utc::now); 582 583 let expires_at_ts: i64 = row.get(7)?; 584 let expires_at = Utc.timestamp_opt(expires_at_ts, 0).single().unwrap_or_else(Utc::now); 585 586 let attempts: i32 = row.get(8)?; 587 let filename: Option<String> = row.get(9)?; 588 let document_id_vec: Option<Vec<u8>> = row.get(10)?; 589 let document_id: Option<MessageId> = document_id_vec.and_then(|v| v.try_into().ok()); 590 591 Ok(OutboundMessage { 592 message_id, 593 recipient_id, 594 content_type, 595 plaintext, 596 filename, 597 encrypted, 598 state, 599 created_at, 600 expires_at, 601 attempts: attempts as u32, 602 document_id, 603 group_id: None, // Populated by caller when needed 604 }) 605 } 606 607 // ========================================================================= 608 // INBOUND MESSAGE OPERATIONS 609 // ========================================================================= 610 611 /// Store a received inbound message. 612 /// 613 /// The message should already be decrypted before calling this method. 614 pub fn store_inbound(&self, message: &InboundMessage) -> Result<()> { 615 self.connection().execute( 616 "INSERT INTO inbound_messages ( 617 message_id, sender_id, content_type, plaintext, filename, 618 state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at, group_id 619 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 620 params![ 621 message.message_id.as_slice(), 622 message.sender_id.as_slice(), 623 message.content_type.to_byte() as i32, 624 message.plaintext.as_slice(), 625 message.filename, 626 message.state as i32, 627 message.received_at.timestamp(), 628 message.read_at.map(|t| t.timestamp()), 629 message.delete_at.timestamp(), 630 message.receive_transport.map(|t| t as i32), 631 message.disappearing_duration as i64, 632 message.sent_at.map(|t| t.timestamp()), 633 message.group_id.as_ref().map(|id| id.as_slice()), 634 ], 635 )?; 636 637 Ok(()) 638 } 639 640 /// Get an inbound message by ID. 641 pub fn get_inbound_message(&self, message_id: &MessageId) -> Result<Option<InboundMessage>> { 642 let result = self.connection().query_row( 643 "SELECT message_id, sender_id, content_type, plaintext, filename, 644 state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at 645 FROM inbound_messages WHERE message_id = ?", 646 [message_id.as_slice()], 647 |row| Self::inbound_from_row(row), 648 ); 649 650 match result { 651 Ok(msg) => Ok(Some(msg)), 652 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), 653 Err(e) => Err(DeadDropError::Database(e.to_string())), 654 } 655 } 656 657 /// Get all messages from a specific contact. 658 pub fn get_messages_from(&self, contact_id: &ContactId) -> Result<Vec<InboundMessage>> { 659 let mut stmt = self.connection().prepare( 660 "SELECT message_id, sender_id, content_type, plaintext, filename, 661 state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at 662 FROM inbound_messages 663 WHERE sender_id = ? AND state != ? AND group_id IS NULL 664 ORDER BY received_at DESC", 665 )?; 666 667 let messages = stmt 668 .query_map( 669 params![contact_id.as_slice(), InboundState::Deleted as i32], 670 |row| Self::inbound_from_row(row), 671 )? 672 .collect::<std::result::Result<Vec<_>, _>>()?; 673 674 Ok(messages) 675 } 676 677 /// Get all unread messages. 678 pub fn get_unread(&self) -> Result<Vec<InboundMessage>> { 679 let mut stmt = self.connection().prepare( 680 "SELECT message_id, sender_id, content_type, plaintext, filename, 681 state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at 682 FROM inbound_messages 683 WHERE state = ? 684 ORDER BY received_at DESC", 685 )?; 686 687 let messages = stmt 688 .query_map([InboundState::Received as i32], |row| { 689 Self::inbound_from_row(row) 690 })? 691 .collect::<std::result::Result<Vec<_>, _>>()?; 692 693 Ok(messages) 694 } 695 696 /// Mark a message as read. 697 /// 698 /// If the message has a disappearing_duration > 0, sets delete_at to 699 /// read_at + duration. Otherwise uses the default 7-day auto-delete. 700 pub fn mark_message_read(&self, message_id: &MessageId) -> Result<()> { 701 let now = Utc::now().timestamp(); 702 703 // Check if this message has a disappearing duration 704 let disappearing_duration: i64 = self.connection().query_row( 705 "SELECT COALESCE(disappearing_duration, 0) FROM inbound_messages WHERE message_id = ?", 706 [message_id.as_slice()], 707 |row| row.get(0), 708 ).unwrap_or(0); 709 710 let delete_at = if disappearing_duration > 0 { 711 now + disappearing_duration 712 } else { 713 (Utc::now() + Duration::days(DEFAULT_AUTO_DELETE_DAYS)).timestamp() 714 }; 715 716 let rows = self.connection().execute( 717 "UPDATE inbound_messages 718 SET state = ?, read_at = ?, delete_at = ? 719 WHERE message_id = ? AND state = ?", 720 params![ 721 InboundState::Read as i32, 722 now, 723 delete_at, 724 message_id.as_slice(), 725 InboundState::Received as i32, 726 ], 727 )?; 728 729 if rows == 0 { 730 // Check if message exists but is already read 731 let exists: u32 = self.connection().query_row( 732 "SELECT COUNT(*) FROM inbound_messages WHERE message_id = ?", 733 [message_id.as_slice()], 734 |row| row.get(0), 735 )?; 736 737 if exists == 0 { 738 return Err(DeadDropError::NotFound("Message not found".to_string())); 739 } 740 // Message exists but was already read - that's fine 741 } 742 743 Ok(()) 744 } 745 746 /// Delete an inbound message (mark as deleted). 747 pub fn delete_inbound_message(&self, message_id: &MessageId) -> Result<()> { 748 let rows = self.connection().execute( 749 "UPDATE inbound_messages SET state = ? WHERE message_id = ?", 750 params![InboundState::Deleted as i32, message_id.as_slice()], 751 )?; 752 753 if rows == 0 { 754 return Err(DeadDropError::NotFound("Message not found".to_string())); 755 } 756 757 Ok(()) 758 } 759 760 /// Permanently delete an inbound message from the database. 761 pub fn purge_inbound_message(&self, message_id: &MessageId) -> Result<()> { 762 let rows = self.connection().execute( 763 "DELETE FROM inbound_messages WHERE message_id = ?", 764 [message_id.as_slice()], 765 )?; 766 767 if rows == 0 { 768 return Err(DeadDropError::NotFound("Message not found".to_string())); 769 } 770 771 Ok(()) 772 } 773 774 /// Clear all messages for a contact (hard delete). 775 pub fn clear_conversation(&self, contact_id: &ContactId) -> Result<u32> { 776 let conn = self.connection(); 777 let outbound = conn.execute( 778 "DELETE FROM outbound_messages WHERE recipient_id = ?", 779 [contact_id.as_slice()], 780 )?; 781 let inbound = conn.execute( 782 "DELETE FROM inbound_messages WHERE sender_id = ?", 783 [contact_id.as_slice()], 784 )?; 785 let _ = conn.execute( 786 "DELETE FROM document_chunks WHERE contact_id = ?", 787 [contact_id.as_slice()], 788 ); 789 Ok((outbound + inbound) as u32) 790 } 791 792 /// Delete messages past their auto-delete time. 793 /// 794 /// Returns the number of messages deleted. 795 pub fn delete_expired_inbound(&self) -> Result<u32> { 796 let now = Utc::now().timestamp(); 797 798 let count = self.connection().execute( 799 "DELETE FROM inbound_messages WHERE delete_at < ?", 800 [now], 801 )?; 802 803 Ok(count as u32) 804 } 805 806 /// Count unread messages. 807 pub fn count_unread(&self) -> Result<u32> { 808 let count: u32 = self.connection().query_row( 809 "SELECT COUNT(*) FROM inbound_messages WHERE state = ? AND group_id IS NULL", 810 [InboundState::Received as i32], 811 |row| row.get(0), 812 )?; 813 Ok(count) 814 } 815 816 /// Count unread messages from a specific contact. 817 pub fn count_unread_from(&self, contact_id: &ContactId) -> Result<u32> { 818 let count: u32 = self.connection().query_row( 819 "SELECT COUNT(*) FROM inbound_messages WHERE sender_id = ? AND state = ? AND group_id IS NULL", 820 params![contact_id.as_slice(), InboundState::Received as i32], 821 |row| row.get(0), 822 )?; 823 Ok(count) 824 } 825 826 /// Helper to construct InboundMessage from a row. 827 /// Expects columns: message_id, sender_id, content_type, plaintext, filename, 828 /// state, received_at, read_at, delete_at, receive_transport, disappearing_duration, sent_at 829 fn inbound_from_row(row: &rusqlite::Row) -> rusqlite::Result<InboundMessage> { 830 let message_id_vec: Vec<u8> = row.get(0)?; 831 let message_id: MessageId = message_id_vec 832 .try_into() 833 .map_err(|_| rusqlite::Error::InvalidColumnType(0, "message_id".to_string(), rusqlite::types::Type::Blob))?; 834 835 let sender_id_vec: Vec<u8> = row.get(1)?; 836 let sender_id: ContactId = sender_id_vec 837 .try_into() 838 .map_err(|_| rusqlite::Error::InvalidColumnType(1, "sender_id".to_string(), rusqlite::types::Type::Blob))?; 839 840 let content_type_int: i32 = row.get(2)?; 841 let content_type = ContentType::from_byte(content_type_int as u8) 842 .ok_or_else(|| rusqlite::Error::InvalidColumnType(2, "content_type".to_string(), rusqlite::types::Type::Integer))?; 843 844 let plaintext: Vec<u8> = row.get(3)?; 845 let filename: Option<String> = row.get(4)?; 846 847 let state_int: i32 = row.get(5)?; 848 let state = InboundState::from_byte(state_int as u8) 849 .ok_or_else(|| rusqlite::Error::InvalidColumnType(5, "state".to_string(), rusqlite::types::Type::Integer))?; 850 851 let received_at_ts: i64 = row.get(6)?; 852 let received_at = Utc.timestamp_opt(received_at_ts, 0).single().unwrap_or_else(Utc::now); 853 854 let read_at_ts: Option<i64> = row.get(7)?; 855 let read_at = read_at_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single()); 856 857 let delete_at_ts: i64 = row.get(8)?; 858 let delete_at = Utc.timestamp_opt(delete_at_ts, 0).single().unwrap_or_else(Utc::now); 859 860 let receive_transport: Option<i32> = row.get(9)?; 861 let disappearing_duration: i64 = row.get::<_, Option<i64>>(10)?.unwrap_or(0); 862 let sent_at_ts: Option<i64> = row.get(11)?; 863 let sent_at = sent_at_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single()); 864 865 Ok(InboundMessage { 866 message_id, 867 sender_id, 868 content_type, 869 plaintext, 870 filename, 871 state, 872 received_at, 873 read_at, 874 delete_at, 875 receive_transport: receive_transport.map(|t| t as u8), 876 disappearing_duration: disappearing_duration as u64, 877 sent_at, 878 group_id: None, // Populated by caller when needed 879 }) 880 } 881 882 // ========================================================================= 883 // CONVERSATION VIEW 884 // ========================================================================= 885 886 /// Get a conversation with a contact (both inbound and outbound). 887 /// 888 /// Returns messages ordered by timestamp (newest first). 889 pub fn get_conversation(&self, contact_id: &ContactId) -> Result<Vec<ConversationMessage>> { 890 let mut messages = Vec::new(); 891 892 // Get outbound messages with delivery info (exclude group messages) 893 let mut stmt = self.connection().prepare( 894 "SELECT message_id, recipient_id, content_type, plaintext, 895 encrypted, state, created_at, expires_at, attempts, filename, document_id, 896 delivery_transport, delivered_at, last_error 897 FROM outbound_messages 898 WHERE recipient_id = ? AND group_id IS NULL 899 ORDER BY created_at DESC", 900 )?; 901 902 let outbound_messages = stmt 903 .query_map([contact_id.as_slice()], |row| { 904 let msg = Self::outbound_from_row(row)?; 905 906 // Get delivery info from additional columns (indices 11, 12, 13) 907 let transport_int: Option<i32> = row.get(11)?; 908 let delivered_at_ts: Option<i64> = row.get(12)?; 909 let last_error: Option<String> = row.get(13)?; 910 911 let transport = transport_int.and_then(|t| DeliveryTransport::from_byte(t as u8)); 912 let delivered_at = delivered_at_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single()); 913 914 Ok((msg, transport, delivered_at, last_error)) 915 })? 916 .collect::<std::result::Result<Vec<_>, _>>()?; 917 918 for (msg, transport, delivered_at, last_error) in outbound_messages { 919 // Derive disappearing_duration from the message's own expires_at. 920 // Messages sent with disappearing enabled have a short TTL (seconds/hours), 921 // while normal messages have DEFAULT_MESSAGE_EXPIRY_DAYS (30 days). 922 let ttl_secs = (msg.expires_at - msg.created_at).num_seconds().max(0) as u64; 923 let is_disappearing = ttl_secs < 86400 * 2; // Less than 2 days = disappearing 924 messages.push(ConversationMessage { 925 message_id: msg.message_id, 926 is_outbound: true, 927 content_type: msg.content_type, 928 plaintext: msg.plaintext, 929 filename: msg.filename, 930 timestamp: msg.created_at, 931 outbound_state: Some(msg.state), 932 inbound_state: None, 933 delivery_transport: transport, 934 delivered_at, 935 last_error, 936 disappearing_duration: if is_disappearing { ttl_secs } else { 0 }, 937 sender_id: None, 938 group_id: None, 939 }); 940 } 941 942 // Get inbound messages 943 let inbound = self.get_messages_from(contact_id)?; 944 for msg in inbound { 945 // Convert receive_transport to DeliveryTransport for display 946 let transport = msg.receive_transport.and_then(DeliveryTransport::from_byte); 947 948 messages.push(ConversationMessage { 949 message_id: msg.message_id, 950 is_outbound: false, 951 content_type: msg.content_type, 952 plaintext: msg.plaintext.clone(), 953 filename: msg.filename.clone(), 954 timestamp: msg.sent_at.unwrap_or(msg.received_at), 955 outbound_state: None, 956 inbound_state: Some(msg.state), 957 delivery_transport: transport, // Transport used to receive the message 958 delivered_at: None, 959 disappearing_duration: msg.disappearing_duration, 960 last_error: None, 961 sender_id: Some(msg.sender_id), 962 group_id: None, 963 }); 964 } 965 966 // Sort by timestamp (newest first) 967 messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); 968 969 Ok(messages) 970 } 971 } 972 973 /// Outbound message counts by state. 974 #[derive(Debug, Clone)] 975 pub struct OutboundCounts { 976 /// Messages waiting for delivery. 977 pub queued: u32, 978 /// Messages currently being transmitted. 979 pub transmitting: u32, 980 /// Messages delivered but not acknowledged. 981 pub delivered: u32, 982 /// Messages that failed to deliver. 983 pub failed: u32, 984 } 985 986 /// Transport type used for message delivery. 987 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 988 #[repr(u8)] 989 pub enum DeliveryTransport { 990 /// Bluetooth Low Energy. 991 Ble = 0, 992 /// DHT (Distributed Hash Table). 993 Dht = 1, 994 /// Relay server. 995 Relay = 2, 996 /// Tor P2P (direct .onion connection). 997 Tor = 3, 998 /// Iroh (QUIC-based P2P with NAT traversal). 999 IrohBlobs = 4, 1000 /// BLE Gossip (store-and-forward mesh relay). 1001 BleGossip = 5, 1002 } 1003 1004 impl DeliveryTransport { 1005 /// Convert from byte. 1006 pub fn from_byte(byte: u8) -> Option<Self> { 1007 match byte { 1008 0 => Some(DeliveryTransport::Ble), 1009 1 => Some(DeliveryTransport::Dht), 1010 2 => Some(DeliveryTransport::Relay), 1011 3 => Some(DeliveryTransport::Tor), 1012 4 => Some(DeliveryTransport::IrohBlobs), 1013 5 => Some(DeliveryTransport::BleGossip), 1014 _ => None, 1015 } 1016 } 1017 1018 /// Convert to string for display. 1019 pub fn as_str(&self) -> &'static str { 1020 match self { 1021 DeliveryTransport::Ble => "ble", 1022 DeliveryTransport::Dht => "dht", 1023 DeliveryTransport::Relay => "relay", 1024 DeliveryTransport::Tor => "tor", 1025 DeliveryTransport::IrohBlobs => "iroh", 1026 DeliveryTransport::BleGossip => "ble-gossip", 1027 } 1028 } 1029 } 1030 1031 /// Statistics for message delivery by transport. 1032 #[derive(Debug, Clone, Default)] 1033 pub struct TransportStats { 1034 /// Messages delivered via BLE. 1035 pub ble: u32, 1036 /// Messages delivered via DHT. 1037 pub dht: u32, 1038 /// Messages delivered via Relay. 1039 pub relay: u32, 1040 /// Messages delivered via Iroh. 1041 pub iroh: u32, 1042 } 1043 1044 /// A message in a conversation (either inbound or outbound). 1045 #[derive(Debug, Clone)] 1046 pub struct ConversationMessage { 1047 /// Unique message ID. 1048 pub message_id: MessageId, 1049 /// True if this is an outbound message. 1050 pub is_outbound: bool, 1051 /// Content type. 1052 pub content_type: ContentType, 1053 /// Message content. 1054 pub plaintext: Vec<u8>, 1055 /// Filename (for documents). 1056 pub filename: Option<String>, 1057 /// Message timestamp. 1058 pub timestamp: DateTime<Utc>, 1059 /// Outbound state (if outbound). 1060 pub outbound_state: Option<OutboundState>, 1061 /// Inbound state (if inbound). 1062 pub inbound_state: Option<InboundState>, 1063 /// Transport used to deliver (if delivered). 1064 pub delivery_transport: Option<DeliveryTransport>, 1065 /// Delivery timestamp (if delivered). 1066 pub delivered_at: Option<DateTime<Utc>>, 1067 /// Last error message (if failed). 1068 pub last_error: Option<String>, 1069 /// Disappearing message duration in seconds (0 = standard). 1070 pub disappearing_duration: u64, 1071 /// Sender contact ID (for group inbound messages). 1072 pub sender_id: Option<ContactId>, 1073 /// Group ID (if this is a group message). 1074 pub group_id: Option<[u8; 16]>, 1075 } 1076 1077 impl ConversationMessage { 1078 /// Get content as text (if text type). 1079 pub fn as_text(&self) -> Option<String> { 1080 if self.content_type == ContentType::Text { 1081 String::from_utf8(self.plaintext.clone()).ok() 1082 } else { 1083 None 1084 } 1085 } 1086 } 1087 1088 // ========================================================================= 1089 // MESSAGE REACTIONS 1090 // ========================================================================= 1091 1092 /// A reaction to a message. 1093 #[derive(Debug, Clone)] 1094 pub struct MessageReaction { 1095 /// Unique reaction ID. 1096 pub reaction_id: [u8; 16], 1097 /// ID of the message being reacted to. 1098 pub message_id: MessageId, 1099 /// Contact who added the reaction (all zeros for self). 1100 pub contact_id: ContactId, 1101 /// The emoji reaction. 1102 pub emoji: String, 1103 /// When the reaction was added. 1104 pub created_at: DateTime<Utc>, 1105 } 1106 1107 /// Summary of reactions for a message (grouped by emoji). 1108 #[derive(Debug, Clone)] 1109 pub struct ReactionSummary { 1110 /// The emoji. 1111 pub emoji: String, 1112 /// Number of this reaction. 1113 pub count: u32, 1114 /// Whether the current user has added this reaction. 1115 pub includes_self: bool, 1116 } 1117 1118 impl Database { 1119 /// Add a reaction to a message. 1120 /// 1121 /// If the same contact already has this emoji on this message, 1122 /// it will be replaced (upsert behavior via UNIQUE constraint). 1123 pub fn add_reaction( 1124 &self, 1125 message_id: &MessageId, 1126 contact_id: &ContactId, 1127 emoji: &str, 1128 ) -> Result<[u8; 16]> { 1129 let reaction_id = generate_message_id(); 1130 let now = Utc::now().timestamp(); 1131 1132 self.connection().execute( 1133 "INSERT OR REPLACE INTO message_reactions 1134 (reaction_id, message_id, contact_id, emoji, created_at) 1135 VALUES (?, ?, ?, ?, ?)", 1136 params![ 1137 reaction_id.as_slice(), 1138 message_id.as_slice(), 1139 contact_id.as_slice(), 1140 emoji, 1141 now, 1142 ], 1143 )?; 1144 1145 Ok(reaction_id) 1146 } 1147 1148 /// Remove a reaction from a message. 1149 pub fn remove_reaction( 1150 &self, 1151 message_id: &MessageId, 1152 contact_id: &ContactId, 1153 emoji: &str, 1154 ) -> Result<()> { 1155 self.connection().execute( 1156 "DELETE FROM message_reactions 1157 WHERE message_id = ? AND contact_id = ? AND emoji = ?", 1158 params![ 1159 message_id.as_slice(), 1160 contact_id.as_slice(), 1161 emoji, 1162 ], 1163 )?; 1164 Ok(()) 1165 } 1166 1167 /// Get all reactions for a message. 1168 pub fn get_reactions(&self, message_id: &MessageId) -> Result<Vec<MessageReaction>> { 1169 let mut stmt = self.connection().prepare( 1170 "SELECT reaction_id, message_id, contact_id, emoji, created_at 1171 FROM message_reactions WHERE message_id = ?", 1172 )?; 1173 1174 let reactions = stmt 1175 .query_map([message_id.as_slice()], |row| { 1176 let reaction_id: Vec<u8> = row.get(0)?; 1177 let msg_id: Vec<u8> = row.get(1)?; 1178 let contact: Vec<u8> = row.get(2)?; 1179 let emoji: String = row.get(3)?; 1180 let created_at: i64 = row.get(4)?; 1181 1182 Ok(MessageReaction { 1183 reaction_id: reaction_id.try_into().unwrap_or([0u8; 16]), 1184 message_id: msg_id.try_into().unwrap_or([0u8; 16]), 1185 contact_id: contact.try_into().unwrap_or([0u8; 16]), 1186 emoji, 1187 created_at: Utc.timestamp_opt(created_at, 0).single().unwrap_or_else(Utc::now), 1188 }) 1189 })? 1190 .collect::<std::result::Result<Vec<_>, _>>()?; 1191 1192 Ok(reactions) 1193 } 1194 1195 /// Get reaction summaries for a message (grouped by emoji). 1196 pub fn get_reaction_summaries(&self, message_id: &MessageId) -> Result<Vec<ReactionSummary>> { 1197 let self_contact_id: ContactId = [0u8; 16]; 1198 1199 let mut stmt = self.connection().prepare( 1200 "SELECT emoji, COUNT(*) as count, 1201 MAX(CASE WHEN contact_id = ? THEN 1 ELSE 0 END) as includes_self 1202 FROM message_reactions 1203 WHERE message_id = ? 1204 GROUP BY emoji 1205 ORDER BY MIN(created_at)", 1206 )?; 1207 1208 let summaries = stmt 1209 .query_map(params![self_contact_id.as_slice(), message_id.as_slice()], |row| { 1210 Ok(ReactionSummary { 1211 emoji: row.get(0)?, 1212 count: row.get(1)?, 1213 includes_self: row.get::<_, i32>(2)? == 1, 1214 }) 1215 })? 1216 .collect::<std::result::Result<Vec<_>, _>>()?; 1217 1218 Ok(summaries) 1219 } 1220 1221 // ========================================================================= 1222 // DOCUMENT CHUNKS 1223 // ========================================================================= 1224 1225 /// Store a document chunk. 1226 pub fn store_chunk(&self, chunk: &DocumentChunk) -> Result<()> { 1227 let now = Utc::now().timestamp(); 1228 1229 self.connection().execute( 1230 "INSERT OR REPLACE INTO document_chunks 1231 (document_id, chunk_index, total_chunks, filename, total_size, data, contact_id, received_at) 1232 VALUES (?, ?, ?, ?, ?, ?, ?, ?)", 1233 params![ 1234 chunk.document_id.as_slice(), 1235 chunk.chunk_index as i32, 1236 chunk.total_chunks as i32, 1237 &chunk.filename, 1238 chunk.total_size as i64, 1239 chunk.data.as_slice(), 1240 chunk.contact_id.as_slice(), 1241 now, 1242 ], 1243 )?; 1244 1245 Ok(()) 1246 } 1247 1248 /// Get all chunks for a document, ordered by chunk index. 1249 pub fn get_chunks(&self, document_id: &[u8; 16]) -> Result<Vec<DocumentChunk>> { 1250 let mut stmt = self.connection().prepare( 1251 "SELECT document_id, chunk_index, total_chunks, filename, total_size, data, contact_id 1252 FROM document_chunks 1253 WHERE document_id = ? 1254 ORDER BY chunk_index ASC", 1255 )?; 1256 1257 let chunks = stmt 1258 .query_map([document_id.as_slice()], |row| { 1259 let doc_id: Vec<u8> = row.get(0)?; 1260 let chunk_index: i32 = row.get(1)?; 1261 let total_chunks: i32 = row.get(2)?; 1262 let filename: Option<String> = row.get(3)?; 1263 let total_size: i64 = row.get(4)?; 1264 let data: Vec<u8> = row.get(5)?; 1265 let contact: Vec<u8> = row.get(6)?; 1266 1267 Ok(DocumentChunk { 1268 document_id: doc_id.try_into().unwrap_or([0u8; 16]), 1269 chunk_index: chunk_index as u16, 1270 total_chunks: total_chunks as u16, 1271 filename: filename.unwrap_or_default(), 1272 total_size: total_size as u64, 1273 data, 1274 contact_id: contact.try_into().unwrap_or([0u8; 16]), 1275 }) 1276 })? 1277 .collect::<std::result::Result<Vec<_>, _>>()?; 1278 1279 Ok(chunks) 1280 } 1281 1282 /// Check if all chunks have been received for a document. 1283 pub fn is_document_complete(&self, document_id: &[u8; 16]) -> Result<bool> { 1284 let result: Option<(i32, i32)> = self.connection().query_row( 1285 "SELECT total_chunks, COUNT(*) as received 1286 FROM document_chunks 1287 WHERE document_id = ? 1288 GROUP BY document_id", 1289 [document_id.as_slice()], 1290 |row| Ok((row.get(0)?, row.get(1)?)), 1291 ).ok(); 1292 1293 match result { 1294 Some((total, received)) => Ok(total == received), 1295 None => Ok(false), 1296 } 1297 } 1298 1299 /// Get document info (filename and total size) from the first chunk. 1300 pub fn get_document_info(&self, document_id: &[u8; 16]) -> Result<Option<(String, u64)>> { 1301 let result = self.connection().query_row( 1302 "SELECT filename, total_size FROM document_chunks WHERE document_id = ? AND chunk_index = 0", 1303 [document_id.as_slice()], 1304 |row| { 1305 let filename: Option<String> = row.get(0)?; 1306 let total_size: i64 = row.get(1)?; 1307 Ok((filename.unwrap_or_default(), total_size as u64)) 1308 }, 1309 ); 1310 1311 match result { 1312 Ok(info) => Ok(Some(info)), 1313 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), 1314 Err(e) => Err(DeadDropError::Database(e.to_string())), 1315 } 1316 } 1317 1318 /// Delete all chunks for a document (cleanup after reassembly). 1319 pub fn delete_chunks(&self, document_id: &[u8; 16]) -> Result<u32> { 1320 let count = self.connection().execute( 1321 "DELETE FROM document_chunks WHERE document_id = ?", 1322 [document_id.as_slice()], 1323 )?; 1324 1325 Ok(count as u32) 1326 } 1327 1328 /// Delete old chunks that haven't been completed (cleanup). 1329 /// 1330 /// Returns the number of chunks deleted. 1331 pub fn delete_stale_chunks(&self, max_age_secs: i64) -> Result<u32> { 1332 let cutoff = Utc::now().timestamp() - max_age_secs; 1333 1334 let count = self.connection().execute( 1335 "DELETE FROM document_chunks WHERE received_at < ?", 1336 [cutoff], 1337 )?; 1338 1339 Ok(count as u32) 1340 } 1341 1342 // ========================================================================= 1343 // DOCUMENT TRANSFER PROGRESS 1344 // ========================================================================= 1345 1346 /// Get send progress for a chunked document. 1347 /// Returns (delivered_count, total_chunks) or None if not a chunked document. 1348 pub fn get_send_progress(&self, document_id: &[u8; 16]) -> Result<Option<(u32, u32)>> { 1349 let result = self.connection().query_row( 1350 "SELECT COUNT(*), 1351 SUM(CASE WHEN state >= 2 THEN 1 ELSE 0 END) 1352 FROM outbound_messages 1353 WHERE document_id = ? AND content_type = ?", 1354 params![document_id.as_slice(), ContentType::DocumentChunk.to_byte() as i32], 1355 |row| { 1356 let total: i32 = row.get(0)?; 1357 let delivered: i32 = row.get::<_, Option<i32>>(1)?.unwrap_or(0); 1358 Ok((total, delivered)) 1359 }, 1360 ); 1361 match result { 1362 Ok((total, delivered)) if total > 0 => Ok(Some((delivered as u32, total as u32))), 1363 Ok(_) => Ok(None), 1364 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), 1365 Err(e) => Err(DeadDropError::Database(e.to_string())), 1366 } 1367 } 1368 1369 /// Acknowledge all outbound chunks belonging to a document. 1370 /// Called when a delivery receipt is received for the parent document message, 1371 /// so that `get_send_progress` reflects completion. 1372 pub fn acknowledge_document_chunks(&self, document_id: &[u8; 16]) -> Result<usize> { 1373 let count = self.connection().execute( 1374 "UPDATE outbound_messages SET state = ? 1375 WHERE document_id = ? AND content_type = ? AND (state < ? OR state = ?)", 1376 params![ 1377 OutboundState::Acknowledged as i32, 1378 document_id.as_slice(), 1379 ContentType::DocumentChunk.to_byte() as i32, 1380 OutboundState::Acknowledged as i32, 1381 OutboundState::PendingRelay as i32, 1382 ], 1383 )?; 1384 Ok(count) 1385 } 1386 1387 /// Get receive progress for a chunked document. 1388 /// Returns (received_count, total_chunks) or None if no chunks found. 1389 pub fn get_receive_progress(&self, document_id: &[u8; 16]) -> Result<Option<(u32, u32)>> { 1390 let result = self.connection().query_row( 1391 "SELECT COUNT(*), MAX(total_chunks) 1392 FROM document_chunks 1393 WHERE document_id = ?", 1394 [document_id.as_slice()], 1395 |row| { 1396 let received: i32 = row.get(0)?; 1397 let total: Option<i32> = row.get(1)?; 1398 Ok((received, total.unwrap_or(0))) 1399 }, 1400 ); 1401 match result { 1402 Ok((received, total)) if received > 0 => Ok(Some((received as u32, total as u32))), 1403 Ok(_) => Ok(None), 1404 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), 1405 Err(e) => Err(DeadDropError::Database(e.to_string())), 1406 } 1407 } 1408 1409 // ========================================================================= 1410 // MESSAGE SEARCH 1411 // ========================================================================= 1412 1413 /// Search messages within a single conversation. 1414 /// Searches text content and document filenames, case-insensitive. 1415 pub fn search_conversation( 1416 &self, 1417 contact_id: &ContactId, 1418 query: &str, 1419 limit: u32, 1420 ) -> Result<Vec<SearchResult>> { 1421 let like_pattern = format!("%{}%", query); 1422 let mut results = Vec::new(); 1423 1424 // Search outbound text messages and document filenames 1425 { 1426 let mut stmt = self.connection().prepare( 1427 "SELECT message_id, recipient_id, content_type, plaintext, filename, created_at 1428 FROM outbound_messages 1429 WHERE recipient_id = ? 1430 AND content_type IN (0, 1) 1431 AND ( 1432 (content_type = 0 AND CAST(plaintext AS TEXT) LIKE ?1 COLLATE NOCASE) 1433 OR (content_type = 1 AND filename LIKE ?1 COLLATE NOCASE) 1434 ) 1435 ORDER BY created_at DESC 1436 LIMIT ?", 1437 )?; 1438 1439 let rows = stmt.query_map( 1440 params![contact_id.as_slice(), &like_pattern, limit as i64], 1441 |row| Self::search_result_from_row(row, true), 1442 )?.collect::<std::result::Result<Vec<_>, _>>()?; 1443 results.extend(rows); 1444 } 1445 1446 // Search inbound text messages and document filenames 1447 { 1448 let mut stmt = self.connection().prepare( 1449 "SELECT message_id, sender_id, content_type, plaintext, filename, received_at 1450 FROM inbound_messages 1451 WHERE sender_id = ? 1452 AND content_type IN (0, 1) 1453 AND ( 1454 (content_type = 0 AND CAST(plaintext AS TEXT) LIKE ?1 COLLATE NOCASE) 1455 OR (content_type = 1 AND filename LIKE ?1 COLLATE NOCASE) 1456 ) 1457 ORDER BY received_at DESC 1458 LIMIT ?", 1459 )?; 1460 1461 let rows = stmt.query_map( 1462 params![contact_id.as_slice(), &like_pattern, limit as i64], 1463 |row| Self::search_result_from_row(row, false), 1464 )?.collect::<std::result::Result<Vec<_>, _>>()?; 1465 results.extend(rows); 1466 } 1467 1468 // Sort combined by timestamp, newest first 1469 results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); 1470 results.truncate(limit as usize); 1471 Ok(results) 1472 } 1473 1474 /// Search across all conversations. 1475 pub fn search_all_messages( 1476 &self, 1477 query: &str, 1478 limit: u32, 1479 ) -> Result<Vec<SearchResult>> { 1480 let like_pattern = format!("%{}%", query); 1481 let mut results = Vec::new(); 1482 1483 // Search outbound 1484 { 1485 let mut stmt = self.connection().prepare( 1486 "SELECT message_id, recipient_id, content_type, plaintext, filename, created_at 1487 FROM outbound_messages 1488 WHERE content_type IN (0, 1) 1489 AND ( 1490 (content_type = 0 AND CAST(plaintext AS TEXT) LIKE ?1 COLLATE NOCASE) 1491 OR (content_type = 1 AND filename LIKE ?1 COLLATE NOCASE) 1492 ) 1493 ORDER BY created_at DESC 1494 LIMIT ?", 1495 )?; 1496 1497 let rows = stmt.query_map( 1498 params![&like_pattern, limit as i64], 1499 |row| Self::search_result_from_row(row, true), 1500 )?.collect::<std::result::Result<Vec<_>, _>>()?; 1501 results.extend(rows); 1502 } 1503 1504 // Search inbound 1505 { 1506 let mut stmt = self.connection().prepare( 1507 "SELECT message_id, sender_id, content_type, plaintext, filename, received_at 1508 FROM inbound_messages 1509 WHERE content_type IN (0, 1) 1510 AND ( 1511 (content_type = 0 AND CAST(plaintext AS TEXT) LIKE ?1 COLLATE NOCASE) 1512 OR (content_type = 1 AND filename LIKE ?1 COLLATE NOCASE) 1513 ) 1514 ORDER BY received_at DESC 1515 LIMIT ?", 1516 )?; 1517 1518 let rows = stmt.query_map( 1519 params![&like_pattern, limit as i64], 1520 |row| Self::search_result_from_row(row, false), 1521 )?.collect::<std::result::Result<Vec<_>, _>>()?; 1522 results.extend(rows); 1523 } 1524 1525 results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); 1526 results.truncate(limit as usize); 1527 Ok(results) 1528 } 1529 1530 /// Helper to construct SearchResult from a row. 1531 fn search_result_from_row(row: &rusqlite::Row, is_outbound: bool) -> rusqlite::Result<SearchResult> { 1532 let message_id_vec: Vec<u8> = row.get(0)?; 1533 let message_id: MessageId = message_id_vec 1534 .try_into() 1535 .map_err(|_| rusqlite::Error::InvalidColumnType(0, "message_id".to_string(), rusqlite::types::Type::Blob))?; 1536 let contact_id_vec: Vec<u8> = row.get(1)?; 1537 let contact_id: ContactId = contact_id_vec 1538 .try_into() 1539 .map_err(|_| rusqlite::Error::InvalidColumnType(1, "contact_id".to_string(), rusqlite::types::Type::Blob))?; 1540 let content_type_int: i32 = row.get(2)?; 1541 let content_type = ContentType::from_byte(content_type_int as u8) 1542 .ok_or_else(|| rusqlite::Error::InvalidColumnType(2, "content_type".to_string(), rusqlite::types::Type::Integer))?; 1543 let plaintext: Vec<u8> = row.get(3)?; 1544 let filename: Option<String> = row.get(4)?; 1545 let timestamp_ts: i64 = row.get(5)?; 1546 let timestamp = Utc.timestamp_opt(timestamp_ts, 0).single().unwrap_or_else(Utc::now); 1547 1548 Ok(SearchResult { 1549 message_id, 1550 contact_id, 1551 is_outbound, 1552 content_type, 1553 plaintext, 1554 filename, 1555 timestamp, 1556 }) 1557 } 1558 } 1559 1560 /// A stored document chunk. 1561 #[derive(Debug, Clone)] 1562 pub struct DocumentChunk { 1563 /// Unique document ID (groups chunks together). 1564 pub document_id: [u8; 16], 1565 /// Index of this chunk (0-based). 1566 pub chunk_index: u16, 1567 /// Total number of chunks. 1568 pub total_chunks: u16, 1569 /// Filename (from chunk 0). 1570 pub filename: String, 1571 /// Total file size in bytes. 1572 pub total_size: u64, 1573 /// Chunk data. 1574 pub data: Vec<u8>, 1575 /// Contact who sent this chunk. 1576 pub contact_id: ContactId, 1577 } 1578 1579 /// A message search result. 1580 #[derive(Debug, Clone)] 1581 pub struct SearchResult { 1582 pub message_id: MessageId, 1583 pub contact_id: ContactId, 1584 pub is_outbound: bool, 1585 pub content_type: ContentType, 1586 pub plaintext: Vec<u8>, 1587 pub filename: Option<String>, 1588 pub timestamp: DateTime<Utc>, 1589 } 1590 1591 #[cfg(test)] 1592 mod tests { 1593 use super::*; 1594 use crate::crypto::keys::{ExchangeKeyPair, IdentityKeyPair}; 1595 use crate::protocol::messages::{encrypt_message, generate_message_id, PlaintextMessage, DEFAULT_MESSAGE_EXPIRY_DAYS}; 1596 use crate::storage::contacts::Contact; 1597 1598 fn test_db() -> Database { 1599 Database::open_in_memory(b"test_passphrase").unwrap() 1600 } 1601 1602 fn create_test_outbound(recipient_id: ContactId) -> OutboundMessage { 1603 let sender = IdentityKeyPair::generate(); 1604 let recipient = ExchangeKeyPair::generate(); 1605 1606 let plaintext = PlaintextMessage::text("Test message"); 1607 let encrypted = encrypt_message(&plaintext, &sender, &recipient.public_bytes()).unwrap(); 1608 1609 let now = Utc::now(); 1610 1611 OutboundMessage { 1612 message_id: encrypted.message_id, 1613 recipient_id, 1614 content_type: ContentType::Text, 1615 plaintext: plaintext.content, 1616 filename: None, 1617 encrypted, 1618 state: OutboundState::Queued, 1619 created_at: now, 1620 expires_at: now + Duration::days(DEFAULT_MESSAGE_EXPIRY_DAYS), 1621 attempts: 0, 1622 document_id: None, 1623 group_id: None, 1624 } 1625 } 1626 1627 fn create_test_inbound(sender_id: ContactId) -> InboundMessage { 1628 let now = Utc::now(); 1629 1630 InboundMessage { 1631 message_id: generate_message_id(), 1632 sender_id, 1633 content_type: ContentType::Text, 1634 plaintext: b"Received message".to_vec(), 1635 filename: None, 1636 state: InboundState::Received, 1637 received_at: now, 1638 read_at: None, 1639 delete_at: now + Duration::days(DEFAULT_AUTO_DELETE_DAYS), 1640 receive_transport: None, 1641 disappearing_duration: 0, 1642 sent_at: None, 1643 group_id: None, 1644 } 1645 } 1646 1647 fn setup_contact(db: &Database) -> ContactId { 1648 let identity = IdentityKeyPair::generate(); 1649 let exchange = ExchangeKeyPair::generate(); 1650 let contact = Contact::new(identity.public_bytes(), exchange.public_bytes()); 1651 let contact_id = contact.contact_id; 1652 db.add_contact(&contact).unwrap(); 1653 contact_id 1654 } 1655 1656 // ==================== Outbound Tests ==================== 1657 1658 #[test] 1659 fn test_queue_message() { 1660 let db = test_db(); 1661 let contact_id = setup_contact(&db); 1662 1663 let message = create_test_outbound(contact_id); 1664 db.queue_message(&message).unwrap(); 1665 1666 let retrieved = db.get_outbound_message(&message.message_id).unwrap().unwrap(); 1667 assert_eq!(retrieved.message_id, message.message_id); 1668 assert_eq!(retrieved.recipient_id, contact_id); 1669 assert_eq!(retrieved.state, OutboundState::Queued); 1670 } 1671 1672 #[test] 1673 fn test_get_queued_for_contact() { 1674 let db = test_db(); 1675 let contact_id = setup_contact(&db); 1676 1677 // Queue multiple messages 1678 for _ in 0..3 { 1679 let message = create_test_outbound(contact_id); 1680 db.queue_message(&message).unwrap(); 1681 } 1682 1683 let queued = db.get_queued_for_contact(&contact_id).unwrap(); 1684 assert_eq!(queued.len(), 3); 1685 } 1686 1687 #[test] 1688 fn test_get_all_queued() { 1689 let db = test_db(); 1690 1691 // Create messages for different contacts 1692 let contact1 = setup_contact(&db); 1693 let contact2 = setup_contact(&db); 1694 1695 db.queue_message(&create_test_outbound(contact1)).unwrap(); 1696 db.queue_message(&create_test_outbound(contact1)).unwrap(); 1697 db.queue_message(&create_test_outbound(contact2)).unwrap(); 1698 1699 let all_queued = db.get_all_queued().unwrap(); 1700 assert_eq!(all_queued.len(), 3); 1701 } 1702 1703 #[test] 1704 fn test_update_outbound_state() { 1705 let db = test_db(); 1706 let contact_id = setup_contact(&db); 1707 1708 let message = create_test_outbound(contact_id); 1709 let message_id = message.message_id; 1710 db.queue_message(&message).unwrap(); 1711 1712 // Update to Transmitting 1713 db.update_outbound_state(&message_id, OutboundState::Transmitting).unwrap(); 1714 let retrieved = db.get_outbound_message(&message_id).unwrap().unwrap(); 1715 assert_eq!(retrieved.state, OutboundState::Transmitting); 1716 1717 // Update to Delivered 1718 db.update_outbound_state(&message_id, OutboundState::Delivered).unwrap(); 1719 let retrieved = db.get_outbound_message(&message_id).unwrap().unwrap(); 1720 assert_eq!(retrieved.state, OutboundState::Delivered); 1721 } 1722 1723 #[test] 1724 fn test_increment_outbound_attempts() { 1725 let db = test_db(); 1726 let contact_id = setup_contact(&db); 1727 1728 let message = create_test_outbound(contact_id); 1729 let message_id = message.message_id; 1730 db.queue_message(&message).unwrap(); 1731 1732 assert_eq!(db.increment_outbound_attempts(&message_id).unwrap(), 1); 1733 assert_eq!(db.increment_outbound_attempts(&message_id).unwrap(), 2); 1734 assert_eq!(db.increment_outbound_attempts(&message_id).unwrap(), 3); 1735 } 1736 1737 #[test] 1738 fn test_delete_expired_outbound() { 1739 let db = test_db(); 1740 let contact_id = setup_contact(&db); 1741 1742 // Create an expired message 1743 let sender = IdentityKeyPair::generate(); 1744 let recipient = ExchangeKeyPair::generate(); 1745 let plaintext = PlaintextMessage::text("Expired"); 1746 let encrypted = encrypt_message(&plaintext, &sender, &recipient.public_bytes()).unwrap(); 1747 1748 let now = Utc::now(); 1749 let expired_message = OutboundMessage { 1750 message_id: encrypted.message_id, 1751 recipient_id: contact_id, 1752 content_type: ContentType::Text, 1753 plaintext: plaintext.content, 1754 filename: None, 1755 encrypted, 1756 state: OutboundState::Queued, 1757 created_at: now - Duration::days(40), 1758 expires_at: now - Duration::days(10), // Expired 10 days ago 1759 attempts: 0, 1760 document_id: None, 1761 group_id: None, 1762 }; 1763 1764 db.queue_message(&expired_message).unwrap(); 1765 1766 // Create a non-expired message 1767 let non_expired = create_test_outbound(contact_id); 1768 db.queue_message(&non_expired).unwrap(); 1769 1770 // Delete expired 1771 let deleted = db.delete_expired_outbound().unwrap(); 1772 assert_eq!(deleted, 1); 1773 1774 // Non-expired should still exist 1775 let remaining = db.get_all_queued().unwrap(); 1776 assert_eq!(remaining.len(), 1); 1777 } 1778 1779 #[test] 1780 fn test_count_outbound() { 1781 let db = test_db(); 1782 let contact_id = setup_contact(&db); 1783 1784 // Queue messages 1785 let msg1 = create_test_outbound(contact_id); 1786 let msg2 = create_test_outbound(contact_id); 1787 let msg3 = create_test_outbound(contact_id); 1788 1789 db.queue_message(&msg1).unwrap(); 1790 db.queue_message(&msg2).unwrap(); 1791 db.queue_message(&msg3).unwrap(); 1792 1793 // Update some states 1794 db.update_outbound_state(&msg2.message_id, OutboundState::Transmitting).unwrap(); 1795 db.update_outbound_state(&msg3.message_id, OutboundState::Delivered).unwrap(); 1796 1797 let counts = db.count_outbound().unwrap(); 1798 assert_eq!(counts.queued, 1); 1799 assert_eq!(counts.transmitting, 1); 1800 assert_eq!(counts.delivered, 1); 1801 } 1802 1803 // ==================== Inbound Tests ==================== 1804 1805 #[test] 1806 fn test_store_inbound() { 1807 let db = test_db(); 1808 let contact_id = setup_contact(&db); 1809 1810 let message = create_test_inbound(contact_id); 1811 db.store_inbound(&message).unwrap(); 1812 1813 let retrieved = db.get_inbound_message(&message.message_id).unwrap().unwrap(); 1814 assert_eq!(retrieved.message_id, message.message_id); 1815 assert_eq!(retrieved.sender_id, contact_id); 1816 assert_eq!(retrieved.state, InboundState::Received); 1817 } 1818 1819 #[test] 1820 fn test_get_messages_from() { 1821 let db = test_db(); 1822 let contact_id = setup_contact(&db); 1823 1824 // Store multiple messages 1825 for _ in 0..3 { 1826 let message = create_test_inbound(contact_id); 1827 db.store_inbound(&message).unwrap(); 1828 } 1829 1830 let messages = db.get_messages_from(&contact_id).unwrap(); 1831 assert_eq!(messages.len(), 3); 1832 } 1833 1834 #[test] 1835 fn test_get_unread() { 1836 let db = test_db(); 1837 let contact_id = setup_contact(&db); 1838 1839 // Store messages 1840 let msg1 = create_test_inbound(contact_id); 1841 let msg2 = create_test_inbound(contact_id); 1842 db.store_inbound(&msg1).unwrap(); 1843 db.store_inbound(&msg2).unwrap(); 1844 1845 // Mark one as read 1846 db.mark_message_read(&msg1.message_id).unwrap(); 1847 1848 let unread = db.get_unread().unwrap(); 1849 assert_eq!(unread.len(), 1); 1850 assert_eq!(unread[0].message_id, msg2.message_id); 1851 } 1852 1853 #[test] 1854 fn test_mark_message_read() { 1855 let db = test_db(); 1856 let contact_id = setup_contact(&db); 1857 1858 let message = create_test_inbound(contact_id); 1859 db.store_inbound(&message).unwrap(); 1860 1861 db.mark_message_read(&message.message_id).unwrap(); 1862 1863 let retrieved = db.get_inbound_message(&message.message_id).unwrap().unwrap(); 1864 assert_eq!(retrieved.state, InboundState::Read); 1865 assert!(retrieved.read_at.is_some()); 1866 } 1867 1868 #[test] 1869 fn test_delete_inbound_message() { 1870 let db = test_db(); 1871 let contact_id = setup_contact(&db); 1872 1873 let message = create_test_inbound(contact_id); 1874 db.store_inbound(&message).unwrap(); 1875 1876 db.delete_inbound_message(&message.message_id).unwrap(); 1877 1878 let retrieved = db.get_inbound_message(&message.message_id).unwrap().unwrap(); 1879 assert_eq!(retrieved.state, InboundState::Deleted); 1880 1881 // Should not appear in get_messages_from 1882 let messages = db.get_messages_from(&contact_id).unwrap(); 1883 assert!(messages.is_empty()); 1884 } 1885 1886 #[test] 1887 fn test_count_unread() { 1888 let db = test_db(); 1889 let contact_id = setup_contact(&db); 1890 1891 // Store messages 1892 for _ in 0..3 { 1893 let message = create_test_inbound(contact_id); 1894 db.store_inbound(&message).unwrap(); 1895 } 1896 1897 assert_eq!(db.count_unread().unwrap(), 3); 1898 assert_eq!(db.count_unread_from(&contact_id).unwrap(), 3); 1899 } 1900 1901 // ==================== Conversation Tests ==================== 1902 1903 #[test] 1904 fn test_get_conversation() { 1905 let db = test_db(); 1906 let contact_id = setup_contact(&db); 1907 1908 // Add outbound messages 1909 let outbound = create_test_outbound(contact_id); 1910 db.queue_message(&outbound).unwrap(); 1911 1912 // Add inbound messages 1913 let inbound = create_test_inbound(contact_id); 1914 db.store_inbound(&inbound).unwrap(); 1915 1916 let conversation = db.get_conversation(&contact_id).unwrap(); 1917 assert_eq!(conversation.len(), 2); 1918 1919 // Check that we have both types 1920 let outbound_count = conversation.iter().filter(|m| m.is_outbound).count(); 1921 let inbound_count = conversation.iter().filter(|m| !m.is_outbound).count(); 1922 assert_eq!(outbound_count, 1); 1923 assert_eq!(inbound_count, 1); 1924 } 1925 1926 #[test] 1927 fn test_conversation_message_as_text() { 1928 let db = test_db(); 1929 let contact_id = setup_contact(&db); 1930 1931 let message = create_test_inbound(contact_id); 1932 db.store_inbound(&message).unwrap(); 1933 1934 let conversation = db.get_conversation(&contact_id).unwrap(); 1935 assert!(!conversation.is_empty()); 1936 1937 let msg = &conversation[0]; 1938 assert_eq!(msg.as_text(), Some("Received message".to_string())); 1939 } 1940 }