Skip to content

Commit

Permalink
Refactor: Add test to ensure a Leader re-apply all logs upon startup,…
Browse files Browse the repository at this point in the history
… even when committed log id not saved

When a node starts up as the Leader, it now re-applies all logs at once.
Even when `save_committed()` is not implemented.

- Related issue: #1246
  • Loading branch information
drmingdrmer committed Sep 14, 2024
1 parent 805729b commit bc0ffc1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 2 deletions.
20 changes: 19 additions & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::io::Cursor;
use std::ops::RangeBounds;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;

Expand Down Expand Up @@ -117,6 +119,11 @@ pub enum BlockOperation {
pub struct MemStore {
last_purged_log_id: RwLock<Option<LogId<MemNodeId>>>,

/// Saving committed log id is optional in Openraft.
///
/// This flag switches on the saving for testing purposes.
pub enable_saving_committed: AtomicBool,

committed: RwLock<Option<LogId<MemNodeId>>>,

/// The Raft log. Logs are stored in serialized json.
Expand Down Expand Up @@ -146,6 +153,7 @@ impl MemStore {

Self {
last_purged_log_id: RwLock::new(None),
enable_saving_committed: AtomicBool::new(true),
committed: RwLock::new(None),
log,
sm,
Expand Down Expand Up @@ -325,13 +333,23 @@ impl RaftStorage<TypeConfig> for Arc<MemStore> {
}

async fn save_committed(&mut self, committed: Option<LogId<MemNodeId>>) -> Result<(), StorageError<MemNodeId>> {
tracing::debug!(?committed, "save_committed");
let enabled = self.enable_saving_committed.load(Ordering::Relaxed);
tracing::debug!(?committed, "save_committed, enabled: {}", enabled);
if !enabled {
return Ok(());
}
let mut c = self.committed.write().await;
*c = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<MemNodeId>>, StorageError<MemNodeId>> {
let enabled = self.enable_saving_committed.load(Ordering::Relaxed);
tracing::debug!("read_committed, enabled: {}", enabled);
if !enabled {
return Ok(None);
}

Ok(*self.committed.read().await)
}

Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/tests/startup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ fn eng() -> Engine<UTConfig> {
eng
}

/// It is a Leader but not yet append any logs.
#[test]
fn test_startup_as_leader_without_logs() -> anyhow::Result<()> {
let mut eng = eng();
Expand Down
8 changes: 7 additions & 1 deletion tests/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ pub struct TypedRaftRouter {
#[allow(clippy::type_complexity)]
nodes: Arc<Mutex<BTreeMap<MemNodeId, (MemRaft, MemLogStore, MemStateMachine)>>>,

/// Whether to save the committed entries to the RaftLogStorage.
pub enable_saving_committed: bool,

/// Whether to fail a network RPC that is sent from/to a node.
/// And it defines what kind of error to return.
fail_rpc: Arc<Mutex<HashMap<(MemNodeId, Direction), RPCErrorType>>>,
Expand Down Expand Up @@ -290,6 +293,7 @@ impl Builder {
TypedRaftRouter {
config: self.config,
nodes: Default::default(),
enable_saving_committed: true,
fail_rpc: Default::default(),
send_delay: Arc::new(AtomicU64::new(send_delay)),
append_entries_quota: Arc::new(Mutex::new(None)),
Expand Down Expand Up @@ -449,7 +453,9 @@ impl TypedRaftRouter {

pub fn new_store(&mut self) -> (MemLogStore, MemStateMachine) {
let store = Arc::new(MemStore::default());
Adaptor::new(store)
store.enable_saving_committed.store(self.enable_saving_committed, Ordering::Relaxed);
let (log, sm) = Adaptor::new(store);
(log, sm)
}

#[tracing::instrument(level = "debug", skip_all)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use crate::fixtures::RaftRouter;

/// A single leader should re-apply all logs upon startup,
/// because itself is a quorum.
///
/// This test disables save_committed() to ensure that logs are still re-applied because the leader
/// itself forms a quorum.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> {
let config = Arc::new(
Expand All @@ -24,6 +27,7 @@ async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> {
);

let mut router = RaftRouter::new(config.clone());
router.enable_saving_committed = false;

tracing::info!("--- bring up cluster of 1 node");
let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?;
Expand Down

0 comments on commit bc0ffc1

Please sign in to comment.