Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover possible transaction in conflicted cache when RBF #4561

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(RbfReplaceProposedSuccess),
Box::new(RbfConcurrency),
Box::new(RbfCellDepsCheck),
Box::new(RbfCyclingAttack),
Box::new(CompactBlockEmpty),
Box::new(CompactBlockEmptyParentUnknown),
Box::new(CompactBlockPrefilled),
Expand Down
136 changes: 136 additions & 0 deletions test/src/specs/tx_pool/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,142 @@ impl Spec for RbfCellDepsCheck {
}
}

pub struct RbfCyclingAttack;
impl Spec for RbfCyclingAttack {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];

let initial_inputs = gen_spendable(node0, 3);
let input_a = &initial_inputs[0];
let input_b = &initial_inputs[1];
let input_c = &initial_inputs[2];

let input_c: CellInput = CellInput::new_builder()
.previous_output(input_c.out_point.clone())
.build();

// Commit transaction root
let tx_a = {
let tx_a = always_success_transaction(node0, input_a);
node0.submit_transaction(&tx_a);
tx_a
};

let tx_b = {
let tx_b = always_success_transaction(node0, input_b);
node0.submit_transaction(&tx_b);
tx_b
};

let mut prev = tx_a.clone();
// Create transaction chain, A0 -> A1 -> A2
let mut txs_chain_a = vec![tx_a];
for _i in 0..2 {
let input =
CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default())
.out_point(OutPoint::new(prev.hash(), 0))
.build();
let cur = always_success_transaction(node0, &input);
txs_chain_a.push(cur.clone());
let _ = node0.rpc_client().send_transaction(cur.data().into());
prev = cur.clone();
}

// Create transaction chain, B0 -> B1
let mut txs_chain_b = vec![tx_b.clone()];
let mut prev = tx_b;
for _i in 0..1 {
let input =
CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default())
.out_point(OutPoint::new(prev.hash(), 0))
.build();
let cur = always_success_transaction(node0, &input);
txs_chain_b.push(cur.clone());
let _ = node0.rpc_client().send_transaction(cur.data().into());
prev = cur.clone();
}
let tx_b1 = txs_chain_b[1].clone();
eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id());

// Create a child transaction consume B0 and A1
// A0 ---> A1 ---> A2
// |
// ----------> B2
// |
// B0 ---> B1
let tx_a1 = &txs_chain_a[1];
let tx_b0 = &txs_chain_b[0];

let input_a1: CellInput = CellInput::new_builder()
.previous_output(OutPoint::new(tx_a1.hash(), 0))
.build();
let input_b0 = CellInput::new_builder()
.previous_output(OutPoint::new(tx_b0.hash(), 0))
.build();

let tx_b2_output = CellOutputBuilder::default()
.capacity(capacity_bytes!(200).pack())
.build();
let tx_b2 = tx_a1
.as_advanced_builder()
.set_inputs(vec![input_a1, input_b0])
.set_outputs(vec![tx_b2_output])
.build();
let res = node0.rpc_client().send_transaction(tx_b2.data().into());
eprintln!("tx_b2 {:?}", res);

// after A2 and B1 is replaced by B2
// A0 ---> A1
// |
// ----------> B2
// |
// B0
let res = node0.rpc_client().get_transaction(tx_b2.hash());
assert_eq!(res.tx_status.status, Status::Pending);
let res = node0.rpc_client().get_transaction(txs_chain_a[2].hash());
assert_eq!(res.tx_status.status, Status::Rejected);
let res = node0.rpc_client().get_transaction(txs_chain_b[1].hash());
assert_eq!(res.tx_status.status, Status::Rejected);

// tx_b1 is still rejected
let res = node0.rpc_client().get_transaction(tx_b1.hash());
assert_eq!(res.tx_status.status, Status::Rejected);

// Create a new transaction A3 consume A1, it will replace B2
let input_a1 = CellInput::new_builder()
.previous_output(OutPoint::new(tx_a1.hash(), 0))
.build();
let tx_a3_output = CellOutputBuilder::default()
.capacity(capacity_bytes!(100).pack())
.build();
let tx_a3 = tx_a1
.as_advanced_builder()
.set_inputs(vec![input_a1, input_c])
.set_outputs(vec![tx_a3_output])
.build();
let _res = node0.rpc_client().send_transaction(tx_a3.data().into());

// now result is:
// A0 ---> A1 -> A3
//
// B0 -> B1 (B1 is recovered back)
//
let res = node0.rpc_client().get_transaction(tx_a3.hash());
assert_eq!(res.tx_status.status, Status::Pending);
let res = node0.rpc_client().get_transaction(tx_b2.hash());
assert_eq!(res.tx_status.status, Status::Rejected);
eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id());

// B1 is expected by recovered back
let res = node0.rpc_client().get_transaction(tx_b1.hash());
assert_eq!(res.tx_status.status, Status::Pending);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}

fn run_spec_send_conflict_relay(nodes: &mut [Node]) {
let node0 = &nodes[0];
let node1 = &nodes[1];
Expand Down
27 changes: 26 additions & 1 deletion tx-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::sync::Arc;

const COMMITTED_HASH_CACHE_SIZE: usize = 100_000;
const CONFLICTES_CACHE_SIZE: usize = 10_000;
const CONFLICTES_INPUTS_CACHE_SIZE: usize = 30_000;
const MAX_REPLACEMENT_CANDIDATES: usize = 100;

/// Tx-pool implementation
Expand All @@ -44,6 +45,8 @@ pub struct TxPool {
pub(crate) expiry: u64,
// conflicted transaction cache
pub(crate) conflicts_cache: lru::LruCache<ProposalShortId, TransactionView>,
// conflicted transaction outputs cache, input -> tx_short_id
pub(crate) conflicts_outputs_cache: lru::LruCache<OutPoint, ProposalShortId>,
}

impl TxPool {
Expand All @@ -59,6 +62,7 @@ impl TxPool {
recent_reject,
expiry,
conflicts_cache: LruCache::new(CONFLICTES_CACHE_SIZE),
conflicts_outputs_cache: lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE),
Comment on lines 64 to +65
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may add two config options for CONFLICTES_CACHE_SIZE and CONFLICTES_INPUTS_CACHE_SIZE

}
}

Expand Down Expand Up @@ -158,6 +162,9 @@ impl TxPool {

pub(crate) fn record_conflict(&mut self, tx: TransactionView) {
let short_id = tx.proposal_short_id();
for inputs in tx.input_pts_iter() {
self.conflicts_outputs_cache.put(inputs, short_id.clone());
}
self.conflicts_cache.put(short_id.clone(), tx);
debug!(
"record_conflict {:?} now cache size: {}",
Expand All @@ -167,14 +174,31 @@ impl TxPool {
}

pub(crate) fn remove_conflict(&mut self, short_id: &ProposalShortId) {
self.conflicts_cache.pop(short_id);
if let Some(tx) = self.conflicts_cache.pop(short_id) {
for inputs in tx.input_pts_iter() {
self.conflicts_outputs_cache.pop(&inputs);
}
}
debug!(
"remove_conflict {:?} now cache size: {}",
short_id,
self.conflicts_cache.len()
);
}

pub(crate) fn get_conflicted_txs_from_inputs(
&self,
inputs: impl Iterator<Item = OutPoint>,
) -> Vec<TransactionView> {
inputs
.filter_map(|input| {
self.conflicts_outputs_cache
.peek(&input)
.and_then(|id| self.conflicts_cache.peek(id).cloned())
})
.collect()
}

/// Returns tx with cycles corresponding to the id.
pub(crate) fn get_tx_with_cycles(
&self,
Expand Down Expand Up @@ -493,6 +517,7 @@ impl TxPool {
self.snapshot = snapshot;
self.committed_txs_hash_cache = LruCache::new(COMMITTED_HASH_CACHE_SIZE);
self.conflicts_cache = LruCache::new(CONFLICTES_CACHE_SIZE);
self.conflicts_outputs_cache = lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE);
}

pub(crate) fn package_proposals(
Expand Down
87 changes: 66 additions & 21 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,39 +132,35 @@ impl TxPoolService {
time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
}

// try to remove conflicted tx here
for id in conflicts.iter() {
let removed = tx_pool.pool_map.remove_entry_and_descendants(id);
for old in removed {
debug!(
"remove conflict tx {} for RBF by new tx {}",
old.transaction().hash(),
entry.transaction().hash()
);
let reject = Reject::RBFRejected(format!(
"replaced by tx {}",
entry.transaction().hash()
));
// RBF replace successfully, put old transactions into conflicts pool
tx_pool.record_conflict(old.transaction().clone());
// after removing old tx from tx_pool, we call reject callbacks manually
self.callbacks.call_reject(tx_pool, &old, reject);
}
}
let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;

// in a corner case, a tx with lower fee rate may be rejected immediately
// after inserting into pool, return proper reject error here
for evict in evicted {
let reject = Reject::Invalidated(format!(
"invalidated by tx {}",
evict.transaction().hash()
));
self.callbacks.call_reject(tx_pool, &evict, reject);
}

tx_pool.remove_conflict(&entry.proposal_short_id());
// in a corner case, a tx with lower fee rate may be rejected immediately
// after inserting into pool, return proper reject error here
tx_pool
.limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
.map_or(Ok(()), Err)?;

if !may_recovered_txs.is_empty() {
let self_clone = self.clone();
tokio::spawn(async move {
// push the recovered txs back to verify queue, so that they can be verified and submitted again
let mut queue = self_clone.verify_queue.write().await;
for tx in may_recovered_txs {
debug!("recover back: {:?}", tx.proposal_short_id());
let _ = queue.add_tx(tx, None);
}
});
}
Ok(())
})
.await;
Expand Down Expand Up @@ -200,6 +196,55 @@ impl TxPoolService {
}
}

// try to remove conflicted tx here, the returned txs can be re-verified and re-submitted
// since they maybe not conflicted anymore
fn process_rbf(
&self,
tx_pool: &mut TxPool,
entry: &TxEntry,
conflicts: &HashSet<ProposalShortId>,
) -> Vec<TransactionView> {
let mut may_recovered_txs = vec![];
let mut available_inputs = HashSet::new();

if conflicts.is_empty() {
return may_recovered_txs;
}

let all_removed: Vec<_> = conflicts
.iter()
.flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
.collect();

available_inputs.extend(
all_removed
.iter()
.flat_map(|removed| removed.transaction().input_pts_iter()),
);

for input in entry.transaction().input_pts_iter() {
available_inputs.remove(&input);
}

may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
for old in all_removed {
debug!(
"remove conflict tx {} for RBF by new tx {}",
old.transaction().hash(),
entry.transaction().hash()
);
let reject =
Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));

// RBF replace successfully, put old transactions into conflicts pool
tx_pool.record_conflict(old.transaction().clone());
// after removing old tx from tx_pool, we call reject callbacks manually
self.callbacks.call_reject(tx_pool, &old, reject);
}
assert!(!may_recovered_txs.contains(entry.transaction()));
may_recovered_txs
}

pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
let queue = self.verify_queue.read().await;
queue.contains_key(&tx.proposal_short_id())
Expand Down
Loading