Skip to content

Commit

Permalink
Refactor: Chunk read log entry and check range on startup
Browse files Browse the repository at this point in the history
- Implement chunk-based reading of committed log entries when
  re-applying to state machine upon startup.

- Add validation for log entry indexes, to avoid applying wrong entries
  to state machine.
  • Loading branch information
drmingdrmer committed Sep 14, 2024
1 parent bc0ffc1 commit f0b0aa5
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 5 deletions.
1 change: 1 addition & 0 deletions memstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing = { workspace = true }
[dev-dependencies]

[features]
bt = ["openraft/bt"]

[package.metadata.docs.rs]
all-features = true
61 changes: 56 additions & 5 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::marker::PhantomData;
use std::sync::Arc;

use anyerror::AnyError;

use crate::display_ext::DisplayOptionExt;
use crate::engine::LogIdList;
use crate::entry::RaftPayload;
use crate::log_id::RaftLogId;
use crate::raft_state::io_state::log_io_id::LogIOId;
use crate::raft_state::IOState;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::TypeConfigExt;
Expand All @@ -16,10 +17,12 @@ use crate::AsyncRuntime;
use crate::EffectiveMembership;
use crate::LogIdOptionExt;
use crate::MembershipState;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StorageIOError;
use crate::StoredMembership;

/// StorageHelper provides additional methods to access a [`RaftLogStorage`] and
Expand Down Expand Up @@ -94,10 +97,7 @@ where
let start = last_applied.next_index();
let end = committed.next_index();

tracing::info!("re-apply log {}..{} to state machine", start, end);

let entries = self.log_store.get_log_entries(start..end).await?;
self.state_machine.apply(entries).await?;
self.reapply_committed(start, end).await?;

last_applied = committed;
}
Expand Down Expand Up @@ -174,6 +174,57 @@ where
})
}

/// Read log entries from [`RaftLogReader`] in chunks, and apply them to the state machine.
pub(crate) async fn reapply_committed(&mut self, mut start: u64, end: u64) -> Result<(), StorageError<C::NodeId>> {
let chunk_size = 64;

tracing::info!(
"re-apply log [{}..{}) in {} item chunks to state machine",
chunk_size,
start,
end
);

let mut log_reader = self.log_store.get_log_reader().await;

while start < end {
let chunk_end = std::cmp::min(end, start + chunk_size);
let entries = log_reader.try_get_log_entries(start..chunk_end).await?;

let first = entries.first().map(|x| x.get_log_id().index);
let last = entries.last().map(|x| x.get_log_id().index);

let make_err = || {
let err = AnyError::error(format!(
"Failed to get log entries, expected index: [{}, {}), got [{:?}, {:?})",
start, chunk_end, first, last
));

tracing::error!("{}", err);
err
};

if first != Some(start) {
return Err(StorageIOError::read_log_at_index(start, make_err()).into());
}
if last != Some(chunk_end - 1) {
return Err(StorageIOError::read_log_at_index(chunk_end - 1, make_err()).into());
}

tracing::info!(
"re-apply {} log entries: [{}, {}),",
chunk_end - start,
start,
chunk_end
);
self.state_machine.apply(entries).await?;

start = chunk_end;
}

Ok(())
}

/// Returns the last 2 membership config found in log or state machine.
///
/// A raft node needs to store at most 2 membership config log:
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/storage_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ where NID: NodeId
Self::new(ErrorSubject::Log(log_id), ErrorVerb::Write, source)
}

pub fn read_log_at_index(log_index: u64, source: impl Into<AnyError>) -> Self {
Self::new(ErrorSubject::LogIndex(log_index), ErrorVerb::Read, source)
}

pub fn read_log_entry(log_id: LogId<NID>, source: impl Into<AnyError>) -> Self {
Self::new(ErrorSubject::Log(log_id), ErrorVerb::Read, source)
}
Expand Down
3 changes: 3 additions & 0 deletions sledstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ tracing = "0.1.29"
[dev-dependencies]
tempfile = { version = "3.4.0" }

[features]
bt = ["openraft/bt"]

[package.metadata.docs.rs]
all-features = true
3 changes: 3 additions & 0 deletions stores/rocksstore-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ tracing = "0.1.29"
[dev-dependencies]
tempfile = { version = "3.4.0" }

[features]
bt = ["openraft/bt"]

[package.metadata.docs.rs]
all-features = true

0 comments on commit f0b0aa5

Please sign in to comment.