cursor_producer.rs
1 use std::{ 2 pin::Pin, 3 sync::Arc, 4 task::{self, Poll, Waker}, 5 }; 6 7 use apibara_core::{node::v1alpha2::DataFinality, starknet::v1alpha2}; 8 use apibara_node::{ 9 async_trait, 10 stream::{ 11 BatchCursor, CursorProducer, IngestionMessage, IngestionResponse, ReconfigureResponse, 12 StreamConfiguration, StreamError, 13 }, 14 }; 15 use futures::{stream::FusedStream, Stream}; 16 use tracing::{debug, instrument, trace}; 17 18 use crate::{core::GlobalBlockId, db::StorageReader}; 19 20 /// A [CursorProducer] that produces sequential cursors. 21 pub struct SequentialCursorProducer<R: StorageReader + Send + Sync + 'static> { 22 configuration: Option<BatchConfiguration>, 23 ingestion_state: Option<IngestionState>, 24 storage: Arc<R>, 25 waker: Option<Waker>, 26 } 27 28 struct BatchConfiguration { 29 current: Option<GlobalBlockId>, 30 pending_sent: bool, 31 data_finality: DataFinality, 32 batch_size: usize, 33 } 34 35 #[derive(Default, Debug)] 36 struct IngestionState { 37 finalized: Option<GlobalBlockId>, 38 accepted: Option<GlobalBlockId>, 39 pending: Option<GlobalBlockId>, 40 } 41 42 impl<R> SequentialCursorProducer<R> 43 where 44 R: StorageReader + Send + Sync + 'static, 45 { 46 pub fn new(storage: Arc<R>) -> Self { 47 SequentialCursorProducer { 48 configuration: None, 49 storage, 50 ingestion_state: None, 51 waker: None, 52 } 53 } 54 55 #[instrument(skip_all, level = "debug")] 56 pub fn next_cursor(&mut self) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> { 57 if self.configuration.is_some() { 58 self.next_cursor_with_configuration() 59 } else { 60 Ok(None) 61 } 62 } 63 64 #[instrument(skip_all, level = "debug")] 65 fn next_cursor_with_configuration( 66 &mut self, 67 ) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> { 68 // We call this from inside a `is_some` check. 69 let state = self.get_ingestion_state()?; 70 // keep borrow checker happy 71 let pending_cursor = state.pending; 72 let accepted_cursor = state.accepted; 73 let finalized_cursor = state.finalized; 74 75 let configuration = self.configuration.as_mut().expect("configuration"); 76 let starting_cursor = configuration.current; 77 78 let next_block_number = configuration.current.map(|c| c.number() + 1).unwrap_or(0); 79 80 trace!( 81 next_block_number = %next_block_number, 82 finalized = ?finalized_cursor, 83 accepted = ?accepted_cursor, 84 pending = ?pending_cursor, 85 "deciding which cursor to generate" 86 ); 87 88 if let Some(finalized) = finalized_cursor { 89 if next_block_number <= finalized.number() { 90 return self.next_cursor_finalized(starting_cursor, next_block_number, &finalized); 91 } 92 } 93 94 if let Some(accepted) = accepted_cursor { 95 if next_block_number <= accepted.number() { 96 return self.next_cursor_accepted(starting_cursor, next_block_number); 97 } 98 } 99 100 if let Some(pending) = pending_cursor { 101 if next_block_number <= pending.number() { 102 return self.next_cursor_pending(starting_cursor, next_block_number); 103 } 104 } 105 106 Ok(None) 107 } 108 109 #[instrument(skip_all, level = "trace")] 110 fn next_cursor_finalized( 111 &mut self, 112 starting_cursor: Option<GlobalBlockId>, 113 next_block_number: u64, 114 finalized: &GlobalBlockId, 115 ) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> { 116 // always send finalized data. 117 let configuration = self.configuration.as_mut().expect("configuration"); 118 let mut cursors = Vec::with_capacity(configuration.batch_size); 119 let final_block_number = u64::min( 120 finalized.number(), 121 next_block_number + (configuration.batch_size as u64) - 1, 122 ); 123 for block_number in next_block_number..=final_block_number { 124 match self.storage.canonical_block_id(block_number)? { 125 Some(cursor) => { 126 cursors.push(cursor); 127 } 128 None => break, 129 } 130 } 131 132 if cursors.is_empty() { 133 return Ok(None); 134 } 135 136 let batch_cursor = BatchCursor::new_finalized(starting_cursor, cursors); 137 configuration.current = Some(*batch_cursor.end_cursor()); 138 Ok(Some(batch_cursor)) 139 } 140 141 #[instrument(skip_all, level = "trace")] 142 fn next_cursor_accepted( 143 &mut self, 144 starting_cursor: Option<GlobalBlockId>, 145 next_block_number: u64, 146 ) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> { 147 let configuration = self.configuration.as_mut().expect("configuration"); 148 if configuration.data_finality == DataFinality::DataStatusFinalized 149 || configuration.data_finality == DataFinality::DataStatusUnknown 150 { 151 return Ok(None); 152 } 153 154 match self.storage.canonical_block_id(next_block_number)? { 155 Some(cursor) => { 156 let batch_cursor = BatchCursor::new_accepted(starting_cursor, cursor); 157 configuration.current = Some(*batch_cursor.end_cursor()); 158 Ok(Some(batch_cursor)) 159 } 160 None => Ok(None), 161 } 162 } 163 164 #[instrument(skip_all, level = "trace")] 165 fn next_cursor_pending( 166 &mut self, 167 starting_cursor: Option<GlobalBlockId>, 168 next_block_number: u64, 169 ) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> { 170 let configuration = self.configuration.as_mut().expect("configuration"); 171 if configuration.data_finality != DataFinality::DataStatusPending 172 || configuration.pending_sent 173 { 174 return Ok(None); 175 } 176 177 // Cursor won't be part of the canonical chain since it's not accepted yet. 178 // Optimistically send the pending block cursor since if we reached this point it's 179 // already been produced. 180 let cursor = GlobalBlockId::from_u64(next_block_number); 181 let batch_cursor = BatchCursor::new_pending(starting_cursor, cursor); 182 trace!(cursor = ?batch_cursor, "sending pending data"); 183 configuration.pending_sent = true; 184 Ok(Some(batch_cursor)) 185 } 186 187 fn get_ingestion_state(&mut self) -> Result<&IngestionState, R::Error> { 188 let state = self.get_ingestion_state_mut()?; 189 Ok(state) 190 } 191 192 fn get_ingestion_state_mut(&mut self) -> Result<&mut IngestionState, R::Error> { 193 // Read new state only if we don't have one yet. 194 // Initialize with default value otherwise to make the borrow checker happy. 195 let new_state = if self.ingestion_state.is_some() { 196 IngestionState::default() 197 } else { 198 let accepted = self.storage.highest_accepted_block()?; 199 let finalized = self.storage.highest_finalized_block()?; 200 IngestionState { 201 accepted, 202 finalized, 203 pending: None, 204 } 205 }; 206 207 Ok(self.ingestion_state.get_or_insert(new_state)) 208 } 209 210 /// wake up the stream if it was waiting for a new block 211 fn wake(&mut self) { 212 if let Some(waker) = self.waker.take() { 213 waker.wake(); 214 } 215 } 216 } 217 218 fn lowest_cursor(a: GlobalBlockId, b: GlobalBlockId) -> GlobalBlockId { 219 if a.number() < b.number() { 220 a 221 } else { 222 b 223 } 224 } 225 226 #[async_trait] 227 impl<R> CursorProducer for SequentialCursorProducer<R> 228 where 229 R: StorageReader + Send + Sync + 'static, 230 { 231 type Cursor = GlobalBlockId; 232 type Filter = v1alpha2::Filter; 233 234 async fn reconfigure( 235 &mut self, 236 configuration: &StreamConfiguration<Self::Cursor, Self::Filter>, 237 ) -> Result<ReconfigureResponse<Self::Cursor>, StreamError> { 238 let (current, response) = match configuration.starting_cursor { 239 None => (None, ReconfigureResponse::Ok), 240 Some(starting_cursor) => { 241 let starting_cursor = if starting_cursor.hash().is_zero() { 242 // the user specified a block number but not a hash. Find the hash 243 // corresponding to the block number. 244 match self 245 .storage 246 .canonical_block_id(starting_cursor.number()) 247 .map_err(StreamError::internal)? 248 { 249 Some(starting_cursor) => starting_cursor, 250 None => return Ok(ReconfigureResponse::MissingStartingCursor), 251 } 252 } else { 253 starting_cursor 254 }; 255 256 debug!(starting_cursor = ?starting_cursor, "reconfigure stream with starting cursor"); 257 let starting_status = match self 258 .storage 259 .read_status(&starting_cursor) 260 .map_err(StreamError::internal)? 261 { 262 None => return Ok(ReconfigureResponse::MissingStartingCursor), 263 Some(starting_status) => starting_status, 264 }; 265 266 if starting_status.is_accepted() || starting_status.is_finalized() { 267 (Some(starting_cursor), ReconfigureResponse::Ok) 268 } else { 269 // the user-specified cursor is not part of the canonical chain anymore. 270 // walk bakcwards until finding a canonical chain and use that as starting 271 // cursor. 272 let mut new_root = starting_cursor; 273 loop { 274 let status = match self 275 .storage 276 .read_status(&new_root) 277 .map_err(StreamError::internal)? 278 { 279 None => return Ok(ReconfigureResponse::MissingStartingCursor), 280 Some(status) => status, 281 }; 282 283 if status.is_accepted() || status.is_finalized() { 284 break; 285 } 286 287 let header = match self 288 .storage 289 .read_header(&new_root) 290 .map_err(StreamError::internal)? 291 { 292 None => return Ok(ReconfigureResponse::MissingStartingCursor), 293 Some(header) => header, 294 }; 295 296 new_root = GlobalBlockId::from_block_header_parent(&header) 297 .map_err(StreamError::internal)?; 298 } 299 300 (Some(new_root), ReconfigureResponse::Invalidate(new_root)) 301 } 302 } 303 }; 304 305 let configuration = BatchConfiguration { 306 data_finality: configuration.finality, 307 pending_sent: false, 308 current, 309 batch_size: configuration.batch_size, 310 }; 311 self.configuration = Some(configuration); 312 313 self.wake(); 314 315 Ok(response) 316 } 317 318 async fn handle_ingestion_message( 319 &mut self, 320 message: &IngestionMessage<Self::Cursor>, 321 ) -> Result<IngestionResponse<Self::Cursor>, StreamError> { 322 let state = self 323 .get_ingestion_state_mut() 324 .map_err(StreamError::internal)?; 325 let response = match message { 326 IngestionMessage::Pending(cursor) => { 327 state.pending = Some(*cursor); 328 // mark pending as ready to send 329 if let Some(configuration) = self.configuration.as_mut() { 330 configuration.pending_sent = false; 331 } 332 IngestionResponse::Ok 333 } 334 IngestionMessage::Accepted(cursor) => { 335 state.accepted = Some(*cursor); 336 IngestionResponse::Ok 337 } 338 IngestionMessage::Finalized(cursor) => { 339 state.finalized = Some(*cursor); 340 IngestionResponse::Ok 341 } 342 IngestionMessage::Invalidate(cursor) => { 343 state.pending = None; 344 state.accepted = state.accepted.map(|c| lowest_cursor(c, *cursor)); 345 state.finalized = state.finalized.map(|c| lowest_cursor(c, *cursor)); 346 // if the current cursor is after the new head, then data was invalidated. 347 if let Some(configuration) = self.configuration.as_mut() { 348 let is_invalidated = configuration 349 .current 350 .map(|c| c.number() > cursor.number()) 351 .unwrap_or(false); 352 353 configuration.current = 354 configuration.current.map(|c| lowest_cursor(c, *cursor)); 355 356 if is_invalidated { 357 IngestionResponse::Invalidate(*cursor) 358 } else { 359 IngestionResponse::Ok 360 } 361 } else { 362 IngestionResponse::Ok 363 } 364 } 365 }; 366 367 self.wake(); 368 369 Ok(response) 370 } 371 } 372 373 impl<R> Stream for SequentialCursorProducer<R> 374 where 375 R: StorageReader + Send + Sync + 'static, 376 { 377 type Item = Result<BatchCursor<GlobalBlockId>, StreamError>; 378 379 fn poll_next( 380 mut self: Pin<&mut Self>, 381 cx: &mut task::Context<'_>, 382 ) -> task::Poll<Option<Self::Item>> { 383 match self.next_cursor() { 384 Err(err) => { 385 let err = StreamError::internal(err); 386 Poll::Ready(Some(Err(err))) 387 } 388 Ok(None) => { 389 // no new block yet, store waker and wake after a new ingestion message 390 self.waker = Some(cx.waker().clone()); 391 Poll::Pending 392 } 393 Ok(Some(batch_cursor)) => Poll::Ready(Some(Ok(batch_cursor))), 394 } 395 } 396 } 397 398 impl<R> FusedStream for SequentialCursorProducer<R> 399 where 400 R: StorageReader + Send + Sync + 'static, 401 { 402 fn is_terminated(&self) -> bool { 403 false 404 } 405 } 406 407 #[cfg(test)] 408 mod tests { 409 use std::sync::Arc; 410 411 use apibara_core::{ 412 node::v1alpha2::DataFinality, 413 starknet::v1alpha2::{BlockHeader, BlockStatus, Filter}, 414 }; 415 use apibara_node::stream::{ 416 CursorProducer, IngestionMessage, ReconfigureResponse, StreamConfiguration, 417 }; 418 use assert_matches::assert_matches; 419 use futures::{FutureExt, StreamExt, TryStreamExt}; 420 use mockall::predicate::eq; 421 422 use crate::{ 423 core::{BlockHash, GlobalBlockId}, 424 db::{MockStorageReader, StorageReader}, 425 }; 426 427 use super::SequentialCursorProducer; 428 429 fn new_block_hash(n: u64, c: u8) -> BlockHash { 430 let mut b = [0; 32]; 431 b[24..].copy_from_slice(&n.to_be_bytes()); 432 b[0] = c; 433 BlockHash::from_slice(&b).unwrap() 434 } 435 436 fn new_block_id(num: u64) -> GlobalBlockId { 437 let hash = new_block_hash(num, 0); 438 GlobalBlockId::new(num, hash) 439 } 440 441 fn new_block_header( 442 number: u64, 443 hash: GlobalBlockId, 444 parent_hash: GlobalBlockId, 445 ) -> BlockHeader { 446 BlockHeader { 447 block_number: number, 448 block_hash: Some(hash.hash().into()), 449 parent_block_hash: Some(parent_hash.hash().into()), 450 ..BlockHeader::default() 451 } 452 } 453 454 fn new_configuration( 455 starting_cursor: Option<GlobalBlockId>, 456 finality: DataFinality, 457 ) -> StreamConfiguration<GlobalBlockId, Filter> { 458 StreamConfiguration { 459 batch_size: 3, 460 stream_id: 0, 461 finality, 462 starting_cursor, 463 filter: vec![Filter::default()], 464 } 465 } 466 467 async fn new_producer<R>( 468 cursor: Option<GlobalBlockId>, 469 finality: DataFinality, 470 storage: Arc<R>, 471 ) -> SequentialCursorProducer<R> 472 where 473 R: StorageReader + Send + Sync + 'static, 474 { 475 let mut producer = SequentialCursorProducer::new(storage); 476 producer 477 .reconfigure(&new_configuration(cursor, finality)) 478 .await 479 .unwrap(); 480 producer 481 } 482 483 /// This test checks that the cursor producer keeps producing finalized batches with the 484 /// requested number of cursors. 485 /// 486 /// Finality: FINALIZED 487 #[tokio::test] 488 async fn test_produce_full_batch_finalized() { 489 let mut storage = MockStorageReader::new(); 490 storage 491 .expect_canonical_block_id() 492 .returning(|i| Ok(Some(new_block_id(i)))); 493 storage 494 .expect_highest_accepted_block() 495 .returning(|| Ok(Some(new_block_id(100)))); 496 storage 497 .expect_highest_finalized_block() 498 .returning(|| Ok(Some(new_block_id(90)))); 499 500 let producer = 501 new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await; 502 503 let batches: Vec<_> = producer.take(5).try_collect().await.unwrap(); 504 assert_eq!(batches.len(), 5); 505 let mut i = 0; 506 for batch in batches { 507 let cursors = batch.as_finalized().unwrap(); 508 for cursor in cursors { 509 assert_eq!(cursor.number(), i as u64); 510 i += 1; 511 } 512 } 513 } 514 515 /// This test checks that the producer doesn't produce any cursor if the requested block is 516 /// after the most recent finalized block. 517 /// 518 /// Finality: FINALIZED 519 #[tokio::test] 520 async fn test_produce_nothing_if_after_finalized_as_finalized() { 521 let mut storage = MockStorageReader::new(); 522 storage 523 .expect_read_status() 524 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 525 storage 526 .expect_canonical_block_id() 527 .returning(|i| Ok(Some(new_block_id(i)))); 528 storage 529 .expect_highest_accepted_block() 530 .returning(|| Ok(Some(new_block_id(100)))); 531 storage 532 .expect_highest_finalized_block() 533 .returning(|| Ok(Some(new_block_id(90)))); 534 535 let mut producer = new_producer( 536 Some(new_block_id(90)), 537 DataFinality::DataStatusFinalized, 538 Arc::new(storage), 539 ) 540 .await; 541 542 let batch = producer.try_next().now_or_never(); 543 assert!(batch.is_none()); 544 } 545 546 /// This test checks the transition between finalized and accepted. Since the requested data is 547 /// finalized, the producer should produce partial batches with only the finalized cursors. 548 /// 549 /// Finality: FINALIZED 550 #[tokio::test] 551 async fn test_reach_accepted_as_finalized() { 552 let mut storage = MockStorageReader::new(); 553 storage 554 .expect_canonical_block_id() 555 .returning(|i| Ok(Some(new_block_id(i)))); 556 storage 557 .expect_highest_accepted_block() 558 .returning(|| Ok(Some(new_block_id(15)))); 559 storage 560 .expect_highest_finalized_block() 561 .returning(|| Ok(Some(new_block_id(10)))); 562 563 let producer = 564 new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await; 565 566 let batches: Vec<_> = producer.take(4).try_collect().await.unwrap(); 567 assert_eq!(batches.len(), 4); 568 let mut i = 0; 569 for (batch_idx, batch) in batches.iter().enumerate() { 570 let cursors = batch.as_finalized().unwrap(); 571 if batch_idx == 3 { 572 // last batch is partial because it cannot contain block 11, which is accepted 573 assert_eq!(cursors.len(), 2); 574 } else { 575 assert_eq!(cursors.len(), 3); 576 } 577 for cursor in cursors { 578 assert_eq!(cursor.number(), i as u64); 579 i += 1; 580 } 581 } 582 } 583 584 /// This test checks that the producer starts producing new batches after the chain finality 585 /// status is updated. 586 /// 587 /// Finality: FINALIZED 588 #[tokio::test] 589 async fn test_handle_finalized_message_as_finalized() { 590 let mut storage = MockStorageReader::new(); 591 storage 592 .expect_canonical_block_id() 593 .returning(|i| Ok(Some(new_block_id(i)))); 594 storage 595 .expect_highest_accepted_block() 596 .returning(|| Ok(Some(new_block_id(15)))); 597 storage 598 .expect_highest_finalized_block() 599 .returning(|| Ok(Some(new_block_id(10)))); 600 601 let mut producer = 602 new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await; 603 604 for _ in 0..4 { 605 let batch = producer.try_next().await.unwrap().unwrap(); 606 assert!(batch.as_finalized().is_some()); 607 } 608 609 let batch = producer.try_next().now_or_never(); 610 assert!(batch.is_none()); 611 612 producer 613 .handle_ingestion_message(&IngestionMessage::Finalized(new_block_id(14))) 614 .await 615 .unwrap(); 616 617 let mut expected_block = 11; 618 for _ in 0..2 { 619 let batch = producer.try_next().await.unwrap().unwrap(); 620 let cursors = batch.as_finalized().unwrap(); 621 for cursor in cursors { 622 assert_eq!(cursor.number(), expected_block); 623 expected_block += 1; 624 } 625 } 626 627 let batch = producer.try_next().now_or_never(); 628 assert!(batch.is_none()); 629 } 630 631 /// This test checks that the producer produces messages after the invalidated cursor. 632 /// 633 /// Finality: FINALIZED 634 #[tokio::test] 635 async fn test_handle_invalidate_message_as_finalized() { 636 let mut storage = MockStorageReader::new(); 637 storage 638 .expect_read_status() 639 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 640 storage 641 .expect_canonical_block_id() 642 .returning(|i| Ok(Some(new_block_id(i)))); 643 storage 644 .expect_highest_accepted_block() 645 .returning(|| Ok(Some(new_block_id(15)))); 646 storage 647 .expect_highest_finalized_block() 648 .returning(|| Ok(Some(new_block_id(10)))); 649 650 let mut producer = new_producer( 651 Some(new_block_id(8)), 652 DataFinality::DataStatusFinalized, 653 Arc::new(storage), 654 ) 655 .await; 656 657 let batch = producer.try_next().await.unwrap().unwrap(); 658 assert!(batch.as_finalized().is_some()); 659 660 let batch = producer.try_next().now_or_never(); 661 assert!(batch.is_none()); 662 663 // invalidate after current. nothing happens 664 producer 665 .handle_ingestion_message(&IngestionMessage::Invalidate(new_block_id(14))) 666 .await 667 .unwrap(); 668 669 let batch = producer.try_next().now_or_never(); 670 assert!(batch.is_none()); 671 672 // invalidate before current. goes back 673 producer 674 .handle_ingestion_message(&IngestionMessage::Invalidate(new_block_id(4))) 675 .await 676 .unwrap(); 677 678 // still no new finalized. 679 let batch = producer.try_next().now_or_never(); 680 assert!(batch.is_none()); 681 682 producer 683 .handle_ingestion_message(&IngestionMessage::Finalized(new_block_id(6))) 684 .await 685 .unwrap(); 686 687 let batch = producer.try_next().await.unwrap().unwrap(); 688 assert!(batch.as_finalized().is_some()); 689 } 690 691 /// This test checks that no data is produced if the node has not ingested any finalized block 692 /// yet. 693 /// 694 /// Finality: FINALIZED 695 #[tokio::test] 696 async fn test_no_finalized_as_finalized() { 697 let mut storage = MockStorageReader::new(); 698 storage 699 .expect_canonical_block_id() 700 .returning(|i| Ok(Some(new_block_id(i)))); 701 storage 702 .expect_highest_accepted_block() 703 .returning(|| Ok(Some(new_block_id(14)))); 704 storage 705 .expect_highest_finalized_block() 706 .returning(|| Ok(None)); 707 708 let mut producer = 709 new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await; 710 711 let batch = producer.try_next().now_or_never(); 712 assert!(batch.is_none()); 713 714 producer 715 .handle_ingestion_message(&IngestionMessage::Finalized(new_block_id(13))) 716 .await 717 .unwrap(); 718 719 let batch = producer.try_next().await.unwrap().unwrap(); 720 assert!(batch.as_finalized().is_some()); 721 } 722 723 /// This test checks that no data is produced if the node has not ingested any finalized block 724 /// yet. 725 /// 726 /// Finality: FINALIZED 727 #[tokio::test] 728 async fn test_no_accepted_as_finalized() { 729 let mut storage = MockStorageReader::new(); 730 storage 731 .expect_canonical_block_id() 732 .returning(|i| Ok(Some(new_block_id(i)))); 733 storage 734 .expect_highest_accepted_block() 735 .returning(|| Ok(None)); 736 storage 737 .expect_highest_finalized_block() 738 .returning(|| Ok(Some(new_block_id(15)))); 739 740 let mut producer = 741 new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await; 742 743 let batch = producer.try_next().await.unwrap().unwrap(); 744 assert!(batch.as_finalized().is_some()); 745 } 746 747 /// This test checks that the producer switches between producing finalized cursors and 748 /// accepted cursors. 749 /// 750 /// Finality: ACCEPTED 751 #[tokio::test] 752 async fn test_full_batch_as_accepted() { 753 let mut storage = MockStorageReader::new(); 754 storage 755 .expect_read_status() 756 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 757 storage 758 .expect_canonical_block_id() 759 .returning(|i| Ok(Some(new_block_id(i)))); 760 storage 761 .expect_highest_accepted_block() 762 .returning(|| Ok(Some(new_block_id(15)))); 763 storage 764 .expect_highest_finalized_block() 765 .returning(|| Ok(Some(new_block_id(10)))); 766 767 let mut producer = new_producer( 768 Some(new_block_id(8)), 769 DataFinality::DataStatusAccepted, 770 Arc::new(storage), 771 ) 772 .await; 773 774 // finalized batch 775 let batch = producer.try_next().await.unwrap().unwrap(); 776 assert!(batch.as_finalized().is_some()); 777 778 // accepted batches 779 for block_num in 11..=15 { 780 let batch = producer.try_next().await.unwrap().unwrap(); 781 assert!(batch.as_finalized().is_none()); 782 let accepted = batch.as_accepted().unwrap(); 783 assert_eq!(accepted.number(), block_num); 784 } 785 786 let batch = producer.try_next().now_or_never(); 787 assert!(batch.is_none()); 788 } 789 790 /// This test checks that the producer goes back to producing finalized blocks after receiving 791 /// a finalized message, if the new finalized cursor is after the current cursor. 792 /// 793 /// Finality: ACCEPTED 794 #[tokio::test] 795 async fn test_handle_finalized_message_as_accepted() { 796 let mut storage = MockStorageReader::new(); 797 storage 798 .expect_read_status() 799 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 800 storage 801 .expect_canonical_block_id() 802 .returning(|i| Ok(Some(new_block_id(i)))); 803 storage 804 .expect_highest_accepted_block() 805 .returning(|| Ok(Some(new_block_id(15)))); 806 storage 807 .expect_highest_finalized_block() 808 .returning(|| Ok(Some(new_block_id(10)))); 809 810 let mut producer = new_producer( 811 Some(new_block_id(8)), 812 DataFinality::DataStatusAccepted, 813 Arc::new(storage), 814 ) 815 .await; 816 817 // finalized batch 818 let batch = producer.try_next().await.unwrap().unwrap(); 819 assert!(batch.as_finalized().is_some()); 820 821 // one finalized batch 822 let batch = producer.try_next().await.unwrap().unwrap(); 823 assert!(batch.as_finalized().is_none()); 824 let accepted = batch.as_accepted().unwrap(); 825 assert_eq!(accepted.number(), 11); 826 827 producer 828 .handle_ingestion_message(&IngestionMessage::Finalized(new_block_id(13))) 829 .await 830 .unwrap(); 831 832 // finalized with block 12, 13 833 let batch = producer.try_next().await.unwrap().unwrap(); 834 assert!(batch.as_finalized().is_some()); 835 836 // one finalized batch 837 let batch = producer.try_next().await.unwrap().unwrap(); 838 assert!(batch.as_finalized().is_none()); 839 let accepted = batch.as_accepted().unwrap(); 840 assert_eq!(accepted.number(), 14); 841 } 842 843 /// This test checks that the producer resumes producing accepted cursors after receiving an 844 /// accepted message. 845 /// 846 /// Finality: ACCEPTED 847 #[tokio::test] 848 async fn test_handle_accepted_message_as_accepted() { 849 let mut storage = MockStorageReader::new(); 850 storage 851 .expect_read_status() 852 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 853 storage 854 .expect_canonical_block_id() 855 .returning(|i| Ok(Some(new_block_id(i)))); 856 storage 857 .expect_highest_accepted_block() 858 .returning(|| Ok(Some(new_block_id(15)))); 859 storage 860 .expect_highest_finalized_block() 861 .returning(|| Ok(Some(new_block_id(10)))); 862 863 let mut producer = new_producer( 864 Some(new_block_id(11)), 865 DataFinality::DataStatusAccepted, 866 Arc::new(storage), 867 ) 868 .await; 869 870 // accepted batches 871 for block_num in 12..=15 { 872 let batch = producer.try_next().await.unwrap().unwrap(); 873 assert!(batch.as_finalized().is_none()); 874 let accepted = batch.as_accepted().unwrap(); 875 assert_eq!(accepted.number(), block_num); 876 } 877 878 let batch = producer.try_next().now_or_never(); 879 assert!(batch.is_none()); 880 881 producer 882 .handle_ingestion_message(&IngestionMessage::Accepted(new_block_id(16))) 883 .await 884 .unwrap(); 885 886 // one finalized batch 887 let batch = producer.try_next().await.unwrap().unwrap(); 888 assert!(batch.as_finalized().is_none()); 889 let accepted = batch.as_accepted().unwrap(); 890 assert_eq!(accepted.number(), 16); 891 892 let batch = producer.try_next().now_or_never(); 893 assert!(batch.is_none()); 894 } 895 896 /// This test checks that the producer produces messages after the invalidated cursor. 897 /// 898 /// Finality: ACCEPTED 899 #[tokio::test] 900 async fn test_handle_invalidate_message_as_accepted() { 901 let mut storage = MockStorageReader::new(); 902 storage 903 .expect_read_status() 904 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 905 storage 906 .expect_canonical_block_id() 907 .returning(|i| Ok(Some(new_block_id(i)))); 908 storage 909 .expect_highest_accepted_block() 910 .returning(|| Ok(Some(new_block_id(15)))); 911 storage 912 .expect_highest_finalized_block() 913 .returning(|| Ok(Some(new_block_id(10)))); 914 915 let mut producer = new_producer( 916 Some(new_block_id(11)), 917 DataFinality::DataStatusAccepted, 918 Arc::new(storage), 919 ) 920 .await; 921 922 for _ in 0..2 { 923 let batch = producer.try_next().await.unwrap().unwrap(); 924 assert!(batch.as_accepted().is_some()); 925 } 926 927 // invalidate after current. nothing happens 928 producer 929 .handle_ingestion_message(&IngestionMessage::Invalidate(new_block_id(14))) 930 .await 931 .unwrap(); 932 933 let batch = producer.try_next().await.unwrap().unwrap(); 934 assert_eq!(batch.as_accepted().unwrap().number(), 14); 935 936 // invalidate before current. goes back 937 producer 938 .handle_ingestion_message(&IngestionMessage::Invalidate(new_block_id(11))) 939 .await 940 .unwrap(); 941 942 // still no new accepted. 943 let batch = producer.try_next().now_or_never(); 944 assert!(batch.is_none()); 945 946 producer 947 .handle_ingestion_message(&IngestionMessage::Accepted(new_block_id(15))) 948 .await 949 .unwrap(); 950 951 let batch = producer.try_next().await.unwrap().unwrap(); 952 assert_eq!(batch.as_accepted().unwrap().number(), 12); 953 } 954 955 /// This test checks that data is produced if the node has not ingested any finalized data, but 956 /// the client requested accepted data. This happens on devnet. 957 /// 958 /// Finality: ACCEPTED 959 #[tokio::test] 960 async fn test_no_finalized_as_accepted() { 961 let mut storage = MockStorageReader::new(); 962 storage 963 .expect_canonical_block_id() 964 .returning(|i| Ok(Some(new_block_id(i)))); 965 storage 966 .expect_highest_accepted_block() 967 .returning(|| Ok(Some(new_block_id(14)))); 968 storage 969 .expect_highest_finalized_block() 970 .returning(|| Ok(None)); 971 972 let mut producer = 973 new_producer(None, DataFinality::DataStatusAccepted, Arc::new(storage)).await; 974 975 let batch = producer.try_next().await.unwrap().unwrap(); 976 assert_eq!(batch.as_accepted().unwrap().number(), 0); 977 } 978 979 /// This test checks that finalized cursors are produced even if no accepted data has been 980 /// ingested. This happens when initially syncing the node. 981 /// 982 /// Finality: ACCEPTED 983 #[tokio::test] 984 async fn test_no_accepted_as_accepted() { 985 let mut storage = MockStorageReader::new(); 986 storage 987 .expect_canonical_block_id() 988 .returning(|i| Ok(Some(new_block_id(i)))); 989 storage 990 .expect_highest_accepted_block() 991 .returning(|| Ok(None)); 992 storage 993 .expect_highest_finalized_block() 994 .returning(|| Ok(Some(new_block_id(15)))); 995 996 let mut producer = 997 new_producer(None, DataFinality::DataStatusAccepted, Arc::new(storage)).await; 998 999 let batch = producer.try_next().await.unwrap().unwrap(); 1000 assert!(batch.as_finalized().is_some()); 1001 } 1002 1003 /// This test checks that the pending producer produces finalized/accepted cursors until 1004 /// reaching the head. At that point, it produces one pending block (if any). 1005 /// 1006 /// Finality: PENDING 1007 #[tokio::test] 1008 async fn test_produce_full_batch_pending() { 1009 let mut storage = MockStorageReader::new(); 1010 storage 1011 .expect_read_status() 1012 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 1013 storage 1014 .expect_canonical_block_id() 1015 .returning(|i| Ok(Some(new_block_id(i)))); 1016 storage 1017 .expect_highest_accepted_block() 1018 .returning(|| Ok(Some(new_block_id(15)))); 1019 storage 1020 .expect_highest_finalized_block() 1021 .returning(|| Ok(Some(new_block_id(10)))); 1022 1023 let mut producer = new_producer( 1024 Some(new_block_id(8)), 1025 DataFinality::DataStatusPending, 1026 Arc::new(storage), 1027 ) 1028 .await; 1029 1030 let batch = producer.try_next().await.unwrap().unwrap(); 1031 assert!(batch.as_finalized().is_some()); 1032 1033 for i in 11..=15 { 1034 let batch = producer.try_next().await.unwrap().unwrap(); 1035 assert_eq!(batch.as_accepted().unwrap().number(), i); 1036 } 1037 1038 // no pending block yet. 1039 let batch = producer.try_next().now_or_never(); 1040 assert!(batch.is_none()); 1041 1042 producer 1043 .handle_ingestion_message(&IngestionMessage::Pending(new_block_id(16))) 1044 .await 1045 .unwrap(); 1046 1047 let batch = producer.try_next().await.unwrap().unwrap(); 1048 assert_eq!(batch.as_pending().unwrap().number(), 16); 1049 1050 // only produce one pending. 1051 let batch = producer.try_next().now_or_never(); 1052 assert!(batch.is_none()); 1053 1054 producer 1055 .handle_ingestion_message(&IngestionMessage::Accepted(new_block_id(16))) 1056 .await 1057 .unwrap(); 1058 1059 let batch = producer.try_next().await.unwrap().unwrap(); 1060 assert_eq!(batch.as_accepted().unwrap().number(), 16); 1061 1062 // no pending block yet. 1063 let batch = producer.try_next().now_or_never(); 1064 assert!(batch.is_none()); 1065 } 1066 1067 #[tokio::test] 1068 async fn test_configure_with_valid_starting_cursor() { 1069 let mut storage = MockStorageReader::new(); 1070 storage 1071 .expect_read_status() 1072 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 1073 storage 1074 .expect_canonical_block_id() 1075 .returning(|i| Ok(Some(new_block_id(i)))); 1076 storage 1077 .expect_highest_accepted_block() 1078 .returning(|| Ok(Some(new_block_id(15)))); 1079 storage 1080 .expect_highest_finalized_block() 1081 .returning(|| Ok(Some(new_block_id(10)))); 1082 1083 let cursor = new_block_id(8); 1084 let mut producer = SequentialCursorProducer::new(Arc::new(storage)); 1085 let response = producer 1086 .reconfigure(&new_configuration( 1087 Some(cursor), 1088 DataFinality::DataStatusAccepted, 1089 )) 1090 .await 1091 .unwrap(); 1092 assert_matches!(response, ReconfigureResponse::Ok); 1093 } 1094 1095 #[tokio::test] 1096 async fn test_configure_with_invalidated_starting_cursor() { 1097 let mut storage = MockStorageReader::new(); 1098 storage 1099 .expect_read_status() 1100 .with(eq(new_block_id(8))) 1101 .returning(|_| Ok(Some(BlockStatus::Rejected))); 1102 storage 1103 .expect_read_status() 1104 .with(eq(new_block_id(7))) 1105 .returning(|_| Ok(Some(BlockStatus::Rejected))); 1106 storage 1107 .expect_read_status() 1108 .with(eq(new_block_id(6))) 1109 .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1))); 1110 storage 1111 .expect_read_header() 1112 .with(eq(new_block_id(8))) 1113 .returning(|_| Ok(Some(new_block_header(8, new_block_id(8), new_block_id(7))))); 1114 storage 1115 .expect_read_header() 1116 .with(eq(new_block_id(7))) 1117 .returning(|_| Ok(Some(new_block_header(7, new_block_id(7), new_block_id(6))))); 1118 storage 1119 .expect_canonical_block_id() 1120 .returning(|i| Ok(Some(new_block_id(i)))); 1121 storage 1122 .expect_highest_accepted_block() 1123 .returning(|| Ok(Some(new_block_id(15)))); 1124 storage 1125 .expect_highest_finalized_block() 1126 .returning(|| Ok(Some(new_block_id(10)))); 1127 1128 let cursor = new_block_id(8); 1129 let mut producer = SequentialCursorProducer::new(Arc::new(storage)); 1130 let response = producer 1131 .reconfigure(&new_configuration( 1132 Some(cursor), 1133 DataFinality::DataStatusAccepted, 1134 )) 1135 .await 1136 .unwrap(); 1137 assert_matches!(response, ReconfigureResponse::Invalidate(_)); 1138 } 1139 1140 #[tokio::test] 1141 async fn test_configure_with_non_existing_starting_cursor() { 1142 let mut storage = MockStorageReader::new(); 1143 storage.expect_read_status().returning(|_| Ok(None)); 1144 storage 1145 .expect_canonical_block_id() 1146 .returning(|i| Ok(Some(new_block_id(i)))); 1147 storage 1148 .expect_highest_accepted_block() 1149 .returning(|| Ok(Some(new_block_id(15)))); 1150 storage 1151 .expect_highest_finalized_block() 1152 .returning(|| Ok(Some(new_block_id(10)))); 1153 1154 let cursor = new_block_id(8); 1155 let mut producer = SequentialCursorProducer::new(Arc::new(storage)); 1156 let response = producer 1157 .reconfigure(&new_configuration( 1158 Some(cursor), 1159 DataFinality::DataStatusAccepted, 1160 )) 1161 .await 1162 .unwrap(); 1163 assert_matches!(response, ReconfigureResponse::MissingStartingCursor); 1164 } 1165 }