Skip to content

Commit

Permalink
Merge pull request #2100 from eqlabs/krisztian/p2p-rollback-to-anchor
Browse files Browse the repository at this point in the history
feat(pathfinder/sync/checkpoint): add rollback_to_anchor implementation to checkpoint sync
  • Loading branch information
kkovaacs committed Jun 25, 2024
2 parents e53ee69 + f19c232 commit cf75275
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 25 deletions.
48 changes: 28 additions & 20 deletions crates/p2p/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,13 @@ impl MainLoop {
channel,
},
)) => {
tracing::debug!(%peer, %request_id, "Sync request sent");
tracing::debug!(%peer, %request_id, "Header sync request sent");

let _ = self
.pending_sync_requests
.headers
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("Header sync request still to be pending")
.send(Ok(channel));
}
SwarmEvent::Behaviour(behaviour::Event::ClassesSync(
Expand Down Expand Up @@ -535,13 +535,13 @@ impl MainLoop {
channel,
},
)) => {
tracing::debug!(%peer, %request_id, "Sync request sent");
tracing::debug!(%peer, %request_id, "Classes sync request sent");

let _ = self
.pending_sync_requests
.classes
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("Classes sync request still to be pending")
.send(Ok(channel));
}
SwarmEvent::Behaviour(behaviour::Event::StateDiffsSync(
Expand Down Expand Up @@ -570,13 +570,13 @@ impl MainLoop {
channel,
},
)) => {
tracing::debug!(%peer, %request_id, "Sync request sent");
tracing::debug!(%peer, %request_id, "State diff sync request sent");

let _ = self
.pending_sync_requests
.state_diffs
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("State diff sync request still to be pending")
.send(Ok(channel));
}
SwarmEvent::Behaviour(behaviour::Event::TransactionsSync(
Expand Down Expand Up @@ -605,13 +605,13 @@ impl MainLoop {
channel,
},
)) => {
tracing::debug!(%peer, %request_id, "Sync request sent");
tracing::debug!(%peer, %request_id, "Transaction sync request sent");

let _ = self
.pending_sync_requests
.transactions
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("Transaction sync request still to be pending")
.send(Ok(channel));
}
SwarmEvent::Behaviour(behaviour::Event::EventsSync(
Expand Down Expand Up @@ -640,78 +640,86 @@ impl MainLoop {
channel,
},
)) => {
tracing::debug!(%peer, %request_id, "Sync request sent");
tracing::debug!(%peer, %request_id, "Event sync request sent");

let _ = self
.pending_sync_requests
.events
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("Event sync request still to be pending")
.send(Ok(channel));
}
SwarmEvent::Behaviour(behaviour::Event::HeadersSync(
p2p_stream::Event::OutboundFailure {
request_id, error, ..
},
)) => {
tracing::warn!(?request_id, ?error, "Outbound request failed");
tracing::warn!(?request_id, ?error, "Outbound header sync request failed");
let _ = self
.pending_sync_requests
.headers
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("Header sync request still to be pending")
.send(Err(error.into()));
}
SwarmEvent::Behaviour(behaviour::Event::ClassesSync(
p2p_stream::Event::OutboundFailure {
request_id, error, ..
},
)) => {
tracing::warn!(?request_id, ?error, "Outbound request failed");
tracing::warn!(?request_id, ?error, "Outbound event sync request failed");
let _ = self
.pending_sync_requests
.classes
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("Event sync request still to be pending")
.send(Err(error.into()));
}
SwarmEvent::Behaviour(behaviour::Event::StateDiffsSync(
p2p_stream::Event::OutboundFailure {
request_id, error, ..
},
)) => {
tracing::warn!(?request_id, ?error, "Outbound request failed");
tracing::warn!(
?request_id,
?error,
"Outbound state diff sync request failed"
);
let _ = self
.pending_sync_requests
.state_diffs
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("State diff sync request still to be pending")
.send(Err(error.into()));
}
SwarmEvent::Behaviour(behaviour::Event::TransactionsSync(
p2p_stream::Event::OutboundFailure {
request_id, error, ..
},
)) => {
tracing::warn!(?request_id, ?error, "Outbound request failed");
tracing::warn!(
?request_id,
?error,
"Outbound transaction sync request failed"
);
let _ = self
.pending_sync_requests
.transactions
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("Transaction sync request still to be pending")
.send(Err(error.into()));
}
SwarmEvent::Behaviour(behaviour::Event::EventsSync(
p2p_stream::Event::OutboundFailure {
request_id, error, ..
},
)) => {
tracing::warn!(?request_id, ?error, "Outbound request failed");
tracing::warn!(?request_id, ?error, "Outbound event sync request failed");
let _ = self
.pending_sync_requests
.events
.remove(&request_id)
.expect("Block sync request still to be pending")
.expect("Event sync request still to be pending")
.send(Err(error.into()));
}
// ===========================
Expand Down
39 changes: 35 additions & 4 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,8 @@ impl CheckpointAnalysis {
%local, anchor=%anchor.unwrap_or_default(), %checkpoint,
"Rolling back local chain to latest anchor point. Local data is potentially invalid as the Ethereum checkpoint is newer the local chain."
);
rollback_to_anchor(storage, anchor)

rollback_to_anchor(storage, local, anchor)
.await
.context("Rolling back chain state to L1 anchor")?;
}
Expand All @@ -524,7 +525,8 @@ impl CheckpointAnalysis {
%block, %local, %checkpoint, ?anchor,
"Rolling back local chain to latest anchor point. Local data is invalid as it did not match the Ethereum checkpoint's hash."
);
rollback_to_anchor(storage, anchor)

rollback_to_anchor(storage, block, anchor)
.await
.context("Rolling back chain state to L1 anchor")?;
}
Expand Down Expand Up @@ -575,9 +577,38 @@ impl LocalState {

/// Rolls back local chain-state until the given anchor point, making it the tip
/// of the local chain. If this is ['None'] then all data will be rolled back.
async fn rollback_to_anchor(storage: Storage, anchor: Option<BlockNumber>) -> anyhow::Result<()> {
async fn rollback_to_anchor(
storage: Storage,
local: BlockNumber,
anchor: Option<BlockNumber>,
) -> anyhow::Result<()> {
spawn_blocking(move || {
todo!("Rollback storage to anchor point");
tracing::info!(%local, ?anchor, "Rolling back storage to anchor point");

let last_block_to_remove = anchor.map(|n| n + 1).unwrap_or_default();
let mut head = local;

let mut db = storage
.connection()
.context("Creating database connection")?;
let transaction = db.transaction().context("Create database transaction")?;

// TODO: roll back Merkle tree state once we're updating that

while head >= last_block_to_remove {
transaction
.purge_block(head)
.with_context(|| format!("Purging block {head} from database"))?;

// No further blocks to purge if we just purged genesis.
if head == BlockNumber::GENESIS {
break;
}

head -= 1;
}

Ok(())
})
.await
.context("Joining blocking task")?
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/src/sync/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<T: Send + 'static> SyncReceiver<T> {
let elements_per_sec = count as f32 / t.elapsed().as_secs_f32();
let queue_fullness = queue_capacity - self.inner.capacity();
let input_queue = Fullness(queue_fullness, queue_capacity);
tracing::debug!(stage=S::NAME, %input_queue, %elements_per_sec, "Item processed");
tracing::trace!(stage=S::NAME, %input_queue, %elements_per_sec, "Item processed");

output
}
Expand Down

0 comments on commit cf75275

Please sign in to comment.