Skip to content

Commit

Permalink
Revert "feat: emit from batch up to the failed block"
Browse files Browse the repository at this point in the history
This reverts commit fcacc67.
  • Loading branch information
kkovaacs committed Sep 23, 2024
1 parent f4fcad9 commit 50efa95
Showing 1 changed file with 14 additions and 38 deletions.
52 changes: 14 additions & 38 deletions crates/pathfinder/src/state/sync/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,17 +667,11 @@ where

async move {
let t_block = std::time::Instant::now();
let (block, state_update) = sequencer
.state_update_with_block(block_number)
.await
.map_err(|e| (block_number, e.into()))?;
let (block, state_update) = sequencer.state_update_with_block(block_number).await?;
let t_block = t_block.elapsed();

let t_signature = std::time::Instant::now();
let signature = sequencer
.signature(block_number.into())
.await
.map_err(|e| (block_number, e.into()))?;
let signature = sequencer.signature(block_number.into()).await?;
let t_signature = t_signature.elapsed();

let span = tracing::Span::current();
Expand Down Expand Up @@ -742,16 +736,14 @@ where
) = rx
.await
.expect("Panic on rayon thread while verifying block")
.context("Verifying block contents")
.map_err(|e| (block_number, e))?;
.context("Verifying block contents")?;

let t_declare = std::time::Instant::now();
let downloaded_classes = download_new_classes(&state_update, &sequencer, storage)
.await
.with_context(|| {
format!("Handling newly declared classes for block {block_number:?}")
})
.map_err(|e| (block_number, e))?;
})?;
let t_declare = t_declare.elapsed();

let timings = Timings {
Expand All @@ -760,7 +752,7 @@ where
signature_download: t_signature,
};

Ok::<_, (BlockNumber, anyhow::Error)>((
Ok::<_, anyhow::Error>((
block,
state_update,
signature,
Expand Down Expand Up @@ -795,28 +787,17 @@ where
futures::stream::iter(futures_chunk).buffer_unordered(fetch_concurrency.get());

let mut ordered_blocks = BTreeMap::new();
let mut failed_block = None;

while let Some(result) = stream.next().await {
let ok = match result {
Ok(x) => x,
Err((block, error)) => {
// We've hit an error, so we stop the loop and return. `head` has been updated
// to the last synced block so our "tracking" sync will just
// continue from there.
tracing::info!(
%block, %error,
"Error during bulk syncing blocks, falling back to normal sync",
);

if block == start {
return Ok(());
} else {
// We can still emit up to the failed block
failed_block = Some(block);
continue;
}
}
let Ok(ok) = result else {
// We've hit an error, so we stop the loop and return. `head` has been updated
// to the last synced block so our "tracking" sync will just
// continue from there.
tracing::info!(
"Error during bulk syncing blocks, falling back to normal sync: {}",
result.err().unwrap()
);
return Ok(());
};

ordered_blocks.insert(ok.0.block_number.get(), ok);
Expand Down Expand Up @@ -886,11 +867,6 @@ where
.await
.context("Event channel closed")?;
}

match failed_block {
Some(x) if start == x.get() => return Ok(()),
_ => {}
}
}
}

Expand Down

0 comments on commit 50efa95

Please sign in to comment.