diff --git a/crates/p2p/src/behaviour/builder.rs b/crates/p2p/src/behaviour/builder.rs index 94bb378d1..1463c38e5 100644 --- a/crates/p2p/src/behaviour/builder.rs +++ b/crates/p2p/src/behaviour/builder.rs @@ -106,6 +106,7 @@ impl Builder { kademlia_config.set_record_ttl(Some(Duration::from_secs(0))); kademlia_config.set_provider_record_ttl(Some(PROVIDER_PUBLICATION_INTERVAL * 3)); kademlia_config.set_provider_publication_interval(Some(PROVIDER_PUBLICATION_INTERVAL)); + kademlia_config.set_periodic_bootstrap_interval(Some(cfg.bootstrap_period)); let peer_id = identity.public().to_peer_id(); let secret = Secret::new(&identity); diff --git a/crates/p2p/src/builder.rs b/crates/p2p/src/builder.rs index 4a428ae93..6d7d86443 100644 --- a/crates/p2p/src/builder.rs +++ b/crates/p2p/src/builder.rs @@ -49,7 +49,7 @@ impl Builder { let client = Client::new(command_sender, local_peer_id); let (behaviour, relay_transport) = behaviour_builder - .unwrap_or_else(|| Behaviour::builder(keypair.clone(), chain_id, cfg.clone())) + .unwrap_or_else(|| Behaviour::builder(keypair.clone(), chain_id, cfg)) .build(client.clone()); let swarm = Swarm::new( @@ -65,7 +65,7 @@ impl Builder { ( client, event_receiver, - MainLoop::new(swarm, command_receiver, event_sender, cfg), + MainLoop::new(swarm, command_receiver, event_sender), ) } } diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index f2f23c277..a2225e3e0 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -56,14 +56,12 @@ pub struct Config { pub max_inbound_relayed_peers: usize, /// Maximum number of outbound peers. pub max_outbound_peers: usize, - /// The minimum number of peers to maintain. If the number of outbound peers - /// drops below this number, the node will attempt to connect to more - /// peers. - pub low_watermark: usize, /// How long to prevent evicted peers from reconnecting. pub eviction_timeout: Duration, pub ip_whitelist: Vec, - pub bootstrap: BootstrapConfig, + /// If the number of peers is below the low watermark, the node will attempt + /// periodic bootstrapping at this interval. + pub bootstrap_period: Duration, pub inbound_connections_rate_limit: RateLimit, /// Custom protocol name for Kademlia pub kad_name: Option, @@ -79,21 +77,6 @@ pub struct RateLimit { pub interval: Duration, } -#[derive(Copy, Clone, Debug)] -pub struct BootstrapConfig { - pub start_offset: Duration, - pub period: Duration, -} - -impl Default for BootstrapConfig { - fn default() -> Self { - Self { - start_offset: Duration::from_secs(5), - period: Duration::from_secs(2 * 60), - } - } -} - pub type HeadTx = tokio::sync::watch::Sender>; pub type HeadRx = tokio::sync::watch::Receiver>; diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index 7a25ce1f0..93d7fc683 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -1,18 +1,11 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; +use std::num::NonZeroUsize; use futures::channel::mpsc::Receiver as ResponseReceiver; use futures::StreamExt; use libp2p::gossipsub::{self, IdentTopic}; -use libp2p::kad::{ - self, - BootstrapError, - BootstrapOk, - ProgressStep, - QueryId, - QueryInfo, - QueryResult, -}; +use libp2p::kad::{self, BootstrapError, BootstrapOk, QueryId, QueryResult}; use libp2p::multiaddr::Protocol; use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::SwarmEvent; @@ -29,10 +22,9 @@ use tokio::time::Duration; #[cfg(test)] use crate::test_utils; -use crate::{behaviour, Command, Config, EmptyResultSender, Event, TestCommand, TestEvent}; +use crate::{behaviour, Command, EmptyResultSender, Event, TestCommand, TestEvent}; pub struct MainLoop { - cfg: crate::Config, swarm: libp2p::swarm::Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, @@ -46,8 +38,6 @@ pub struct MainLoop { // 2. update the sync head info of our peers using a different mechanism // request_sync_status: HashSetDelay, pending_queries: PendingQueries, - /// Ongoing Kademlia bootstrap query. - ongoing_bootstrap: Option, _pending_test_queries: TestQueries, } @@ -86,36 +76,24 @@ impl MainLoop { swarm: libp2p::swarm::Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, - cfg: Config, ) -> Self { Self { - cfg, swarm, command_receiver, event_sender, pending_dials: Default::default(), pending_sync_requests: Default::default(), pending_queries: Default::default(), - ongoing_bootstrap: None, _pending_test_queries: Default::default(), } } pub async fn run(mut self) { - // Delay bootstrap so that by the time we attempt it we've connected to the - // bootstrap node - let bootstrap_start = tokio::time::Instant::now() + self.cfg.bootstrap.start_offset; - let mut bootstrap_interval = - tokio::time::interval_at(bootstrap_start, self.cfg.bootstrap.period); - let mut network_status_interval = tokio::time::interval(Duration::from_secs(5)); let mut peer_status_interval = tokio::time::interval(Duration::from_secs(30)); let me = *self.swarm.local_peer_id(); loop { - let bootstrap_interval_tick = bootstrap_interval.tick(); - tokio::pin!(bootstrap_interval_tick); - let network_status_interval_tick = network_status_interval.tick(); tokio::pin!(network_status_interval_tick); @@ -162,31 +140,6 @@ impl MainLoop { dht, ); } - _ = bootstrap_interval_tick => { - tracing::debug!("Checking low watermark"); - if let Some(query_id) = self.ongoing_bootstrap { - match self.swarm.behaviour_mut().kademlia_mut().query_mut(&query_id) { - Some(mut query) if matches!(query.info(), QueryInfo::Bootstrap { - step: ProgressStep { last: false, .. }, .. } - ) => { - tracing::debug!("Previous bootstrap still in progress, aborting it"); - query.finish(); - continue; - } - _ => self.ongoing_bootstrap = None, - } - } - if self.swarm.behaviour_mut().outbound_peers().count() < self.cfg.low_watermark { - if let Ok(query_id) = self.swarm.behaviour_mut().kademlia_mut().bootstrap() { - self.ongoing_bootstrap = Some(query_id); - send_test_event( - &self.event_sender, - TestEvent::KademliaBootstrapStarted, - ) - .await; - } - } - } command = self.command_receiver.recv() => { match command { Some(c) => self.handle_command(c).await, @@ -397,7 +350,6 @@ impl MainLoop { Err(peer) } }; - self.ongoing_bootstrap = None; send_test_event( &self.event_sender, TestEvent::KademliaBootstrapCompleted(result), @@ -451,31 +403,52 @@ impl MainLoop { } _ => self.test_query_completed(id, result).await, } - } else if let QueryResult::GetProviders(result) = result { - use libp2p::kad::GetProvidersOk; - - let result = match result { - Ok(GetProvidersOk::FoundProviders { providers, .. }) => Ok(providers), - Ok(_) => Ok(Default::default()), - Err(_) => { - unreachable!( - "when a query times out libp2p makes it the last stage" - ) - } - }; - - let sender = self - .pending_queries - .get_providers - .get(&id) - .expect("Query to be pending"); - - sender - .send(result) - .await - .expect("Receiver not to be dropped"); } else { - self.test_query_progressed(id, result).await; + match result { + QueryResult::GetProviders(result) => { + use libp2p::kad::GetProvidersOk; + + let result = match result { + Ok(GetProvidersOk::FoundProviders { providers, .. }) => { + Ok(providers) + } + Ok(_) => Ok(Default::default()), + Err(_) => { + unreachable!( + "when a query times out libp2p makes it the last stage" + ) + } + }; + + let sender = self + .pending_queries + .get_providers + .get(&id) + .expect("Query to be pending"); + + sender + .send(result) + .await + .expect("Receiver not to be dropped"); + } + QueryResult::Bootstrap(_) => { + tracing::debug!("Checking low watermark"); + // Starting from libp2p-v0.54.1 bootstrap queries are started + // automatically in the kad behaviour: + // 1. periodically, + // 2. after a peer is added to the routing table, if the number of + // peers in the DHT is lower than 20. See `bootstrap_on_low_peers` for more details: + // https://github.com/libp2p/rust-libp2p/blob/d7beb55f672dce54017fa4b30f67ecb8d66b9810/protocols/kad/src/behaviour.rs#L1401). + if step.count == NonZeroUsize::new(1).expect("1>0") { + send_test_event( + &self.event_sender, + TestEvent::KademliaBootstrapStarted, + ) + .await; + } + } + _ => self.test_query_progressed(id, result).await, + } } } kad::Event::RoutingUpdated { diff --git a/crates/p2p/src/test_utils/peer.rs b/crates/p2p/src/test_utils/peer.rs index 72c4defdb..72911864f 100644 --- a/crates/p2p/src/test_utils/peer.rs +++ b/crates/p2p/src/test_utils/peer.rs @@ -35,9 +35,8 @@ impl Config { max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, max_outbound_peers: 10, - low_watermark: 10, - ip_whitelist: vec!["::/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], - bootstrap: Default::default(), + ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], + bootstrap_period: Duration::from_millis(500), eviction_timeout: Duration::from_secs(15 * 60), inbound_connections_rate_limit: RateLimit { max: 1000, diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index 1723a627b..3a80f46c7 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -15,7 +15,7 @@ use rstest::rstest; use crate::sync::codec; use crate::test_utils::peer::TestPeer; -use crate::{BootstrapConfig, Config, Event, EventReceiver, RateLimit, TestEvent}; +use crate::{Config, Event, EventReceiver, RateLimit, TestEvent}; /// [`MainLoop`](p2p::MainLoop)'s event channel size is 1, so we need to consume /// all events as soon as they're sent otherwise the main loop will stall. @@ -161,25 +161,8 @@ async fn periodic_bootstrap() { const BOOTSTRAP_PERIOD: Duration = Duration::from_millis(500); let cfg = Config { - direct_connection_timeout: Duration::from_secs(0), - relay_connection_timeout: Duration::from_secs(0), - ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], - max_inbound_direct_peers: 10, - max_inbound_relayed_peers: 10, - max_outbound_peers: 10, - low_watermark: 3, - bootstrap: BootstrapConfig { - period: BOOTSTRAP_PERIOD, - start_offset: Duration::from_secs(1), - }, - eviction_timeout: Duration::from_secs(15 * 60), - inbound_connections_rate_limit: RateLimit { - max: 1000, - interval: Duration::from_secs(1), - }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + bootstrap_period: BOOTSTRAP_PERIOD, + ..Config::for_test() }; let mut boot = TestPeer::new(cfg.clone()); let mut peer1 = TestPeer::new(cfg.clone()); @@ -270,41 +253,6 @@ async fn periodic_bootstrap() { peer2.client.for_test().get_peers_from_dht().await, [boot.peer_id, peer1.peer_id].into() ); - - // Start a new peer and connect to the other peers, immediately reaching the low - // watermark. - let mut peer3 = TestPeer::new(cfg); - - peer3 - .client - .dial(boot.peer_id, boot_addr.clone()) - .await - .unwrap(); - peer3 - .client - .dial(peer1.peer_id, addr1.clone()) - .await - .unwrap(); - peer3 - .client - .dial(peer2.peer_id, addr2.clone()) - .await - .unwrap(); - - consume_accumulated_events(&mut peer3.event_receiver).await; - - // The low watermark is reached for peer3, so no more bootstrap attempts are - // made. - let timeout = tokio::time::timeout( - BOOTSTRAP_PERIOD + Duration::from_millis(100), - wait_for_event(&mut peer3.event_receiver, |event| match event { - Event::Test(TestEvent::KademliaBootstrapStarted) => Some(()), - _ => None, - }), - ) - .await; - - assert!(timeout.is_err()); } /// Test that if a peer attempts to reconnect too quickly, the connection is @@ -314,24 +262,7 @@ async fn reconnect_too_quickly() { const CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); let cfg = Config { direct_connection_timeout: CONNECTION_TIMEOUT, - relay_connection_timeout: Duration::from_secs(0), - ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], - max_inbound_direct_peers: 10, - max_inbound_relayed_peers: 10, - max_outbound_peers: 10, - low_watermark: 0, - bootstrap: BootstrapConfig { - period: Duration::from_millis(500), - start_offset: Duration::from_secs(10), - }, - eviction_timeout: Duration::from_secs(15 * 60), - inbound_connections_rate_limit: RateLimit { - max: 1000, - interval: Duration::from_secs(1), - }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + ..Config::for_test() }; let mut peer1 = TestPeer::new(cfg.clone()); @@ -418,25 +349,7 @@ async fn duplicate_connection() { const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50); let cfg = Config { direct_connection_timeout: CONNECTION_TIMEOUT, - relay_connection_timeout: Duration::from_secs(0), - ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], - max_inbound_direct_peers: 10, - max_inbound_relayed_peers: 10, - max_outbound_peers: 10, - // Don't open connections automatically. - low_watermark: 0, - bootstrap: BootstrapConfig { - period: Duration::from_millis(500), - start_offset: Duration::from_secs(10), - }, - eviction_timeout: Duration::from_secs(15 * 60), - inbound_connections_rate_limit: RateLimit { - max: 1000, - interval: Duration::from_secs(1), - }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + ..Config::for_test() }; let keypair = Keypair::generate_ed25519(); let mut peer1 = TestPeer::with_keypair(keypair.clone(), cfg.clone()); @@ -506,26 +419,10 @@ async fn duplicate_connection() { #[test_log::test(tokio::test)] async fn outbound_peer_eviction() { let cfg = Config { - direct_connection_timeout: Duration::from_secs(0), - relay_connection_timeout: Duration::from_secs(0), - ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 2, max_inbound_relayed_peers: 0, max_outbound_peers: 2, - // Don't open connections automatically. - low_watermark: 0, - bootstrap: BootstrapConfig { - period: Duration::from_millis(500), - start_offset: Duration::from_secs(10), - }, - eviction_timeout: Duration::from_secs(15 * 60), - inbound_connections_rate_limit: RateLimit { - max: 1000, - interval: Duration::from_secs(1), - }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + ..Config::for_test() }; let mut peer = TestPeer::new(cfg.clone()); @@ -638,26 +535,10 @@ async fn outbound_peer_eviction() { #[test_log::test(tokio::test)] async fn inbound_peer_eviction() { let cfg = Config { - direct_connection_timeout: Duration::from_secs(0), - relay_connection_timeout: Duration::from_secs(0), - ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 25, max_inbound_relayed_peers: 0, max_outbound_peers: 100, - // Don't open connections automatically. - low_watermark: 0, - bootstrap: BootstrapConfig { - period: Duration::from_millis(500), - start_offset: Duration::from_secs(10), - }, - eviction_timeout: Duration::from_secs(15 * 60), - inbound_connections_rate_limit: RateLimit { - max: 1000, - interval: Duration::from_secs(1), - }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + ..Config::for_test() }; let mut peer = TestPeer::new(cfg.clone()); @@ -727,26 +608,10 @@ async fn inbound_peer_eviction() { #[test_log::test(tokio::test)] async fn evicted_peer_reconnection() { let cfg = Config { - direct_connection_timeout: Duration::from_secs(0), - relay_connection_timeout: Duration::from_secs(0), - ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 1000, max_inbound_relayed_peers: 0, max_outbound_peers: 1, - // Don't open connections automatically. - low_watermark: 0, - bootstrap: BootstrapConfig { - period: Duration::from_millis(500), - start_offset: Duration::from_secs(10), - }, - eviction_timeout: Duration::from_secs(15 * 60), - inbound_connections_rate_limit: RateLimit { - max: 1000, - interval: Duration::from_secs(1), - }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + ..Config::for_test() }; let mut peer1 = TestPeer::new(cfg.clone()); @@ -821,26 +686,8 @@ async fn evicted_peer_reconnection() { #[test_log::test(tokio::test)] async fn ip_whitelist() { let cfg = Config { - direct_connection_timeout: Duration::from_secs(0), - relay_connection_timeout: Duration::from_secs(0), ip_whitelist: vec!["127.0.0.2/32".parse().unwrap()], - max_inbound_direct_peers: 10, - max_inbound_relayed_peers: 10, - max_outbound_peers: 10, - // Don't open connections automatically. - low_watermark: 0, - bootstrap: BootstrapConfig { - period: Duration::from_millis(500), - start_offset: Duration::from_secs(10), - }, - eviction_timeout: Duration::from_secs(15 * 60), - inbound_connections_rate_limit: RateLimit { - max: 1000, - interval: Duration::from_secs(1), - }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + ..Config::for_test() }; let mut peer1 = TestPeer::new(cfg.clone()); let peer2 = TestPeer::new(cfg.clone()); @@ -857,26 +704,8 @@ async fn ip_whitelist() { // Start another peer accepting connections from 127.0.0.1. let cfg = Config { - direct_connection_timeout: Duration::from_secs(0), - relay_connection_timeout: Duration::from_secs(0), ip_whitelist: vec!["127.0.0.1/32".parse().unwrap()], - max_inbound_direct_peers: 10, - max_inbound_relayed_peers: 10, - max_outbound_peers: 10, - // Don't open connections automatically. - low_watermark: 0, - bootstrap: BootstrapConfig { - period: Duration::from_millis(500), - start_offset: Duration::from_secs(10), - }, - eviction_timeout: Duration::from_secs(15 * 60), - inbound_connections_rate_limit: RateLimit { - max: 1000, - interval: Duration::from_secs(1), - }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + ..Config::for_test() }; let mut peer3 = TestPeer::new(cfg); @@ -894,26 +723,11 @@ async fn rate_limit() { const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(1); let cfg = Config { - direct_connection_timeout: Duration::from_secs(0), - relay_connection_timeout: Duration::from_secs(0), - ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], - max_inbound_direct_peers: 10, - max_inbound_relayed_peers: 10, - max_outbound_peers: 10, - // Don't open connections automatically. - low_watermark: 0, - bootstrap: BootstrapConfig { - period: Duration::from_millis(500), - start_offset: Duration::from_secs(10), - }, - eviction_timeout: Duration::from_secs(15 * 60), inbound_connections_rate_limit: RateLimit { max: 2, interval: RATE_LIMIT_INTERVAL, }, - kad_name: Default::default(), - stream_timeout: Duration::from_secs(10), - max_concurrent_streams: 100, + ..Config::for_test() }; let mut peer1 = TestPeer::new(cfg.clone()); diff --git a/crates/pathfinder/src/bin/pathfinder/config.rs b/crates/pathfinder/src/bin/pathfinder/config.rs index f01f4b519..6a6a3c01b 100644 --- a/crates/pathfinder/src/bin/pathfinder/config.rs +++ b/crates/pathfinder/src/bin/pathfinder/config.rs @@ -458,17 +458,6 @@ Example: )] max_outbound_connections: u32, - #[arg( - long = "p2p.low-watermark", - long_help = "The minimum number of outbound peers to maintain. If the number of outbound \ - peers drops below this number, the node will attempt to connect to more \ - peers.", - value_name = "LOW_WATERMARK", - env = "PATHFINDER_LOW_WATERMARK", - default_value = "20" - )] - low_watermark: u32, - #[arg( long = "p2p.ip-whitelist", long_help = "Comma separated list of IP addresses or IP address ranges (in CIDR) to \ @@ -739,7 +728,6 @@ pub struct P2PConfig { pub max_inbound_relayed_connections: usize, pub max_outbound_connections: usize, pub ip_whitelist: Vec, - pub low_watermark: usize, pub kad_name: Option, pub l1_checkpoint_override: Option, pub stream_timeout: Duration, @@ -850,15 +838,6 @@ impl P2PConfig { .exit() } - if args.low_watermark > args.max_outbound_connections { - Cli::command() - .error( - ErrorKind::ValueValidation, - "p2p.low-watermark must be less than or equal to p2p.max_outbound_connections", - ) - .exit() - } - if args.kad_name.iter().any(|x| !x.starts_with('/')) { Cli::command() .error( @@ -886,7 +865,6 @@ impl P2PConfig { ), predefined_peers: parse_multiaddr_vec("p2p.predefined-peers", args.predefined_peers), ip_whitelist: args.ip_whitelist, - low_watermark: 0, kad_name: args.kad_name, l1_checkpoint_override, stream_timeout: Duration::from_secs(args.stream_timeout.into()), diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 7ab60493b..6ad4a4651 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -447,9 +447,8 @@ async fn start_p2p( max_inbound_direct_peers: config.max_inbound_direct_connections, max_inbound_relayed_peers: config.max_inbound_relayed_connections, max_outbound_peers: config.max_outbound_connections, - low_watermark: config.low_watermark, ip_whitelist: config.ip_whitelist, - bootstrap: Default::default(), + bootstrap_period: Duration::from_secs(2 * 60), eviction_timeout: config.eviction_timeout, inbound_connections_rate_limit: p2p::RateLimit { max: 10,