Skip to content

Commit

Permalink
event_cache/timeline: have the event cache handle restarting a back-p…
Browse files Browse the repository at this point in the history
…agination that failed under our feet

When a timeline reset happens while we're back-paginating, the event
cache method to run back pagination would return an success result
indicating that the pagination token disappeared. After thinking about
it, it's not the best API in the world; ideally, the backpagination
mechanism would restart automatically.

Now, this was handled in the timeline before, and the reason it was
handled there was because it was possible to back-paginate and ask for a
certain number of events. I've removed that feature, so that
back-pagination on a live timeline matches the capabilities of a
focused-timeline back-pagination: one can only ask for a given number of
*events*, not timeline items.

As a matter of fact, this simplifies the code a lot by removing many
data structures, that were also exposed (and unused, since recent
changes) in the FFI layer.
  • Loading branch information
bnjbvr committed May 1, 2024
1 parent f69ad5d commit 20404fb
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 395 deletions.
26 changes: 0 additions & 26 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,32 +981,6 @@ impl SendAttachmentJoinHandle {
}
}

#[derive(uniffi::Enum)]
pub enum PaginationOptions {
SimpleRequest { event_limit: u16, wait_for_token: bool },
UntilNumItems { event_limit: u16, items: u16, wait_for_token: bool },
}

impl From<PaginationOptions> for matrix_sdk_ui::timeline::PaginationOptions<'static> {
fn from(value: PaginationOptions) -> Self {
use matrix_sdk_ui::timeline::PaginationOptions as Opts;
let (wait_for_token, mut opts) = match value {
PaginationOptions::SimpleRequest { event_limit, wait_for_token } => {
(wait_for_token, Opts::simple_request(event_limit))
}
PaginationOptions::UntilNumItems { event_limit, items, wait_for_token } => {
(wait_for_token, Opts::until_num_items(event_limit, items))
}
};

if wait_for_token {
opts = opts.wait_for_token();
}

opts
}
}

/// A [`TimelineItem`](super::TimelineItem) that doesn't correspond to an event.
#[derive(uniffi::Enum)]
pub enum VirtualTimelineItem {
Expand Down
1 change: 0 additions & 1 deletion crates/matrix-sdk-ui/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ pub use self::{
event_type_filter::TimelineEventTypeFilter,
inner::default_event_filter,
item::{TimelineItem, TimelineItemKind},
pagination::{PaginationOptions, PaginationOutcome},
polls::PollResult,
reactions::ReactionSenderData,
sliding_sync_ext::SlidingSyncRoomExt,
Expand Down
314 changes: 34 additions & 280 deletions crates/matrix-sdk-ui/src/timeline/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt, ops::ControlFlow, sync::Arc};

use eyeball::Subscriber;
use matrix_sdk::event_cache::{
self,
Expand All @@ -32,9 +30,7 @@ impl super::Timeline {
#[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
pub async fn paginate_backwards(&self, num_events: u16) -> Result<bool, Error> {
if self.inner.is_live().await {
Ok(self
.live_paginate_backwards(PaginationOptions::until_num_items(num_events, 20))
.await?)
Ok(self.live_paginate_backwards(num_events).await?)
} else {
Ok(self.focused_paginate_backwards(num_events).await?)
}
Expand Down Expand Up @@ -64,79 +60,50 @@ impl super::Timeline {
/// on a specific event.
///
/// Returns whether we hit the start of the timeline.
#[instrument(skip_all, fields(room_id = ?self.room().room_id(), ?options))]
pub async fn live_paginate_backwards(
&self,
mut options: PaginationOptions<'_>,
) -> event_cache::Result<bool> {
let initial_options = options.clone();
let mut outcome = PaginationOutcome::default();

#[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
let pagination = self.event_cache.pagination();

while let Some(batch_size) = options.next_event_limit(outcome) {
loop {
let result = pagination.run_backwards(batch_size).await;

let event_cache_outcome = match result {
Ok(outcome) => outcome,
loop {
let result = pagination.run_backwards(batch_size).await;

Err(EventCacheError::BackpaginationError(
PaginatorError::InvalidPreviousState { actual, .. },
)) if actual == PaginatorState::Paginating => {
warn!("Another pagination request is already happening, returning early");
return Ok(false);
}
let event_cache_outcome = match result {
Ok(outcome) => outcome,

Err(err) => return Err(err),
};

match event_cache_outcome {
BackPaginationOutcome::Success { events, reached_start } => {
let num_events = events.len();
trace!("Back-pagination succeeded with {num_events} events");

let handle_many_res = self
.inner
.add_events_at(
events,
TimelineEnd::Front,
RemoteEventOrigin::Pagination,
)
.await;
Err(EventCacheError::BackpaginationError(
PaginatorError::InvalidPreviousState {
actual: PaginatorState::Paginating, ..
},
)) => {
warn!("Another pagination request is already happening, returning early");
return Ok(false);
}

if reached_start {
return Ok(true);
}
Err(err) => return Err(err),
};

outcome.events_received = num_events as u64;
outcome.total_events_received += outcome.events_received;
let BackPaginationOutcome { events, reached_start } = event_cache_outcome;

outcome.items_added = handle_many_res.items_added;
outcome.items_updated = handle_many_res.items_updated;
outcome.total_items_added += outcome.items_added;
outcome.total_items_updated += outcome.items_updated;
let num_events = events.len();
trace!("Back-pagination succeeded with {num_events} events");

if num_events == 0 {
// As an exceptional contract: if there were no events in the response,
// and we've not hit the start of the timeline, so retry until we get
// some events.
continue;
}
}
self.inner
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
.await;

BackPaginationOutcome::UnknownBackpaginationToken => {
// The token has been lost.
// It's possible the timeline has been cleared; restart the whole
// back-pagination.
outcome = Default::default();
options = initial_options.clone();
}
}
if reached_start {
return Ok(true);
}

// Exit the inner loop, and ask for another limit.
break;
if num_events == 0 {
// As an exceptional contract: if there were no events in the response,
// and we've not hit the start of the timeline, retry until we get
// some events or reach the start of the timeline.
continue;
}

// Exit the inner loop, and ask for another limit.
break;
}

Ok(false)
Expand All @@ -150,216 +117,3 @@ impl super::Timeline {
self.event_cache.pagination().status()
}
}

/// Options for pagination.
#[derive(Clone)]
pub struct PaginationOptions<'a> {
inner: PaginationOptionsInner<'a>,
pub(super) wait_for_token: bool,
}

impl<'a> PaginationOptions<'a> {
/// Do pagination requests until we receive some events, asking the server
/// for the given maximum number of events.
///
/// The server may choose to return fewer events, even if the start or end
/// of the visible timeline is not yet reached.
pub fn simple_request(event_limit: u16) -> Self {
Self::new(PaginationOptionsInner::SingleRequest { event_limit_if_first: Some(event_limit) })
}

/// Continually paginate with a fixed `limit` until at least the given
/// amount of timeline items have been added (unless the start or end of the
/// visible timeline is reached).
///
/// The `event_limit` represents the maximum number of events the server
/// should return in one batch. It may choose to return fewer events per
/// response.
pub fn until_num_items(event_limit: u16, items: u16) -> Self {
Self::new(PaginationOptionsInner::UntilNumItems { event_limit, items })
}

/// Paginate once with the given initial maximum number of events, then
/// do more requests based on the user-provided strategy
/// callback.
///
/// The callback is given numbers on the events and resulting timeline
/// items for the last request as well as summed over all
/// requests in a `paginate_backwards` call, and can decide
/// whether to do another request (by returning
/// `ControlFlow::Continue(next_event_limit)`) or not (by returning
/// `ControlFlow::Break(())`).
pub fn custom(
initial_event_limit: u16,
pagination_strategy: impl Fn(PaginationOutcome) -> ControlFlow<(), u16> + Send + Sync + 'a,
) -> Self {
Self::new(PaginationOptionsInner::Custom {
event_limit_if_first: Some(initial_event_limit),
strategy: Arc::new(pagination_strategy),
})
}

/// Whether to wait for a pagination token to be set before starting.
///
/// This is not something you should normally do since it can lead to very
/// long wait times, however in the specific case of using sliding sync with
/// the current proxy and subscribing to the room in a way that you know a
/// sync will be coming in soon, it can be useful to reduce unnecessary
/// traffic from duplicated events and avoid ordering issues from the sync
/// proxy returning older data than pagination.
pub fn wait_for_token(mut self) -> Self {
self.wait_for_token = true;
self
}

pub(super) fn next_event_limit(
&mut self,
pagination_outcome: PaginationOutcome,
) -> Option<u16> {
match &mut self.inner {
PaginationOptionsInner::SingleRequest { event_limit_if_first } => {
event_limit_if_first.take()
}
PaginationOptionsInner::UntilNumItems { items, event_limit } => {
(pagination_outcome.total_items_added < *items as u64).then_some(*event_limit)
}
PaginationOptionsInner::Custom { event_limit_if_first, strategy } => {
event_limit_if_first.take().or_else(|| match strategy(pagination_outcome) {
ControlFlow::Continue(event_limit) => Some(event_limit),
ControlFlow::Break(_) => None,
})
}
}
}

fn new(inner: PaginationOptionsInner<'a>) -> Self {
Self { inner, wait_for_token: false }
}
}

#[derive(Clone)]
pub enum PaginationOptionsInner<'a> {
SingleRequest {
event_limit_if_first: Option<u16>,
},
UntilNumItems {
event_limit: u16,
items: u16,
},
Custom {
event_limit_if_first: Option<u16>,
strategy: Arc<dyn Fn(PaginationOutcome) -> ControlFlow<(), u16> + Send + Sync + 'a>,
},
}

#[cfg(not(tarpaulin_include))]
impl<'a> fmt::Debug for PaginationOptions<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.inner {
PaginationOptionsInner::SingleRequest { event_limit_if_first } => f
.debug_struct("SingleRequest")
.field("event_limit_if_first", event_limit_if_first)
.finish(),
PaginationOptionsInner::UntilNumItems { items, event_limit } => f
.debug_struct("UntilNumItems")
.field("items", items)
.field("event_limit", event_limit)
.finish(),
PaginationOptionsInner::Custom { event_limit_if_first, .. } => f
.debug_struct("Custom")
.field("event_limit_if_first", event_limit_if_first)
.finish(),
}
}
}

/// The result of a successful pagination request.
#[derive(Clone, Copy, Debug, Default)]
#[non_exhaustive]
pub struct PaginationOutcome {
/// The number of events received in last pagination response.
pub events_received: u64,

/// The number of timeline items added by the last pagination response.
pub items_added: u64,

/// The number of timeline items updated by the last pagination
/// response.
pub items_updated: u64,

/// The number of events received by a `paginate_backwards` call so far.
pub total_events_received: u64,

/// The total number of items added by a `paginate_backwards` call so
/// far.
pub total_items_added: u64,

/// The total number of items updated by a `paginate_backwards` call so
/// far.
pub total_items_updated: u64,
}

#[cfg(test)]
mod tests {
use std::{
ops::ControlFlow,
sync::atomic::{AtomicU8, Ordering},
};

use super::{PaginationOptions, PaginationOutcome};

fn bump_outcome(outcome: &mut PaginationOutcome) {
outcome.events_received = 8;
outcome.items_added = 6;
outcome.items_updated = 1;
outcome.total_events_received += 8;
outcome.total_items_added += 6;
outcome.total_items_updated += 1;
}

#[test]
fn test_simple_request_limits() {
let mut opts = PaginationOptions::simple_request(10);
let mut outcome = PaginationOutcome::default();
assert_eq!(opts.next_event_limit(outcome), Some(10));

bump_outcome(&mut outcome);
assert_eq!(opts.next_event_limit(outcome), None);
}

#[test]
fn test_until_num_items_limits() {
let mut opts = PaginationOptions::until_num_items(10, 10);
let mut outcome = PaginationOutcome::default();
assert_eq!(opts.next_event_limit(outcome), Some(10));

bump_outcome(&mut outcome);
assert_eq!(opts.next_event_limit(outcome), Some(10));

bump_outcome(&mut outcome);
assert_eq!(opts.next_event_limit(outcome), None);
}

#[test]
fn test_custom_limits() {
let num_calls = AtomicU8::new(0);
let mut opts = PaginationOptions::custom(8, |outcome| {
num_calls.fetch_add(1, Ordering::AcqRel);
if outcome.total_items_added - outcome.total_items_updated < 6 {
ControlFlow::Continue(12)
} else {
ControlFlow::Break(())
}
});
let mut outcome = PaginationOutcome::default();
assert_eq!(opts.next_event_limit(outcome), Some(8));

bump_outcome(&mut outcome);
assert_eq!(opts.next_event_limit(outcome), Some(12));

bump_outcome(&mut outcome);
assert_eq!(opts.next_event_limit(outcome), None);

assert_eq!(num_calls.load(Ordering::Acquire), 2);
}
}
Loading

0 comments on commit 20404fb

Please sign in to comment.