Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for L1 to L2 message log syncing #2251

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Pathfinder now fetches data concurrently from the feeder gateway when catching up. The `--gateway.fetch-concurrency` CLI option can be used to limit how many blocks are fetched concurrently (the default is 8).
- `--disable-version-update-check` CLI option has been added to disable the periodic checking for a new version.
- Pathfinder now syncs L1 to L2 message logs for improved cross-chain tracking.

### Changed

Expand Down
8 changes: 6 additions & 2 deletions crates/common/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use primitive_types::H256;

/// An L1 -> L2 message hash with the L1 tx hash where it was sent
use crate::TransactionHash;

/// An L1 -> L2 message log with the corresponding L1 and L2 tx hashes
#[derive(Debug, Clone)]
pub struct L1ToL2MessageLog {
pub message_hash: H256,
pub l1_tx_hash: H256,
pub l1_block_number: Option<u64>,
pub l1_tx_hash: Option<H256>,
pub l2_tx_hash: Option<TransactionHash>,
}
1 change: 0 additions & 1 deletion crates/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true }

[dev-dependencies]
281 changes: 265 additions & 16 deletions crates/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::collections::BTreeMap;
use std::future::Future;
use std::time::Duration;

use alloy::eips::{BlockId, BlockNumberOrTag, RpcBlockHash};
use alloy::primitives::{Address, B256};
use alloy::providers::{Provider, ProviderBuilder, WsConnect};
use alloy::rpc::types::Log;
use alloy::eips::{BlockId, BlockNumberOrTag};
use alloy::primitives::Address;
use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect};
use alloy::pubsub::PubSubFrontend;
use alloy::rpc::types::{Filter, Log};
use anyhow::Context;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use pathfinder_common::{BlockHash, BlockNumber, EthereumChain, L1ToL2MessageLog, StateCommitment};
use primitive_types::{H160, H256, U256};
use reqwest::{IntoUrl, Url};
Expand Down Expand Up @@ -36,6 +37,29 @@ pub mod core_addr {
Decoder::Hex.decode(b"4737c0c1B4D5b1A687B42610DdabEE781152359c");
}

pub mod block_numbers {
use super::BlockNumber;
pub mod mainnet {
use super::BlockNumber;
/// The first v0.13.2 block number
pub const FIRST_V0_13_2_BLOCK: BlockNumber = BlockNumber::new_or_panic(671_813);
/// The first L1 block number with a state update corresponding to
/// v0.13.2 of Starknet
pub const FIRST_L1_BLOCK_STARKNET_V0_13_2: BlockNumber =
BlockNumber::new_or_panic(20_627_771);
}

pub mod sepolia {
use super::BlockNumber;
/// The first v0.13.2 block number
pub const FIRST_V0_13_2_BLOCK: BlockNumber = BlockNumber::new_or_panic(86_311);
/// The first L1 block number with a state update corresponding to
/// v0.13.2 of Starknet
pub const FIRST_L1_BLOCK_STARKNET_V0_13_2: BlockNumber =
BlockNumber::new_or_panic(6_453_990);
}
}

/// Events that can be emitted by the Ethereum client
#[derive(Debug)]
pub enum EthereumEvent {
Expand All @@ -49,13 +73,26 @@ pub struct EthereumStateUpdate {
pub state_root: StateCommitment,
pub block_number: BlockNumber,
pub block_hash: BlockHash,
pub l1_block_number: Option<BlockNumber>,
}

/// Ethereum API trait
#[async_trait::async_trait]
pub trait EthereumApi {
async fn get_starknet_state(&self, address: &H160) -> anyhow::Result<EthereumStateUpdate>;
async fn get_chain(&self) -> anyhow::Result<EthereumChain>;
async fn get_message_logs(
&self,
address: &H160,
from_block: BlockNumber,
to_block: BlockNumber,
) -> anyhow::Result<Vec<L1ToL2MessageLog>>;
async fn get_state_updates(
&self,
address: &H160,
from_block: BlockNumber,
to_block: BlockNumber,
) -> anyhow::Result<Vec<EthereumStateUpdate>>;
async fn listen<F, Fut>(
&mut self,
address: &H160,
Expand Down Expand Up @@ -91,20 +128,17 @@ impl EthereumClient {
Self::new(url)
}

/// Returns the hash of the last finalized block
async fn get_finalized_block_hash(&self) -> anyhow::Result<H256> {
/// Returns the block number of the last finalized block
async fn get_finalized_block_number(&self) -> anyhow::Result<BlockNumber> {
t00ts marked this conversation as resolved.
Show resolved Hide resolved
// Create a WebSocket connection
let ws = WsConnect::new(self.url.clone());
let provider = ProviderBuilder::new().on_ws(ws).await?;

// Fetch the finalized block hash
// Fetch the finalized block number
provider
.get_block_by_number(BlockNumberOrTag::Finalized, false)
.await?
.map(|block| {
let block_hash: [u8; 32] = block.header.hash.into();
H256::from(block_hash)
})
.map(|block| BlockNumber::new_or_panic(block.header.number))
.context("Failed to fetch finalized block hash")
}
}
Expand Down Expand Up @@ -176,6 +210,7 @@ impl EthereumApi for EthereumClient {
block_number,
block_hash: get_block_hash(state_update.inner.blockHash),
state_root: get_state_root(state_update.inner.globalRoot),
l1_block_number: None,
};
self.pending_state_updates.insert(eth_block, state_update);
} else {
Expand All @@ -188,7 +223,9 @@ impl EthereumApi for EthereumClient {
// Create L1ToL2MessageHash from the log data
let msg = L1ToL2MessageLog {
message_hash: H256::from(log.inner.message_hash().to_be_bytes()),
l1_tx_hash: log.transaction_hash.map(|hash| H256::from(hash.0)).unwrap_or_default(),
l1_block_number: Some(log.block_number.expect("missing eth block number")),
l1_tx_hash: log.transaction_hash.map(|hash| H256::from(hash.0)),
l2_tx_hash: None,
};
// Emit the message log
callback(EthereumEvent::MessageLog(msg)).await;
Expand Down Expand Up @@ -221,9 +258,8 @@ impl EthereumApi for EthereumClient {
let contract = StarknetCoreContract::new(address, provider);

// Get the finalized block hash
let finalized_block_hash = self.get_finalized_block_hash().await?;
let block_hash = B256::from(finalized_block_hash.0);
let block_id = BlockId::Hash(RpcBlockHash::from_hash(block_hash, None));
let finalized_block_number = self.get_finalized_block_number().await?;
let block_id = BlockId::Number(BlockNumberOrTag::Number(finalized_block_number.get()));

// Call the contract methods
let state_root = contract.stateRoot().block(block_id).call().await?;
Expand All @@ -235,6 +271,7 @@ impl EthereumApi for EthereumClient {
state_root: get_state_root(state_root._0),
block_hash: get_block_hash(block_hash._0),
block_number: get_block_number(block_number._0),
l1_block_number: Some(finalized_block_number),
})
}

Expand All @@ -255,4 +292,216 @@ impl EthereumApi for EthereumClient {
x => EthereumChain::Other(x),
})
}

/// Get the L1 to L2 message logs for a given address and block range
async fn get_message_logs(
&self,
address: &H160,
from_block: BlockNumber,
to_block: BlockNumber,
) -> anyhow::Result<Vec<L1ToL2MessageLog>> {
// Create a WebSocket connection
let ws = WsConnect::new(self.url.clone());
let provider = ProviderBuilder::new().on_ws(ws).await?;

// Create the StarknetCoreContract instance
let address = Address::new((*address).into());
let core_contract = StarknetCoreContract::new(address, provider.clone());

// Create the filter
let filter = core_contract.LogMessageToL2_filter().filter;

// Fetch the logs
let mut logs = Vec::new();
get_logs_recursive(
&provider,
&filter,
from_block.get(),
to_block.get(),
&mut logs,
10_000,
)
.await?;

tracing::debug!(
"Fetched {} `L1ToL2MessageLog` logs from {} to {}",
logs.len(),
from_block,
to_block
);
Comment on lines +326 to +331
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::debug!(
"Fetched {} `L1ToL2MessageLog` logs from {} to {}",
logs.len(),
from_block,
to_block
);
tracing::trace!(
number_of_logs=%logs.len,
%from_block,
%to_block,
"Fetched L1ToL2MessageLog logs"
);


let logs: Vec<L1ToL2MessageLog> = logs
.into_iter()
.filter_map(|log| {
log.log_decode::<StarknetCoreContract::LogMessageToL2>()
.ok()
.map(|decoded| L1ToL2MessageLog {
message_hash: H256::from(decoded.inner.message_hash().to_be_bytes()),
l1_block_number: Some(log.block_number.expect("missing eth block number")),
l1_tx_hash: log.transaction_hash.map(|hash| H256::from(hash.0)),
l2_tx_hash: None,
})
})
.collect();

Ok(logs)
}

/// Get state updates for a given address and block range
async fn get_state_updates(
&self,
address: &H160,
from_block: BlockNumber,
to_block: BlockNumber,
) -> anyhow::Result<Vec<EthereumStateUpdate>> {
// Create a WebSocket connection
let ws = WsConnect::new(self.url.clone());
let provider = ProviderBuilder::new().on_ws(ws).await?;

// Create the StarknetCoreContract instance
let address = Address::new((*address).into());
let core_contract = StarknetCoreContract::new(address, provider.clone());

// Create the filter
let filter = core_contract.LogStateUpdate_filter().filter;

// Fetch the logs
let mut logs = Vec::new();
get_logs_recursive(
&provider,
&filter,
from_block.get(),
to_block.get(),
&mut logs,
2_000,
)
.await?;

let logs: Vec<EthereumStateUpdate> = logs
.into_iter()
.filter_map(|log| {
log.log_decode::<StarknetCoreContract::LogStateUpdate>()
.ok()
.map(|decoded| EthereumStateUpdate {
state_root: get_state_root(decoded.inner.globalRoot),
block_hash: get_block_hash(decoded.inner.blockHash),
block_number: get_block_number(decoded.inner.blockNumber),
l1_block_number: Some(BlockNumber::new_or_panic(
log.block_number.expect("missing eth block number"),
)),
})
})
.collect();

Ok(logs)
}
}

/// Recursively fetches logs while respecting provider limits
fn get_logs_recursive<'a>(
provider: &'a RootProvider<PubSubFrontend>,
base_filter: &'a Filter,
from_block: u64,
to_block: u64,
logs: &'a mut Vec<Log>,
// Limits
max_block_range: u64,
) -> futures::future::BoxFuture<'a, anyhow::Result<()>> {
async move {
// Nothing to do
if from_block > to_block {
return Ok(());
}

// If the block range exceeds the maximum, split it
let block_range = to_block - from_block + 1;
if block_range > max_block_range {
let mid_block = from_block + block_range / 2;
get_logs_recursive(
provider,
base_filter,
from_block,
mid_block - 1,
logs,
max_block_range,
)
.await?;
get_logs_recursive(
provider,
base_filter,
mid_block,
to_block,
logs,
max_block_range,
)
.await?;

return Ok(());
}

// Adjust the base filter to the current block range
let from_block_id = BlockNumberOrTag::Number(from_block);
let to_block_id = BlockNumberOrTag::Number(to_block);
let filter = (*base_filter)
.clone()
.from_block(from_block_id)
.to_block(to_block_id);

// Attempt to fetch the logs
let result = provider.get_logs(&filter).await;
match result {
Ok(new_logs) => {
logs.extend(new_logs);
}
Err(e) => {
tracing::debug!("Get logs error at block {}: {}", from_block, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::debug!("Get logs error at block {}: {}", from_block, e);
tracing::debug!(%from_block, error=?e, "Get logs error at block");

if let Some(err) = e.as_error_resp() {
// Retry the request splitting the block range in half
//
// Multiple providers have multiple restrictions. The max range limit can help
// with the obvious restrictions, but not with those that depend on the response
// size. And because there's no way we can predict this, retrying with a smaller
// range is the best we can do.
if err.is_retry_err() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the right check here. I had a look at the is_retry_err() implementation and while there seems to be an overlap in errors covered it seems to be more concerned by filtering errors that are related to rate limiting than query limits.

We did have some code back in Pathfinder 0.5.7 to do this and for example there was a special case with Alchemy where they were returning JSON-RPC error code -32602 (invalid params) and a message starting with "Log response size exceeded" -- I think that's not explicitly covered here.

We could leave this as it is for now but we should put some more effort into finding out if this is sufficient in a follow-up task.

tracing::debug!(
"Retrying request (splitting) at block {}: {}",
from_block,
err
);
Comment on lines +466 to +470
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::debug!(
"Retrying request (splitting) at block {}: {}",
from_block,
err
);
tracing::debug!(
%from_block,
error=?err,
"Retrying request (splitting) at block"
);

let mid_block = from_block + block_range / 2;
get_logs_recursive(
provider,
base_filter,
from_block,
mid_block - 1,
logs,
max_block_range,
)
.await?;
get_logs_recursive(
provider,
base_filter,
mid_block,
to_block,
logs,
max_block_range,
)
.await?;
return Ok(());
} else {
tracing::error!(
"get_logs: Provider error at block {}: {}",
from_block,
err
);
Comment on lines +492 to +496
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::error!(
"get_logs: Provider error at block {}: {}",
from_block,
err
);
tracing::error!(
%from_block,
error=?err,
"get_logs provider error"
);

Comment on lines +492 to +496
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::error!(
"get_logs: Provider error at block {}: {}",
from_block,
err
);
tracing::error!(
%from_block,
error=?err,
"get_logs: Provider error"
);

}
} else {
tracing::error!("get_logs: Unknown error at block {}: {}", from_block, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::error!("get_logs: Unknown error at block {}: {}", from_block, e);
tracing::error!(%from_block, error=?e, "get_logs: Unknown error");

}
}
}

Ok(())
}
.boxed()
}
1 change: 1 addition & 0 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ fn parse_l1_checkpoint_or_exit(
state_root: dto.state_root,
block_number: dto.block_number,
block_hash: dto.block_hash,
l1_block_number: None,
}
})
}
Expand Down
Loading