/ starknet / src / stream / cursor_producer.rs
cursor_producer.rs
   1  use std::{
   2      pin::Pin,
   3      sync::Arc,
   4      task::{self, Poll, Waker},
   5  };
   6  
   7  use apibara_core::{node::v1alpha2::DataFinality, starknet::v1alpha2};
   8  use apibara_node::{
   9      async_trait,
  10      stream::{
  11          BatchCursor, CursorProducer, IngestionMessage, IngestionResponse, ReconfigureResponse,
  12          StreamConfiguration, StreamError,
  13      },
  14  };
  15  use futures::{stream::FusedStream, Stream};
  16  use tracing::{debug, instrument, trace};
  17  
  18  use crate::{core::GlobalBlockId, db::StorageReader};
  19  
  20  /// A [CursorProducer] that produces sequential cursors.
  21  pub struct SequentialCursorProducer<R: StorageReader + Send + Sync + 'static> {
  22      configuration: Option<BatchConfiguration>,
  23      ingestion_state: Option<IngestionState>,
  24      storage: Arc<R>,
  25      waker: Option<Waker>,
  26  }
  27  
  28  struct BatchConfiguration {
  29      current: Option<GlobalBlockId>,
  30      pending_sent: bool,
  31      data_finality: DataFinality,
  32      batch_size: usize,
  33  }
  34  
  35  #[derive(Default, Debug)]
  36  struct IngestionState {
  37      finalized: Option<GlobalBlockId>,
  38      accepted: Option<GlobalBlockId>,
  39      pending: Option<GlobalBlockId>,
  40  }
  41  
  42  impl<R> SequentialCursorProducer<R>
  43  where
  44      R: StorageReader + Send + Sync + 'static,
  45  {
  46      pub fn new(storage: Arc<R>) -> Self {
  47          SequentialCursorProducer {
  48              configuration: None,
  49              storage,
  50              ingestion_state: None,
  51              waker: None,
  52          }
  53      }
  54  
  55      #[instrument(skip_all, level = "debug")]
  56      pub fn next_cursor(&mut self) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> {
  57          if self.configuration.is_some() {
  58              self.next_cursor_with_configuration()
  59          } else {
  60              Ok(None)
  61          }
  62      }
  63  
  64      #[instrument(skip_all, level = "debug")]
  65      fn next_cursor_with_configuration(
  66          &mut self,
  67      ) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> {
  68          // We call this from inside a `is_some` check.
  69          let state = self.get_ingestion_state()?;
  70          // keep borrow checker happy
  71          let pending_cursor = state.pending;
  72          let accepted_cursor = state.accepted;
  73          let finalized_cursor = state.finalized;
  74  
  75          let configuration = self.configuration.as_mut().expect("configuration");
  76          let starting_cursor = configuration.current;
  77  
  78          let next_block_number = configuration.current.map(|c| c.number() + 1).unwrap_or(0);
  79  
  80          trace!(
  81              next_block_number = %next_block_number,
  82              finalized = ?finalized_cursor,
  83              accepted = ?accepted_cursor,
  84              pending = ?pending_cursor,
  85              "deciding which cursor to generate"
  86          );
  87  
  88          if let Some(finalized) = finalized_cursor {
  89              if next_block_number <= finalized.number() {
  90                  return self.next_cursor_finalized(starting_cursor, next_block_number, &finalized);
  91              }
  92          }
  93  
  94          if let Some(accepted) = accepted_cursor {
  95              if next_block_number <= accepted.number() {
  96                  return self.next_cursor_accepted(starting_cursor, next_block_number);
  97              }
  98          }
  99  
 100          if let Some(pending) = pending_cursor {
 101              if next_block_number <= pending.number() {
 102                  return self.next_cursor_pending(starting_cursor, next_block_number);
 103              }
 104          }
 105  
 106          Ok(None)
 107      }
 108  
 109      #[instrument(skip_all, level = "trace")]
 110      fn next_cursor_finalized(
 111          &mut self,
 112          starting_cursor: Option<GlobalBlockId>,
 113          next_block_number: u64,
 114          finalized: &GlobalBlockId,
 115      ) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> {
 116          // always send finalized data.
 117          let configuration = self.configuration.as_mut().expect("configuration");
 118          let mut cursors = Vec::with_capacity(configuration.batch_size);
 119          let final_block_number = u64::min(
 120              finalized.number(),
 121              next_block_number + (configuration.batch_size as u64) - 1,
 122          );
 123          for block_number in next_block_number..=final_block_number {
 124              match self.storage.canonical_block_id(block_number)? {
 125                  Some(cursor) => {
 126                      cursors.push(cursor);
 127                  }
 128                  None => break,
 129              }
 130          }
 131  
 132          if cursors.is_empty() {
 133              return Ok(None);
 134          }
 135  
 136          let batch_cursor = BatchCursor::new_finalized(starting_cursor, cursors);
 137          configuration.current = Some(*batch_cursor.end_cursor());
 138          Ok(Some(batch_cursor))
 139      }
 140  
 141      #[instrument(skip_all, level = "trace")]
 142      fn next_cursor_accepted(
 143          &mut self,
 144          starting_cursor: Option<GlobalBlockId>,
 145          next_block_number: u64,
 146      ) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> {
 147          let configuration = self.configuration.as_mut().expect("configuration");
 148          if configuration.data_finality == DataFinality::DataStatusFinalized
 149              || configuration.data_finality == DataFinality::DataStatusUnknown
 150          {
 151              return Ok(None);
 152          }
 153  
 154          match self.storage.canonical_block_id(next_block_number)? {
 155              Some(cursor) => {
 156                  let batch_cursor = BatchCursor::new_accepted(starting_cursor, cursor);
 157                  configuration.current = Some(*batch_cursor.end_cursor());
 158                  Ok(Some(batch_cursor))
 159              }
 160              None => Ok(None),
 161          }
 162      }
 163  
 164      #[instrument(skip_all, level = "trace")]
 165      fn next_cursor_pending(
 166          &mut self,
 167          starting_cursor: Option<GlobalBlockId>,
 168          next_block_number: u64,
 169      ) -> Result<Option<BatchCursor<GlobalBlockId>>, R::Error> {
 170          let configuration = self.configuration.as_mut().expect("configuration");
 171          if configuration.data_finality != DataFinality::DataStatusPending
 172              || configuration.pending_sent
 173          {
 174              return Ok(None);
 175          }
 176  
 177          // Cursor won't be part of the canonical chain since it's not accepted yet.
 178          // Optimistically send the pending block cursor since if we reached this point it's
 179          // already been produced.
 180          let cursor = GlobalBlockId::from_u64(next_block_number);
 181          let batch_cursor = BatchCursor::new_pending(starting_cursor, cursor);
 182          trace!(cursor = ?batch_cursor, "sending pending data");
 183          configuration.pending_sent = true;
 184          Ok(Some(batch_cursor))
 185      }
 186  
 187      fn get_ingestion_state(&mut self) -> Result<&IngestionState, R::Error> {
 188          let state = self.get_ingestion_state_mut()?;
 189          Ok(state)
 190      }
 191  
 192      fn get_ingestion_state_mut(&mut self) -> Result<&mut IngestionState, R::Error> {
 193          // Read new state only if we don't have one yet.
 194          // Initialize with default value otherwise to make the borrow checker happy.
 195          let new_state = if self.ingestion_state.is_some() {
 196              IngestionState::default()
 197          } else {
 198              let accepted = self.storage.highest_accepted_block()?;
 199              let finalized = self.storage.highest_finalized_block()?;
 200              IngestionState {
 201                  accepted,
 202                  finalized,
 203                  pending: None,
 204              }
 205          };
 206  
 207          Ok(self.ingestion_state.get_or_insert(new_state))
 208      }
 209  
 210      /// wake up the stream if it was waiting for a new block
 211      fn wake(&mut self) {
 212          if let Some(waker) = self.waker.take() {
 213              waker.wake();
 214          }
 215      }
 216  }
 217  
 218  fn lowest_cursor(a: GlobalBlockId, b: GlobalBlockId) -> GlobalBlockId {
 219      if a.number() < b.number() {
 220          a
 221      } else {
 222          b
 223      }
 224  }
 225  
 226  #[async_trait]
 227  impl<R> CursorProducer for SequentialCursorProducer<R>
 228  where
 229      R: StorageReader + Send + Sync + 'static,
 230  {
 231      type Cursor = GlobalBlockId;
 232      type Filter = v1alpha2::Filter;
 233  
 234      async fn reconfigure(
 235          &mut self,
 236          configuration: &StreamConfiguration<Self::Cursor, Self::Filter>,
 237      ) -> Result<ReconfigureResponse<Self::Cursor>, StreamError> {
 238          let (current, response) = match configuration.starting_cursor {
 239              None => (None, ReconfigureResponse::Ok),
 240              Some(starting_cursor) => {
 241                  let starting_cursor = if starting_cursor.hash().is_zero() {
 242                      // the user specified a block number but not a hash. Find the hash
 243                      // corresponding to the block number.
 244                      match self
 245                          .storage
 246                          .canonical_block_id(starting_cursor.number())
 247                          .map_err(StreamError::internal)?
 248                      {
 249                          Some(starting_cursor) => starting_cursor,
 250                          None => return Ok(ReconfigureResponse::MissingStartingCursor),
 251                      }
 252                  } else {
 253                      starting_cursor
 254                  };
 255  
 256                  debug!(starting_cursor = ?starting_cursor, "reconfigure stream with starting cursor");
 257                  let starting_status = match self
 258                      .storage
 259                      .read_status(&starting_cursor)
 260                      .map_err(StreamError::internal)?
 261                  {
 262                      None => return Ok(ReconfigureResponse::MissingStartingCursor),
 263                      Some(starting_status) => starting_status,
 264                  };
 265  
 266                  if starting_status.is_accepted() || starting_status.is_finalized() {
 267                      (Some(starting_cursor), ReconfigureResponse::Ok)
 268                  } else {
 269                      // the user-specified cursor is not part of the canonical chain anymore.
 270                      // walk bakcwards until finding a canonical chain and use that as starting
 271                      // cursor.
 272                      let mut new_root = starting_cursor;
 273                      loop {
 274                          let status = match self
 275                              .storage
 276                              .read_status(&new_root)
 277                              .map_err(StreamError::internal)?
 278                          {
 279                              None => return Ok(ReconfigureResponse::MissingStartingCursor),
 280                              Some(status) => status,
 281                          };
 282  
 283                          if status.is_accepted() || status.is_finalized() {
 284                              break;
 285                          }
 286  
 287                          let header = match self
 288                              .storage
 289                              .read_header(&new_root)
 290                              .map_err(StreamError::internal)?
 291                          {
 292                              None => return Ok(ReconfigureResponse::MissingStartingCursor),
 293                              Some(header) => header,
 294                          };
 295  
 296                          new_root = GlobalBlockId::from_block_header_parent(&header)
 297                              .map_err(StreamError::internal)?;
 298                      }
 299  
 300                      (Some(new_root), ReconfigureResponse::Invalidate(new_root))
 301                  }
 302              }
 303          };
 304  
 305          let configuration = BatchConfiguration {
 306              data_finality: configuration.finality,
 307              pending_sent: false,
 308              current,
 309              batch_size: configuration.batch_size,
 310          };
 311          self.configuration = Some(configuration);
 312  
 313          self.wake();
 314  
 315          Ok(response)
 316      }
 317  
 318      async fn handle_ingestion_message(
 319          &mut self,
 320          message: &IngestionMessage<Self::Cursor>,
 321      ) -> Result<IngestionResponse<Self::Cursor>, StreamError> {
 322          let state = self
 323              .get_ingestion_state_mut()
 324              .map_err(StreamError::internal)?;
 325          let response = match message {
 326              IngestionMessage::Pending(cursor) => {
 327                  state.pending = Some(*cursor);
 328                  // mark pending as ready to send
 329                  if let Some(configuration) = self.configuration.as_mut() {
 330                      configuration.pending_sent = false;
 331                  }
 332                  IngestionResponse::Ok
 333              }
 334              IngestionMessage::Accepted(cursor) => {
 335                  state.accepted = Some(*cursor);
 336                  IngestionResponse::Ok
 337              }
 338              IngestionMessage::Finalized(cursor) => {
 339                  state.finalized = Some(*cursor);
 340                  IngestionResponse::Ok
 341              }
 342              IngestionMessage::Invalidate(cursor) => {
 343                  state.pending = None;
 344                  state.accepted = state.accepted.map(|c| lowest_cursor(c, *cursor));
 345                  state.finalized = state.finalized.map(|c| lowest_cursor(c, *cursor));
 346                  // if the current cursor is after the new head, then data was invalidated.
 347                  if let Some(configuration) = self.configuration.as_mut() {
 348                      let is_invalidated = configuration
 349                          .current
 350                          .map(|c| c.number() > cursor.number())
 351                          .unwrap_or(false);
 352  
 353                      configuration.current =
 354                          configuration.current.map(|c| lowest_cursor(c, *cursor));
 355  
 356                      if is_invalidated {
 357                          IngestionResponse::Invalidate(*cursor)
 358                      } else {
 359                          IngestionResponse::Ok
 360                      }
 361                  } else {
 362                      IngestionResponse::Ok
 363                  }
 364              }
 365          };
 366  
 367          self.wake();
 368  
 369          Ok(response)
 370      }
 371  }
 372  
 373  impl<R> Stream for SequentialCursorProducer<R>
 374  where
 375      R: StorageReader + Send + Sync + 'static,
 376  {
 377      type Item = Result<BatchCursor<GlobalBlockId>, StreamError>;
 378  
 379      fn poll_next(
 380          mut self: Pin<&mut Self>,
 381          cx: &mut task::Context<'_>,
 382      ) -> task::Poll<Option<Self::Item>> {
 383          match self.next_cursor() {
 384              Err(err) => {
 385                  let err = StreamError::internal(err);
 386                  Poll::Ready(Some(Err(err)))
 387              }
 388              Ok(None) => {
 389                  // no new block yet, store waker and wake after a new ingestion message
 390                  self.waker = Some(cx.waker().clone());
 391                  Poll::Pending
 392              }
 393              Ok(Some(batch_cursor)) => Poll::Ready(Some(Ok(batch_cursor))),
 394          }
 395      }
 396  }
 397  
 398  impl<R> FusedStream for SequentialCursorProducer<R>
 399  where
 400      R: StorageReader + Send + Sync + 'static,
 401  {
 402      fn is_terminated(&self) -> bool {
 403          false
 404      }
 405  }
 406  
 407  #[cfg(test)]
 408  mod tests {
 409      use std::sync::Arc;
 410  
 411      use apibara_core::{
 412          node::v1alpha2::DataFinality,
 413          starknet::v1alpha2::{BlockHeader, BlockStatus, Filter},
 414      };
 415      use apibara_node::stream::{
 416          CursorProducer, IngestionMessage, ReconfigureResponse, StreamConfiguration,
 417      };
 418      use assert_matches::assert_matches;
 419      use futures::{FutureExt, StreamExt, TryStreamExt};
 420      use mockall::predicate::eq;
 421  
 422      use crate::{
 423          core::{BlockHash, GlobalBlockId},
 424          db::{MockStorageReader, StorageReader},
 425      };
 426  
 427      use super::SequentialCursorProducer;
 428  
 429      fn new_block_hash(n: u64, c: u8) -> BlockHash {
 430          let mut b = [0; 32];
 431          b[24..].copy_from_slice(&n.to_be_bytes());
 432          b[0] = c;
 433          BlockHash::from_slice(&b).unwrap()
 434      }
 435  
 436      fn new_block_id(num: u64) -> GlobalBlockId {
 437          let hash = new_block_hash(num, 0);
 438          GlobalBlockId::new(num, hash)
 439      }
 440  
 441      fn new_block_header(
 442          number: u64,
 443          hash: GlobalBlockId,
 444          parent_hash: GlobalBlockId,
 445      ) -> BlockHeader {
 446          BlockHeader {
 447              block_number: number,
 448              block_hash: Some(hash.hash().into()),
 449              parent_block_hash: Some(parent_hash.hash().into()),
 450              ..BlockHeader::default()
 451          }
 452      }
 453  
 454      fn new_configuration(
 455          starting_cursor: Option<GlobalBlockId>,
 456          finality: DataFinality,
 457      ) -> StreamConfiguration<GlobalBlockId, Filter> {
 458          StreamConfiguration {
 459              batch_size: 3,
 460              stream_id: 0,
 461              finality,
 462              starting_cursor,
 463              filter: vec![Filter::default()],
 464          }
 465      }
 466  
 467      async fn new_producer<R>(
 468          cursor: Option<GlobalBlockId>,
 469          finality: DataFinality,
 470          storage: Arc<R>,
 471      ) -> SequentialCursorProducer<R>
 472      where
 473          R: StorageReader + Send + Sync + 'static,
 474      {
 475          let mut producer = SequentialCursorProducer::new(storage);
 476          producer
 477              .reconfigure(&new_configuration(cursor, finality))
 478              .await
 479              .unwrap();
 480          producer
 481      }
 482  
 483      /// This test checks that the cursor producer keeps producing finalized batches with the
 484      /// requested number of cursors.
 485      ///
 486      /// Finality: FINALIZED
 487      #[tokio::test]
 488      async fn test_produce_full_batch_finalized() {
 489          let mut storage = MockStorageReader::new();
 490          storage
 491              .expect_canonical_block_id()
 492              .returning(|i| Ok(Some(new_block_id(i))));
 493          storage
 494              .expect_highest_accepted_block()
 495              .returning(|| Ok(Some(new_block_id(100))));
 496          storage
 497              .expect_highest_finalized_block()
 498              .returning(|| Ok(Some(new_block_id(90))));
 499  
 500          let producer =
 501              new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await;
 502  
 503          let batches: Vec<_> = producer.take(5).try_collect().await.unwrap();
 504          assert_eq!(batches.len(), 5);
 505          let mut i = 0;
 506          for batch in batches {
 507              let cursors = batch.as_finalized().unwrap();
 508              for cursor in cursors {
 509                  assert_eq!(cursor.number(), i as u64);
 510                  i += 1;
 511              }
 512          }
 513      }
 514  
 515      /// This test checks that the producer doesn't produce any cursor if the requested block is
 516      /// after the most recent finalized block.
 517      ///
 518      /// Finality: FINALIZED
 519      #[tokio::test]
 520      async fn test_produce_nothing_if_after_finalized_as_finalized() {
 521          let mut storage = MockStorageReader::new();
 522          storage
 523              .expect_read_status()
 524              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
 525          storage
 526              .expect_canonical_block_id()
 527              .returning(|i| Ok(Some(new_block_id(i))));
 528          storage
 529              .expect_highest_accepted_block()
 530              .returning(|| Ok(Some(new_block_id(100))));
 531          storage
 532              .expect_highest_finalized_block()
 533              .returning(|| Ok(Some(new_block_id(90))));
 534  
 535          let mut producer = new_producer(
 536              Some(new_block_id(90)),
 537              DataFinality::DataStatusFinalized,
 538              Arc::new(storage),
 539          )
 540          .await;
 541  
 542          let batch = producer.try_next().now_or_never();
 543          assert!(batch.is_none());
 544      }
 545  
 546      /// This test checks the transition between finalized and accepted. Since the requested data is
 547      /// finalized, the producer should produce partial batches with only the finalized cursors.
 548      ///
 549      /// Finality: FINALIZED
 550      #[tokio::test]
 551      async fn test_reach_accepted_as_finalized() {
 552          let mut storage = MockStorageReader::new();
 553          storage
 554              .expect_canonical_block_id()
 555              .returning(|i| Ok(Some(new_block_id(i))));
 556          storage
 557              .expect_highest_accepted_block()
 558              .returning(|| Ok(Some(new_block_id(15))));
 559          storage
 560              .expect_highest_finalized_block()
 561              .returning(|| Ok(Some(new_block_id(10))));
 562  
 563          let producer =
 564              new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await;
 565  
 566          let batches: Vec<_> = producer.take(4).try_collect().await.unwrap();
 567          assert_eq!(batches.len(), 4);
 568          let mut i = 0;
 569          for (batch_idx, batch) in batches.iter().enumerate() {
 570              let cursors = batch.as_finalized().unwrap();
 571              if batch_idx == 3 {
 572                  // last batch is partial because it cannot contain block 11, which is accepted
 573                  assert_eq!(cursors.len(), 2);
 574              } else {
 575                  assert_eq!(cursors.len(), 3);
 576              }
 577              for cursor in cursors {
 578                  assert_eq!(cursor.number(), i as u64);
 579                  i += 1;
 580              }
 581          }
 582      }
 583  
 584      /// This test checks that the producer starts producing new batches after the chain finality
 585      /// status is updated.
 586      ///
 587      /// Finality: FINALIZED
 588      #[tokio::test]
 589      async fn test_handle_finalized_message_as_finalized() {
 590          let mut storage = MockStorageReader::new();
 591          storage
 592              .expect_canonical_block_id()
 593              .returning(|i| Ok(Some(new_block_id(i))));
 594          storage
 595              .expect_highest_accepted_block()
 596              .returning(|| Ok(Some(new_block_id(15))));
 597          storage
 598              .expect_highest_finalized_block()
 599              .returning(|| Ok(Some(new_block_id(10))));
 600  
 601          let mut producer =
 602              new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await;
 603  
 604          for _ in 0..4 {
 605              let batch = producer.try_next().await.unwrap().unwrap();
 606              assert!(batch.as_finalized().is_some());
 607          }
 608  
 609          let batch = producer.try_next().now_or_never();
 610          assert!(batch.is_none());
 611  
 612          producer
 613              .handle_ingestion_message(&IngestionMessage::Finalized(new_block_id(14)))
 614              .await
 615              .unwrap();
 616  
 617          let mut expected_block = 11;
 618          for _ in 0..2 {
 619              let batch = producer.try_next().await.unwrap().unwrap();
 620              let cursors = batch.as_finalized().unwrap();
 621              for cursor in cursors {
 622                  assert_eq!(cursor.number(), expected_block);
 623                  expected_block += 1;
 624              }
 625          }
 626  
 627          let batch = producer.try_next().now_or_never();
 628          assert!(batch.is_none());
 629      }
 630  
 631      /// This test checks that the producer produces messages after the invalidated cursor.
 632      ///
 633      /// Finality: FINALIZED
 634      #[tokio::test]
 635      async fn test_handle_invalidate_message_as_finalized() {
 636          let mut storage = MockStorageReader::new();
 637          storage
 638              .expect_read_status()
 639              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
 640          storage
 641              .expect_canonical_block_id()
 642              .returning(|i| Ok(Some(new_block_id(i))));
 643          storage
 644              .expect_highest_accepted_block()
 645              .returning(|| Ok(Some(new_block_id(15))));
 646          storage
 647              .expect_highest_finalized_block()
 648              .returning(|| Ok(Some(new_block_id(10))));
 649  
 650          let mut producer = new_producer(
 651              Some(new_block_id(8)),
 652              DataFinality::DataStatusFinalized,
 653              Arc::new(storage),
 654          )
 655          .await;
 656  
 657          let batch = producer.try_next().await.unwrap().unwrap();
 658          assert!(batch.as_finalized().is_some());
 659  
 660          let batch = producer.try_next().now_or_never();
 661          assert!(batch.is_none());
 662  
 663          // invalidate after current. nothing happens
 664          producer
 665              .handle_ingestion_message(&IngestionMessage::Invalidate(new_block_id(14)))
 666              .await
 667              .unwrap();
 668  
 669          let batch = producer.try_next().now_or_never();
 670          assert!(batch.is_none());
 671  
 672          // invalidate before current. goes back
 673          producer
 674              .handle_ingestion_message(&IngestionMessage::Invalidate(new_block_id(4)))
 675              .await
 676              .unwrap();
 677  
 678          // still no new finalized.
 679          let batch = producer.try_next().now_or_never();
 680          assert!(batch.is_none());
 681  
 682          producer
 683              .handle_ingestion_message(&IngestionMessage::Finalized(new_block_id(6)))
 684              .await
 685              .unwrap();
 686  
 687          let batch = producer.try_next().await.unwrap().unwrap();
 688          assert!(batch.as_finalized().is_some());
 689      }
 690  
 691      /// This test checks that no data is produced if the node has not ingested any finalized block
 692      /// yet.
 693      ///
 694      /// Finality: FINALIZED
 695      #[tokio::test]
 696      async fn test_no_finalized_as_finalized() {
 697          let mut storage = MockStorageReader::new();
 698          storage
 699              .expect_canonical_block_id()
 700              .returning(|i| Ok(Some(new_block_id(i))));
 701          storage
 702              .expect_highest_accepted_block()
 703              .returning(|| Ok(Some(new_block_id(14))));
 704          storage
 705              .expect_highest_finalized_block()
 706              .returning(|| Ok(None));
 707  
 708          let mut producer =
 709              new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await;
 710  
 711          let batch = producer.try_next().now_or_never();
 712          assert!(batch.is_none());
 713  
 714          producer
 715              .handle_ingestion_message(&IngestionMessage::Finalized(new_block_id(13)))
 716              .await
 717              .unwrap();
 718  
 719          let batch = producer.try_next().await.unwrap().unwrap();
 720          assert!(batch.as_finalized().is_some());
 721      }
 722  
 723      /// This test checks that no data is produced if the node has not ingested any finalized block
 724      /// yet.
 725      ///
 726      /// Finality: FINALIZED
 727      #[tokio::test]
 728      async fn test_no_accepted_as_finalized() {
 729          let mut storage = MockStorageReader::new();
 730          storage
 731              .expect_canonical_block_id()
 732              .returning(|i| Ok(Some(new_block_id(i))));
 733          storage
 734              .expect_highest_accepted_block()
 735              .returning(|| Ok(None));
 736          storage
 737              .expect_highest_finalized_block()
 738              .returning(|| Ok(Some(new_block_id(15))));
 739  
 740          let mut producer =
 741              new_producer(None, DataFinality::DataStatusFinalized, Arc::new(storage)).await;
 742  
 743          let batch = producer.try_next().await.unwrap().unwrap();
 744          assert!(batch.as_finalized().is_some());
 745      }
 746  
 747      /// This test checks that the producer switches between producing finalized cursors and
 748      /// accepted cursors.
 749      ///
 750      /// Finality: ACCEPTED
 751      #[tokio::test]
 752      async fn test_full_batch_as_accepted() {
 753          let mut storage = MockStorageReader::new();
 754          storage
 755              .expect_read_status()
 756              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
 757          storage
 758              .expect_canonical_block_id()
 759              .returning(|i| Ok(Some(new_block_id(i))));
 760          storage
 761              .expect_highest_accepted_block()
 762              .returning(|| Ok(Some(new_block_id(15))));
 763          storage
 764              .expect_highest_finalized_block()
 765              .returning(|| Ok(Some(new_block_id(10))));
 766  
 767          let mut producer = new_producer(
 768              Some(new_block_id(8)),
 769              DataFinality::DataStatusAccepted,
 770              Arc::new(storage),
 771          )
 772          .await;
 773  
 774          // finalized batch
 775          let batch = producer.try_next().await.unwrap().unwrap();
 776          assert!(batch.as_finalized().is_some());
 777  
 778          // accepted batches
 779          for block_num in 11..=15 {
 780              let batch = producer.try_next().await.unwrap().unwrap();
 781              assert!(batch.as_finalized().is_none());
 782              let accepted = batch.as_accepted().unwrap();
 783              assert_eq!(accepted.number(), block_num);
 784          }
 785  
 786          let batch = producer.try_next().now_or_never();
 787          assert!(batch.is_none());
 788      }
 789  
 790      /// This test checks that the producer goes back to producing finalized blocks after receiving
 791      /// a finalized message, if the new finalized cursor is after the current cursor.
 792      ///
 793      /// Finality: ACCEPTED
 794      #[tokio::test]
 795      async fn test_handle_finalized_message_as_accepted() {
 796          let mut storage = MockStorageReader::new();
 797          storage
 798              .expect_read_status()
 799              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
 800          storage
 801              .expect_canonical_block_id()
 802              .returning(|i| Ok(Some(new_block_id(i))));
 803          storage
 804              .expect_highest_accepted_block()
 805              .returning(|| Ok(Some(new_block_id(15))));
 806          storage
 807              .expect_highest_finalized_block()
 808              .returning(|| Ok(Some(new_block_id(10))));
 809  
 810          let mut producer = new_producer(
 811              Some(new_block_id(8)),
 812              DataFinality::DataStatusAccepted,
 813              Arc::new(storage),
 814          )
 815          .await;
 816  
 817          // finalized batch
 818          let batch = producer.try_next().await.unwrap().unwrap();
 819          assert!(batch.as_finalized().is_some());
 820  
 821          // one finalized batch
 822          let batch = producer.try_next().await.unwrap().unwrap();
 823          assert!(batch.as_finalized().is_none());
 824          let accepted = batch.as_accepted().unwrap();
 825          assert_eq!(accepted.number(), 11);
 826  
 827          producer
 828              .handle_ingestion_message(&IngestionMessage::Finalized(new_block_id(13)))
 829              .await
 830              .unwrap();
 831  
 832          // finalized with block 12, 13
 833          let batch = producer.try_next().await.unwrap().unwrap();
 834          assert!(batch.as_finalized().is_some());
 835  
 836          // one finalized batch
 837          let batch = producer.try_next().await.unwrap().unwrap();
 838          assert!(batch.as_finalized().is_none());
 839          let accepted = batch.as_accepted().unwrap();
 840          assert_eq!(accepted.number(), 14);
 841      }
 842  
 843      /// This test checks that the producer resumes producing accepted cursors after receiving an
 844      /// accepted message.
 845      ///
 846      /// Finality: ACCEPTED
 847      #[tokio::test]
 848      async fn test_handle_accepted_message_as_accepted() {
 849          let mut storage = MockStorageReader::new();
 850          storage
 851              .expect_read_status()
 852              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
 853          storage
 854              .expect_canonical_block_id()
 855              .returning(|i| Ok(Some(new_block_id(i))));
 856          storage
 857              .expect_highest_accepted_block()
 858              .returning(|| Ok(Some(new_block_id(15))));
 859          storage
 860              .expect_highest_finalized_block()
 861              .returning(|| Ok(Some(new_block_id(10))));
 862  
 863          let mut producer = new_producer(
 864              Some(new_block_id(11)),
 865              DataFinality::DataStatusAccepted,
 866              Arc::new(storage),
 867          )
 868          .await;
 869  
 870          // accepted batches
 871          for block_num in 12..=15 {
 872              let batch = producer.try_next().await.unwrap().unwrap();
 873              assert!(batch.as_finalized().is_none());
 874              let accepted = batch.as_accepted().unwrap();
 875              assert_eq!(accepted.number(), block_num);
 876          }
 877  
 878          let batch = producer.try_next().now_or_never();
 879          assert!(batch.is_none());
 880  
 881          producer
 882              .handle_ingestion_message(&IngestionMessage::Accepted(new_block_id(16)))
 883              .await
 884              .unwrap();
 885  
 886          // one finalized batch
 887          let batch = producer.try_next().await.unwrap().unwrap();
 888          assert!(batch.as_finalized().is_none());
 889          let accepted = batch.as_accepted().unwrap();
 890          assert_eq!(accepted.number(), 16);
 891  
 892          let batch = producer.try_next().now_or_never();
 893          assert!(batch.is_none());
 894      }
 895  
 896      /// This test checks that the producer produces messages after the invalidated cursor.
 897      ///
 898      /// Finality: ACCEPTED
 899      #[tokio::test]
 900      async fn test_handle_invalidate_message_as_accepted() {
 901          let mut storage = MockStorageReader::new();
 902          storage
 903              .expect_read_status()
 904              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
 905          storage
 906              .expect_canonical_block_id()
 907              .returning(|i| Ok(Some(new_block_id(i))));
 908          storage
 909              .expect_highest_accepted_block()
 910              .returning(|| Ok(Some(new_block_id(15))));
 911          storage
 912              .expect_highest_finalized_block()
 913              .returning(|| Ok(Some(new_block_id(10))));
 914  
 915          let mut producer = new_producer(
 916              Some(new_block_id(11)),
 917              DataFinality::DataStatusAccepted,
 918              Arc::new(storage),
 919          )
 920          .await;
 921  
 922          for _ in 0..2 {
 923              let batch = producer.try_next().await.unwrap().unwrap();
 924              assert!(batch.as_accepted().is_some());
 925          }
 926  
 927          // invalidate after current. nothing happens
 928          producer
 929              .handle_ingestion_message(&IngestionMessage::Invalidate(new_block_id(14)))
 930              .await
 931              .unwrap();
 932  
 933          let batch = producer.try_next().await.unwrap().unwrap();
 934          assert_eq!(batch.as_accepted().unwrap().number(), 14);
 935  
 936          // invalidate before current. goes back
 937          producer
 938              .handle_ingestion_message(&IngestionMessage::Invalidate(new_block_id(11)))
 939              .await
 940              .unwrap();
 941  
 942          // still no new accepted.
 943          let batch = producer.try_next().now_or_never();
 944          assert!(batch.is_none());
 945  
 946          producer
 947              .handle_ingestion_message(&IngestionMessage::Accepted(new_block_id(15)))
 948              .await
 949              .unwrap();
 950  
 951          let batch = producer.try_next().await.unwrap().unwrap();
 952          assert_eq!(batch.as_accepted().unwrap().number(), 12);
 953      }
 954  
 955      /// This test checks that data is produced if the node has not ingested any finalized data, but
 956      /// the client requested accepted data. This happens on devnet.
 957      ///
 958      /// Finality: ACCEPTED
 959      #[tokio::test]
 960      async fn test_no_finalized_as_accepted() {
 961          let mut storage = MockStorageReader::new();
 962          storage
 963              .expect_canonical_block_id()
 964              .returning(|i| Ok(Some(new_block_id(i))));
 965          storage
 966              .expect_highest_accepted_block()
 967              .returning(|| Ok(Some(new_block_id(14))));
 968          storage
 969              .expect_highest_finalized_block()
 970              .returning(|| Ok(None));
 971  
 972          let mut producer =
 973              new_producer(None, DataFinality::DataStatusAccepted, Arc::new(storage)).await;
 974  
 975          let batch = producer.try_next().await.unwrap().unwrap();
 976          assert_eq!(batch.as_accepted().unwrap().number(), 0);
 977      }
 978  
 979      /// This test checks that finalized cursors are produced even if no accepted data has been
 980      /// ingested. This happens when initially syncing the node.
 981      ///
 982      /// Finality: ACCEPTED
 983      #[tokio::test]
 984      async fn test_no_accepted_as_accepted() {
 985          let mut storage = MockStorageReader::new();
 986          storage
 987              .expect_canonical_block_id()
 988              .returning(|i| Ok(Some(new_block_id(i))));
 989          storage
 990              .expect_highest_accepted_block()
 991              .returning(|| Ok(None));
 992          storage
 993              .expect_highest_finalized_block()
 994              .returning(|| Ok(Some(new_block_id(15))));
 995  
 996          let mut producer =
 997              new_producer(None, DataFinality::DataStatusAccepted, Arc::new(storage)).await;
 998  
 999          let batch = producer.try_next().await.unwrap().unwrap();
1000          assert!(batch.as_finalized().is_some());
1001      }
1002  
1003      /// This test checks that the pending producer produces finalized/accepted cursors until
1004      /// reaching the head. At that point, it produces one pending block (if any).
1005      ///
1006      /// Finality: PENDING
1007      #[tokio::test]
1008      async fn test_produce_full_batch_pending() {
1009          let mut storage = MockStorageReader::new();
1010          storage
1011              .expect_read_status()
1012              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
1013          storage
1014              .expect_canonical_block_id()
1015              .returning(|i| Ok(Some(new_block_id(i))));
1016          storage
1017              .expect_highest_accepted_block()
1018              .returning(|| Ok(Some(new_block_id(15))));
1019          storage
1020              .expect_highest_finalized_block()
1021              .returning(|| Ok(Some(new_block_id(10))));
1022  
1023          let mut producer = new_producer(
1024              Some(new_block_id(8)),
1025              DataFinality::DataStatusPending,
1026              Arc::new(storage),
1027          )
1028          .await;
1029  
1030          let batch = producer.try_next().await.unwrap().unwrap();
1031          assert!(batch.as_finalized().is_some());
1032  
1033          for i in 11..=15 {
1034              let batch = producer.try_next().await.unwrap().unwrap();
1035              assert_eq!(batch.as_accepted().unwrap().number(), i);
1036          }
1037  
1038          // no pending block yet.
1039          let batch = producer.try_next().now_or_never();
1040          assert!(batch.is_none());
1041  
1042          producer
1043              .handle_ingestion_message(&IngestionMessage::Pending(new_block_id(16)))
1044              .await
1045              .unwrap();
1046  
1047          let batch = producer.try_next().await.unwrap().unwrap();
1048          assert_eq!(batch.as_pending().unwrap().number(), 16);
1049  
1050          // only produce one pending.
1051          let batch = producer.try_next().now_or_never();
1052          assert!(batch.is_none());
1053  
1054          producer
1055              .handle_ingestion_message(&IngestionMessage::Accepted(new_block_id(16)))
1056              .await
1057              .unwrap();
1058  
1059          let batch = producer.try_next().await.unwrap().unwrap();
1060          assert_eq!(batch.as_accepted().unwrap().number(), 16);
1061  
1062          // no pending block yet.
1063          let batch = producer.try_next().now_or_never();
1064          assert!(batch.is_none());
1065      }
1066  
1067      #[tokio::test]
1068      async fn test_configure_with_valid_starting_cursor() {
1069          let mut storage = MockStorageReader::new();
1070          storage
1071              .expect_read_status()
1072              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
1073          storage
1074              .expect_canonical_block_id()
1075              .returning(|i| Ok(Some(new_block_id(i))));
1076          storage
1077              .expect_highest_accepted_block()
1078              .returning(|| Ok(Some(new_block_id(15))));
1079          storage
1080              .expect_highest_finalized_block()
1081              .returning(|| Ok(Some(new_block_id(10))));
1082  
1083          let cursor = new_block_id(8);
1084          let mut producer = SequentialCursorProducer::new(Arc::new(storage));
1085          let response = producer
1086              .reconfigure(&new_configuration(
1087                  Some(cursor),
1088                  DataFinality::DataStatusAccepted,
1089              ))
1090              .await
1091              .unwrap();
1092          assert_matches!(response, ReconfigureResponse::Ok);
1093      }
1094  
1095      #[tokio::test]
1096      async fn test_configure_with_invalidated_starting_cursor() {
1097          let mut storage = MockStorageReader::new();
1098          storage
1099              .expect_read_status()
1100              .with(eq(new_block_id(8)))
1101              .returning(|_| Ok(Some(BlockStatus::Rejected)));
1102          storage
1103              .expect_read_status()
1104              .with(eq(new_block_id(7)))
1105              .returning(|_| Ok(Some(BlockStatus::Rejected)));
1106          storage
1107              .expect_read_status()
1108              .with(eq(new_block_id(6)))
1109              .returning(|_| Ok(Some(BlockStatus::AcceptedOnL1)));
1110          storage
1111              .expect_read_header()
1112              .with(eq(new_block_id(8)))
1113              .returning(|_| Ok(Some(new_block_header(8, new_block_id(8), new_block_id(7)))));
1114          storage
1115              .expect_read_header()
1116              .with(eq(new_block_id(7)))
1117              .returning(|_| Ok(Some(new_block_header(7, new_block_id(7), new_block_id(6)))));
1118          storage
1119              .expect_canonical_block_id()
1120              .returning(|i| Ok(Some(new_block_id(i))));
1121          storage
1122              .expect_highest_accepted_block()
1123              .returning(|| Ok(Some(new_block_id(15))));
1124          storage
1125              .expect_highest_finalized_block()
1126              .returning(|| Ok(Some(new_block_id(10))));
1127  
1128          let cursor = new_block_id(8);
1129          let mut producer = SequentialCursorProducer::new(Arc::new(storage));
1130          let response = producer
1131              .reconfigure(&new_configuration(
1132                  Some(cursor),
1133                  DataFinality::DataStatusAccepted,
1134              ))
1135              .await
1136              .unwrap();
1137          assert_matches!(response, ReconfigureResponse::Invalidate(_));
1138      }
1139  
1140      #[tokio::test]
1141      async fn test_configure_with_non_existing_starting_cursor() {
1142          let mut storage = MockStorageReader::new();
1143          storage.expect_read_status().returning(|_| Ok(None));
1144          storage
1145              .expect_canonical_block_id()
1146              .returning(|i| Ok(Some(new_block_id(i))));
1147          storage
1148              .expect_highest_accepted_block()
1149              .returning(|| Ok(Some(new_block_id(15))));
1150          storage
1151              .expect_highest_finalized_block()
1152              .returning(|| Ok(Some(new_block_id(10))));
1153  
1154          let cursor = new_block_id(8);
1155          let mut producer = SequentialCursorProducer::new(Arc::new(storage));
1156          let response = producer
1157              .reconfigure(&new_configuration(
1158                  Some(cursor),
1159                  DataFinality::DataStatusAccepted,
1160              ))
1161              .await
1162              .unwrap();
1163          assert_matches!(response, ReconfigureResponse::MissingStartingCursor);
1164      }
1165  }