From 99df3002508d65f49ebb829b9a104dee4e577ea6 Mon Sep 17 00:00:00 2001 From: pompon0 Date: Mon, 22 Jul 2024 19:43:17 +0200 Subject: [PATCH 1/4] fixed PushBatchStoreStateReq message (#156) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit it was pushing the last batch, instead of just a batch number. Additionally I've: * tweaked names a bit * replaced context() with wrap() to propagate cancellation * fixed gossipnet msg size limits * added Debug for the public type --------- Co-authored-by: Bruno França --- node/Cargo.lock | 4 +- node/actors/executor/src/attestation.rs | 10 +- node/actors/executor/src/lib.rs | 9 +- node/actors/network/src/gossip/batch_votes.rs | 30 ++-- node/actors/network/src/gossip/mod.rs | 7 +- node/actors/network/src/gossip/runner.rs | 151 ++++++++---------- node/actors/network/src/gossip/testonly.rs | 1 + .../network/src/gossip/tests/fetch_batches.rs | 8 +- node/actors/network/src/lib.rs | 7 +- node/actors/network/src/proto/gossip.proto | 6 +- .../network/src/rpc/push_batch_store_state.rs | 6 +- node/deny.toml | 5 - node/libs/concurrency/src/ctx/mod.rs | 5 + node/libs/concurrency/src/error.rs | 8 +- node/libs/roles/src/attester/tests.rs | 8 +- node/libs/storage/src/batch_store.rs | 48 +++--- node/libs/storage/src/testonly/in_memory.rs | 6 +- node/libs/storage/src/testonly/mod.rs | 2 +- node/tools/src/config.rs | 5 +- 19 files changed, 145 insertions(+), 181 deletions(-) diff --git a/node/Cargo.lock b/node/Cargo.lock index 929747b2..c73e59a5 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -411,9 +411,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" [[package]] name = "bytesize" diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index b7ab3166..d7d21918 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -1,16 +1,13 @@ //! Module to publish attestations over batches. -use std::sync::Arc; - +use crate::Attester; use anyhow::Context; -use zksync_concurrency::ctx; -use zksync_concurrency::time; +use std::sync::Arc; +use zksync_concurrency::{ctx, time}; use zksync_consensus_network::gossip::BatchVotesPublisher; use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; -use crate::Attester; - const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); /// Polls the database for new batches to be signed and publishes them to the gossip channel. @@ -56,7 +53,6 @@ impl AttesterRunner { .await .context("wait_until_persisted")? .last - .map(|b| b.number) .unwrap_or_default(); // Determine the batch to start signing from. diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index e9317f40..f0475687 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -9,7 +9,7 @@ use std::{ }; use zksync_concurrency::{ctx, limiter, net, scope, time}; use zksync_consensus_bft as bft; -use zksync_consensus_network::{self as network}; +use zksync_consensus_network as network; use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; @@ -50,7 +50,7 @@ pub struct Config { /// Maximal size of the block payload. pub max_payload_size: usize, /// Maximal size of a batch, which includes `max_payload_size` per block in the batch, - /// plus the size of the Merkle proof of the commitment being included on L1 (should be ~1kB). + /// plus the size of the Merkle proof of the commitment being included on L1. pub max_batch_size: usize, /// Key of this node. It uniquely identifies the node. /// It should match the secret key provided in the `node_key` file. @@ -132,6 +132,7 @@ impl Executor { tracing::debug!("Starting actors in separate threads."); scope::run!(ctx, |ctx, s| async { + s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") }); let (net, runner) = network::Network::new( network_config, self.block_store.clone(), @@ -139,8 +140,6 @@ impl Executor { network_actor_pipe, ); net.register_metrics(); - - s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") }); s.spawn(async { runner.run(ctx).await.context("Network stopped") }); if let Some(attester) = self.attester { @@ -151,7 +150,7 @@ impl Executor { attester, net.batch_vote_publisher(), ); - s.spawn::<()>(async { + s.spawn(async { runner.run(ctx).await?; Ok(()) }); diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index cc934f67..ecff00ef 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -1,10 +1,10 @@ //! Global state distributed by active attesters, observed by all the nodes in the network. use crate::watch::Watch; -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, fmt, sync::Arc}; use zksync_concurrency::sync; -use zksync_consensus_roles::attester::{self, Batch}; +use zksync_consensus_roles::attester; -/// Represents the currents state of node's knowledge about the attester votes. +/// Represents the current state of node's knowledge about the attester votes. /// /// Eventually this data structure will have to track voting potentially happening /// simultaneously on multiple heights, if we decrease the batch interval to be @@ -24,7 +24,7 @@ pub(crate) struct BatchVotes { /// for now, hoping that with 1 minute batches there's plenty of time for /// the quorum to be reached, but eventually we'll have to allow multiple /// votes across different heights. - pub(crate) votes: im::HashMap>>, + pub(crate) votes: im::HashMap>>, /// Total weight of votes at different heights and hashes. /// @@ -40,7 +40,7 @@ pub(crate) struct BatchVotes { impl BatchVotes { /// Returns a set of votes of `self` which are newer than the entries in `b`. - pub(super) fn get_newer(&self, b: &Self) -> Vec>> { + pub(super) fn get_newer(&self, b: &Self) -> Vec>> { let mut newer = vec![]; for (k, v) in &self.votes { if let Some(bv) = b.votes.get(k) { @@ -61,7 +61,7 @@ impl BatchVotes { pub(super) fn update( &mut self, attesters: &attester::Committee, - data: &[Arc>], + data: &[Arc>], ) -> anyhow::Result { let mut changed = false; @@ -133,7 +133,7 @@ impl BatchVotes { sigs }); attester::BatchQC { - message: Batch { + message: attester::Batch { number: *number, hash: *hash, }, @@ -151,13 +151,12 @@ impl BatchVotes { self.min_batch_number = min_batch_number; self.votes.retain(|_, v| v.msg.number >= min_batch_number); if let Some(prev) = min_batch_number.prev() { - let (_, support) = self.support.split(&prev); - self.support = support; + self.support = self.support.split(&prev).1; } } /// Add an already validated vote from an attester into the register. - fn add(&mut self, vote: Arc>, weight: attester::Weight) { + fn add(&mut self, vote: Arc>, weight: attester::Weight) { self.remove(&vote.key, weight); let batch = self.support.entry(vote.msg.number).or_default(); @@ -213,7 +212,7 @@ impl BatchVotesWatch { pub(crate) async fn update( &self, attesters: &attester::Committee, - data: &[Arc>], + data: &[Arc>], ) -> anyhow::Result<()> { let this = self.0.lock().await; let mut votes = this.borrow().clone(); @@ -233,13 +232,20 @@ impl BatchVotesWatch { /// Wrapper around [BatchVotesWatch] to publish votes over batches signed by an attester key. pub struct BatchVotesPublisher(pub(crate) Arc); +impl fmt::Debug for BatchVotesPublisher { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BatchVotesPublisher") + .finish_non_exhaustive() + } +} + impl BatchVotesPublisher { /// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network. pub async fn publish( &self, attesters: &attester::Committee, attester: &attester::SecretKey, - batch: Batch, + batch: attester::Batch, ) -> anyhow::Result<()> { if !attesters.contains(&attester.public()) { return Ok(()); diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index b0d74a4e..9010f99e 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -15,11 +15,10 @@ pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; -use anyhow::Context as _; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; pub(crate) use validator_addrs::*; -use zksync_concurrency::{ctx, ctx::channel, scope, sync}; +use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -170,9 +169,9 @@ impl Network { let next_batch_number = qc.message.number.next(); self.batch_store - .queue_batch_qc(ctx, qc) + .persist_batch_qc(ctx, qc) .await - .context("queue_batch_qc")?; + .wrap("queue_batch_qc")?; self.batch_votes .set_min_batch_number(next_batch_number) diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index a4d3ae61..ff36888c 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -9,10 +9,34 @@ use zksync_consensus_roles::{attester::BatchNumber, node}; use zksync_consensus_storage::{BatchStore, BatchStoreState, BlockStore, BlockStoreState}; use zksync_protobuf::kB; -struct PushValidatorAddrsServer<'a>(&'a Network); +/// Receiver of push messages from the peers. +struct PushServer<'a> { + blocks: sync::watch::Sender, + batches: sync::watch::Sender, + /// The network is required for the verification of messages. + net: &'a Network, +} + +impl<'a> PushServer<'a> { + fn new(net: &'a Network) -> Self { + Self { + blocks: sync::watch::channel(BlockStoreState { + first: net.genesis().first_block, + last: None, + }) + .0, + batches: sync::watch::channel(BatchStoreState { + first: BatchNumber(0), + last: None, + }) + .0, + net, + } + } +} #[async_trait] -impl rpc::Handler for PushValidatorAddrsServer<'_> { +impl rpc::Handler for &PushServer<'_> { fn max_req_size(&self) -> usize { 100 * kB } @@ -21,34 +45,29 @@ impl rpc::Handler for PushValidatorAddrsServer<' _ctx: &ctx::Ctx, req: rpc::push_validator_addrs::Req, ) -> anyhow::Result<()> { - self.0 + self.net .push_validator_addrs_calls .fetch_add(1, Ordering::SeqCst); - self.0 + self.net .validator_addrs - .update(&self.0.genesis().validators, &req.0) + .update(&self.net.genesis().validators, &req.0) .await?; Ok(()) } } -/// Receive the snapshot of known batch votes from a remote peer. -/// -/// The server receives the *diff* from remote peers, which it merges into the common register. -struct PushBatchVotesServer<'a>(&'a Network); - #[async_trait::async_trait] -impl rpc::Handler for PushBatchVotesServer<'_> { +impl rpc::Handler for &PushServer<'_> { /// Here we bound the buffering of incoming batch messages. fn max_req_size(&self) -> usize { 100 * kB } async fn handle(&self, _ctx: &ctx::Ctx, req: rpc::push_batch_votes::Req) -> anyhow::Result<()> { - self.0 + self.net .batch_votes .update( - self.0.genesis().attesters.as_ref().context("attesters")?, + self.net.genesis().attesters.as_ref().context("attesters")?, &req.0, ) .await?; @@ -56,48 +75,8 @@ impl rpc::Handler for PushBatchVotesServer<'_> { } } -/// Represents what we know about the state of available blocks on the remote peer. -struct PushBlockStoreStateServer<'a> { - state: sync::watch::Sender, - /// The network is required for the verification of messages. - net: &'a Network, -} - -impl<'a> PushBlockStoreStateServer<'a> { - fn new(net: &'a Network) -> Self { - Self { - state: sync::watch::channel(BlockStoreState { - first: net.genesis().first_block, - last: None, - }) - .0, - net, - } - } -} - -/// Represents what we know about the state of available batches on the remote peer. -struct PushBatchStoreStateServer { - state: sync::watch::Sender, - max_batch_size: usize, -} - -impl PushBatchStoreStateServer { - /// Start out not knowing anything about the remote peer. - fn new(max_batch_size: usize) -> Self { - Self { - state: sync::watch::channel(BatchStoreState { - first: BatchNumber(0), - last: None, - }) - .0, - max_batch_size, - } - } -} - #[async_trait] -impl rpc::Handler for &PushBlockStoreStateServer<'_> { +impl rpc::Handler for &PushServer<'_> { fn max_req_size(&self) -> usize { 10 * kB } @@ -107,15 +86,15 @@ impl rpc::Handler for &PushBlockStoreStateServ req: rpc::push_block_store_state::Req, ) -> anyhow::Result<()> { req.0.verify(self.net.genesis())?; - self.state.send_replace(req.0); + self.blocks.send_replace(req.0); Ok(()) } } #[async_trait] -impl rpc::Handler for &PushBatchStoreStateServer { +impl rpc::Handler for &PushServer<'_> { fn max_req_size(&self) -> usize { - self.max_batch_size.saturating_add(kB) + 10 * kB } async fn handle( &self, @@ -123,7 +102,7 @@ impl rpc::Handler for &PushBatchStoreStateServ req: rpc::push_batch_store_state::Req, ) -> anyhow::Result<()> { req.0.verify()?; - self.state.send_replace(req.0); + self.batches.send_replace(req.0); Ok(()) } } @@ -159,66 +138,66 @@ impl rpc::Handler for &BatchStore { impl Network { /// Manages lifecycle of a single connection. async fn run_stream(&self, ctx: &ctx::Ctx, stream: noise::Stream) -> anyhow::Result<()> { + let push_server = PushServer::new(self); let push_validator_addrs_client = rpc::Client::::new( ctx, self.cfg.rpc.push_validator_addrs_rate, ); - let push_validator_addrs_server = PushValidatorAddrsServer(self); let push_block_store_state_client = rpc::Client::::new( ctx, self.cfg.rpc.push_block_store_state_rate, ); - let push_block_store_state_server = PushBlockStoreStateServer::new(self); - let get_block_client = - rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate); - let get_batch_client = - rpc::Client::::new(ctx, self.cfg.rpc.get_batch_rate); let push_batch_store_state_client = rpc::Client::::new( ctx, self.cfg.rpc.push_batch_store_state_rate, ); - let push_batch_store_state_server = PushBatchStoreStateServer::new(self.cfg.max_batch_size); + let get_block_client = + rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate); + let get_batch_client = + rpc::Client::::new(ctx, self.cfg.rpc.get_batch_rate); + scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_client(&push_validator_addrs_client) - .add_server( + .add_server::( ctx, - push_validator_addrs_server, + &push_server, self.cfg.rpc.push_validator_addrs_rate, ) .add_client(&push_block_store_state_client) - .add_server( + .add_client(&push_batch_store_state_client) + .add_server::( ctx, - &push_block_store_state_server, + &push_server, self.cfg.rpc.push_block_store_state_rate, ) + .add_server::( + ctx, + &push_server, + self.cfg.rpc.push_batch_store_state_rate, + ) .add_client(&get_block_client) .add_server(ctx, &*self.block_store, self.cfg.rpc.get_block_rate) .add_client(&get_batch_client) .add_server(ctx, &*self.batch_store, self.cfg.rpc.get_batch_rate) - .add_client(&push_batch_store_state_client) - .add_server( - ctx, - &push_batch_store_state_server, - self.cfg.rpc.get_batch_rate, - ) .add_server(ctx, rpc::ping::Server, rpc::ping::RATE); // If there is an attester committee then if self.genesis().attesters.as_ref().is_some() { - let push_signature_client = rpc::Client::::new( - ctx, - self.cfg.rpc.push_batch_votes_rate, - ); - let push_signature_server = PushBatchVotesServer(self); - service = service.add_client(&push_signature_client).add_server( + let push_batch_votes_client = rpc::Client::::new( ctx, - push_signature_server, self.cfg.rpc.push_batch_votes_rate, ); + service = service + .add_client(&push_batch_votes_client) + .add_server::( + ctx, + &push_server, + self.cfg.rpc.push_batch_votes_rate, + ); // Push L1 batch votes updates to peer. s.spawn::<()>(async { - let push_signature_client = push_signature_client; + let push_batch_votes_client = push_batch_votes_client; // Snapshot of the batches when we last pushed to the peer. let mut old = BatchVotes::default(); // Subscribe to what we know about the state of the whole network. @@ -233,7 +212,7 @@ impl Network { } old = new; let req = rpc::push_batch_votes::Req(diff); - push_signature_client.call(ctx, &req, kB).await?; + push_batch_votes_client.call(ctx, &req, kB).await?; } }); } @@ -290,7 +269,7 @@ impl Network { // Perform get_block calls to peer. s.spawn::<()>(async { // Gossiped state of what range of blocks is available on the remote peer. - let state = &mut push_block_store_state_server.state.subscribe(); + let state = &mut push_server.blocks.subscribe(); loop { let call = get_block_client.reserve(ctx).await?; let (req, send_resp) = self.fetch_queue.accept_block(ctx, state).await?; @@ -335,7 +314,7 @@ impl Network { // Perform get_batch calls to peer. s.spawn::<()>(async { - let state = &mut push_batch_store_state_server.state.subscribe(); + let state = &mut push_server.batches.subscribe(); loop { let call = get_batch_client.reserve(ctx).await?; let (req, send_resp) = self.fetch_queue.accept_batch(ctx, state).await?; diff --git a/node/actors/network/src/gossip/testonly.rs b/node/actors/network/src/gossip/testonly.rs index 07f7ab1a..d37419c0 100644 --- a/node/actors/network/src/gossip/testonly.rs +++ b/node/actors/network/src/gossip/testonly.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use super::*; use crate::{frame, mux, noise, preface, rpc, Config, GossipConfig}; +use anyhow::Context as _; use rand::Rng; use std::collections::BTreeMap; use zksync_concurrency::{ctx, limiter}; diff --git a/node/actors/network/src/gossip/tests/fetch_batches.rs b/node/actors/network/src/gossip/tests/fetch_batches.rs index 34f0c5d0..12b8e840 100644 --- a/node/actors/network/src/gossip/tests/fetch_batches.rs +++ b/node/actors/network/src/gossip/tests/fetch_batches.rs @@ -85,7 +85,7 @@ async fn test_simple() { ctx, &rpc::push_batch_store_state::Req(BatchStoreState { first: setup.batches[1].number, - last: Some(setup.batches[1].clone()), + last: Some(setup.batches[1].number), }), ) .await @@ -158,7 +158,7 @@ async fn test_concurrent_requests() { ctx, &rpc::push_batch_store_state::Req(BatchStoreState { first: setup.batches[0].number, - last: Some(setup.batches.last().unwrap().clone()), + last: Some(setup.batches.last().unwrap().number), }), ) .await @@ -218,7 +218,7 @@ async fn test_bad_responses() { let state = rpc::push_batch_store_state::Req(BatchStoreState { first: setup.batches[0].number, - last: Some(setup.batches[0].clone()), + last: Some(setup.batches[0].number), }); for resp in [ @@ -295,7 +295,7 @@ async fn test_retry() { let state = rpc::push_batch_store_state::Req(BatchStoreState { first: setup.batches[0].number, - last: Some(setup.batches[0].clone()), + last: Some(setup.batches[0].number), }); tracing::info!("establish a bunch of connections"); diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 349ee5e2..cc0f3aa9 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -131,12 +131,7 @@ impl Runner { }); // Update QC batches in the background. - s.spawn(async { - match self.net.gossip.run_batch_qc_finder(ctx).await { - Err(ctx::Error::Canceled(_)) => Ok(()), - other => other, - } - }); + s.spawn(self.net.gossip.run_batch_qc_finder(ctx)); // Fetch missing batches in the background. s.spawn(async { diff --git a/node/actors/network/src/proto/gossip.proto b/node/actors/network/src/proto/gossip.proto index 6d2957c4..33e3d16a 100644 --- a/node/actors/network/src/proto/gossip.proto +++ b/node/actors/network/src/proto/gossip.proto @@ -37,10 +37,12 @@ message PushBlockStoreState { // A node is expected to store a continuous range of batches at all times // and actively fetch newest batch. message PushBatchStoreState { + reserved 2; + reserved "last"; // First batch that the node has locally. optional uint64 first = 1; // required; BatchNumber - // Last batch QC that the node has locally. - optional roles.attester.SyncBatch last = 2; // optional + // Last batch that the node has locally. + optional uint64 last_v2 = 3; // optional } // Asks the server to send an L2 block (including its transactions). diff --git a/node/actors/network/src/rpc/push_batch_store_state.rs b/node/actors/network/src/rpc/push_batch_store_state.rs index 53e50535..aa098a5a 100644 --- a/node/actors/network/src/rpc/push_batch_store_state.rs +++ b/node/actors/network/src/rpc/push_batch_store_state.rs @@ -3,7 +3,7 @@ use crate::{mux, proto::gossip as proto}; use anyhow::Context as _; use zksync_consensus_roles::attester; use zksync_consensus_storage::BatchStoreState; -use zksync_protobuf::{read_optional, required, ProtoFmt}; +use zksync_protobuf::{required, ProtoFmt}; /// PushBatchStoreState RPC. #[derive(Debug)] @@ -28,14 +28,14 @@ impl ProtoFmt for Req { fn read(message: &Self::Proto) -> anyhow::Result { Ok(Self(BatchStoreState { first: attester::BatchNumber(*required(&message.first).context("first")?), - last: read_optional(&message.last).context("last")?, + last: message.last_v2.map(attester::BatchNumber), })) } fn build(&self) -> Self::Proto { Self::Proto { first: Some(self.0.first.0), - last: self.0.last.as_ref().map(|x| x.build()), + last_v2: self.0.last.as_ref().map(|n| n.0), } } } diff --git a/node/deny.toml b/node/deny.toml index 5d63ea5d..d135b269 100644 --- a/node/deny.toml +++ b/node/deny.toml @@ -76,13 +76,8 @@ skip = [ { name = "http", version = "0.2.12"}, { name = "http-body", version = "0.4.6"}, { name = "hyper", version = "0.14.28"}, - - # Old versions required by tls-listener. - { name = "tokio-rustls", version = "0.25"}, - { name = "rustls", version = "0.22.4"}, ] [sources] unknown-registry = "deny" unknown-git = "deny" -allow-org = { github = ["matter-labs"] } # TODO: remove once `vise` is published diff --git a/node/libs/concurrency/src/ctx/mod.rs b/node/libs/concurrency/src/ctx/mod.rs index e716b84e..bf4f01d5 100644 --- a/node/libs/concurrency/src/ctx/mod.rs +++ b/node/libs/concurrency/src/ctx/mod.rs @@ -253,6 +253,11 @@ impl Ctx { /// anyhow::Error + "canceled" variant. /// Useful for working with concurrent code which doesn't need structured errors, /// but needs to handle cancelation explicitly. +/// +/// WARNING: this error type implements both `Wrap` and `From`. +/// You should be careful to NOT use `context()` instead of `wrap()`, +/// because otherwise the `Canceled` error will get silently translated +/// to `Internal` error. #[derive(thiserror::Error, Debug)] pub enum Error { /// Context has been canceled before call completion. diff --git a/node/libs/concurrency/src/error.rs b/node/libs/concurrency/src/error.rs index 18621112..086b6138 100644 --- a/node/libs/concurrency/src/error.rs +++ b/node/libs/concurrency/src/error.rs @@ -1,8 +1,12 @@ //! Generalization of anyhow::Context to more structured errors. use std::fmt::Display; -/// Trait complementary to anyhow::Context which allows for -/// adding context to error types which contain anyhow::Error. +/// Trait complementary to `anyhow::Context` which allows for +/// adding context to error types which contain `anyhow::Error`. +/// +/// If an error type implements both `Wrap` and `From` +/// you should be careful to NOT use `context()` instead of `wrap()`, +/// because `context()` will just hide all the error details. pub trait Wrap: Sized { /// Appends context `c` to the error. fn wrap(self, c: C) -> Self { diff --git a/node/libs/roles/src/attester/tests.rs b/node/libs/roles/src/attester/tests.rs index b7d2f4c0..57c577a8 100644 --- a/node/libs/roles/src/attester/tests.rs +++ b/node/libs/roles/src/attester/tests.rs @@ -134,10 +134,6 @@ fn test_agg_signature_verify() { .is_err()); } -fn make_batch_msg(rng: &mut impl Rng) -> Batch { - rng.gen() -} - #[test] fn test_batch_qc() { use BatchQCVerifyError as Error; @@ -169,7 +165,7 @@ fn test_batch_qc() { // Create QCs with increasing number of attesters. for i in 0..setup1.attester_keys.len() + 1 { - let mut qc = BatchQC::new(make_batch_msg(rng)); + let mut qc = BatchQC::new(rng.gen()); for key in &setup1.attester_keys[0..i] { qc.add(&key.sign_msg(qc.message.clone()), &setup1.genesis) .unwrap(); @@ -201,7 +197,7 @@ fn test_attester_committee_weights() { // Expected sum of the attesters weights let sums = [1000, 1600, 2400, 8400, 9300, 10000]; - let msg = make_batch_msg(rng); + let msg: Batch = rng.gen(); let mut qc = BatchQC::new(msg.clone()); for (n, weight) in sums.iter().enumerate() { let key = &setup.attester_keys[n]; diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index 7eb2059a..29b64e3a 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -1,7 +1,7 @@ //! Defines storage layer for batches of blocks. use anyhow::Context as _; use std::{collections::VecDeque, fmt, sync::Arc}; -use zksync_concurrency::{ctx, scope, sync}; +use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{attester, validator}; /// State of the `BatchStore`: continuous range of batches. @@ -12,33 +12,33 @@ pub struct BatchStoreState { pub first: attester::BatchNumber, /// The last stored L1 batch. /// None iff store is empty. - pub last: Option, + pub last: Option, } impl BatchStoreState { /// Checks whether batch with the given number is stored in the `BatchStore`. pub fn contains(&self, number: attester::BatchNumber) -> bool { - let Some(last) = &self.last else { return false }; - self.first <= number && number <= last.number + let Some(last) = self.last else { return false }; + self.first <= number && number <= last } /// Number of the next batch that can be stored in the `BatchStore`. /// (i.e. `last` + 1). pub fn next(&self) -> attester::BatchNumber { match &self.last { - Some(last) => last.number.next(), + Some(last) => last.next(), None => self.first, } } /// Verifies `BatchStoreState'. pub fn verify(&self) -> anyhow::Result<()> { - if let Some(last) = &self.last { + if let Some(last) = self.last { anyhow::ensure!( - self.first <= last.clone().number, + self.first <= last, "first batch {} has bigger number than the last batch {}", self.first, - last.number + last ); } Ok(()) @@ -51,11 +51,6 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { /// Range of batches persisted in storage. fn persisted(&self) -> sync::watch::Receiver; - /// Get the L1 batch from storage with the highest number. - /// - /// Returns `None` if no batches have been created yet. - async fn last_batch(&self, ctx: &ctx::Ctx) -> ctx::Result>; - /// Get the earliest of L1 batches which are missing the corresponding L1 batch quorum certificates /// and potentially need to be signed by attesters. /// @@ -99,7 +94,7 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync { number: attester::BatchNumber, ) -> ctx::Result>; - /// Store the given batch QC in the storage. + /// Store the given batch QC in the storage persistently. async fn store_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()>; /// Queue the batch to be persisted in storage. @@ -149,7 +144,7 @@ impl Inner { if self.queued.next() != batch.number { return false; } - self.queued.last = Some(batch.clone()); + self.queued.last = Some(batch.number); self.cache.push_back(batch.clone()); self.truncate_cache(); true @@ -264,12 +259,10 @@ impl BatchStore { return Ok(Some(batch)); } } - let batch = self - .persistent + self.persistent .get_batch(ctx, number) .await - .context("persistent.batch()")?; - Ok(batch) + .wrap("persistent.batch()") } /// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed. @@ -285,12 +278,10 @@ impl BatchStore { &self, ctx: &ctx::Ctx, ) -> ctx::Result> { - let unsigned = self - .persistent + self.persistent .earliest_batch_number_to_sign(ctx) .await - .context("persistent.get_batch_to_sign()")?; - Ok(unsigned) + .wrap("persistent.get_batch_to_sign()") } /// Retrieve a batch to be signed. @@ -302,12 +293,10 @@ impl BatchStore { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::Result> { - let batch = self - .persistent + self.persistent .get_batch_to_sign(ctx, number) .await - .context("persistent.get_batch_to_sign()")?; - Ok(batch) + .wrap("persistent.get_batch_to_sign()") } /// Append batch to a queue to be persisted eventually. @@ -337,7 +326,7 @@ impl BatchStore { } /// Wait until the database has a batch, then attach the corresponding QC. - pub async fn queue_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { + pub async fn persist_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload // isn't yet available, but to be safe we can wait for the availability signal here as well. sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { @@ -345,8 +334,7 @@ impl BatchStore { }) .await?; // Now it's definitely safe to store it. - self.persistent.store_qc(ctx, qc).await?; - Ok(()) + self.persistent.store_qc(ctx, qc).await } /// Waits until the given batch is queued (in memory, or persisted). diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index eb57170d..9cb583a4 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -131,10 +131,6 @@ impl PersistentBatchStore for BatchStore { self.0.persisted.subscribe() } - async fn last_batch(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - Ok(self.0.persisted.borrow().last.clone().map(|qc| qc.number)) - } - async fn last_batch_qc(&self, _ctx: &ctx::Ctx) -> ctx::Result> { let certs = self.0.certs.lock().unwrap(); let last_batch_number = certs.keys().max().unwrap(); @@ -215,7 +211,7 @@ impl PersistentBatchStore for BatchStore { } self.0 .persisted - .send_modify(|p| p.last = Some(batch.clone())); + .send_modify(|p| p.last = Some(batch.number)); batches.push_back(batch); Ok(()) } diff --git a/node/libs/storage/src/testonly/mod.rs b/node/libs/storage/src/testonly/mod.rs index 1c11e024..38c4a4ba 100644 --- a/node/libs/storage/src/testonly/mod.rs +++ b/node/libs/storage/src/testonly/mod.rs @@ -154,7 +154,7 @@ pub async fn dump_batch( let after = state .last .as_ref() - .map(|sb| sb.number.next()) + .map(|sb| sb.next()) .unwrap_or(state.first); for n in (state.first.0..after.0).map(attester::BatchNumber) { let batch = store.get_batch(ctx, n).await.unwrap().unwrap(); diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 11081d9d..221baf04 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -134,7 +134,10 @@ impl ProtoFmt for AppConfig { let max_batch_size = match &r.max_batch_size { Some(x) => (*x).try_into().context("max_payload_size")?, - None => max_payload_size * 100 + kB, // Merkle proof is ~1kB and we have a batch per minute. + // Arbitrary estimate of 100 blocks + 1kB for the merkle proof. + // NOTE: this test node currently doesn't implement batches at all. + // Once it does, do the estimates again. + None => max_payload_size * 100 + kB, }; Ok(Self { From da5e506a685513327126e37c114d45b5275d2998 Mon Sep 17 00:00:00 2001 From: pompon0 Date: Tue, 23 Jul 2024 19:07:26 +0200 Subject: [PATCH 2/4] crates.io v0.1.0 rc.3 branch (#158) also set publish = false for zksync_consensus_tools --- node/Cargo.lock | 24 ++++++++++++------------ node/Cargo.toml | 24 ++++++++++++------------ node/tools/Cargo.toml | 1 + 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/node/Cargo.lock b/node/Cargo.lock index c73e59a5..e36bb5b3 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -3305,7 +3305,7 @@ dependencies = [ [[package]] name = "tester" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "clap", @@ -4059,7 +4059,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "assert_matches", @@ -4077,7 +4077,7 @@ dependencies = [ [[package]] name = "zksync_consensus_bft" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "assert_matches", @@ -4101,7 +4101,7 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "blst", @@ -4124,7 +4124,7 @@ dependencies = [ [[package]] name = "zksync_consensus_executor" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "rand 0.8.5", @@ -4144,7 +4144,7 @@ dependencies = [ [[package]] name = "zksync_consensus_network" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "assert_matches", @@ -4180,7 +4180,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "assert_matches", @@ -4201,7 +4201,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "assert_matches", @@ -4223,7 +4223,7 @@ dependencies = [ [[package]] name = "zksync_consensus_tools" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "async-trait", @@ -4258,7 +4258,7 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "rand 0.8.5", @@ -4268,7 +4268,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "bit-vec", @@ -4290,7 +4290,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" dependencies = [ "anyhow", "heck", diff --git a/node/Cargo.toml b/node/Cargo.toml index 820559db..26f2ab86 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -22,23 +22,23 @@ homepage = "https://matter-labs.io/" repository = "https://github.com/matter-labs/era-consensus" license = "MIT OR Apache-2.0" keywords = ["blockchain", "zksync"] -version = "0.1.0-rc.2" +version = "0.1.0-rc.3" [workspace.dependencies] # Crates from this repo. -zksync_consensus_bft = { version = "=0.1.0-rc.2", path = "actors/bft" } -zksync_consensus_crypto = { version = "=0.1.0-rc.2", path = "libs/crypto" } -zksync_consensus_executor = { version = "=0.1.0-rc.2", path = "actors/executor" } -zksync_consensus_network = { version = "=0.1.0-rc.2", path = "actors/network" } -zksync_consensus_roles = { version = "=0.1.0-rc.2", path = "libs/roles" } -zksync_consensus_storage = { version = "=0.1.0-rc.2", path = "libs/storage" } -zksync_consensus_tools = { version = "=0.1.0-rc.2", path = "tools" } -zksync_consensus_utils = { version = "=0.1.0-rc.2", path = "libs/utils" } +zksync_consensus_bft = { version = "=0.1.0-rc.3", path = "actors/bft" } +zksync_consensus_crypto = { version = "=0.1.0-rc.3", path = "libs/crypto" } +zksync_consensus_executor = { version = "=0.1.0-rc.3", path = "actors/executor" } +zksync_consensus_network = { version = "=0.1.0-rc.3", path = "actors/network" } +zksync_consensus_roles = { version = "=0.1.0-rc.3", path = "libs/roles" } +zksync_consensus_storage = { version = "=0.1.0-rc.3", path = "libs/storage" } +zksync_consensus_tools = { version = "=0.1.0-rc.3", path = "tools" } +zksync_consensus_utils = { version = "=0.1.0-rc.3", path = "libs/utils" } # Crates from this repo that might become independent in the future. -zksync_concurrency = { version = "=0.1.0-rc.2", path = "libs/concurrency" } -zksync_protobuf = { version = "=0.1.0-rc.2", path = "libs/protobuf" } -zksync_protobuf_build = { version = "=0.1.0-rc.2", path = "libs/protobuf_build" } +zksync_concurrency = { version = "=0.1.0-rc.3", path = "libs/concurrency" } +zksync_protobuf = { version = "=0.1.0-rc.3", path = "libs/protobuf" } +zksync_protobuf_build = { version = "=0.1.0-rc.3", path = "libs/protobuf_build" } # Crates from Matter Labs. pairing = { package = "pairing_ce", version = "=0.28.6" } diff --git a/node/tools/Cargo.toml b/node/tools/Cargo.toml index 98797b50..2134e4b1 100644 --- a/node/tools/Cargo.toml +++ b/node/tools/Cargo.toml @@ -7,6 +7,7 @@ homepage.workspace = true license.workspace = true repository.workspace = true keywords.workspace = true +publish = false default-run = "executor" description = "ZKsync consensus tools" From 1075fc6f1ad3646b664f4f6dd34a7ae68111a79e Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Wed, 24 Jul 2024 09:48:19 +0100 Subject: [PATCH 3/4] feat: Batch vote metrics (BFT-486) (#157) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Adds metrics around the gossiping of batch votes and the storage of batches. Added `gossip::metrics::BATCH_VOTES_METRICS`: - [x] `committee_size` to show how many attesters we have, which can be correlated with the number of votes we receive over time. Wanted to add a weight based one as well, but not sure if counting the weights monotonically would not overflow the counter. - [x] `votes_added` counts the votes received since start; with its rate-of-change we can get an idea of how many attesters are producing votes - [x] `weight_added` counts the total weight of votes added since start, each normalized to [0; 1] range with the total committee weight; this way it shouldn't overflow, and its rate of growth should be 0.8-1.0 per batch. - [x] `min_batch_number` shows the minimum batch we expect votes for; going up will indicate that QCs are formed - [x] `last_added_vote_batch_number` is the number from the last vote we registered; apart from Byzantine votes it should average to go up as L1 batches are created and votes are received, even if a QC cannot be formed - [x] `last_signed_batch_number` indicates that the current node is publishing its votes and keeping up with the L1 batches Also implement metrics for the `BatchStore` similar to how the `BlockStore` has them: - [x] `next` for `queued` and `persisted` - [x] latencies for the `PersistedBatchStore` operations - [x] latencies for the `wait_for_...` methods - [x] last batch QC saved ## Why ❔ To make the voting process observable. --- node/actors/network/src/gossip/batch_votes.rs | 63 +++++++++++-- node/actors/network/src/gossip/metrics.rs | 37 ++++++++ node/actors/network/src/gossip/mod.rs | 3 +- node/libs/storage/src/batch_store/metrics.rs | 50 +++++++++++ .../{batch_store.rs => batch_store/mod.rs} | 89 +++++++++++++++---- node/libs/storage/src/block_store/metrics.rs | 2 +- node/libs/storage/src/block_store/mod.rs | 7 +- 7 files changed, 223 insertions(+), 28 deletions(-) create mode 100644 node/actors/network/src/gossip/metrics.rs create mode 100644 node/libs/storage/src/batch_store/metrics.rs rename node/libs/storage/src/{batch_store.rs => batch_store/mod.rs} (87%) diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index ecff00ef..a80f0e03 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -4,6 +4,23 @@ use std::{collections::HashSet, fmt, sync::Arc}; use zksync_concurrency::sync; use zksync_consensus_roles::attester; +use super::metrics; + +#[derive(Debug, Default)] +pub(super) struct BatchUpdateStats { + num_added: usize, + weight_added: u64, + last_added: Option, +} + +impl BatchUpdateStats { + fn added(&mut self, number: attester::BatchNumber, weight: u64) { + self.num_added += 1; + self.weight_added += weight; + self.last_added = Some(number); + } +} + /// Represents the current state of node's knowledge about the attester votes. /// /// Eventually this data structure will have to track voting potentially happening @@ -57,13 +74,14 @@ impl BatchVotes { /// It exits as soon as an invalid entry is found. /// `self` might get modified even if an error is returned /// (all entries verified so far are added). - /// Returns true iff some new entry was added. + /// + /// Returns statistics about new entries added. pub(super) fn update( &mut self, attesters: &attester::Committee, data: &[Arc>], - ) -> anyhow::Result { - let mut changed = false; + ) -> anyhow::Result { + let mut stats = BatchUpdateStats::default(); let mut done = HashSet::new(); for d in data { @@ -97,10 +115,10 @@ impl BatchVotes { d.verify()?; self.add(d.clone(), weight); - - changed = true; + stats.added(d.msg.number, weight); } - Ok(changed) + + Ok(stats) } /// Check if we have achieved quorum for any of the batch hashes. @@ -216,9 +234,31 @@ impl BatchVotesWatch { ) -> anyhow::Result<()> { let this = self.0.lock().await; let mut votes = this.borrow().clone(); - if votes.update(attesters, data)? { + let stats = votes.update(attesters, data)?; + + if let Some(last_added) = stats.last_added { this.send_replace(votes); + + #[allow(clippy::float_arithmetic)] + let weight_added = stats.weight_added as f64 / attesters.total_weight() as f64; + + metrics::BATCH_VOTES_METRICS + .last_added_vote_batch_number + .set(last_added.0); + + metrics::BATCH_VOTES_METRICS + .votes_added + .inc_by(stats.num_added as u64); + + metrics::BATCH_VOTES_METRICS + .weight_added + .inc_by(weight_added); } + + metrics::BATCH_VOTES_METRICS + .committee_size + .set(attesters.len()); + Ok(()) } @@ -226,6 +266,10 @@ impl BatchVotesWatch { pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) { let this = self.0.lock().await; this.send_modify(|votes| votes.set_min_batch_number(min_batch_number)); + + metrics::BATCH_VOTES_METRICS + .min_batch_number + .set(min_batch_number.0); } } @@ -251,6 +295,11 @@ impl BatchVotesPublisher { return Ok(()); } let attestation = attester.sign_msg(batch); + + metrics::BATCH_VOTES_METRICS + .last_signed_batch_number + .set(attestation.msg.number.0); + self.0.update(attesters, &[Arc::new(attestation)]).await } } diff --git a/node/actors/network/src/gossip/metrics.rs b/node/actors/network/src/gossip/metrics.rs new file mode 100644 index 00000000..d9df9d0a --- /dev/null +++ b/node/actors/network/src/gossip/metrics.rs @@ -0,0 +1,37 @@ +/// Metrics related to the gossiping of L1 batch votes. +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "network_gossip_batch_votes")] +pub(crate) struct BatchVotesMetrics { + /// Number of members in the attester committee. + pub(crate) committee_size: vise::Gauge, + + /// Number of votes added to the tally. + /// + /// Its rate of change should correlate with the attester committee size, + /// save for any new joiner casting their historic votes in a burst. + pub(crate) votes_added: vise::Counter, + + /// Weight of votes added to the tally normalized by the total committee weight. + /// + /// Its rate of change should correlate with the attester committee weight and batch production rate, + /// that is, it should go up up by 1.0 with each new batch if everyone attests. + pub(crate) weight_added: vise::Counter, + + /// The minimum batch number we still expect votes for. + /// + /// This should go up as the main node indicates the finalisation of batches, + /// or as soon as batch QCs are found and persisted. + pub(crate) min_batch_number: vise::Gauge, + + /// Batch number in the last vote added to the register. + /// + /// This should go up as L1 batches are created, save for any temporary + /// outlier from lagging attesters or ones sending votes far in the future. + pub(crate) last_added_vote_batch_number: vise::Gauge, + + /// Batch number of the last batch signed by this attester. + pub(crate) last_signed_batch_number: vise::Gauge, +} + +#[vise::register] +pub(super) static BATCH_VOTES_METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 9010f99e..d3af5f4a 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -26,6 +26,7 @@ mod batch_votes; mod fetch; mod handshake; pub mod loadtest; +mod metrics; mod runner; #[cfg(test)] mod testonly; @@ -171,7 +172,7 @@ impl Network { self.batch_store .persist_batch_qc(ctx, qc) .await - .wrap("queue_batch_qc")?; + .wrap("persist_batch_qc")?; self.batch_votes .set_min_batch_number(next_batch_number) diff --git a/node/libs/storage/src/batch_store/metrics.rs b/node/libs/storage/src/batch_store/metrics.rs new file mode 100644 index 00000000..cabe7e34 --- /dev/null +++ b/node/libs/storage/src/batch_store/metrics.rs @@ -0,0 +1,50 @@ +//! Storage metrics. +use std::time; + +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "zksync_consensus_storage_persistent_batch_store")] +pub(super) struct PersistentBatchStore { + /// Latency of a successful `get_batch()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) batch_latency: vise::Histogram, + /// Latency of a successful `earliest_batch_number_to_sign()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) earliest_batch_latency: vise::Histogram, + /// Latency of a successful `get_batch_to_sign()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) batch_to_sign_latency: vise::Histogram, +} + +#[vise::register] +pub(super) static PERSISTENT_BATCH_STORE: vise::Global = vise::Global::new(); + +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "zksync_consensus_storage_batch_store")] +pub(super) struct BatchStore { + /// Overall latency of a `queue_batch()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) queue_batch: vise::Histogram, + /// Overall latency of a `persist_batch_qc()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) persist_batch_qc: vise::Histogram, + /// Overall latency of a `wait_until_queued()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) wait_until_queued: vise::Histogram, + /// Overall latency of a `wait_until_persisted()` call. + #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] + pub(super) wait_until_persisted: vise::Histogram, + /// Last persisted batch QC. + pub(super) last_persisted_batch_qc: vise::Gauge, +} + +#[vise::register] +pub(super) static BATCH_STORE: vise::Global = vise::Global::new(); + +#[derive(Debug, vise::Metrics)] +#[metrics(prefix = "zksync_consensus_storage_batch_store")] +pub(super) struct BatchStoreState { + /// BatchNumber of the next batch to queue. + pub(super) next_queued_batch: vise::Gauge, + /// BatchNumber of the next batch to persist. + pub(super) next_persisted_batch: vise::Gauge, +} diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store/mod.rs similarity index 87% rename from node/libs/storage/src/batch_store.rs rename to node/libs/storage/src/batch_store/mod.rs index 29b64e3a..3b6b6fe9 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -4,6 +4,8 @@ use std::{collections::VecDeque, fmt, sync::Arc}; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{attester, validator}; +mod metrics; + /// State of the `BatchStore`: continuous range of batches. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BatchStoreState { @@ -181,6 +183,12 @@ pub struct BatchStoreRunner(Arc); impl BatchStoreRunner { /// Runs the background tasks of the BatchStore. pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + #[vise::register] + static COLLECTOR: vise::Collector> = + vise::Collector::new(); + let store_ref = Arc::downgrade(&self.0); + let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); + let res = scope::run!(ctx, |ctx, s| async { let persisted = self.0.persistent.persisted(); let mut queue_next = persisted.borrow().next(); @@ -259,10 +267,16 @@ impl BatchStore { return Ok(Some(batch)); } } - self.persistent + let t = metrics::PERSISTENT_BATCH_STORE.batch_latency.start(); + + let batch = self + .persistent .get_batch(ctx, number) .await - .wrap("persistent.batch()") + .wrap("persistent.get_batch()")?; + + t.observe(); + Ok(batch) } /// Retrieve the minimum batch number that doesn't have a QC yet and potentially need to be signed. @@ -278,10 +292,18 @@ impl BatchStore { &self, ctx: &ctx::Ctx, ) -> ctx::Result> { - self.persistent + let t = metrics::PERSISTENT_BATCH_STORE + .earliest_batch_latency + .start(); + + let batch = self + .persistent .earliest_batch_number_to_sign(ctx) .await - .wrap("persistent.get_batch_to_sign()") + .wrap("persistent.get_batch_to_sign()")?; + + t.observe(); + Ok(batch) } /// Retrieve a batch to be signed. @@ -293,10 +315,18 @@ impl BatchStore { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::Result> { - self.persistent + let t = metrics::PERSISTENT_BATCH_STORE + .batch_to_sign_latency + .start(); + + let batch = self + .persistent .get_batch_to_sign(ctx, number) .await - .wrap("persistent.get_batch_to_sign()") + .wrap("persistent.get_batch_to_sign()")?; + + t.observe(); + Ok(batch) } /// Append batch to a queue to be persisted eventually. @@ -311,6 +341,8 @@ impl BatchStore { batch: attester::SyncBatch, _genesis: validator::Genesis, ) -> ctx::Result<()> { + let t = metrics::BATCH_STORE.queue_batch.start(); + // XXX: Once we can validate `SyncBatch::proof` we should do it before adding the // batch to the cache, otherwise a malicious peer could serve us data that prevents // other inputs from entering the queue. It will also cause it to be gossiped at the moment. @@ -322,11 +354,13 @@ impl BatchStore { self.inner .send_if_modified(|inner| inner.try_push(batch.clone())); + t.observe(); Ok(()) } /// Wait until the database has a batch, then attach the corresponding QC. pub async fn persist_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { + let t = metrics::BATCH_STORE.persist_batch_qc.start(); // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload // isn't yet available, but to be safe we can wait for the availability signal here as well. sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { @@ -334,7 +368,14 @@ impl BatchStore { }) .await?; // Now it's definitely safe to store it. - self.persistent.store_qc(ctx, qc).await + metrics::BATCH_STORE + .last_persisted_batch_qc + .set(qc.message.number.0); + + self.persistent.store_qc(ctx, qc).await?; + + t.observe(); + Ok(()) } /// Waits until the given batch is queued (in memory, or persisted). @@ -344,12 +385,17 @@ impl BatchStore { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::OrCanceled { - Ok(sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| { + let t = metrics::BATCH_STORE.wait_until_queued.start(); + + let state = sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| { number < inner.queued.next() }) .await? .queued - .clone()) + .clone(); + + t.observe(); + Ok(state) } /// Waits until the given batch is stored persistently. @@ -359,12 +405,23 @@ impl BatchStore { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::OrCanceled { - Ok( - sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { - number < persisted.next() - }) - .await? - .clone(), - ) + let t = metrics::BATCH_STORE.wait_until_persisted.start(); + + let state = sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { + number < persisted.next() + }) + .await? + .clone(); + + t.observe(); + Ok(state) + } + + fn scrape_metrics(&self) -> metrics::BatchStoreState { + let m = metrics::BatchStoreState::default(); + let inner = self.inner.borrow(); + m.next_queued_batch.set(inner.queued.next().0); + m.next_persisted_batch.set(inner.persisted.next().0); + m } } diff --git a/node/libs/storage/src/block_store/metrics.rs b/node/libs/storage/src/block_store/metrics.rs index 34954f69..04f87bd0 100644 --- a/node/libs/storage/src/block_store/metrics.rs +++ b/node/libs/storage/src/block_store/metrics.rs @@ -23,7 +23,7 @@ pub(super) static PERSISTENT_BLOCK_STORE: vise::Global = v #[derive(Debug, vise::Metrics)] #[metrics(prefix = "zksync_consensus_storage_block_store")] -pub(super) struct BlockStore { +pub(super) struct BlockStoreState { /// BlockNumber of the next block to queue. pub(super) next_queued_block: vise::Gauge, /// BlockNumber of the next block to persist. diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index b29a1036..a901c0fe 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -170,7 +170,8 @@ impl BlockStoreRunner { /// Runs the background tasks of the BlockStore. pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { #[vise::register] - static COLLECTOR: vise::Collector> = vise::Collector::new(); + static COLLECTOR: vise::Collector> = + vise::Collector::new(); let store_ref = Arc::downgrade(&self.0); let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); @@ -337,8 +338,8 @@ impl BlockStore { ) } - fn scrape_metrics(&self) -> metrics::BlockStore { - let m = metrics::BlockStore::default(); + fn scrape_metrics(&self) -> metrics::BlockStoreState { + let m = metrics::BlockStoreState::default(); let inner = self.inner.borrow(); m.next_queued_block.set(inner.queued.next().0); m.next_persisted_block.set(inner.persisted.next().0); From 0d27fcfcd2419e54f56551472864c74b9efc8e95 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Wed, 24 Jul 2024 10:02:07 +0100 Subject: [PATCH 4/4] Bump version to 0.1.0-rc.4 (#159) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Bumps the version to 0.1.0-rc.4 for a new release. ## Why ❔ So that I can deploy the batch vote gossip metrics and add some of them to the Grafana dashboards (need to bump the era-consensus version in zksync-era). --- node/Cargo.lock | 24 ++++++++++++------------ node/Cargo.toml | 24 ++++++++++++------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/node/Cargo.lock b/node/Cargo.lock index e36bb5b3..22ab5e9e 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -3305,7 +3305,7 @@ dependencies = [ [[package]] name = "tester" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "clap", @@ -4059,7 +4059,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "assert_matches", @@ -4077,7 +4077,7 @@ dependencies = [ [[package]] name = "zksync_consensus_bft" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "assert_matches", @@ -4101,7 +4101,7 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "blst", @@ -4124,7 +4124,7 @@ dependencies = [ [[package]] name = "zksync_consensus_executor" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "rand 0.8.5", @@ -4144,7 +4144,7 @@ dependencies = [ [[package]] name = "zksync_consensus_network" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "assert_matches", @@ -4180,7 +4180,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "assert_matches", @@ -4201,7 +4201,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "assert_matches", @@ -4223,7 +4223,7 @@ dependencies = [ [[package]] name = "zksync_consensus_tools" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "async-trait", @@ -4258,7 +4258,7 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "rand 0.8.5", @@ -4268,7 +4268,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "bit-vec", @@ -4290,7 +4290,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" dependencies = [ "anyhow", "heck", diff --git a/node/Cargo.toml b/node/Cargo.toml index 26f2ab86..c99cd15c 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -22,23 +22,23 @@ homepage = "https://matter-labs.io/" repository = "https://github.com/matter-labs/era-consensus" license = "MIT OR Apache-2.0" keywords = ["blockchain", "zksync"] -version = "0.1.0-rc.3" +version = "0.1.0-rc.4" [workspace.dependencies] # Crates from this repo. -zksync_consensus_bft = { version = "=0.1.0-rc.3", path = "actors/bft" } -zksync_consensus_crypto = { version = "=0.1.0-rc.3", path = "libs/crypto" } -zksync_consensus_executor = { version = "=0.1.0-rc.3", path = "actors/executor" } -zksync_consensus_network = { version = "=0.1.0-rc.3", path = "actors/network" } -zksync_consensus_roles = { version = "=0.1.0-rc.3", path = "libs/roles" } -zksync_consensus_storage = { version = "=0.1.0-rc.3", path = "libs/storage" } -zksync_consensus_tools = { version = "=0.1.0-rc.3", path = "tools" } -zksync_consensus_utils = { version = "=0.1.0-rc.3", path = "libs/utils" } +zksync_consensus_bft = { version = "=0.1.0-rc.4", path = "actors/bft" } +zksync_consensus_crypto = { version = "=0.1.0-rc.4", path = "libs/crypto" } +zksync_consensus_executor = { version = "=0.1.0-rc.4", path = "actors/executor" } +zksync_consensus_network = { version = "=0.1.0-rc.4", path = "actors/network" } +zksync_consensus_roles = { version = "=0.1.0-rc.4", path = "libs/roles" } +zksync_consensus_storage = { version = "=0.1.0-rc.4", path = "libs/storage" } +zksync_consensus_tools = { version = "=0.1.0-rc.4", path = "tools" } +zksync_consensus_utils = { version = "=0.1.0-rc.4", path = "libs/utils" } # Crates from this repo that might become independent in the future. -zksync_concurrency = { version = "=0.1.0-rc.3", path = "libs/concurrency" } -zksync_protobuf = { version = "=0.1.0-rc.3", path = "libs/protobuf" } -zksync_protobuf_build = { version = "=0.1.0-rc.3", path = "libs/protobuf_build" } +zksync_concurrency = { version = "=0.1.0-rc.4", path = "libs/concurrency" } +zksync_protobuf = { version = "=0.1.0-rc.4", path = "libs/protobuf" } +zksync_protobuf_build = { version = "=0.1.0-rc.4", path = "libs/protobuf_build" } # Crates from Matter Labs. pairing = { package = "pairing_ce", version = "=0.28.6" }