Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pathfinder/state/sync): bulk sync should exit right after first error #2259

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 85 additions & 105 deletions crates/pathfinder/src/state/sync/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,17 +667,11 @@ where

async move {
let t_block = std::time::Instant::now();
let (block, state_update) = sequencer
.state_update_with_block(block_number)
.await
.map_err(|e| (block_number, e.into()))?;
let (block, state_update) = sequencer.state_update_with_block(block_number).await?;
let t_block = t_block.elapsed();

let t_signature = std::time::Instant::now();
let signature = sequencer
.signature(block_number.into())
.await
.map_err(|e| (block_number, e.into()))?;
let signature = sequencer.signature(block_number.into()).await?;
let t_signature = t_signature.elapsed();

let span = tracing::Span::current();
Expand Down Expand Up @@ -742,16 +736,14 @@ where
) = rx
.await
.expect("Panic on rayon thread while verifying block")
.context("Verifying block contents")
.map_err(|e| (block_number, e))?;
.context("Verifying block contents")?;

let t_declare = std::time::Instant::now();
let downloaded_classes = download_new_classes(&state_update, &sequencer, storage)
.await
.with_context(|| {
format!("Handling newly declared classes for block {block_number:?}")
})
.map_err(|e| (block_number, e))?;
})?;
let t_declare = t_declare.elapsed();

let timings = Timings {
Expand All @@ -760,7 +752,7 @@ where
signature_download: t_signature,
};

Ok::<_, (BlockNumber, anyhow::Error)>((
Ok::<_, anyhow::Error>((
block,
state_update,
signature,
Expand Down Expand Up @@ -795,28 +787,17 @@ where
futures::stream::iter(futures_chunk).buffer_unordered(fetch_concurrency.get());

let mut ordered_blocks = BTreeMap::new();
let mut failed_block = None;

while let Some(result) = stream.next().await {
let ok = match result {
Ok(x) => x,
Err((block, error)) => {
// We've hit an error, so we stop the loop and return. `head` has been updated
// to the last synced block so our "tracking" sync will just
// continue from there.
tracing::info!(
%block, %error,
"Error during bulk syncing blocks, falling back to normal sync",
);

if block == start {
return Ok(());
} else {
// We can still emit up to the failed block
failed_block = Some(block);
continue;
}
}
let Ok(ok) = result else {
// We've hit an error, so we stop the loop and return. `head` has been updated
// to the last synced block so our "tracking" sync will just
// continue from there.
tracing::info!(
"Error during bulk syncing blocks, falling back to normal sync: {}",
result.err().unwrap()
);
return Ok(());
};

ordered_blocks.insert(ok.0.block_number.get(), ok);
Expand Down Expand Up @@ -886,11 +867,6 @@ where
.await
.context("Event channel closed")?;
}

match failed_block {
Some(x) if start == x.get() => return Ok(()),
_ => {}
}
}
}

Expand Down Expand Up @@ -1578,6 +1554,19 @@ mod tests {
.return_once(move |_| returned_result);
}

fn expect_state_update_with_block_no_sequence_at_most_once(
mock: &mut MockGatewayApi,
block: BlockNumber,
returned_result: Result<(reply::Block, StateUpdate), SequencerError>,
) {
use mockall::predicate::eq;

mock.expect_state_update_with_block()
.with(eq(block))
.times(..=1)
.return_once(move |_| returned_result);
}

/// Convenience wrapper
fn expect_block_header(
mock: &mut MockGatewayApi,
Expand Down Expand Up @@ -1624,6 +1613,19 @@ mod tests {
.return_once(|_| returned_result);
}

fn expect_signature_no_sequence_at_most_once(
mock: &mut MockGatewayApi,
block: BlockId,
returned_result: Result<reply::BlockSignature, SequencerError>,
) {
use mockall::predicate::eq;

mock.expect_signature()
.with(eq(block))
.times(..=1)
.return_once(|_| returned_result);
}

/// Convenience wrapper
fn expect_class_by_hash(
mock: &mut MockGatewayApi,
Expand All @@ -1650,6 +1652,17 @@ mod tests {
.return_once(|_| returned_result);
}

fn expect_class_by_hash_no_sequence_at_most_once(
mock: &mut MockGatewayApi,
class_hash: ClassHash,
returned_result: Result<bytes::Bytes, SequencerError>,
) {
mock.expect_pending_class_by_hash()
.withf(move |x| x == &class_hash)
.times(..=1)
.return_once(|_| returned_result);
}

/// Convenience wrapper
fn block_not_found() -> SequencerError {
SequencerError::StarknetError(StarknetError {
Expand Down Expand Up @@ -3065,77 +3078,44 @@ mod tests {
assert_matches!(result, Ok(Some((BLOCK1_NUMBER, BLOCK1_HASH, _))));
}

mod no_such_block {
use pretty_assertions_sorted::{assert_eq, assert_eq_sorted};

use super::*;

#[tokio::test]
async fn first_in_batch() {
let (tx_event, mut rx_event) = tokio::sync::mpsc::channel(1);
let mut mock = MockGatewayApi::new();

// Downloading the genesis block fails with block not found
expect_state_update_with_block_no_sequence(
&mut mock,
BLOCK0_NUMBER,
Err(block_not_found()),
);

// Let's run the UUT
let jh = spawn_bulk_sync(tx_event, mock);

assert!(rx_event.recv().await.is_none());
#[tokio::test]
async fn no_such_block() {
let (tx_event, mut rx_event) = tokio::sync::mpsc::channel(1);
let mut mock = MockGatewayApi::new();

let result = jh.await.unwrap();
assert_matches!(result, Ok(None));
}
// Downloading the genesis block data is racing against the failure of block 1,
// hence "at most once"
expect_state_update_with_block_no_sequence_at_most_once(
&mut mock,
BLOCK0_NUMBER,
Ok((BLOCK0.clone(), STATE_UPDATE0.clone())),
);
expect_class_by_hash_no_sequence_at_most_once(
&mut mock,
CONTRACT0_HASH,
Ok(CONTRACT0_DEF.clone()),
);
expect_signature_no_sequence_at_most_once(
&mut mock,
BLOCK0_NUMBER.into(),
Ok(BLOCK0_SIGNATURE.clone()),
);
// Downloading block 1 fails with block not found
expect_state_update_with_block_no_sequence(
&mut mock,
BLOCK1_NUMBER,
Err(block_not_found()),
);

#[tokio::test]
async fn further_in_batch() {
let (tx_event, mut rx_event) = tokio::sync::mpsc::channel(1);
let mut mock = MockGatewayApi::new();
// Let's run the UUT
let jh = spawn_bulk_sync(tx_event, mock);

// Download the genesis block with respective state update and contracts
expect_state_update_with_block_no_sequence(
&mut mock,
BLOCK0_NUMBER,
Ok((BLOCK0.clone(), STATE_UPDATE0.clone())),
);
expect_class_by_hash_no_sequence(
&mut mock,
CONTRACT0_HASH,
Ok(CONTRACT0_DEF.clone()),
);
expect_signature_no_sequence(
&mut mock,
BLOCK0_NUMBER.into(),
Ok(BLOCK0_SIGNATURE.clone()),
);
// Downloading block 1 fails with block not found
expect_state_update_with_block_no_sequence(
&mut mock,
BLOCK1_NUMBER,
Err(block_not_found()),
);
// The entire unemitted, yet cached batch is rejected
assert!(rx_event.recv().await.is_none());

// Let's run the UUT
let jh = spawn_bulk_sync(tx_event, mock);

assert_matches!(rx_event.recv().await.unwrap(),
SyncEvent::CairoClass { hash, .. } => {
assert_eq!(hash, CONTRACT0_HASH);
});
assert_matches!(rx_event.recv().await.unwrap(), SyncEvent::Block((block, _), state_update, signature, _, _) => {
assert_eq!(*block, *BLOCK0);
assert_eq_sorted!(*state_update, *STATE_UPDATE0);
assert_eq!(*signature, BLOCK0_SIGNATURE.signature());
});

// Bulk sync should _not_ fail if the block is not found
let result = jh.await.unwrap();
assert_matches!(result, Ok(Some((BLOCK0_NUMBER, BLOCK0_HASH, _))));
}
// Bulk sync should _not_ fail if the block is not found
let result = jh.await.unwrap();
assert_matches!(result, Ok(None));
}
}
}
Expand Down