Skip to content

Commit

Permalink
Merge pull request #1963 from eqlabs/krisztian/feeder-gateway-inject-…
Browse files Browse the repository at this point in the history
…reorg

feat(examples/feeder_gateway): add optional support for reorg simulation
  • Loading branch information
kkovaacs committed Apr 18, 2024
2 parents 6b06e4f + 004376c commit b0f0b87
Showing 1 changed file with 115 additions and 27 deletions.
142 changes: 115 additions & 27 deletions crates/pathfinder/examples/feeder_gateway.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
/// Serve feeder gateway REST endpoints required for pathfinder to sync.
///
/// Usage:
/// `cargo run --release -p pathfinder --example feeder_gateway ./testnet-sepolia.sqlite`
///
/// Then pathfinder can be run with the following arguments to use this tool as a sync source:
///
/// ```text
/// cargo run --release -p pathfinder -- \
/// --network custom --chain-id SN_SEPOLIA \
/// --ethereum.url https://eth-sepolia.alchemyapi.io/v2/YOUR_API_KEY
/// --gateway-url http://localhost:8080/gateway \
/// --feeder-gateway-url http://localhost:8080/feeder_gateway \
/// --data-directory /tmp
/// ```
///
/// Optionally this tool can simulate reorgs. To have the tool return data so that
/// pathfinder reorgs from block 50 to 40 use the following command line:
/// `cargo run --release -p pathfinder --example feeder_gateway ./testnet-sepolia.sqlite --reorg-at-block 50 --reorg-to-block 40`
use std::collections::HashMap;
use std::convert::Infallible;
use std::num::NonZeroU32;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use anyhow::Context;
use clap::{Args, Parser};
use pathfinder_common::state_update::ContractClassUpdate;
use pathfinder_common::{
state_diff_commitment_bytes, BlockCommitmentSignature, BlockCommitmentSignatureElem, BlockHash,
Expand All @@ -19,39 +42,48 @@ use starknet_gateway_types::reply::{
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use warp::Filter;

/// Groups the Starknet contract addresses for a specific chain.
///
/// Getting addresses: <SEQUENCER_URL>/feeder_gateway/get_contract_addresses
pub struct ContractAddresses {
pub core: H160,
pub gps: H160,
#[derive(Parser)]
#[command(version)]
struct Cli {
#[arg(long_help = "Database path")]
pub database_path: PathBuf,
#[command(flatten)]
pub reorg: ReorgCli,
}

#[derive(Debug, Clone, Args)]
struct ReorgCli {
#[arg(long, long_help = "Reorg should happen after this block", value_parser = parse_block_number, requires = "reorg_to_block")]
pub reorg_at_block: Option<BlockNumber>,
#[arg(long, long_help = "Reorg should roll back state to this block", value_parser = parse_block_number, requires = "reorg_at_block")]
pub reorg_to_block: Option<BlockNumber>,
}

fn parse_block_number(s: &str) -> Result<BlockNumber, String> {
let n: u64 = s
.parse()
.map_err(|e| format!("Invalid block number '{s}': {e}"))?;
BlockNumber::new(n).ok_or_else(|| format!("Invalid block number '{s}'"))
}

/// Serve feeder gateway REST endpoints required for pathfinder to sync.
///
/// Usage:
/// `cargo run --release -p pathfinder --example feeder_gateway ./testnet-sepolia.sqlite`
///
/// Then pathfinder can be run with the following arguments to use this tool as a sync source:
///
/// ```text
/// cargo run --release -p pathfinder -- \
/// --network custom --chain-id SN_SEPOLIA \
/// --ethereum.url https://eth-sepolia.alchemyapi.io/v2/YOUR_API_KEY
/// --gateway-url http://localhost:8080/gateway \
/// --feeder-gateway-url http://localhost:8080/feeder_gateway \
/// --data-directory /tmp
/// ```
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();

tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
serve().await
serve(cli).await
}

#[derive(Debug, Clone)]
struct ReorgConfig {
pub reorg_at_block: BlockNumber,
pub reorg_to_block: BlockNumber,
}

async fn serve() -> anyhow::Result<()> {
async fn serve(cli: Cli) -> anyhow::Result<()> {
let database_path = std::env::args().nth(1).unwrap();
let storage = pathfinder_storage::StorageBuilder::file(database_path.into())
.migrate()?
Expand All @@ -64,6 +96,14 @@ async fn serve() -> anyhow::Result<()> {
get_chain(&tx)?
};

let reorg_config = cli.reorg.reorg_at_block.and_then(|reorg_at_block| {
cli.reorg.reorg_to_block.map(|reorg_to_block| ReorgConfig {
reorg_at_block,
reorg_to_block,
})
});
let reorged = Arc::new(AtomicBool::new(false));

let get_contract_addresses = warp::path("get_contract_addresses").map(move || {
let addresses = contract_addresses(chain).unwrap();
let reply =
Expand Down Expand Up @@ -190,8 +230,13 @@ async fn serve() -> anyhow::Result<()> {
.and(warp::query::<BlockIdParam>())
.and_then({
let storage = storage.clone();
let reorg_config = reorg_config.clone();
let reorged = reorged.clone();

move |block_id: BlockIdParam| {
let storage = storage.clone();
let reorg_config = reorg_config.clone();
let reorged = reorged.clone();
async move {
let include_block = block_id.include_block.unwrap_or(false);

Expand All @@ -201,7 +246,7 @@ async fn serve() -> anyhow::Result<()> {
let mut connection = storage.connection().unwrap();
let tx = connection.transaction().unwrap();

let state_update = resolve_state_update(&tx, block_id);
let state_update = resolve_state_update(&tx, block_id, reorg_config, reorged);
let block = resolve_block(&tx, block_id);

(state_update, block)
Expand All @@ -221,14 +266,14 @@ async fn serve() -> anyhow::Result<()> {
block,
};

Ok(warp::reply::json(&reply))
Ok(warp::reply::with_status(warp::reply::json(&reply), warp::http::StatusCode::OK))
} else {
Ok(warp::reply::json(&state_update))
Ok(warp::reply::with_status(warp::reply::json(&state_update), warp::http::StatusCode::OK))
}
},
_ => {
let error = serde_json::json!({"code": "StarknetErrorCode.BLOCK_NOT_FOUND", "message": "Block number not found"});
Ok(warp::reply::json(&error))
Ok(warp::reply::with_status(warp::reply::json(&error), warp::http::StatusCode::BAD_REQUEST))
}
}
},
Expand Down Expand Up @@ -336,6 +381,14 @@ fn contract_addresses(chain: Chain) -> anyhow::Result<ContractAddresses> {
})
}

/// Groups the Starknet contract addresses for a specific chain.
///
/// Getting addresses: <SEQUENCER_URL>/feeder_gateway/get_contract_addresses
pub struct ContractAddresses {
pub core: H160,
pub gps: H160,
}

#[tracing::instrument(level = "trace", skip(tx))]
fn resolve_block(
tx: &pathfinder_storage::Transaction<'_>,
Expand Down Expand Up @@ -423,7 +476,42 @@ fn resolve_signature(
fn resolve_state_update(
tx: &pathfinder_storage::Transaction<'_>,
block: BlockId,
reorg_config: Option<ReorgConfig>,
reorged: Arc<AtomicBool>,
) -> anyhow::Result<starknet_gateway_types::reply::StateUpdate> {
let block = if let Some(reorg_config) = reorg_config {
match block {
BlockId::Number(block_number) => {
if reorged.load(Ordering::Relaxed) {
// reorg is active
if block_number > reorg_config.reorg_to_block {
anyhow::bail!("Reorged block requested");
}
reorged.store(false, Ordering::Relaxed);
} else {
// reorg should start at this block
if block_number > reorg_config.reorg_at_block {
tracing::warn!(%reorg_config.reorg_to_block, "Reorg");
reorged.store(true, Ordering::Relaxed);
anyhow::bail!("Reorg happened");
}
}

block
}
BlockId::Latest => {
if reorged.load(Ordering::Relaxed) {
reorg_config.reorg_to_block.into()
} else {
block
}
}
_ => block,
}
} else {
block
};

tx.state_update(block)
.context("Fetching state update")?
.context("State update missing")
Expand Down

0 comments on commit b0f0b87

Please sign in to comment.