From 441ed66de8ae0430a7d7adf0fe3c91d7f95dc542 Mon Sep 17 00:00:00 2001 From: Serban Iorga Date: Wed, 16 Aug 2023 12:42:51 +0300 Subject: [PATCH] Finality loop refactoring (#2357) --- bridges/relays/finality/Cargo.toml | 2 +- bridges/relays/finality/src/finality_loop.rs | 1072 ++++++++--------- .../finality/src/finality_loop_tests.rs | 604 ---------- .../relays/finality/src/finality_proofs.rs | 227 ++++ bridges/relays/finality/src/headers.rs | 237 ++++ bridges/relays/finality/src/lib.rs | 40 +- bridges/relays/finality/src/mock.rs | 209 ++++ .../src/finality/source.rs | 2 +- bridges/relays/utils/src/relay_loop.rs | 12 +- 9 files changed, 1226 insertions(+), 1179 deletions(-) delete mode 100644 bridges/relays/finality/src/finality_loop_tests.rs create mode 100644 bridges/relays/finality/src/finality_proofs.rs create mode 100644 bridges/relays/finality/src/headers.rs create mode 100644 bridges/relays/finality/src/mock.rs diff --git a/bridges/relays/finality/Cargo.toml b/bridges/relays/finality/Cargo.toml index ab75533b023a..7fcd08ef6f5f 100644 --- a/bridges/relays/finality/Cargo.toml +++ b/bridges/relays/finality/Cargo.toml @@ -12,7 +12,7 @@ async-trait = "0.1" backoff = "0.4" bp-header-chain = { path = "../../primitives/header-chain" } futures = "0.3.28" -log = "0.4.17" +log = "0.4.20" num-traits = "0.2" relay-utils = { path = "../utils" } diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index 7c8217c209fd..b1f1f018c0ed 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -19,23 +19,23 @@ //! is the mandatory headers, which we always submit to the target node. For such headers, we //! assume that the persistent proof either exists, or will eventually become available. +use crate::{sync_loop_metrics::SyncLoopMetrics, Error, FinalitySyncPipeline, SourceHeader}; + use crate::{ - sync_loop_metrics::SyncLoopMetrics, FinalityPipeline, FinalitySyncPipeline, SourceClientBase, - SourceHeader, + base::SourceClientBase, + finality_proofs::{FinalityProofsBuf, FinalityProofsStream}, + headers::{JustifiedHeader, JustifiedHeaderSelector}, }; - use async_trait::async_trait; -use backoff::backoff::Backoff; -use bp_header_chain::FinalityProof; -use futures::{select, Future, FutureExt, Stream, StreamExt}; -use num_traits::{One, Saturating}; +use backoff::{backoff::Backoff, ExponentialBackoff}; +use futures::{future::Fuse, select, Future, FutureExt}; +use num_traits::Saturating; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient, HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker, }; use std::{ fmt::Debug, - pin::Pin, time::{Duration, Instant}, }; @@ -104,653 +104,593 @@ pub fn metrics_prefix() -> String { format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME) } -/// Run finality proofs synchronization loop. -pub async fn run( - source_client: impl SourceClient

, - target_client: impl TargetClient

, - sync_params: FinalitySyncParams, - metrics_params: MetricsParams, - exit_signal: impl Future + 'static + Send, -) -> Result<(), relay_utils::Error> { - let exit_signal = exit_signal.shared(); - relay_utils::relay_loop(source_client, target_client) - .with_metrics(metrics_params) - .loop_metric(SyncLoopMetrics::new( - Some(&metrics_prefix::

()), - "source", - "source_at_target", - )?)? - .expose() - .await? - .run(metrics_prefix::

(), move |source_client, target_client, metrics| { - run_until_connection_lost( - source_client, - target_client, - sync_params.clone(), - metrics, - exit_signal.clone(), - ) - }) - .await +pub struct SyncInfo { + pub best_number_at_source: P::Number, + pub best_number_at_target: P::Number, + pub is_using_same_fork: bool, } -/// Unjustified headers container. Ordered by header number. -pub(crate) type UnjustifiedHeaders = Vec; -/// Finality proofs container. Ordered by target header number. -pub(crate) type FinalityProofs

= - Vec<(

::Number,

::FinalityProof)>; -/// Reference to finality proofs container. -pub(crate) type FinalityProofsRef<'a, P> = - &'a [(

::Number,

::FinalityProof)]; - -/// Error that may happen inside finality synchronization loop. -#[derive(Debug)] -pub(crate) enum Error { - /// Source client request has failed with given error. - Source(SourceError), - /// Target client request has failed with given error. - Target(TargetError), - /// Finality proof for mandatory header is missing from the source node. - MissingMandatoryFinalityProof(P::Number), -} +impl SyncInfo

{ + /// Checks if both clients are on the same fork. + async fn is_on_same_fork>( + source_client: &SC, + id_at_target: &HeaderId, + ) -> Result { + let header_at_source = source_client.header_and_finality_proof(id_at_target.0).await?.0; + let header_hash_at_source = header_at_source.hash(); + Ok(if id_at_target.1 == header_hash_at_source { + true + } else { + log::error!( + target: "bridge", + "Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \ + at-source {:?} vs at-target {:?}", + P::SOURCE_NAME, + P::TARGET_NAME, + id_at_target.0, + header_hash_at_source, + id_at_target.1, + ); + + false + }) + } -impl Error -where - P: FinalitySyncPipeline, - SourceError: MaybeConnectionError, - TargetError: MaybeConnectionError, -{ - fn fail_if_connection_error(&self) -> Result<(), FailedClient> { - match *self { - Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source), - Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target), - _ => Ok(()), + async fn new, TC: TargetClient

>( + source_client: &SC, + target_client: &TC, + ) -> Result> { + let best_number_at_source = + source_client.best_finalized_block_number().await.map_err(Error::Source)?; + let best_id_at_target = + target_client.best_finalized_source_block_id().await.map_err(Error::Target)?; + let best_number_at_target = best_id_at_target.0; + + let is_using_same_fork = Self::is_on_same_fork(source_client, &best_id_at_target) + .await + .map_err(Error::Source)?; + + Ok(Self { best_number_at_source, best_number_at_target, is_using_same_fork }) + } + + fn update_metrics(&self, metrics_sync: &Option) { + if let Some(metrics_sync) = metrics_sync { + metrics_sync.update_best_block_at_source(self.best_number_at_source); + metrics_sync.update_best_block_at_target(self.best_number_at_target); + metrics_sync.update_using_same_fork(self.is_using_same_fork); } } + + pub fn num_headers(&self) -> P::Number { + self.best_number_at_source.saturating_sub(self.best_number_at_target) + } } /// Information about transaction that we have submitted. #[derive(Debug, Clone)] -pub(crate) struct Transaction { +pub struct Transaction { /// Submitted transaction tracker. - pub tracker: Tracker, + tracker: Tracker, /// The number of the header we have submitted. - pub submitted_header_number: Number, + header_number: Number, } impl Transaction { pub async fn submit< - C: TargetClient, P: FinalitySyncPipeline, + TC: TargetClient, >( - target_client: &C, + target_client: &TC, header: P::Header, justification: P::FinalityProof, - ) -> Result { - let submitted_header_number = header.number(); + ) -> Result { + let header_number = header.number(); log::debug!( target: "bridge", "Going to submit finality proof of {} header #{:?} to {}", P::SOURCE_NAME, - submitted_header_number, + header_number, P::TARGET_NAME, ); let tracker = target_client.submit_finality_proof(header, justification).await?; - Ok(Transaction { tracker, submitted_header_number }) + Ok(Transaction { tracker, header_number }) } - pub async fn track, P: FinalitySyncPipeline>( + async fn track< + P: FinalitySyncPipeline, + SC: SourceClient

, + TC: TargetClient

, + >( self, - target_client: &C, - ) -> Result<(), String> { + target_client: TC, + ) -> Result<(), Error> { match self.tracker.wait().await { TrackedTransactionStatus::Finalized(_) => { // The transaction has been finalized, but it may have been finalized in the // "failed" state. So let's check if the block number was actually updated. - // If it wasn't then we are stalled. - // - // Please also note that we're returning an error if we fail to read required data - // from the target client - that's the best we can do here to avoid actual stall. target_client .best_finalized_source_block_id() .await - .map_err(|e| format!("failed to read best block from target node: {e:?}")) + .map_err(Error::Target) .and_then(|best_id_at_target| { - if self.submitted_header_number > best_id_at_target.0 { - return Err(format!( - "best block at target after tx is {:?} and we've submitted {:?}", - best_id_at_target.0, self.submitted_header_number, - )) + if self.header_number > best_id_at_target.0 { + return Err(Error::ProofSubmissionTxFailed { + submitted_number: self.header_number, + best_number_at_target: best_id_at_target.0, + }) } Ok(()) }) }, - TrackedTransactionStatus::Lost => Err("transaction failed".to_string()), + TrackedTransactionStatus::Lost => Err(Error::ProofSubmissionTxLost), } } } -/// Finality proofs stream that may be restarted. -pub(crate) struct RestartableFinalityProofsStream { - /// Flag that the stream needs to be restarted. - pub(crate) needs_restart: bool, - /// The stream itself. - stream: Pin>, -} +/// Finality synchronization loop state. +struct FinalityLoop, TC: TargetClient

> { + source_client: SC, + target_client: TC, -impl RestartableFinalityProofsStream { - pub async fn create_raw_stream< - C: SourceClient, - P: FinalitySyncPipeline, - >( - source_client: &C, - ) -> Result { - source_client.finality_proofs().await.map_err(|error| { - log::error!( - target: "bridge", - "Failed to subscribe to {} justifications: {:?}. Going to reconnect", - P::SOURCE_NAME, - error, - ); + sync_params: FinalitySyncParams, + metrics_sync: Option, - FailedClient::Source - }) + progress: (Instant, Option), + retry_backoff: ExponentialBackoff, + finality_proofs_stream: FinalityProofsStream, + finality_proofs_buf: FinalityProofsBuf

, + best_submitted_number: Option, +} + +impl, TC: TargetClient

> FinalityLoop { + pub fn new( + source_client: SC, + target_client: TC, + sync_params: FinalitySyncParams, + metrics_sync: Option, + ) -> Self { + Self { + source_client, + target_client, + sync_params, + metrics_sync, + progress: (Instant::now(), None), + retry_backoff: retry_backoff(), + finality_proofs_stream: FinalityProofsStream::new(), + finality_proofs_buf: FinalityProofsBuf::new(vec![]), + best_submitted_number: None, + } } - pub async fn restart_if_scheduled< - C: SourceClient, - P: FinalitySyncPipeline, - >( - &mut self, - source_client: &C, - ) -> Result<(), FailedClient> { - if self.needs_restart { - log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME); + fn update_progress(&mut self, info: &SyncInfo

) { + let (prev_time, prev_best_number_at_target) = self.progress; + let now = Instant::now(); + + let needs_update = now - prev_time > Duration::from_secs(10) || + prev_best_number_at_target + .map(|prev_best_number_at_target| { + info.best_number_at_target.saturating_sub(prev_best_number_at_target) > + 10.into() + }) + .unwrap_or(true); - self.needs_restart = false; - self.stream = Box::pin(Self::create_raw_stream(source_client).await?); + if !needs_update { + return } - Ok(()) + + log::info!( + target: "bridge", + "Synced {:?} of {:?} headers", + info.best_number_at_target, + info.best_number_at_source, + ); + + self.progress = (now, Some(info.best_number_at_target)) } - pub fn next(&mut self) -> Option { - match self.stream.next().now_or_never() { - Some(Some(finality_proof)) => Some(finality_proof), - Some(None) => { - self.needs_restart = true; - None - }, - None => None, + pub async fn select_header_to_submit( + &mut self, + info: &SyncInfo

, + ) -> Result>, Error> { + // to see that the loop is progressing + log::trace!( + target: "bridge", + "Considering range of headers ({}; {}]", + info.best_number_at_target, + info.best_number_at_source + ); + + // read missing headers + let selector = JustifiedHeaderSelector::new::(&self.source_client, info).await?; + // if we see that the header schedules GRANDPA change, we need to submit it + if self.sync_params.only_mandatory_headers { + return Ok(selector.select_mandatory()) } - } -} -impl From for RestartableFinalityProofsStream { - fn from(stream: S) -> Self { - RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(stream) } + // all headers that are missing from the target client are non-mandatory + // => even if we have already selected some header and its persistent finality proof, + // we may try to select better header by reading non-persistent proofs from the stream + self.finality_proofs_buf.fill(&mut self.finality_proofs_stream); + let maybe_justified_header = selector.select(&self.finality_proofs_buf); + + // remove obsolete 'recent' finality proofs + keep its size under certain limit + let oldest_finality_proof_to_keep = maybe_justified_header + .as_ref() + .map(|justified_header| justified_header.number()) + .unwrap_or(info.best_number_at_target); + self.finality_proofs_buf + .prune(oldest_finality_proof_to_keep, self.sync_params.recent_finality_proofs_limit); + + Ok(maybe_justified_header) } -} -/// Finality synchronization loop state. -pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { - /// Synchronization loop progress. - pub(crate) progress: &'a mut (Instant, Option), - /// Finality proofs stream. - pub(crate) finality_proofs_stream: - &'a mut RestartableFinalityProofsStream, - /// Recent finality proofs that we have read from the stream. - pub(crate) recent_finality_proofs: &'a mut FinalityProofs

, - /// Number of the last header, submitted to the target node. - pub(crate) submitted_header_number: Option, -} + pub async fn run_iteration( + &mut self, + ) -> Result< + Option>, + Error, + > { + // read best source headers ids from source and target nodes + let info = SyncInfo::new(&self.source_client, &self.target_client).await?; + info.update_metrics(&self.metrics_sync); + self.update_progress(&info); + + // if we have already submitted header, then we just need to wait for it + // if we're waiting too much, then we believe our transaction has been lost and restart sync + if Some(info.best_number_at_target) < self.best_submitted_number { + return Ok(None) + } -/// Run finality relay loop until connection to one of nodes is lost. -pub(crate) async fn run_until_connection_lost( - source_client: impl SourceClient

, - target_client: impl TargetClient

, - sync_params: FinalitySyncParams, - metrics_sync: Option, - exit_signal: impl Future, -) -> Result<(), FailedClient> { - let last_transaction_tracker = futures::future::Fuse::terminated(); - let exit_signal = exit_signal.fuse(); - futures::pin_mut!(last_transaction_tracker, exit_signal); - - let mut finality_proofs_stream = - RestartableFinalityProofsStream::create_raw_stream(&source_client).await?.into(); - let mut recent_finality_proofs = Vec::new(); - - let mut progress = (Instant::now(), None); - let mut retry_backoff = retry_backoff(); - let mut last_submitted_header_number = None; - - loop { - // run loop iteration - let iteration_result = run_loop_iteration( - &source_client, - &target_client, - FinalityLoopState { - progress: &mut progress, - finality_proofs_stream: &mut finality_proofs_stream, - recent_finality_proofs: &mut recent_finality_proofs, - submitted_header_number: last_submitted_header_number, - }, - &sync_params, - &metrics_sync, - ) - .await; - - // deal with errors - let next_tick = match iteration_result { - Ok(Some(updated_transaction)) => { - last_submitted_header_number = Some(updated_transaction.submitted_header_number); - last_transaction_tracker.set(updated_transaction.track(&target_client).fuse()); - retry_backoff.reset(); - sync_params.tick - }, - Ok(None) => { - retry_backoff.reset(); - sync_params.tick - }, - Err(error) => { - log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error); - error.fail_if_connection_error()?; - retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY) + // submit new header if we have something new + match self.select_header_to_submit(&info).await? { + Some(header) => { + let transaction = + Transaction::submit(&self.target_client, header.header, header.proof) + .await + .map_err(Error::Target)?; + self.best_submitted_number = Some(transaction.header_number); + Ok(Some(transaction)) }, - }; - finality_proofs_stream.restart_if_scheduled(&source_client).await?; - - // wait till exit signal, or new source block - select! { - transaction_result = last_transaction_tracker => { - transaction_result.map_err(|e| { - log::error!( - target: "bridge", - "Finality synchronization from {} to {} has stalled with error: {}. Going to restart", - P::SOURCE_NAME, - P::TARGET_NAME, - e, - ); - - // Restart the loop if we're stalled. - FailedClient::Both - })? - }, - _ = async_std::task::sleep(next_tick).fuse() => {}, - _ = exit_signal => return Ok(()), + None => Ok(None), } } -} -pub(crate) async fn run_loop_iteration( - source_client: &SC, - target_client: &TC, - state: FinalityLoopState<'_, P, SC::FinalityProofsStream>, - sync_params: &FinalitySyncParams, - metrics_sync: &Option, -) -> Result>, Error> -where - P: FinalitySyncPipeline, - SC: SourceClient

, - TC: TargetClient

, -{ - // read best source headers ids from source and target nodes - let best_number_at_source = - source_client.best_finalized_block_number().await.map_err(Error::Source)?; - let best_id_at_target = - target_client.best_finalized_source_block_id().await.map_err(Error::Target)?; - let best_number_at_target = best_id_at_target.0; - - let different_hash_at_source = ensure_same_fork::(&best_id_at_target, source_client) - .await - .map_err(Error::Source)?; - let using_same_fork = different_hash_at_source.is_none(); - if let Some(ref different_hash_at_source) = different_hash_at_source { - log::error!( - target: "bridge", - "Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \ - at-source {:?} vs at-target {:?}", - P::SOURCE_NAME, - P::TARGET_NAME, - best_number_at_target, - different_hash_at_source, - best_id_at_target.1, - ); - } + async fn ensure_finality_proofs_stream(&mut self) -> Result<(), FailedClient> { + if let Err(e) = self.finality_proofs_stream.ensure_stream(&self.source_client).await { + if e.is_connection_error() { + return Err(FailedClient::Source) + } + } - if let Some(ref metrics_sync) = *metrics_sync { - metrics_sync.update_best_block_at_source(best_number_at_source); - metrics_sync.update_best_block_at_target(best_number_at_target); - metrics_sync.update_using_same_fork(using_same_fork); + Ok(()) } - *state.progress = - print_sync_progress::

(*state.progress, best_number_at_source, best_number_at_target); - - // if we have already submitted header, then we just need to wait for it - // if we're waiting too much, then we believe our transaction has been lost and restart sync - if let Some(submitted_header_number) = state.submitted_header_number { - if best_number_at_target >= submitted_header_number { - // transaction has been mined && we can continue - } else { - return Ok(None) + + /// Run finality relay loop until connection to one of nodes is lost. + async fn run_until_connection_lost( + &mut self, + exit_signal: impl Future, + ) -> Result<(), FailedClient> { + self.ensure_finality_proofs_stream().await?; + let proof_submission_tx_tracker = Fuse::terminated(); + let exit_signal = exit_signal.fuse(); + futures::pin_mut!(exit_signal, proof_submission_tx_tracker); + + loop { + // run loop iteration + let next_tick = match self.run_iteration().await { + Ok(Some(tx)) => { + proof_submission_tx_tracker + .set(tx.track::(self.target_client.clone()).fuse()); + self.retry_backoff.reset(); + self.sync_params.tick + }, + Ok(None) => { + self.retry_backoff.reset(); + self.sync_params.tick + }, + Err(error) => { + log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error); + error.fail_if_connection_error()?; + self.retry_backoff + .next_backoff() + .unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY) + }, + }; + self.ensure_finality_proofs_stream().await?; + + // wait till exit signal, or new source block + select! { + proof_submission_result = proof_submission_tx_tracker => { + if let Err(e) = proof_submission_result { + log::error!( + target: "bridge", + "Finality sync proof submission tx to {} has failed with error: {:?}.", + P::TARGET_NAME, + e, + ); + self.best_submitted_number = None; + e.fail_if_connection_error()?; + } + }, + _ = async_std::task::sleep(next_tick).fuse() => {}, + _ = exit_signal => return Ok(()), + } } } - // submit new header if we have something new - match select_header_to_submit( - source_client, - target_client, - state.finality_proofs_stream, - state.recent_finality_proofs, - best_number_at_source, - best_number_at_target, - sync_params, - ) - .await? - { - Some((header, justification)) => { - let transaction = Transaction::submit(target_client, header, justification) - .await - .map_err(Error::Target)?; - Ok(Some(transaction)) - }, - None => Ok(None), + pub async fn run( + source_client: SC, + target_client: TC, + sync_params: FinalitySyncParams, + metrics_sync: Option, + exit_signal: impl Future, + ) -> Result<(), FailedClient> { + let mut finality_loop = Self::new(source_client, target_client, sync_params, metrics_sync); + finality_loop.run_until_connection_lost(exit_signal).await } } -pub(crate) async fn select_header_to_submit( - source_client: &SC, - target_client: &TC, - finality_proofs_stream: &mut RestartableFinalityProofsStream, - recent_finality_proofs: &mut FinalityProofs

, - best_number_at_source: P::Number, - best_number_at_target: P::Number, - sync_params: &FinalitySyncParams, -) -> Result, Error> -where - P: FinalitySyncPipeline, - SC: SourceClient

, - TC: TargetClient

, -{ - // to see that the loop is progressing - log::trace!( - target: "bridge", - "Considering range of headers ({:?}; {:?}]", - best_number_at_target, - best_number_at_source, - ); - - // read missing headers. if we see that the header schedules GRANDPA change, we need to - // submit this header - let selected_finality_proof = read_missing_headers::( - source_client, - target_client, - best_number_at_source, - best_number_at_target, - ) - .await?; - let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof { - SelectedFinalityProof::Mandatory(header, finality_proof) => - return Ok(Some((header, finality_proof))), - _ if sync_params.only_mandatory_headers => { - // we are not reading finality proofs from the stream, so eventually it'll break - // but we don't care about transient proofs at all, so it is acceptable - return Ok(None) - }, - SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => - (unjustified_headers, Some((header, finality_proof))), - SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None), - }; - - // all headers that are missing from the target client are non-mandatory - // => even if we have already selected some header and its persistent finality proof, - // we may try to select better header by reading non-persistent proofs from the stream - read_finality_proofs_from_stream::(finality_proofs_stream, recent_finality_proofs); - selected_finality_proof = select_better_recent_finality_proof::

( - recent_finality_proofs, - &mut unjustified_headers, - selected_finality_proof, - ); - - // remove obsolete 'recent' finality proofs + keep its size under certain limit - let oldest_finality_proof_to_keep = selected_finality_proof - .as_ref() - .map(|(header, _)| header.number()) - .unwrap_or(best_number_at_target); - prune_recent_finality_proofs::

( - oldest_finality_proof_to_keep, - recent_finality_proofs, - sync_params.recent_finality_proofs_limit, - ); - - Ok(selected_finality_proof) -} - -/// Ensures that both clients are on the same fork. -/// -/// Returns `Some(_)` with header has at the source client if headers are different. -async fn ensure_same_fork>( - best_id_at_target: &HeaderId, - source_client: &SC, -) -> Result, SC::Error> { - let header_at_source = source_client.header_and_finality_proof(best_id_at_target.0).await?.0; - let header_hash_at_source = header_at_source.hash(); - Ok(if best_id_at_target.1 == header_hash_at_source { - None - } else { - Some(header_hash_at_source) - }) -} - -/// Finality proof that has been selected by the `read_missing_headers` function. -pub(crate) enum SelectedFinalityProof { - /// Mandatory header and its proof has been selected. We shall submit proof for this header. - Mandatory(Header, FinalityProof), - /// Regular header and its proof has been selected. We may submit this proof, or proof for - /// some better header. - Regular(UnjustifiedHeaders

, Header, FinalityProof), - /// We haven't found any missing header with persistent proof at the target client. - None(UnjustifiedHeaders
), +/// Run finality proofs synchronization loop. +pub async fn run( + source_client: impl SourceClient

, + target_client: impl TargetClient

, + sync_params: FinalitySyncParams, + metrics_params: MetricsParams, + exit_signal: impl Future + 'static + Send, +) -> Result<(), relay_utils::Error> { + let exit_signal = exit_signal.shared(); + relay_utils::relay_loop(source_client, target_client) + .with_metrics(metrics_params) + .loop_metric(SyncLoopMetrics::new( + Some(&metrics_prefix::

()), + "source", + "source_at_target", + )?)? + .expose() + .await? + .run(metrics_prefix::

(), move |source_client, target_client, metrics| { + FinalityLoop::run( + source_client, + target_client, + sync_params.clone(), + metrics, + exit_signal.clone(), + ) + }) + .await } -/// Read missing headers and their persistent finality proofs from the target client. -/// -/// If we have found some header with known proof, it is returned. -/// Otherwise, `SelectedFinalityProof::None` is returned. -/// -/// Unless we have found mandatory header, all missing headers are collected and returned. -pub(crate) async fn read_missing_headers< - P: FinalitySyncPipeline, - SC: SourceClient

, - TC: TargetClient

, ->( - source_client: &SC, - _target_client: &TC, - best_number_at_source: P::Number, - best_number_at_target: P::Number, -) -> Result, Error> { - let mut unjustified_headers = Vec::new(); - let mut selected_finality_proof = None; - let mut header_number = best_number_at_target + One::one(); - while header_number <= best_number_at_source { - let (header, finality_proof) = source_client - .header_and_finality_proof(header_number) - .await - .map_err(Error::Source)?; - let is_mandatory = header.is_mandatory(); - - match (is_mandatory, finality_proof) { - (true, Some(finality_proof)) => { - log::trace!(target: "bridge", "Header {:?} is mandatory", header_number); - return Ok(SelectedFinalityProof::Mandatory(header, finality_proof)) +#[cfg(test)] +mod tests { + use super::*; + + use crate::mock::*; + use futures::{FutureExt, StreamExt}; + use parking_lot::Mutex; + use relay_utils::{FailedClient, HeaderId, TrackedTransactionStatus}; + use std::{collections::HashMap, sync::Arc}; + + fn prepare_test_clients( + exit_sender: futures::channel::mpsc::UnboundedSender<()>, + state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, + source_headers: HashMap)>, + ) -> (TestSourceClient, TestTargetClient) { + let internal_state_function: Arc = + Arc::new(move |data| { + if state_function(data) { + exit_sender.unbounded_send(()).unwrap(); + } + }); + let clients_data = Arc::new(Mutex::new(ClientsData { + source_best_block_number: 10, + source_headers, + source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)], + + target_best_block_id: HeaderId(5, 5), + target_headers: vec![], + target_transaction_tracker: TestTransactionTracker( + TrackedTransactionStatus::Finalized(Default::default()), + ), + })); + ( + TestSourceClient { + on_method_call: internal_state_function.clone(), + data: clients_data.clone(), }, - (true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())), - (false, Some(finality_proof)) => { - log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number); - unjustified_headers.clear(); - selected_finality_proof = Some((header, finality_proof)); - }, - (false, None) => { - unjustified_headers.push(header); - }, - } - - header_number = header_number + One::one(); + TestTargetClient { on_method_call: internal_state_function, data: clients_data }, + ) } - log::trace!( - target: "bridge", - "Read {} {} headers. Selected finality proof for header: {:?}", - best_number_at_source.saturating_sub(best_number_at_target), - P::SOURCE_NAME, - selected_finality_proof.as_ref().map(|(header, _)| header), - ); - - Ok(match selected_finality_proof { - Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof), - None => SelectedFinalityProof::None(unjustified_headers), - }) -} - -/// Read finality proofs from the stream. -pub(crate) fn read_finality_proofs_from_stream< - P: FinalitySyncPipeline, - FPS: Stream, ->( - finality_proofs_stream: &mut RestartableFinalityProofsStream, - recent_finality_proofs: &mut FinalityProofs

, -) { - let mut proofs_count = 0; - let mut first_header_number = None; - let mut last_header_number = None; - while let Some(finality_proof) = finality_proofs_stream.next() { - let target_header_number = finality_proof.target_header_number(); - if first_header_number.is_none() { - first_header_number = Some(target_header_number); + fn test_sync_params() -> FinalitySyncParams { + FinalitySyncParams { + tick: Duration::from_secs(0), + recent_finality_proofs_limit: 1024, + stall_timeout: Duration::from_secs(1), + only_mandatory_headers: false, } - last_header_number = Some(target_header_number); - proofs_count += 1; + } - recent_finality_proofs.push((target_header_number, finality_proof)); + fn run_sync_loop( + state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, + ) -> (ClientsData, Result<(), FailedClient>) { + let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); + let (source_client, target_client) = prepare_test_clients( + exit_sender, + state_function, + vec![ + (5, (TestSourceHeader(false, 5, 5), None)), + (6, (TestSourceHeader(false, 6, 6), None)), + (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))), + (8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))), + (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))), + (10, (TestSourceHeader(false, 10, 10), None)), + ] + .into_iter() + .collect(), + ); + let sync_params = test_sync_params(); + + let clients_data = source_client.data.clone(); + let result = async_std::task::block_on(FinalityLoop::run( + source_client, + target_client, + sync_params, + None, + exit_receiver.into_future().map(|(_, _)| ()), + )); + + let clients_data = clients_data.lock().clone(); + (clients_data, result) } - if proofs_count != 0 { - log::trace!( - target: "bridge", - "Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]", - proofs_count, - P::SOURCE_NAME, - first_header_number, - last_header_number, + #[test] + fn finality_sync_loop_works() { + let (client_data, result) = run_sync_loop(|data| { + // header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted, + // because header#8 has persistent finality proof && it is mandatory => it is submitted + // header#9 has persistent finality proof, but it isn't mandatory => it is submitted, + // because there are no more persistent finality proofs + // + // once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 + // from the stream + if data.target_best_block_id.0 == 9 { + data.source_best_block_number = 14; + data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None)); + data.source_headers + .insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12)))); + data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None)); + data.source_headers + .insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14)))); + } + // once this ^^^ is done, we generate more blocks && read persistent proof for block 16 + if data.target_best_block_id.0 == 14 { + data.source_best_block_number = 17; + data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None)); + data.source_headers + .insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16)))); + data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None)); + } + + data.target_best_block_id.0 == 16 + }); + + assert_eq!(result, Ok(())); + assert_eq!( + client_data.target_headers, + vec![ + // before adding 11..14: finality proof for mandatory header#8 + (TestSourceHeader(true, 8, 8), TestFinalityProof(8)), + // before adding 11..14: persistent finality proof for non-mandatory header#9 + (TestSourceHeader(false, 9, 9), TestFinalityProof(9)), + // after adding 11..14: ephemeral finality proof for non-mandatory header#14 + (TestSourceHeader(false, 14, 14), TestFinalityProof(14)), + // after adding 15..17: persistent finality proof for non-mandatory header#16 + (TestSourceHeader(false, 16, 16), TestFinalityProof(16)), + ], ); } -} -/// Try to select better header and its proof, given finality proofs that we -/// have recently read from the stream. -pub(crate) fn select_better_recent_finality_proof( - recent_finality_proofs: FinalityProofsRef

, - unjustified_headers: &mut UnjustifiedHeaders, - selected_finality_proof: Option<(P::Header, P::FinalityProof)>, -) -> Option<(P::Header, P::FinalityProof)> { - if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() { - log::trace!( - target: "bridge", - "Can not improve selected {} finality proof {:?}. No unjustified headers and recent proofs", - P::SOURCE_NAME, - selected_finality_proof.as_ref().map(|(h, _)| h.number()), + fn run_only_mandatory_headers_mode_test( + only_mandatory_headers: bool, + has_mandatory_headers: bool, + ) -> Option> { + let (exit_sender, _) = futures::channel::mpsc::unbounded(); + let (source_client, target_client) = prepare_test_clients( + exit_sender, + |_| false, + vec![ + (6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))), + (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))), + (8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))), + (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))), + (10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))), + ] + .into_iter() + .collect(), ); - return selected_finality_proof + async_std::task::block_on(async { + let mut finality_loop = FinalityLoop::new( + source_client, + target_client, + FinalitySyncParams { + tick: Duration::from_secs(0), + recent_finality_proofs_limit: 0, + stall_timeout: Duration::from_secs(0), + only_mandatory_headers, + }, + None, + ); + let info = SyncInfo { + best_number_at_source: 10, + best_number_at_target: 5, + is_using_same_fork: true, + }; + finality_loop.select_header_to_submit(&info).await.unwrap() + }) } - const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed"; - - // we need proofs for headers in range unjustified_range_begin..=unjustified_range_end - let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number(); - let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number(); - - // we have proofs for headers in range buffered_range_begin..=buffered_range_end - let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0; - let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0; - - // we have two ranges => find intersection - let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin); - let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end); - let intersection = intersection_begin..=intersection_end; - - // find last proof from intersection - let selected_finality_proof_index = recent_finality_proofs - .binary_search_by_key(intersection.end(), |(number, _)| *number) - .unwrap_or_else(|index| index.saturating_sub(1)); - let (selected_header_number, finality_proof) = - &recent_finality_proofs[selected_finality_proof_index]; - let has_selected_finality_proof = intersection.contains(selected_header_number); - log::trace!( - target: "bridge", - "Trying to improve selected {} finality proof {:?}. Headers range: [{:?}; {:?}]. Proofs range: [{:?}; {:?}].\ - Trying to improve to: {:?}. Result: {}", - P::SOURCE_NAME, - selected_finality_proof.as_ref().map(|(h, _)| h.number()), - unjustified_range_begin, - unjustified_range_end, - buffered_range_begin, - buffered_range_end, - selected_header_number, - if has_selected_finality_proof { "improved" } else { "not improved" }, - ); - if !has_selected_finality_proof { - return selected_finality_proof + #[test] + fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_headers_are_required( + ) { + assert_eq!(run_only_mandatory_headers_mode_test(true, false), None); + assert_eq!( + run_only_mandatory_headers_mode_test(false, false), + Some(JustifiedHeader { + header: TestSourceHeader(false, 10, 10), + proof: TestFinalityProof(10) + }), + ); } - // now remove all obsolete headers and extract selected header - let selected_header_position = unjustified_headers - .binary_search_by_key(selected_header_number, |header| header.number()) - .expect("unjustified_headers contain all headers from intersection; qed"); - let selected_header = unjustified_headers.swap_remove(selected_header_position); - Some((selected_header, finality_proof.clone())) -} + #[test] + fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() + { + assert_eq!( + run_only_mandatory_headers_mode_test(true, true), + Some(JustifiedHeader { + header: TestSourceHeader(true, 8, 8), + proof: TestFinalityProof(8) + }), + ); + assert_eq!( + run_only_mandatory_headers_mode_test(false, true), + Some(JustifiedHeader { + header: TestSourceHeader(true, 8, 8), + proof: TestFinalityProof(8) + }), + ); + } -pub(crate) fn prune_recent_finality_proofs( - justified_header_number: P::Number, - recent_finality_proofs: &mut FinalityProofs

, - recent_finality_proofs_limit: usize, -) { - let justified_header_idx = recent_finality_proofs - .binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number) - .map(|idx| idx + 1) - .unwrap_or_else(|idx| idx); - let proofs_limit_idx = - recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit); - - *recent_finality_proofs = - recent_finality_proofs.split_off(std::cmp::max(justified_header_idx, proofs_limit_idx)); -} + #[test] + fn different_forks_at_source_and_at_target_are_detected() { + let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded(); + let (source_client, target_client) = prepare_test_clients( + exit_sender, + |_| false, + vec![ + (5, (TestSourceHeader(false, 5, 42), None)), + (6, (TestSourceHeader(false, 6, 6), None)), + (7, (TestSourceHeader(false, 7, 7), None)), + (8, (TestSourceHeader(false, 8, 8), None)), + (9, (TestSourceHeader(false, 9, 9), None)), + (10, (TestSourceHeader(false, 10, 10), None)), + ] + .into_iter() + .collect(), + ); -fn print_sync_progress( - progress_context: (Instant, Option), - best_number_at_source: P::Number, - best_number_at_target: P::Number, -) -> (Instant, Option) { - let (prev_time, prev_best_number_at_target) = progress_context; - let now = Instant::now(); - - let need_update = now - prev_time > Duration::from_secs(10) || - prev_best_number_at_target - .map(|prev_best_number_at_target| { - best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into() - }) - .unwrap_or(true); - - if !need_update { - return (prev_time, prev_best_number_at_target) - } + let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap(); + async_std::task::block_on(async { + let mut finality_loop = FinalityLoop::new( + source_client, + target_client, + test_sync_params(), + Some(metrics_sync.clone()), + ); + finality_loop.run_iteration().await.unwrap() + }); - log::info!( - target: "bridge", - "Synced {:?} of {:?} headers", - best_number_at_target, - best_number_at_source, - ); - (now, Some(best_number_at_target)) + assert!(!metrics_sync.is_using_same_fork()); + } } diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs deleted file mode 100644 index 774a5c0c6730..000000000000 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ /dev/null @@ -1,604 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Tests for finality synchronization loop. - -#![cfg(test)] - -use crate::{ - finality_loop::{ - prune_recent_finality_proofs, read_finality_proofs_from_stream, run_loop_iteration, - run_until_connection_lost, select_better_recent_finality_proof, select_header_to_submit, - FinalityLoopState, FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, - SourceClient, TargetClient, - }, - sync_loop_metrics::SyncLoopMetrics, - FinalityPipeline, FinalitySyncPipeline, SourceClientBase, SourceHeader, -}; - -use async_trait::async_trait; -use bp_header_chain::{FinalityProof, GrandpaConsensusLogReader}; -use futures::{FutureExt, Stream, StreamExt}; -use parking_lot::Mutex; -use relay_utils::{ - relay_loop::Client as RelayClient, FailedClient, HeaderId, MaybeConnectionError, - TrackedTransactionStatus, TransactionTracker, -}; -use std::{ - collections::HashMap, - pin::Pin, - sync::Arc, - time::{Duration, Instant}, -}; - -type IsMandatory = bool; -type TestNumber = u64; -type TestHash = u64; - -#[derive(Clone, Debug)] -struct TestTransactionTracker(TrackedTransactionStatus>); - -impl Default for TestTransactionTracker { - fn default() -> TestTransactionTracker { - TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default())) - } -} - -#[async_trait] -impl TransactionTracker for TestTransactionTracker { - type HeaderId = HeaderId; - - async fn wait(self) -> TrackedTransactionStatus> { - self.0 - } -} - -#[derive(Debug, Clone)] -enum TestError { - NonConnection, -} - -impl MaybeConnectionError for TestError { - fn is_connection_error(&self) -> bool { - false - } -} - -#[derive(Debug, Clone)] -struct TestFinalitySyncPipeline; - -impl FinalityPipeline for TestFinalitySyncPipeline { - const SOURCE_NAME: &'static str = "TestSource"; - const TARGET_NAME: &'static str = "TestTarget"; - - type Hash = TestHash; - type Number = TestNumber; - type FinalityProof = TestFinalityProof; -} - -impl FinalitySyncPipeline for TestFinalitySyncPipeline { - type ConsensusLogReader = GrandpaConsensusLogReader; - type Header = TestSourceHeader; -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct TestSourceHeader(IsMandatory, TestNumber, TestHash); - -impl SourceHeader> - for TestSourceHeader -{ - fn hash(&self) -> TestHash { - self.2 - } - - fn number(&self) -> TestNumber { - self.1 - } - - fn is_mandatory(&self) -> bool { - self.0 - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct TestFinalityProof(TestNumber); - -impl FinalityProof for TestFinalityProof { - fn target_header_number(&self) -> TestNumber { - self.0 - } -} - -#[derive(Debug, Clone, Default)] -struct ClientsData { - source_best_block_number: TestNumber, - source_headers: HashMap)>, - source_proofs: Vec, - - target_best_block_id: HeaderId, - target_headers: Vec<(TestSourceHeader, TestFinalityProof)>, - target_transaction_tracker: TestTransactionTracker, -} - -#[derive(Clone)] -struct TestSourceClient { - on_method_call: Arc, - data: Arc>, -} - -#[async_trait] -impl RelayClient for TestSourceClient { - type Error = TestError; - - async fn reconnect(&mut self) -> Result<(), TestError> { - unreachable!() - } -} - -#[async_trait] -impl SourceClientBase for TestSourceClient { - type FinalityProofsStream = Pin + 'static + Send>>; - - async fn finality_proofs(&self) -> Result { - let mut data = self.data.lock(); - (self.on_method_call)(&mut data); - Ok(futures::stream::iter(data.source_proofs.clone()).boxed()) - } -} - -#[async_trait] -impl SourceClient for TestSourceClient { - async fn best_finalized_block_number(&self) -> Result { - let mut data = self.data.lock(); - (self.on_method_call)(&mut data); - Ok(data.source_best_block_number) - } - - async fn header_and_finality_proof( - &self, - number: TestNumber, - ) -> Result<(TestSourceHeader, Option), TestError> { - let mut data = self.data.lock(); - (self.on_method_call)(&mut data); - data.source_headers.get(&number).cloned().ok_or(TestError::NonConnection) - } -} - -#[derive(Clone)] -struct TestTargetClient { - on_method_call: Arc, - data: Arc>, -} - -#[async_trait] -impl RelayClient for TestTargetClient { - type Error = TestError; - - async fn reconnect(&mut self) -> Result<(), TestError> { - unreachable!() - } -} - -#[async_trait] -impl TargetClient for TestTargetClient { - type TransactionTracker = TestTransactionTracker; - - async fn best_finalized_source_block_id( - &self, - ) -> Result, TestError> { - let mut data = self.data.lock(); - (self.on_method_call)(&mut data); - Ok(data.target_best_block_id) - } - - async fn submit_finality_proof( - &self, - header: TestSourceHeader, - proof: TestFinalityProof, - ) -> Result { - let mut data = self.data.lock(); - (self.on_method_call)(&mut data); - data.target_best_block_id = HeaderId(header.number(), header.hash()); - data.target_headers.push((header, proof)); - (self.on_method_call)(&mut data); - Ok(data.target_transaction_tracker.clone()) - } -} - -fn prepare_test_clients( - exit_sender: futures::channel::mpsc::UnboundedSender<()>, - state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, - source_headers: HashMap)>, -) -> (TestSourceClient, TestTargetClient) { - let internal_state_function: Arc = - Arc::new(move |data| { - if state_function(data) { - exit_sender.unbounded_send(()).unwrap(); - } - }); - let clients_data = Arc::new(Mutex::new(ClientsData { - source_best_block_number: 10, - source_headers, - source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)], - - target_best_block_id: HeaderId(5, 5), - target_headers: vec![], - target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized( - Default::default(), - )), - })); - ( - TestSourceClient { - on_method_call: internal_state_function.clone(), - data: clients_data.clone(), - }, - TestTargetClient { on_method_call: internal_state_function, data: clients_data }, - ) -} - -fn test_sync_params() -> FinalitySyncParams { - FinalitySyncParams { - tick: Duration::from_secs(0), - recent_finality_proofs_limit: 1024, - stall_timeout: Duration::from_secs(1), - only_mandatory_headers: false, - } -} - -fn run_sync_loop( - state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, -) -> (ClientsData, Result<(), FailedClient>) { - let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); - let (source_client, target_client) = prepare_test_clients( - exit_sender, - state_function, - vec![ - (5, (TestSourceHeader(false, 5, 5), None)), - (6, (TestSourceHeader(false, 6, 6), None)), - (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))), - (8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))), - (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))), - (10, (TestSourceHeader(false, 10, 10), None)), - ] - .into_iter() - .collect(), - ); - let sync_params = test_sync_params(); - - let clients_data = source_client.data.clone(); - let result = async_std::task::block_on(run_until_connection_lost( - source_client, - target_client, - sync_params, - None, - exit_receiver.into_future().map(|(_, _)| ()), - )); - - let clients_data = clients_data.lock().clone(); - (clients_data, result) -} - -#[test] -fn finality_sync_loop_works() { - let (client_data, result) = run_sync_loop(|data| { - // header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted, - // because header#8 has persistent finality proof && it is mandatory => it is submitted - // header#9 has persistent finality proof, but it isn't mandatory => it is submitted, - // because there are no more persistent finality proofs - // - // once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from - // the stream - if data.target_best_block_id.0 == 9 { - data.source_best_block_number = 14; - data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None)); - data.source_headers - .insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12)))); - data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None)); - data.source_headers - .insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14)))); - } - // once this ^^^ is done, we generate more blocks && read persistent proof for block 16 - if data.target_best_block_id.0 == 14 { - data.source_best_block_number = 17; - data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None)); - data.source_headers - .insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16)))); - data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None)); - } - - data.target_best_block_id.0 == 16 - }); - - assert_eq!(result, Ok(())); - assert_eq!( - client_data.target_headers, - vec![ - // before adding 11..14: finality proof for mandatory header#8 - (TestSourceHeader(true, 8, 8), TestFinalityProof(8)), - // before adding 11..14: persistent finality proof for non-mandatory header#9 - (TestSourceHeader(false, 9, 9), TestFinalityProof(9)), - // after adding 11..14: ephemeral finality proof for non-mandatory header#14 - (TestSourceHeader(false, 14, 14), TestFinalityProof(14)), - // after adding 15..17: persistent finality proof for non-mandatory header#16 - (TestSourceHeader(false, 16, 16), TestFinalityProof(16)), - ], - ); -} - -fn run_only_mandatory_headers_mode_test( - only_mandatory_headers: bool, - has_mandatory_headers: bool, -) -> Option<(TestSourceHeader, TestFinalityProof)> { - let (exit_sender, _) = futures::channel::mpsc::unbounded(); - let (source_client, target_client) = prepare_test_clients( - exit_sender, - |_| false, - vec![ - (6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))), - (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))), - (8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))), - (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))), - (10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))), - ] - .into_iter() - .collect(), - ); - async_std::task::block_on(select_header_to_submit( - &source_client, - &target_client, - &mut RestartableFinalityProofsStream::from(futures::stream::empty().boxed()), - &mut vec![], - 10, - 5, - &FinalitySyncParams { - tick: Duration::from_secs(0), - recent_finality_proofs_limit: 0, - stall_timeout: Duration::from_secs(0), - only_mandatory_headers, - }, - )) - .unwrap() -} - -#[test] -fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_headers_are_required() { - assert_eq!(run_only_mandatory_headers_mode_test(true, false), None); - assert_eq!( - run_only_mandatory_headers_mode_test(false, false), - Some((TestSourceHeader(false, 10, 10), TestFinalityProof(10))), - ); -} - -#[test] -fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() { - assert_eq!( - run_only_mandatory_headers_mode_test(true, true), - Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))), - ); - assert_eq!( - run_only_mandatory_headers_mode_test(false, true), - Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))), - ); -} - -#[test] -fn select_better_recent_finality_proof_works() { - // if there are no unjustified headers, nothing is changed - assert_eq!( - select_better_recent_finality_proof::( - &[(5, TestFinalityProof(5))], - &mut vec![], - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ), - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ); - - // if there are no recent finality proofs, nothing is changed - assert_eq!( - select_better_recent_finality_proof::( - &[], - &mut vec![TestSourceHeader(false, 5, 5)], - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ), - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ); - - // if there's no intersection between recent finality proofs and unjustified headers, nothing is - // changed - let mut unjustified_headers = - vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)]; - assert_eq!( - select_better_recent_finality_proof::( - &[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))], - &mut unjustified_headers, - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ), - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ); - - // if there's intersection between recent finality proofs and unjustified headers, but there are - // no proofs in this intersection, nothing is changed - let mut unjustified_headers = vec![ - TestSourceHeader(false, 8, 8), - TestSourceHeader(false, 9, 9), - TestSourceHeader(false, 10, 10), - ]; - assert_eq!( - select_better_recent_finality_proof::( - &[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))], - &mut unjustified_headers, - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ), - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ); - assert_eq!( - unjustified_headers, - vec![ - TestSourceHeader(false, 8, 8), - TestSourceHeader(false, 9, 9), - TestSourceHeader(false, 10, 10) - ] - ); - - // if there's intersection between recent finality proofs and unjustified headers and there's - // a proof in this intersection: - // - this better (last from intersection) proof is selected; - // - 'obsolete' unjustified headers are pruned. - let mut unjustified_headers = vec![ - TestSourceHeader(false, 8, 8), - TestSourceHeader(false, 9, 9), - TestSourceHeader(false, 10, 10), - ]; - assert_eq!( - select_better_recent_finality_proof::( - &[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))], - &mut unjustified_headers, - Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), - ), - Some((TestSourceHeader(false, 9, 9), TestFinalityProof(9))), - ); -} - -#[test] -fn read_finality_proofs_from_stream_works() { - // when stream is currently empty, nothing is changed - let mut recent_finality_proofs = vec![(1, TestFinalityProof(1))]; - let mut stream = futures::stream::pending().into(); - read_finality_proofs_from_stream::( - &mut stream, - &mut recent_finality_proofs, - ); - assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1))]); - assert!(!stream.needs_restart); - - // when stream has entry with target, it is added to the recent proofs container - let mut stream = futures::stream::iter(vec![TestFinalityProof(4)]) - .chain(futures::stream::pending()) - .into(); - read_finality_proofs_from_stream::( - &mut stream, - &mut recent_finality_proofs, - ); - assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]); - assert!(!stream.needs_restart); - - // when stream has ended, we'll need to restart it - let mut stream = futures::stream::empty().into(); - read_finality_proofs_from_stream::( - &mut stream, - &mut recent_finality_proofs, - ); - assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]); - assert!(stream.needs_restart); -} - -#[test] -fn prune_recent_finality_proofs_works() { - let original_recent_finality_proofs: FinalityProofs = vec![ - (10, TestFinalityProof(10)), - (13, TestFinalityProof(13)), - (15, TestFinalityProof(15)), - (17, TestFinalityProof(17)), - (19, TestFinalityProof(19)), - ] - .into_iter() - .collect(); - - // when there's proof for justified header in the vec - let mut recent_finality_proofs = original_recent_finality_proofs.clone(); - prune_recent_finality_proofs::(10, &mut recent_finality_proofs, 1024); - assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,); - - // when there are no proof for justified header in the vec - let mut recent_finality_proofs = original_recent_finality_proofs.clone(); - prune_recent_finality_proofs::(11, &mut recent_finality_proofs, 1024); - assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,); - - // when there are too many entries after initial prune && they also need to be pruned - let mut recent_finality_proofs = original_recent_finality_proofs.clone(); - prune_recent_finality_proofs::(10, &mut recent_finality_proofs, 2); - assert_eq!(&original_recent_finality_proofs[3..], recent_finality_proofs,); - - // when last entry is pruned - let mut recent_finality_proofs = original_recent_finality_proofs.clone(); - prune_recent_finality_proofs::(19, &mut recent_finality_proofs, 2); - assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,); - - // when post-last entry is pruned - let mut recent_finality_proofs = original_recent_finality_proofs.clone(); - prune_recent_finality_proofs::(20, &mut recent_finality_proofs, 2); - assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,); -} - -#[test] -fn different_forks_at_source_and_at_target_are_detected() { - let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded(); - let (source_client, target_client) = prepare_test_clients( - exit_sender, - |_| false, - vec![ - (5, (TestSourceHeader(false, 5, 42), None)), - (6, (TestSourceHeader(false, 6, 6), None)), - (7, (TestSourceHeader(false, 7, 7), None)), - (8, (TestSourceHeader(false, 8, 8), None)), - (9, (TestSourceHeader(false, 9, 9), None)), - (10, (TestSourceHeader(false, 10, 10), None)), - ] - .into_iter() - .collect(), - ); - - let mut progress = (Instant::now(), None); - let mut finality_proofs_stream = futures::stream::iter(vec![]).boxed().into(); - let mut recent_finality_proofs = Vec::new(); - let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap(); - async_std::task::block_on(run_loop_iteration::( - &source_client, - &target_client, - FinalityLoopState { - progress: &mut progress, - finality_proofs_stream: &mut finality_proofs_stream, - recent_finality_proofs: &mut recent_finality_proofs, - submitted_header_number: None, - }, - &test_sync_params(), - &Some(metrics_sync.clone()), - )) - .unwrap(); - - assert!(!metrics_sync.is_using_same_fork()); -} - -#[test] -fn stalls_when_transaction_tracker_returns_error() { - let (_, result) = run_sync_loop(|data| { - data.target_transaction_tracker = TestTransactionTracker(TrackedTransactionStatus::Lost); - data.target_best_block_id = HeaderId(5, 5); - data.target_best_block_id.0 == 16 - }); - - assert_eq!(result, Err(FailedClient::Both)); -} - -#[test] -fn stalls_when_transaction_tracker_returns_finalized_but_transaction_fails() { - let (_, result) = run_sync_loop(|data| { - data.target_best_block_id = HeaderId(5, 5); - data.target_best_block_id.0 == 16 - }); - - assert_eq!(result, Err(FailedClient::Both)); -} diff --git a/bridges/relays/finality/src/finality_proofs.rs b/bridges/relays/finality/src/finality_proofs.rs new file mode 100644 index 000000000000..d457c0693bf3 --- /dev/null +++ b/bridges/relays/finality/src/finality_proofs.rs @@ -0,0 +1,227 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::{base::SourceClientBase, FinalityPipeline}; + +use bp_header_chain::FinalityProof; +use futures::{FutureExt, Stream, StreamExt}; +use std::pin::Pin; + +/// Finality proofs container. Ordered by target header number. +pub type FinalityProofs

= + Vec<(

::Number,

::FinalityProof)>; + +/// Source finality proofs stream that may be restarted. +pub struct FinalityProofsStream> { + /// The underlying stream. + stream: Option>>, +} + +impl> FinalityProofsStream { + pub fn new() -> Self { + Self { stream: None } + } + + fn next(&mut self) -> Option<::Item> { + let stream = match &mut self.stream { + Some(stream) => stream, + None => return None, + }; + + match stream.next().now_or_never() { + Some(Some(finality_proof)) => Some(finality_proof), + Some(None) => { + self.stream = None; + None + }, + None => None, + } + } + + pub async fn ensure_stream(&mut self, source_client: &SC) -> Result<(), SC::Error> { + if self.stream.is_none() { + log::warn!(target: "bridge", "{} finality proofs stream is being started / restarted", + P::SOURCE_NAME); + + let stream = source_client.finality_proofs().await.map_err(|error| { + log::error!( + target: "bridge", + "Failed to subscribe to {} justifications: {:?}", + P::SOURCE_NAME, + error, + ); + + error + })?; + self.stream = Some(Box::pin(stream)); + } + + Ok(()) + } +} + +/// Source finality proofs buffer. +pub struct FinalityProofsBuf { + /// Proofs buffer. + buf: FinalityProofs

, +} + +impl FinalityProofsBuf

{ + pub fn new(buf: FinalityProofs

) -> Self { + Self { buf } + } + + pub fn buf(&self) -> &FinalityProofs

{ + &self.buf + } + + pub fn fill>(&mut self, stream: &mut FinalityProofsStream) { + let mut proofs_count = 0; + let mut first_header_number = None; + let mut last_header_number = None; + while let Some(finality_proof) = stream.next() { + let target_header_number = finality_proof.target_header_number(); + first_header_number.get_or_insert(target_header_number); + last_header_number = Some(target_header_number); + proofs_count += 1; + + self.buf.push((target_header_number, finality_proof)); + } + + if proofs_count != 0 { + log::trace!( + target: "bridge", + "Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]", + proofs_count, + P::SOURCE_NAME, + first_header_number, + last_header_number, + ); + } + } + + pub fn prune(&mut self, until_hdr_num: P::Number, buf_limit: usize) { + let kept_hdr_idx = self + .buf + .binary_search_by_key(&until_hdr_num, |(hdr_num, _)| *hdr_num) + .map(|idx| idx + 1) + .unwrap_or_else(|idx| idx); + let buf_limit_idx = self.buf.len().saturating_sub(buf_limit); + + self.buf = self.buf.split_off(std::cmp::max(kept_hdr_idx, buf_limit_idx)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mock::*; + + impl> FinalityProofsStream { + fn from_stream(stream: SC::FinalityProofsStream) -> Self { + Self { stream: Some(Box::pin(stream)) } + } + } + + #[test] + fn finality_proofs_buf_fill_works() { + // when stream is currently empty, nothing is changed + let mut finality_proofs_buf = + FinalityProofsBuf:: { buf: vec![(1, TestFinalityProof(1))] }; + let mut stream = + FinalityProofsStream::::from_stream( + Box::pin(futures::stream::pending()), + ); + finality_proofs_buf.fill(&mut stream); + assert_eq!(finality_proofs_buf.buf, vec![(1, TestFinalityProof(1))]); + assert!(stream.stream.is_some()); + + // when stream has entry with target, it is added to the recent proofs container + let mut stream = + FinalityProofsStream::::from_stream( + Box::pin( + futures::stream::iter(vec![TestFinalityProof(4)]) + .chain(futures::stream::pending()), + ), + ); + finality_proofs_buf.fill(&mut stream); + assert_eq!( + finality_proofs_buf.buf, + vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))] + ); + assert!(stream.stream.is_some()); + + // when stream has ended, we'll need to restart it + let mut stream = + FinalityProofsStream::::from_stream( + Box::pin(futures::stream::empty()), + ); + finality_proofs_buf.fill(&mut stream); + assert_eq!( + finality_proofs_buf.buf, + vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))] + ); + assert!(stream.stream.is_none()); + } + + #[test] + fn finality_proofs_buf_prune_works() { + let original_finality_proofs_buf: FinalityProofs = vec![ + (10, TestFinalityProof(10)), + (13, TestFinalityProof(13)), + (15, TestFinalityProof(15)), + (17, TestFinalityProof(17)), + (19, TestFinalityProof(19)), + ] + .into_iter() + .collect(); + + // when there's proof for justified header in the vec + let mut finality_proofs_buf = FinalityProofsBuf:: { + buf: original_finality_proofs_buf.clone(), + }; + finality_proofs_buf.prune(10, 1024); + assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,); + + // when there are no proof for justified header in the vec + let mut finality_proofs_buf = FinalityProofsBuf:: { + buf: original_finality_proofs_buf.clone(), + }; + finality_proofs_buf.prune(11, 1024); + assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,); + + // when there are too many entries after initial prune && they also need to be pruned + let mut finality_proofs_buf = FinalityProofsBuf:: { + buf: original_finality_proofs_buf.clone(), + }; + finality_proofs_buf.prune(10, 2); + assert_eq!(&original_finality_proofs_buf[3..], finality_proofs_buf.buf,); + + // when last entry is pruned + let mut finality_proofs_buf = FinalityProofsBuf:: { + buf: original_finality_proofs_buf.clone(), + }; + finality_proofs_buf.prune(19, 2); + assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,); + + // when post-last entry is pruned + let mut finality_proofs_buf = FinalityProofsBuf:: { + buf: original_finality_proofs_buf.clone(), + }; + finality_proofs_buf.prune(20, 2); + assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,); + } +} diff --git a/bridges/relays/finality/src/headers.rs b/bridges/relays/finality/src/headers.rs new file mode 100644 index 000000000000..bdb05c9d9b72 --- /dev/null +++ b/bridges/relays/finality/src/headers.rs @@ -0,0 +1,237 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::{ + finality_loop::SyncInfo, finality_proofs::FinalityProofsBuf, Error, FinalitySyncPipeline, + SourceClient, SourceHeader, TargetClient, +}; + +use std::cmp::Ordering; + +/// Unjustified headers container. Ordered by header number. +pub type UnjustifiedHeaders = Vec; + +#[derive(Debug)] +#[cfg_attr(test, derive(Clone, PartialEq))] +pub struct JustifiedHeader { + pub header: P::Header, + pub proof: P::FinalityProof, +} + +impl JustifiedHeader

{ + pub fn number(&self) -> P::Number { + self.header.number() + } +} + +/// Finality proof that has been selected by the `read_missing_headers` function. +pub enum JustifiedHeaderSelector { + /// Mandatory header and its proof has been selected. We shall submit proof for this header. + Mandatory(JustifiedHeader

), + /// Regular header and its proof has been selected. We may submit this proof, or proof for + /// some better header. + Regular(UnjustifiedHeaders, JustifiedHeader

), + /// We haven't found any missing header with persistent proof at the target client. + None(UnjustifiedHeaders), +} + +impl JustifiedHeaderSelector

{ + pub(crate) async fn new, TC: TargetClient

>( + source_client: &SC, + info: &SyncInfo

, + ) -> Result> { + let mut unjustified_headers = Vec::new(); + let mut maybe_justified_header = None; + + let mut header_number = info.best_number_at_target + 1.into(); + while header_number <= info.best_number_at_source { + let (header, maybe_proof) = source_client + .header_and_finality_proof(header_number) + .await + .map_err(Error::Source)?; + + match (header.is_mandatory(), maybe_proof) { + (true, Some(proof)) => { + log::trace!(target: "bridge", "Header {:?} is mandatory", header_number); + return Ok(Self::Mandatory(JustifiedHeader { header, proof })) + }, + (true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())), + (false, Some(proof)) => { + log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number); + unjustified_headers.clear(); + maybe_justified_header = Some(JustifiedHeader { header, proof }); + }, + (false, None) => { + unjustified_headers.push(header); + }, + } + + header_number = header_number + 1.into(); + } + + log::trace!( + target: "bridge", + "Read {} {} headers. Selected finality proof for header: {:?}", + info.num_headers(), + P::SOURCE_NAME, + maybe_justified_header.as_ref().map(|justified_header| &justified_header.header), + ); + + Ok(match maybe_justified_header { + Some(justified_header) => Self::Regular(unjustified_headers, justified_header), + None => Self::None(unjustified_headers), + }) + } + + pub fn select_mandatory(self) -> Option> { + match self { + JustifiedHeaderSelector::Mandatory(header) => Some(header), + _ => None, + } + } + + pub fn select(self, buf: &FinalityProofsBuf

) -> Option> { + let (unjustified_headers, maybe_justified_header) = match self { + JustifiedHeaderSelector::Mandatory(justified_header) => return Some(justified_header), + JustifiedHeaderSelector::Regular(unjustified_headers, justified_header) => + (unjustified_headers, Some(justified_header)), + JustifiedHeaderSelector::None(unjustified_headers) => (unjustified_headers, None), + }; + + let mut finality_proofs_iter = buf.buf().iter().rev(); + let mut maybe_finality_proof = finality_proofs_iter.next(); + + let mut unjustified_headers_iter = unjustified_headers.iter().rev(); + let mut maybe_unjustified_header = unjustified_headers_iter.next(); + + while let (Some(finality_proof), Some(unjustified_header)) = + (maybe_finality_proof, maybe_unjustified_header) + { + match finality_proof.0.cmp(&unjustified_header.number()) { + Ordering::Equal => { + log::trace!( + target: "bridge", + "Managed to improve selected {} finality proof {:?} to {:?}.", + P::SOURCE_NAME, + maybe_justified_header.as_ref().map(|justified_header| justified_header.number()), + finality_proof.0 + ); + return Some(JustifiedHeader { + header: unjustified_header.clone(), + proof: finality_proof.1.clone(), + }) + }, + Ordering::Less => maybe_unjustified_header = unjustified_headers_iter.next(), + Ordering::Greater => { + maybe_finality_proof = finality_proofs_iter.next(); + }, + } + } + + log::trace!( + target: "bridge", + "Could not improve selected {} finality proof {:?}.", + P::SOURCE_NAME, + maybe_justified_header.as_ref().map(|justified_header| justified_header.number()) + ); + maybe_justified_header + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mock::*; + + #[test] + fn select_better_recent_finality_proof_works() { + // if there are no unjustified headers, nothing is changed + let finality_proofs_buf = + FinalityProofsBuf::::new(vec![(5, TestFinalityProof(5))]); + let justified_header = + JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; + let selector = JustifiedHeaderSelector::Regular(vec![], justified_header.clone()); + assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header)); + + // if there are no buffered finality proofs, nothing is changed + let finality_proofs_buf = FinalityProofsBuf::::new(vec![]); + let justified_header = + JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; + let selector = JustifiedHeaderSelector::Regular( + vec![TestSourceHeader(false, 5, 5)], + justified_header.clone(), + ); + assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header)); + + // if there's no intersection between recent finality proofs and unjustified headers, + // nothing is changed + let finality_proofs_buf = FinalityProofsBuf::::new(vec![ + (1, TestFinalityProof(1)), + (4, TestFinalityProof(4)), + ]); + let justified_header = + JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; + let selector = JustifiedHeaderSelector::Regular( + vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)], + justified_header.clone(), + ); + assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header)); + + // if there's intersection between recent finality proofs and unjustified headers, but there + // are no proofs in this intersection, nothing is changed + let finality_proofs_buf = FinalityProofsBuf::::new(vec![ + (7, TestFinalityProof(7)), + (11, TestFinalityProof(11)), + ]); + let justified_header = + JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; + let selector = JustifiedHeaderSelector::Regular( + vec![ + TestSourceHeader(false, 8, 8), + TestSourceHeader(false, 9, 9), + TestSourceHeader(false, 10, 10), + ], + justified_header.clone(), + ); + assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header)); + + // if there's intersection between recent finality proofs and unjustified headers and + // there's a proof in this intersection: + // - this better (last from intersection) proof is selected; + // - 'obsolete' unjustified headers are pruned. + let finality_proofs_buf = FinalityProofsBuf::::new(vec![ + (7, TestFinalityProof(7)), + (9, TestFinalityProof(9)), + ]); + let justified_header = + JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; + let selector = JustifiedHeaderSelector::Regular( + vec![ + TestSourceHeader(false, 8, 8), + TestSourceHeader(false, 9, 9), + TestSourceHeader(false, 10, 10), + ], + justified_header, + ); + assert_eq!( + selector.select(&finality_proofs_buf), + Some(JustifiedHeader { + header: TestSourceHeader(false, 9, 9), + proof: TestFinalityProof(9) + }) + ); + } +} diff --git a/bridges/relays/finality/src/lib.rs b/bridges/relays/finality/src/lib.rs index 599cf2f9f9de..51cd9a093553 100644 --- a/bridges/relays/finality/src/lib.rs +++ b/bridges/relays/finality/src/lib.rs @@ -26,11 +26,14 @@ pub use crate::{ }; use bp_header_chain::ConsensusLogReader; +use relay_utils::{FailedClient, MaybeConnectionError}; use std::fmt::Debug; mod base; mod finality_loop; -mod finality_loop_tests; +mod finality_proofs; +mod headers; +mod mock; mod sync_loop_metrics; /// Finality proofs synchronization pipeline. @@ -50,3 +53,38 @@ pub trait SourceHeader: Clone + Debug + PartialEq + Send + /// Returns true if this header needs to be submitted to target node. fn is_mandatory(&self) -> bool; } + +/// Error that may happen inside finality synchronization loop. +#[derive(Debug)] +enum Error { + /// Source client request has failed with given error. + Source(SourceError), + /// Target client request has failed with given error. + Target(TargetError), + /// Finality proof for mandatory header is missing from the source node. + MissingMandatoryFinalityProof(P::Number), + /// `submit_finality_proof` transaction failed + ProofSubmissionTxFailed { + #[allow(dead_code)] + submitted_number: P::Number, + #[allow(dead_code)] + best_number_at_target: P::Number, + }, + /// `submit_finality_proof` transaction lost + ProofSubmissionTxLost, +} + +impl Error +where + P: FinalitySyncPipeline, + SourceError: MaybeConnectionError, + TargetError: MaybeConnectionError, +{ + fn fail_if_connection_error(&self) -> Result<(), FailedClient> { + match *self { + Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source), + Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target), + _ => Ok(()), + } + } +} diff --git a/bridges/relays/finality/src/mock.rs b/bridges/relays/finality/src/mock.rs new file mode 100644 index 000000000000..181504ce2607 --- /dev/null +++ b/bridges/relays/finality/src/mock.rs @@ -0,0 +1,209 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Tests for finality synchronization loop. + +#![cfg(test)] + +use crate::{ + base::SourceClientBase, + finality_loop::{SourceClient, TargetClient}, + FinalityPipeline, FinalitySyncPipeline, SourceHeader, +}; + +use async_trait::async_trait; +use bp_header_chain::{FinalityProof, GrandpaConsensusLogReader}; +use futures::{Stream, StreamExt}; +use parking_lot::Mutex; +use relay_utils::{ + relay_loop::Client as RelayClient, HeaderId, MaybeConnectionError, TrackedTransactionStatus, + TransactionTracker, +}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; + +type IsMandatory = bool; +pub type TestNumber = u64; +type TestHash = u64; + +#[derive(Clone, Debug)] +pub struct TestTransactionTracker(pub TrackedTransactionStatus>); + +impl Default for TestTransactionTracker { + fn default() -> TestTransactionTracker { + TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default())) + } +} + +#[async_trait] +impl TransactionTracker for TestTransactionTracker { + type HeaderId = HeaderId; + + async fn wait(self) -> TrackedTransactionStatus> { + self.0 + } +} + +#[derive(Debug, Clone)] +pub enum TestError { + NonConnection, +} + +impl MaybeConnectionError for TestError { + fn is_connection_error(&self) -> bool { + false + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TestFinalitySyncPipeline; + +impl FinalityPipeline for TestFinalitySyncPipeline { + const SOURCE_NAME: &'static str = "TestSource"; + const TARGET_NAME: &'static str = "TestTarget"; + + type Hash = TestHash; + type Number = TestNumber; + type FinalityProof = TestFinalityProof; +} + +impl FinalitySyncPipeline for TestFinalitySyncPipeline { + type ConsensusLogReader = GrandpaConsensusLogReader; + type Header = TestSourceHeader; +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TestSourceHeader(pub IsMandatory, pub TestNumber, pub TestHash); + +impl SourceHeader> + for TestSourceHeader +{ + fn hash(&self) -> TestHash { + self.2 + } + + fn number(&self) -> TestNumber { + self.1 + } + + fn is_mandatory(&self) -> bool { + self.0 + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TestFinalityProof(pub TestNumber); + +impl FinalityProof for TestFinalityProof { + fn target_header_number(&self) -> TestNumber { + self.0 + } +} + +#[derive(Debug, Clone, Default)] +pub struct ClientsData { + pub source_best_block_number: TestNumber, + pub source_headers: HashMap)>, + pub source_proofs: Vec, + + pub target_best_block_id: HeaderId, + pub target_headers: Vec<(TestSourceHeader, TestFinalityProof)>, + pub target_transaction_tracker: TestTransactionTracker, +} + +#[derive(Clone)] +pub struct TestSourceClient { + pub on_method_call: Arc, + pub data: Arc>, +} + +#[async_trait] +impl RelayClient for TestSourceClient { + type Error = TestError; + + async fn reconnect(&mut self) -> Result<(), TestError> { + unreachable!() + } +} + +#[async_trait] +impl SourceClientBase for TestSourceClient { + type FinalityProofsStream = Pin + 'static + Send>>; + + async fn finality_proofs(&self) -> Result { + let mut data = self.data.lock(); + (self.on_method_call)(&mut data); + Ok(futures::stream::iter(data.source_proofs.clone()).boxed()) + } +} + +#[async_trait] +impl SourceClient for TestSourceClient { + async fn best_finalized_block_number(&self) -> Result { + let mut data = self.data.lock(); + (self.on_method_call)(&mut data); + Ok(data.source_best_block_number) + } + + async fn header_and_finality_proof( + &self, + number: TestNumber, + ) -> Result<(TestSourceHeader, Option), TestError> { + let mut data = self.data.lock(); + (self.on_method_call)(&mut data); + data.source_headers.get(&number).cloned().ok_or(TestError::NonConnection) + } +} + +#[derive(Clone)] +pub struct TestTargetClient { + pub on_method_call: Arc, + pub data: Arc>, +} + +#[async_trait] +impl RelayClient for TestTargetClient { + type Error = TestError; + + async fn reconnect(&mut self) -> Result<(), TestError> { + unreachable!() + } +} + +#[async_trait] +impl TargetClient for TestTargetClient { + type TransactionTracker = TestTransactionTracker; + + async fn best_finalized_source_block_id( + &self, + ) -> Result, TestError> { + let mut data = self.data.lock(); + (self.on_method_call)(&mut data); + Ok(data.target_best_block_id) + } + + async fn submit_finality_proof( + &self, + header: TestSourceHeader, + proof: TestFinalityProof, + ) -> Result { + let mut data = self.data.lock(); + (self.on_method_call)(&mut data); + data.target_best_block_id = HeaderId(header.number(), header.hash()); + data.target_headers.push((header, proof)); + (self.on_method_call)(&mut data); + Ok(data.target_transaction_tracker.clone()) + } +} diff --git a/bridges/relays/lib-substrate-relay/src/finality/source.rs b/bridges/relays/lib-substrate-relay/src/finality/source.rs index 41c6c53daf4d..c94af6108957 100644 --- a/bridges/relays/lib-substrate-relay/src/finality/source.rs +++ b/bridges/relays/lib-substrate-relay/src/finality/source.rs @@ -125,7 +125,7 @@ impl SubstrateFinalitySource

{ Error, > { let client = self.client.clone(); - let best_finalized_block_number = self.client.best_finalized_header_number().await?; + let best_finalized_block_number = client.best_finalized_header_number().await?; Ok(try_unfold((client, block_number), move |(client, current_block_number)| async move { // if we've passed the `best_finalized_block_number`, we no longer need persistent // justifications diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs index 11e14744a075..dad7293de6d2 100644 --- a/bridges/relays/utils/src/relay_loop.rs +++ b/bridges/relays/utils/src/relay_loop.rs @@ -130,19 +130,19 @@ impl Loop { match result { Ok(()) => break, - Err(failed_client) => + Err(failed_client) => { + log::debug!(target: "bridge", "Restarting relay loop"); + reconnect_failed_client( failed_client, self.reconnect_delay, &mut self.source_client, &mut self.target_client, ) - .await, + .await + }, } - - log::debug!(target: "bridge", "Restarting relay loop"); } - Ok(()) }; @@ -194,7 +194,7 @@ impl LoopMetrics { Err(err) => { log::trace!( target: "bridge-metrics", - "Failed to create tokio runtime. Prometheus meterics are not available: {:?}", + "Failed to create tokio runtime. Prometheus metrics are not available: {:?}", err, ); return