diff --git a/test/src/main.rs b/test/src/main.rs index d6a8187e70..038b947452 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -491,6 +491,7 @@ fn all_specs() -> Vec> { Box::new(RbfReplaceProposedSuccess), Box::new(RbfConcurrency), Box::new(RbfCellDepsCheck), + Box::new(RbfCyclingAttack), Box::new(CompactBlockEmpty), Box::new(CompactBlockEmptyParentUnknown), Box::new(CompactBlockPrefilled), diff --git a/test/src/specs/tx_pool/replace.rs b/test/src/specs/tx_pool/replace.rs index 3d5a61c6bc..b6ef175906 100644 --- a/test/src/specs/tx_pool/replace.rs +++ b/test/src/specs/tx_pool/replace.rs @@ -1013,6 +1013,142 @@ impl Spec for RbfCellDepsCheck { } } +pub struct RbfCyclingAttack; +impl Spec for RbfCyclingAttack { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + let initial_inputs = gen_spendable(node0, 3); + let input_a = &initial_inputs[0]; + let input_b = &initial_inputs[1]; + let input_c = &initial_inputs[2]; + + let input_c: CellInput = CellInput::new_builder() + .previous_output(input_c.out_point.clone()) + .build(); + + // Commit transaction root + let tx_a = { + let tx_a = always_success_transaction(node0, input_a); + node0.submit_transaction(&tx_a); + tx_a + }; + + let tx_b = { + let tx_b = always_success_transaction(node0, input_b); + node0.submit_transaction(&tx_b); + tx_b + }; + + let mut prev = tx_a.clone(); + // Create transaction chain, A0 -> A1 -> A2 + let mut txs_chain_a = vec![tx_a]; + for _i in 0..2 { + let input = + CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default()) + .out_point(OutPoint::new(prev.hash(), 0)) + .build(); + let cur = always_success_transaction(node0, &input); + txs_chain_a.push(cur.clone()); + let _ = node0.rpc_client().send_transaction(cur.data().into()); + prev = cur.clone(); + } + + // Create transaction chain, B0 -> B1 + let mut txs_chain_b = vec![tx_b.clone()]; + let mut prev = tx_b; + for _i in 0..1 { + let input = + CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default()) + .out_point(OutPoint::new(prev.hash(), 0)) + .build(); + let cur = always_success_transaction(node0, &input); + txs_chain_b.push(cur.clone()); + let _ = node0.rpc_client().send_transaction(cur.data().into()); + prev = cur.clone(); + } + let tx_b1 = txs_chain_b[1].clone(); + eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id()); + + // Create a child transaction consume B0 and A1 + // A0 ---> A1 ---> A2 + // | + // ----------> B2 + // | + // B0 ---> B1 + let tx_a1 = &txs_chain_a[1]; + let tx_b0 = &txs_chain_b[0]; + + let input_a1: CellInput = CellInput::new_builder() + .previous_output(OutPoint::new(tx_a1.hash(), 0)) + .build(); + let input_b0 = CellInput::new_builder() + .previous_output(OutPoint::new(tx_b0.hash(), 0)) + .build(); + + let tx_b2_output = CellOutputBuilder::default() + .capacity(capacity_bytes!(200).pack()) + .build(); + let tx_b2 = tx_a1 + .as_advanced_builder() + .set_inputs(vec![input_a1, input_b0]) + .set_outputs(vec![tx_b2_output]) + .build(); + let res = node0.rpc_client().send_transaction(tx_b2.data().into()); + eprintln!("tx_b2 {:?}", res); + + // after A2 and B1 is replaced by B2 + // A0 ---> A1 + // | + // ----------> B2 + // | + // B0 + let res = node0.rpc_client().get_transaction(tx_b2.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + let res = node0.rpc_client().get_transaction(txs_chain_a[2].hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + let res = node0.rpc_client().get_transaction(txs_chain_b[1].hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + + // tx_b1 is still rejected + let res = node0.rpc_client().get_transaction(tx_b1.hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + + // Create a new transaction A3 consume A1, it will replace B2 + let input_a1 = CellInput::new_builder() + .previous_output(OutPoint::new(tx_a1.hash(), 0)) + .build(); + let tx_a3_output = CellOutputBuilder::default() + .capacity(capacity_bytes!(100).pack()) + .build(); + let tx_a3 = tx_a1 + .as_advanced_builder() + .set_inputs(vec![input_a1, input_c]) + .set_outputs(vec![tx_a3_output]) + .build(); + let _res = node0.rpc_client().send_transaction(tx_a3.data().into()); + + // now result is: + // A0 ---> A1 -> A3 + // + // B0 -> B1 (B1 is recovered back) + // + let res = node0.rpc_client().get_transaction(tx_a3.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + let res = node0.rpc_client().get_transaction(tx_b2.hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id()); + + // B1 is expected by recovered back + let res = node0.rpc_client().get_transaction(tx_b1.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + fn run_spec_send_conflict_relay(nodes: &mut [Node]) { let node0 = &nodes[0]; let node1 = &nodes[1]; diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 54b2395e4c..da259d413c 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -28,6 +28,7 @@ use std::sync::Arc; const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; const CONFLICTES_CACHE_SIZE: usize = 10_000; +const CONFLICTES_INPUTS_CACHE_SIZE: usize = 30_000; const MAX_REPLACEMENT_CANDIDATES: usize = 100; /// Tx-pool implementation @@ -44,6 +45,8 @@ pub struct TxPool { pub(crate) expiry: u64, // conflicted transaction cache pub(crate) conflicts_cache: lru::LruCache, + // conflicted transaction outputs cache, input -> tx_short_id + pub(crate) conflicts_outputs_cache: lru::LruCache, } impl TxPool { @@ -59,6 +62,7 @@ impl TxPool { recent_reject, expiry, conflicts_cache: LruCache::new(CONFLICTES_CACHE_SIZE), + conflicts_outputs_cache: lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE), } } @@ -158,6 +162,9 @@ impl TxPool { pub(crate) fn record_conflict(&mut self, tx: TransactionView) { let short_id = tx.proposal_short_id(); + for inputs in tx.input_pts_iter() { + self.conflicts_outputs_cache.put(inputs, short_id.clone()); + } self.conflicts_cache.put(short_id.clone(), tx); debug!( "record_conflict {:?} now cache size: {}", @@ -167,7 +174,11 @@ impl TxPool { } pub(crate) fn remove_conflict(&mut self, short_id: &ProposalShortId) { - self.conflicts_cache.pop(short_id); + if let Some(tx) = self.conflicts_cache.pop(short_id) { + for inputs in tx.input_pts_iter() { + self.conflicts_outputs_cache.pop(&inputs); + } + } debug!( "remove_conflict {:?} now cache size: {}", short_id, @@ -175,6 +186,19 @@ impl TxPool { ); } + pub(crate) fn get_conflicted_txs_from_inputs( + &self, + inputs: impl Iterator, + ) -> Vec { + inputs + .filter_map(|input| { + self.conflicts_outputs_cache + .peek(&input) + .and_then(|id| self.conflicts_cache.peek(id).cloned()) + }) + .collect() + } + /// Returns tx with cycles corresponding to the id. pub(crate) fn get_tx_with_cycles( &self, @@ -493,6 +517,7 @@ impl TxPool { self.snapshot = snapshot; self.committed_txs_hash_cache = LruCache::new(COMMITTED_HASH_CACHE_SIZE); self.conflicts_cache = LruCache::new(CONFLICTES_CACHE_SIZE); + self.conflicts_outputs_cache = lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE); } pub(crate) fn package_proposals( diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 4ca169c2c1..2594901784 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -132,26 +132,11 @@ impl TxPoolService { time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?; } - // try to remove conflicted tx here - for id in conflicts.iter() { - let removed = tx_pool.pool_map.remove_entry_and_descendants(id); - for old in removed { - debug!( - "remove conflict tx {} for RBF by new tx {}", - old.transaction().hash(), - entry.transaction().hash() - ); - let reject = Reject::RBFRejected(format!( - "replaced by tx {}", - entry.transaction().hash() - )); - // RBF replace successfully, put old transactions into conflicts pool - tx_pool.record_conflict(old.transaction().clone()); - // after removing old tx from tx_pool, we call reject callbacks manually - self.callbacks.call_reject(tx_pool, &old, reject); - } - } + let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts); let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?; + + // in a corner case, a tx with lower fee rate may be rejected immediately + // after inserting into pool, return proper reject error here for evict in evicted { let reject = Reject::Invalidated(format!( "invalidated by tx {}", @@ -159,12 +144,23 @@ impl TxPoolService { )); self.callbacks.call_reject(tx_pool, &evict, reject); } + tx_pool.remove_conflict(&entry.proposal_short_id()); - // in a corner case, a tx with lower fee rate may be rejected immediately - // after inserting into pool, return proper reject error here tx_pool .limit_size(&self.callbacks, Some(&entry.proposal_short_id())) .map_or(Ok(()), Err)?; + + if !may_recovered_txs.is_empty() { + let self_clone = self.clone(); + tokio::spawn(async move { + // push the recovered txs back to verify queue, so that they can be verified and submitted again + let mut queue = self_clone.verify_queue.write().await; + for tx in may_recovered_txs { + debug!("recover back: {:?}", tx.proposal_short_id()); + let _ = queue.add_tx(tx, None); + } + }); + } Ok(()) }) .await; @@ -200,6 +196,55 @@ impl TxPoolService { } } + // try to remove conflicted tx here, the returned txs can be re-verified and re-submitted + // since they maybe not conflicted anymore + fn process_rbf( + &self, + tx_pool: &mut TxPool, + entry: &TxEntry, + conflicts: &HashSet, + ) -> Vec { + let mut may_recovered_txs = vec![]; + let mut available_inputs = HashSet::new(); + + if conflicts.is_empty() { + return may_recovered_txs; + } + + let all_removed: Vec<_> = conflicts + .iter() + .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id)) + .collect(); + + available_inputs.extend( + all_removed + .iter() + .flat_map(|removed| removed.transaction().input_pts_iter()), + ); + + for input in entry.transaction().input_pts_iter() { + available_inputs.remove(&input); + } + + may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter()); + for old in all_removed { + debug!( + "remove conflict tx {} for RBF by new tx {}", + old.transaction().hash(), + entry.transaction().hash() + ); + let reject = + Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash())); + + // RBF replace successfully, put old transactions into conflicts pool + tx_pool.record_conflict(old.transaction().clone()); + // after removing old tx from tx_pool, we call reject callbacks manually + self.callbacks.call_reject(tx_pool, &old, reject); + } + assert!(!may_recovered_txs.contains(entry.transaction())); + may_recovered_txs + } + pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool { let queue = self.verify_queue.read().await; queue.contains_key(&tx.proposal_short_id())