diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index ffd3a7b10e8..0b74477caf7 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -17,12 +17,15 @@ use std::{collections::HashMap, fmt::Write as _, fs, sync::Arc}; use anyhow::{Context, Result}; use as_variant::as_variant; use eyeball_im::VectorDiff; -use futures_util::{pin_mut, StreamExt}; -use matrix_sdk::attachment::{ - AttachmentConfig, AttachmentInfo, BaseAudioInfo, BaseFileInfo, BaseImageInfo, - BaseThumbnailInfo, BaseVideoInfo, Thumbnail, +use futures_util::{pin_mut, StreamExt as _}; +use matrix_sdk::{ + attachment::{ + AttachmentConfig, AttachmentInfo, BaseAudioInfo, BaseFileInfo, BaseImageInfo, + BaseThumbnailInfo, BaseVideoInfo, Thumbnail, + }, + event_cache::paginator::PaginatorState, }; -use matrix_sdk_ui::timeline::{EventItemOrigin, PaginationStatus, Profile, TimelineDetails}; +use matrix_sdk_ui::timeline::{EventItemOrigin, Profile, TimelineDetails}; use mime::Mime; use ruma::{ events::{ @@ -164,11 +167,11 @@ impl Timeline { &self, listener: Box, ) -> Result, ClientError> { - let mut subscriber = self.inner.back_pagination_status(); + let (initial, mut subscriber) = self.inner.back_pagination_status(); Ok(Arc::new(TaskHandle::new(RUNTIME.spawn(async move { // Send the current state even if it hasn't changed right away. - listener.on_update(subscriber.next_now()); + listener.on_update(initial); while let Some(status) = subscriber.next().await { listener.on_update(status); @@ -588,7 +591,7 @@ pub trait TimelineListener: Sync + Send { #[uniffi::export(callback_interface)] pub trait PaginationStatusListener: Sync + Send { - fn on_update(&self, status: PaginationStatus); + fn on_update(&self, status: PaginatorState); } #[derive(Clone, uniffi::Object)] @@ -978,32 +981,6 @@ impl SendAttachmentJoinHandle { } } -#[derive(uniffi::Enum)] -pub enum PaginationOptions { - SimpleRequest { event_limit: u16, wait_for_token: bool }, - UntilNumItems { event_limit: u16, items: u16, wait_for_token: bool }, -} - -impl From for matrix_sdk_ui::timeline::PaginationOptions<'static> { - fn from(value: PaginationOptions) -> Self { - use matrix_sdk_ui::timeline::PaginationOptions as Opts; - let (wait_for_token, mut opts) = match value { - PaginationOptions::SimpleRequest { event_limit, wait_for_token } => { - (wait_for_token, Opts::simple_request(event_limit)) - } - PaginationOptions::UntilNumItems { event_limit, items, wait_for_token } => { - (wait_for_token, Opts::until_num_items(event_limit, items)) - } - }; - - if wait_for_token { - opts = opts.wait_for_token(); - } - - opts - } -} - /// A [`TimelineItem`](super::TimelineItem) that doesn't correspond to an event. #[derive(uniffi::Enum)] pub enum VirtualTimelineItem { diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index a123ad26838..f5dc0caa242 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -14,7 +14,6 @@ use std::{collections::BTreeSet, sync::Arc}; -use eyeball::SharedObservable; use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{event_cache::RoomEventCacheUpdate, executor::spawn, Room}; use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; @@ -28,10 +27,7 @@ use super::{ queue::send_queued_messages, Error, Timeline, TimelineDropHandle, TimelineFocus, }; -use crate::{ - timeline::{event_item::RemoteEventOrigin, PaginationStatus}, - unable_to_decrypt_hook::UtdHookManager, -}; +use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager}; /// Builder that allows creating and configuring various parts of a /// [`Timeline`]. @@ -299,7 +295,6 @@ impl TimelineBuilder { let timeline = Timeline { inner, - back_pagination_status: SharedObservable::new(PaginationStatus::Idle), msg_sender, event_cache: room_event_cache, drop_handle: Arc::new(TimelineDropHandle { diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 2f0eb016b18..084c929db5c 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -18,7 +18,6 @@ use std::{pin::Pin, sync::Arc, task::Poll}; -use eyeball::SharedObservable; use eyeball_im::VectorDiff; use futures_core::Stream; use imbl::Vector; @@ -96,7 +95,6 @@ pub use self::{ event_type_filter::TimelineEventTypeFilter, inner::default_event_filter, item::{TimelineItem, TimelineItemKind}, - pagination::{PaginationOptions, PaginationOutcome, PaginationStatus}, polls::PollResult, reactions::ReactionSenderData, sliding_sync_ext::SlidingSyncRoomExt, @@ -130,9 +128,6 @@ pub struct Timeline { /// References to long-running tasks held by the timeline. drop_handle: Arc, - - /// Observable for whether a backward pagination is currently running. - pub(crate) back_pagination_status: SharedObservable, } // Implements hash etc diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index a1a2584bfbc..deb3b3ea9c9 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -12,38 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, ops::ControlFlow, sync::Arc, time::Duration}; - -use eyeball::{SharedObservable, Subscriber}; -use matrix_sdk::event_cache::{self, BackPaginationOutcome}; +use async_rx::StreamExt as _; +use futures_core::Stream; +use matrix_sdk::event_cache::{ + self, + paginator::{PaginatorError, PaginatorState}, + BackPaginationOutcome, EventCacheError, +}; use tracing::{instrument, trace, warn}; use super::Error; use crate::timeline::{event_item::RemoteEventOrigin, inner::TimelineEnd}; -struct ResetStatusGuard { - status: SharedObservable, - target: Option, -} - -impl ResetStatusGuard { - fn new(status: SharedObservable, target: PaginationStatus) -> Self { - Self { status, target: Some(target) } - } - - fn disarm(mut self) { - self.target = None; - } -} - -impl Drop for ResetStatusGuard { - fn drop(&mut self) { - if let Some(target) = self.target.take() { - self.status.set_if_not_eq(target); - } - } -} - impl super::Timeline { /// Add more events to the start of the timeline. /// @@ -51,9 +31,7 @@ impl super::Timeline { #[instrument(skip_all, fields(room_id = ?self.room().room_id()))] pub async fn paginate_backwards(&self, num_events: u16) -> Result { if self.inner.is_live().await { - Ok(self - .live_paginate_backwards(PaginationOptions::until_num_items(num_events, 20)) - .await?) + Ok(self.live_paginate_backwards(num_events).await?) } else { Ok(self.focused_paginate_backwards(num_events).await?) } @@ -83,327 +61,61 @@ impl super::Timeline { /// on a specific event. /// /// Returns whether we hit the start of the timeline. - #[instrument(skip_all, fields(room_id = ?self.room().room_id(), ?options))] - pub async fn live_paginate_backwards( - &self, - mut options: PaginationOptions<'_>, - ) -> event_cache::Result { - let back_pagination_status = &self.back_pagination_status; - - if back_pagination_status.get() == PaginationStatus::TimelineEndReached { - warn!("Start of timeline reached, ignoring backwards-pagination request"); - return Ok(true); - } - - if back_pagination_status.set_if_not_eq(PaginationStatus::Paginating).is_none() { - warn!("Another back-pagination is already running in the background"); - return Ok(false); - } - - let reset_status_guard = - ResetStatusGuard::new(back_pagination_status.clone(), PaginationStatus::Idle); - - // The first time, we allow to wait a bit for *a* back-pagination token to come - // over via sync. - const WAIT_FOR_TOKEN_TIMEOUT: Duration = Duration::from_secs(3); - - let mut token = - self.event_cache.oldest_backpagination_token(Some(WAIT_FOR_TOKEN_TIMEOUT)).await?; - - let initial_options = options.clone(); - let mut outcome = PaginationOutcome::default(); - - while let Some(batch_size) = options.next_event_limit(outcome) { - loop { - match self.event_cache.backpaginate(batch_size, token).await? { - BackPaginationOutcome::Success { events, reached_start } => { - let num_events = events.len(); - trace!("Back-pagination succeeded with {num_events} events"); - - let handle_many_res = self - .inner - .add_events_at( - events, - TimelineEnd::Front, - RemoteEventOrigin::Pagination, - ) - .await; - - if reached_start { - // Don't reset the status to `Idle`… - reset_status_guard.disarm(); - // …and set it to `TimelineEndReached` instead. - back_pagination_status - .set_if_not_eq(PaginationStatus::TimelineEndReached); - return Ok(true); - } + #[instrument(skip_all, fields(room_id = ?self.room().room_id()))] + pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result { + let pagination = self.event_cache.pagination(); + + loop { + let result = pagination.run_backwards(batch_size).await; + + let event_cache_outcome = match result { + Ok(outcome) => outcome, + + Err(EventCacheError::BackpaginationError( + PaginatorError::InvalidPreviousState { + actual: PaginatorState::Paginating, .. + }, + )) => { + warn!("Another pagination request is already happening, returning early"); + return Ok(false); + } - outcome.events_received = num_events as u64; - outcome.total_events_received += outcome.events_received; + Err(err) => return Err(err), + }; - outcome.items_added = handle_many_res.items_added; - outcome.items_updated = handle_many_res.items_updated; - outcome.total_items_added += outcome.items_added; - outcome.total_items_updated += outcome.items_updated; + let BackPaginationOutcome { events, reached_start } = event_cache_outcome; - if num_events == 0 { - // As an exceptional contract: if there were no events in the response, - // see if we had another back-pagination token, and retry the request. - token = self.event_cache.oldest_backpagination_token(None).await?; - continue; - } - } + let num_events = events.len(); + trace!("Back-pagination succeeded with {num_events} events"); - BackPaginationOutcome::UnknownBackpaginationToken => { - // The token has been lost. - // It's possible the timeline has been cleared; restart the whole - // back-pagination. - outcome = Default::default(); - options = initial_options.clone(); - } - } + self.inner + .add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination) + .await; - // Retrieve the next earliest back-pagination token. - token = self.event_cache.oldest_backpagination_token(None).await?; + if reached_start { + return Ok(true); + } - // Exit the inner loop, and ask for another limit. - break; + if num_events == 0 { + // As an exceptional contract: if there were no events in the response, + // and we've not hit the start of the timeline, retry until we get + // some events or reach the start of the timeline. + continue; } - } - // The status is automatically reset to idle by `reset_status_guard`. + // Exit the inner loop, and ask for another limit. + break; + } Ok(false) } /// Subscribe to the back-pagination status of the timeline. - pub fn back_pagination_status(&self) -> Subscriber { - self.back_pagination_status.subscribe() - } -} - -/// Options for pagination. -#[derive(Clone)] -pub struct PaginationOptions<'a> { - inner: PaginationOptionsInner<'a>, - pub(super) wait_for_token: bool, -} - -impl<'a> PaginationOptions<'a> { - /// Do pagination requests until we receive some events, asking the server - /// for the given maximum number of events. - /// - /// The server may choose to return fewer events, even if the start or end - /// of the visible timeline is not yet reached. - pub fn simple_request(event_limit: u16) -> Self { - Self::new(PaginationOptionsInner::SingleRequest { event_limit_if_first: Some(event_limit) }) - } - - /// Continually paginate with a fixed `limit` until at least the given - /// amount of timeline items have been added (unless the start or end of the - /// visible timeline is reached). - /// - /// The `event_limit` represents the maximum number of events the server - /// should return in one batch. It may choose to return fewer events per - /// response. - pub fn until_num_items(event_limit: u16, items: u16) -> Self { - Self::new(PaginationOptionsInner::UntilNumItems { event_limit, items }) - } - - /// Paginate once with the given initial maximum number of events, then - /// do more requests based on the user-provided strategy - /// callback. - /// - /// The callback is given numbers on the events and resulting timeline - /// items for the last request as well as summed over all - /// requests in a `paginate_backwards` call, and can decide - /// whether to do another request (by returning - /// `ControlFlow::Continue(next_event_limit)`) or not (by returning - /// `ControlFlow::Break(())`). - pub fn custom( - initial_event_limit: u16, - pagination_strategy: impl Fn(PaginationOutcome) -> ControlFlow<(), u16> + Send + Sync + 'a, - ) -> Self { - Self::new(PaginationOptionsInner::Custom { - event_limit_if_first: Some(initial_event_limit), - strategy: Arc::new(pagination_strategy), - }) - } - - /// Whether to wait for a pagination token to be set before starting. /// - /// This is not something you should normally do since it can lead to very - /// long wait times, however in the specific case of using sliding sync with - /// the current proxy and subscribing to the room in a way that you know a - /// sync will be coming in soon, it can be useful to reduce unnecessary - /// traffic from duplicated events and avoid ordering issues from the sync - /// proxy returning older data than pagination. - pub fn wait_for_token(mut self) -> Self { - self.wait_for_token = true; - self - } - - pub(super) fn next_event_limit( - &mut self, - pagination_outcome: PaginationOutcome, - ) -> Option { - match &mut self.inner { - PaginationOptionsInner::SingleRequest { event_limit_if_first } => { - event_limit_if_first.take() - } - PaginationOptionsInner::UntilNumItems { items, event_limit } => { - (pagination_outcome.total_items_added < *items as u64).then_some(*event_limit) - } - PaginationOptionsInner::Custom { event_limit_if_first, strategy } => { - event_limit_if_first.take().or_else(|| match strategy(pagination_outcome) { - ControlFlow::Continue(event_limit) => Some(event_limit), - ControlFlow::Break(_) => None, - }) - } - } - } - - fn new(inner: PaginationOptionsInner<'a>) -> Self { - Self { inner, wait_for_token: false } - } -} - -#[derive(Clone)] -pub enum PaginationOptionsInner<'a> { - SingleRequest { - event_limit_if_first: Option, - }, - UntilNumItems { - event_limit: u16, - items: u16, - }, - Custom { - event_limit_if_first: Option, - strategy: Arc ControlFlow<(), u16> + Send + Sync + 'a>, - }, -} - -#[cfg(not(tarpaulin_include))] -impl<'a> fmt::Debug for PaginationOptions<'a> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.inner { - PaginationOptionsInner::SingleRequest { event_limit_if_first } => f - .debug_struct("SingleRequest") - .field("event_limit_if_first", event_limit_if_first) - .finish(), - PaginationOptionsInner::UntilNumItems { items, event_limit } => f - .debug_struct("UntilNumItems") - .field("items", items) - .field("event_limit", event_limit) - .finish(), - PaginationOptionsInner::Custom { event_limit_if_first, .. } => f - .debug_struct("Custom") - .field("event_limit_if_first", event_limit_if_first) - .finish(), - } - } -} - -/// The result of a successful pagination request. -#[derive(Clone, Copy, Debug, Default)] -#[non_exhaustive] -pub struct PaginationOutcome { - /// The number of events received in last pagination response. - pub events_received: u64, - - /// The number of timeline items added by the last pagination response. - pub items_added: u64, - - /// The number of timeline items updated by the last pagination - /// response. - pub items_updated: u64, - - /// The number of events received by a `paginate_backwards` call so far. - pub total_events_received: u64, - - /// The total number of items added by a `paginate_backwards` call so - /// far. - pub total_items_added: u64, - - /// The total number of items updated by a `paginate_backwards` call so - /// far. - pub total_items_updated: u64, -} - -/// The status of a pagination. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))] -pub enum PaginationStatus { - /// No pagination happening. - Idle, - /// Timeline is paginating for this end. - Paginating, - /// An end of the timeline (front or back) has been reached by this - /// pagination. - TimelineEndReached, -} - -#[cfg(test)] -mod tests { - use std::{ - ops::ControlFlow, - sync::atomic::{AtomicU8, Ordering}, - }; - - use super::{PaginationOptions, PaginationOutcome}; - - fn bump_outcome(outcome: &mut PaginationOutcome) { - outcome.events_received = 8; - outcome.items_added = 6; - outcome.items_updated = 1; - outcome.total_events_received += 8; - outcome.total_items_added += 6; - outcome.total_items_updated += 1; - } - - #[test] - fn test_simple_request_limits() { - let mut opts = PaginationOptions::simple_request(10); - let mut outcome = PaginationOutcome::default(); - assert_eq!(opts.next_event_limit(outcome), Some(10)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), None); - } - - #[test] - fn test_until_num_items_limits() { - let mut opts = PaginationOptions::until_num_items(10, 10); - let mut outcome = PaginationOutcome::default(); - assert_eq!(opts.next_event_limit(outcome), Some(10)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), Some(10)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), None); - } - - #[test] - fn test_custom_limits() { - let num_calls = AtomicU8::new(0); - let mut opts = PaginationOptions::custom(8, |outcome| { - num_calls.fetch_add(1, Ordering::AcqRel); - if outcome.total_items_added - outcome.total_items_updated < 6 { - ControlFlow::Continue(12) - } else { - ControlFlow::Break(()) - } - }); - let mut outcome = PaginationOutcome::default(); - assert_eq!(opts.next_event_limit(outcome), Some(8)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), Some(12)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), None); - - assert_eq!(num_calls.load(Ordering::Acquire), 2); + /// Note: this may send multiple Paginating/Idle sequences during a single + /// call to [`Self::paginate_backwards()`]. + pub fn back_pagination_status(&self) -> (PaginatorState, impl Stream) { + let mut status = self.event_cache.pagination().status(); + (status.next_now(), status.dedup()) } } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs index 287865f0b8b..564f283b9b6 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs @@ -18,16 +18,16 @@ use assert_matches2::assert_let; use eyeball_im::VectorDiff; use futures_util::{ future::{join, join3}, - FutureExt, + FutureExt, StreamExt as _, +}; +use matrix_sdk::{ + config::SyncSettings, event_cache::paginator::PaginatorState, + test_utils::logged_in_client_with_server, }; -use matrix_sdk::{config::SyncSettings, test_utils::logged_in_client_with_server}; use matrix_sdk_test::{ async_test, EventBuilder, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, ALICE, BOB, }; -use matrix_sdk_ui::timeline::{ - AnyOtherFullStateEventContent, PaginationOptions, PaginationStatus, RoomExt, - TimelineItemContent, -}; +use matrix_sdk_ui::timeline::{AnyOtherFullStateEventContent, RoomExt, TimelineItemContent}; use once_cell::sync::Lazy; use ruma::{ events::{ @@ -65,7 +65,7 @@ async fn test_back_pagination() { let room = client.get_room(room_id).unwrap(); let timeline = Arc::new(room.timeline().await.unwrap()); let (_, mut timeline_stream) = timeline.subscribe().await; - let mut back_pagination_status = timeline.back_pagination_status(); + let (_, mut back_pagination_status) = timeline.back_pagination_status(); Mock::given(method("GET")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) @@ -77,11 +77,11 @@ async fn test_back_pagination() { .await; let paginate = async { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); server.reset().await; }; let observe_paginating = async { - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating)); + assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating)); }; join(paginate, observe_paginating).await; @@ -136,8 +136,9 @@ async fn test_back_pagination() { .mount(&server) .await; - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); - assert_next_eq!(back_pagination_status, PaginationStatus::TimelineEndReached); + let hit_start = timeline.live_paginate_backwards(10).await.unwrap(); + assert!(hit_start); + assert_next_eq!(back_pagination_status, PaginatorState::Idle); } #[async_test] @@ -201,7 +202,7 @@ async fn test_back_pagination_highlighted() { .mount(&server) .await; - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); server.reset().await; let first = assert_next_matches!( @@ -245,7 +246,7 @@ async fn test_wait_for_token() { let timeline = Arc::new(room.timeline().await.unwrap()); let from = "t392-516_47314_0_7_1_1_1_11444_1"; - let mut back_pagination_status = timeline.back_pagination_status(); + let (_, mut back_pagination_status) = timeline.back_pagination_status(); Mock::given(method("GET")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) @@ -268,14 +269,11 @@ async fn test_wait_for_token() { mock_sync(&server, sync_builder.build_json_sync_response(), None).await; let paginate = async { - timeline - .live_paginate_backwards(PaginationOptions::simple_request(10).wait_for_token()) - .await - .unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); }; let observe_paginating = async { - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating)); - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Idle)); + assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating)); + assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Idle)); }; let sync = async { // Make sure syncing starts a little bit later than pagination @@ -289,7 +287,7 @@ async fn test_wait_for_token() { } #[async_test] -async fn test_dedup() { +async fn test_dedup_pagination() { let room_id = room_id!("!a98sd12bjh:example.org"); let (client, server) = logged_in_client_with_server().await; let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); @@ -334,10 +332,10 @@ async fn test_dedup() { // If I try to paginate twice at the same time, let paginate_1 = async { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); }; let paginate_2 = async { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); }; timeout(Duration::from_secs(5), join(paginate_1, paginate_2)).await.unwrap(); @@ -424,24 +422,44 @@ async fn test_timeline_reset_while_paginating() { .mount(&server) .await; - let mut back_pagination_status = timeline.back_pagination_status(); + let (_, mut back_pagination_status) = timeline.back_pagination_status(); + + let paginate = async { timeline.live_paginate_backwards(10).await.unwrap() }; - let paginate = async { - timeline - .live_paginate_backwards(PaginationOptions::simple_request(10).wait_for_token()) - .await - .unwrap(); - }; let observe_paginating = async { - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating)); - // timeline start reached because second pagination response contains - // no end field - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::TimelineEndReached)); + let mut seen_paginating = false; + + // Observe paginating updates: we want to make sure we see at least once + // Paginating, and that it settles with Idle. + while let Ok(update) = + timeout(Duration::from_millis(500), back_pagination_status.next()).await + { + match update { + Some(state) => { + if state == PaginatorState::Paginating { + seen_paginating = true; + } + } + None => break, + } + } + + assert!(seen_paginating); + + let (status, _) = timeline.back_pagination_status(); + assert_eq!(status, PaginatorState::Idle); }; + let sync = async { client.sync_once(sync_settings.clone()).await.unwrap(); }; - timeout(Duration::from_secs(2), join3(paginate, observe_paginating, sync)).await.unwrap(); + + let (hit_start, _, _) = + timeout(Duration::from_secs(5), join3(paginate, observe_paginating, sync)).await.unwrap(); + + // timeline start reached because second pagination response contains no end + // field. + assert!(hit_start); // No events in back-pagination responses, day divider + event from latest // sync is present @@ -538,7 +556,7 @@ async fn test_empty_chunk() { let room = client.get_room(room_id).unwrap(); let timeline = Arc::new(room.timeline().await.unwrap()); let (_, mut timeline_stream) = timeline.subscribe().await; - let mut back_pagination_status = timeline.back_pagination_status(); + let (_, mut back_pagination_status) = timeline.back_pagination_status(); // It should try to do another request after the empty chunk. Mock::given(method("GET")) @@ -566,11 +584,11 @@ async fn test_empty_chunk() { .await; let paginate = async { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); server.reset().await; }; let observe_paginating = async { - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating)); + assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating)); }; join(paginate, observe_paginating).await; @@ -628,7 +646,7 @@ async fn test_until_num_items_with_empty_chunk() { let room = client.get_room(room_id).unwrap(); let timeline = Arc::new(room.timeline().await.unwrap()); let (_, mut timeline_stream) = timeline.subscribe().await; - let mut back_pagination_status = timeline.back_pagination_status(); + let (_, mut back_pagination_status) = timeline.back_pagination_status(); Mock::given(method("GET")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) @@ -665,11 +683,10 @@ async fn test_until_num_items_with_empty_chunk() { .await; let paginate = async { - timeline.live_paginate_backwards(PaginationOptions::until_num_items(4, 4)).await.unwrap(); - server.reset().await; + timeline.live_paginate_backwards(10).await.unwrap(); }; let observe_paginating = async { - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating)); + assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating)); }; join(paginate, observe_paginating).await; @@ -710,6 +727,8 @@ async fn test_until_num_items_with_empty_chunk() { ); assert!(day_divider.is_day_divider()); + timeline.live_paginate_backwards(10).await.unwrap(); + let message = assert_next_matches!( timeline_stream, VectorDiff::PushFront { value } => value @@ -741,7 +760,7 @@ async fn test_back_pagination_aborted() { let room = client.get_room(room_id).unwrap(); let timeline = Arc::new(room.timeline().await.unwrap()); - let mut back_pagination_status = timeline.back_pagination_status(); + let (_, mut back_pagination_status) = timeline.back_pagination_status(); // Delay the server response, so we have time to abort the request. Mock::given(method("GET")) @@ -758,11 +777,11 @@ async fn test_back_pagination_aborted() { let paginate = spawn({ let timeline = timeline.clone(); async move { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); } }); - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating)); + assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating)); // Abort the pagination! paginate.abort(); @@ -771,7 +790,7 @@ async fn test_back_pagination_aborted() { assert!(paginate.await.unwrap_err().is_cancelled()); // The timeline should automatically reset to idle. - assert_next_eq!(back_pagination_status, PaginationStatus::Idle); + assert_next_eq!(back_pagination_status, PaginatorState::Idle); // And there should be no other pending pagination status updates. assert!(back_pagination_status.next().now_or_never().is_none()); diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 33c806a8835..9ae2e572707 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -32,7 +32,7 @@ //! `RoomInfo`, and that may update a `RoomListService`. //! - [ ] provide read receipts for each message. //! - [x] backwards pagination -//! - [ ] forward pagination +//! - [~] forward pagination //! - [ ] reconcile results with cached timelines. //! - [ ] retry decryption upon receiving new keys (from an encryption sync //! service or from a key backup). @@ -45,7 +45,6 @@ use std::{ collections::BTreeMap, fmt::Debug, sync::{Arc, OnceLock, Weak}, - time::Duration, }; use eyeball::Subscriber; @@ -55,30 +54,30 @@ use matrix_sdk_base::{ }; use matrix_sdk_common::executor::{spawn, JoinHandle}; use ruma::{ - assign, events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent}, serde::Raw, OwnedEventId, OwnedRoomId, RoomId, }; -use tokio::{ - sync::{ - broadcast::{error::RecvError, Receiver, Sender}, - Mutex, Notify, RwLock, RwLockReadGuard, RwLockWriteGuard, - }, - time::timeout, +use tokio::sync::{ + broadcast::{error::RecvError, Receiver, Sender}, + Mutex, RwLock, RwLockWriteGuard, }; use tracing::{error, info_span, instrument, trace, warn, Instrument as _, Span}; use self::{ - linked_chunk::ChunkContent, - store::{Gap, PaginationToken, RoomEvents}, + pagination::RoomPaginationData, + paginator::{Paginator, PaginatorError}, + store::{Gap, RoomEvents}, }; -use crate::{client::ClientInner, room::MessagesOptions, Client, Room}; +use crate::{client::ClientInner, Client, Room}; mod linked_chunk; -pub mod paginator; +mod pagination; mod store; +pub mod paginator; +pub use pagination::RoomPagination; + /// An error observed in the [`EventCache`]. #[derive(thiserror::Error, Debug)] pub enum EventCacheError { @@ -100,6 +99,10 @@ pub enum EventCacheError { #[error("The given back-pagination token is unknown to the event cache.")] UnknownBackpaginationToken, + /// An error has been observed while back-paginating. + #[error("Error observed while back-paginating: {0}")] + BackpaginationError(#[from] PaginatorError), + /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains /// to. It's possible this weak reference points to nothing anymore, at /// times where we try to use the client. @@ -321,14 +324,15 @@ impl EventCacheInner { // Note: one must NOT clear the `by_room` map, because if something subscribed // to a room update, they would never get any new update for that room, since // re-creating the `RoomEventCache` would create a new unrelated sender. - let rooms = self.by_room.write().await; + let rooms = self.by_room.write().await; for room in rooms.values() { // Notify all the observers that we've lost track of state. (We ignore the // error if there aren't any.) let _ = room.inner.sender.send(RoomEventCacheUpdate::Clear); // Clear all the events in memory. - room.inner.events.write().await.reset(); + let mut events = room.inner.events.write().await; + room.inner.clear(&mut events).await; } } @@ -441,32 +445,10 @@ impl RoomEventCache { Ok((events, self.inner.sender.subscribe())) } - /// Returns the oldest back-pagination token, that is, the one closest to - /// the beginning of the timeline as we know it. - /// - /// Optionally, wait at most for the given duration for a back-pagination - /// token to be returned by a sync. - pub async fn oldest_backpagination_token( - &self, - max_wait: Option, - ) -> Result> { - self.inner.oldest_backpagination_token(max_wait).await - } - - /// Back-paginate with the given token, if provided. - /// - /// If no token has been provided, it will back-paginate from the end of the - /// room. - /// - /// If a token has been provided, but it was unknown to the event cache - /// (i.e. it's not associated to any gap in the timeline stored by the - /// event cache), then an error result will be returned. - pub async fn backpaginate( - &self, - batch_size: u16, - token: Option, - ) -> Result { - self.inner.backpaginate(batch_size, token).await + /// Return a [`RoomPagination`] API object useful for running + /// back-pagination queries in the current room. + pub fn pagination(&self) -> RoomPagination { + RoomPagination { inner: self.inner.clone() } } } @@ -475,18 +457,21 @@ struct RoomEventCacheInner { /// Sender part for subscribers to this room. sender: Sender, - /// The Client [`Room`] this event cache pertains to. - room: Room, - /// The events of the room. events: RwLock, - /// A notifier that we received a new pagination token. - pagination_token_notifier: Notify, - - /// A lock that ensures we don't run multiple pagination queries at the same - /// time. - pagination_lock: Mutex<()>, + /// A paginator instance, that's configured to run back-pagination on our + /// behalf. + /// + /// Note: forward-paginations are still run "out-of-band", that is, + /// disconnected from the event cache, as we don't implement matching + /// events received from those kinds of pagination with the cache. This + /// paginator is only used for queries that interact with the actual event + /// cache. + /// + /// It's protected behind a lock to avoid multiple accesses to the paginator + /// at the same time. + pagination: RoomPaginationData, } impl RoomEventCacheInner { @@ -496,14 +481,23 @@ impl RoomEventCacheInner { let sender = Sender::new(32); Self { - room, events: RwLock::new(RoomEvents::default()), sender, - pagination_lock: Default::default(), - pagination_token_notifier: Default::default(), + pagination: RoomPaginationData { + paginator: Paginator::new(Box::new(room)), + waited_for_initial_prev_token: Mutex::new(false), + token_notifier: Default::default(), + }, } } + async fn clear(&self, room_events: &mut RwLockWriteGuard<'_, RoomEvents>) { + room_events.reset(); + + // Reset the back-pagination state to the initial too. + *self.pagination.waited_for_initial_prev_token.lock().await = false; + } + fn handle_account_data(&self, account_data: Vec>) { let mut handled_read_marker = false; @@ -606,8 +600,8 @@ impl RoomEventCacheInner { // Acquire the lock. let mut room_events = self.events.write().await; - // Reset the events. - room_events.reset(); + // Reset the room's state. + self.clear(&mut room_events).await; // Propagate to observers. let _ = self.sender.send(RoomEventCacheUpdate::Clear); @@ -667,7 +661,7 @@ impl RoomEventCacheInner { // events themselves. { if let Some(prev_token) = &prev_batch { - room_events.push_gap(Gap { prev_token: PaginationToken(prev_token.clone()) }); + room_events.push_gap(Gap { prev_token: prev_token.clone() }); } room_events.push_events(events.clone().into_iter()); @@ -676,7 +670,7 @@ impl RoomEventCacheInner { // Now that all events have been added, we can trigger the // `pagination_token_notifier`. if prev_batch.is_some() { - self.pagination_token_notifier.notify_one(); + self.pagination.token_notifier.notify_one(); } let _ = @@ -684,199 +678,23 @@ impl RoomEventCacheInner { Ok(()) } - - /// Run a single back-pagination `/messages` request. - /// - /// This will only run one request; since a backpagination may need to - /// continue, it's preferable to use [`Self::backpaginate_until`]. - /// - /// Returns the number of messages received in this chunk. - #[instrument(skip(self))] - async fn backpaginate( - &self, - batch_size: u16, - token: Option, - ) -> Result { - // Make sure there's at most one back-pagination request. - let _guard = self.pagination_lock.lock().await; - - // Get messages. - let messages = self - .room - .messages(assign!(MessagesOptions::backward(), { - from: token.as_ref().map(|token| token.0.clone()), - limit: batch_size.into() - })) - .await - .map_err(EventCacheError::SdkError)?; - - // Make sure the `RoomEvents` isn't updated while we are saving events from - // backpagination. - let mut room_events = self.events.write().await; - - // Check that the `token` exists if any. - let gap_identifier = if let Some(token) = token.as_ref() { - let gap_identifier = room_events.chunk_identifier(|chunk| { - matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if prev_token == token) - }); - - // The method has been called with `token` but it doesn't exist in `RoomEvents`, - // it's an error. - if gap_identifier.is_none() { - return Ok(BackPaginationOutcome::UnknownBackpaginationToken); - } - - gap_identifier - } else { - None - }; - - // Would we want to backpaginate again, we'd start from the `end` token as the - // next `from` token. - - let prev_token = - messages.end.map(|prev_token| Gap { prev_token: PaginationToken(prev_token) }); - - // If this token is missing, then we've reached the end of the timeline. - let reached_start = prev_token.is_none(); - - // Note: The chunk could be empty. - // - // If there's any event, they are presented in reverse order (i.e. the first one - // should be prepended first). - let events = messages.chunk; - - let sync_events = events - .iter() - // Reverse the order of the events as `/messages` has been called with `dir=b` - // (backward). The `RoomEvents` API expects the first event to be the oldest. - .rev() - .cloned() - .map(SyncTimelineEvent::from); - - // There is a `token`/gap, let's replace it by new events! - if let Some(gap_identifier) = gap_identifier { - let new_position = { - // Replace the gap by new events. - let new_chunk = room_events - .replace_gap_at(sync_events, gap_identifier) - // SAFETY: we are sure that `gap_identifier` represents a valid - // `ChunkIdentifier` for a `Gap` chunk. - .expect("The `gap_identifier` must represent a `Gap`"); - - new_chunk.first_position() - }; - - // And insert a new gap if there is any `prev_token`. - if let Some(prev_token_gap) = prev_token { - room_events - .insert_gap_at(prev_token_gap, new_position) - // SAFETY: we are sure that `new_position` represents a valid - // `ChunkIdentifier` for an `Item` chunk. - .expect("The `new_position` must represent an `Item`"); - } - - trace!("replaced gap with new events from backpagination"); - - // TODO: implement smarter reconciliation later - //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); - - return Ok(BackPaginationOutcome::Success { events, reached_start }); - } - - // There is no `token`/gap identifier. Let's assume we must prepend the new - // events. - let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos); - - match first_event_pos { - // Is there a first item? Insert at this position. - Some(first_event_pos) => { - if let Some(prev_token_gap) = prev_token { - room_events - .insert_gap_at(prev_token_gap, first_event_pos) - // SAFETY: The `first_event_pos` can only be an `Item` chunk, it's - // an invariant of `LinkedChunk`. Also, it can only represent a valid - // `ChunkIdentifier` as the data structure isn't modified yet. - .expect("`first_event_pos` must point to a valid `Item` chunk when inserting a gap"); - } - - room_events - .insert_events_at(sync_events, first_event_pos) - // SAFETY: The `first_event_pos` can only be an `Item` chunk, it's - // an invariant of `LinkedChunk`. The chunk it points to has not been - // removed. - .expect("The `first_event_pos` must point to a valid `Item` chunk when inserting events"); - } - - // There is no first item. Let's simply push. - None => { - if let Some(prev_token_gap) = prev_token { - room_events.push_gap(prev_token_gap); - } - - room_events.push_events(sync_events); - } - } - - Ok(BackPaginationOutcome::Success { events, reached_start }) - } - - /// Returns the oldest back-pagination token, that is, the one closest to - /// the start of the timeline as we know it. - /// - /// Optionally, wait at most for the given duration for a back-pagination - /// token to be returned by a sync. - async fn oldest_backpagination_token( - &self, - max_wait: Option, - ) -> Result> { - // Optimistically try to return the backpagination token immediately. - fn get_oldest(room_events: RwLockReadGuard<'_, RoomEvents>) -> Option { - room_events.chunks().find_map(|chunk| match chunk.content() { - ChunkContent::Gap(gap) => Some(gap.prev_token.clone()), - ChunkContent::Items(..) => None, - }) - } - - if let Some(token) = get_oldest(self.events.read().await) { - return Ok(Some(token)); - } - - let Some(max_wait) = max_wait else { - // We had no token and no time to wait, so… no tokens. - return Ok(None); - }; - - // Otherwise wait for a notification that we received a token. - // Timeouts are fine, per this function's contract. - let _ = timeout(max_wait, self.pagination_token_notifier.notified()).await; - - Ok(get_oldest(self.events.read().await)) - } } /// The result of a single back-pagination request. #[derive(Debug)] -pub enum BackPaginationOutcome { - /// The back-pagination succeeded, and new events have been found. - Success { - /// Did the back-pagination reach the start of the timeline? - reached_start: bool, - - /// All the events that have been returned in the back-pagination - /// request. - /// - /// Events are presented in reverse order: the first element of the vec, - /// if present, is the most "recent" event from the chunk (or - /// technically, the last one in the topological ordering). - /// - /// Note: they're not deduplicated (TODO: smart reconciliation). - events: Vec, - }, +pub struct BackPaginationOutcome { + /// Did the back-pagination reach the start of the timeline? + pub reached_start: bool, - /// The back-pagination token was unknown to the event cache, and the caller - /// must retry after obtaining a new back-pagination token. - UnknownBackpaginationToken, + /// All the events that have been returned in the back-pagination + /// request. + /// + /// Events are presented in reverse order: the first element of the vec, + /// if present, is the most "recent" event from the chunk (or + /// technically, the last one in the topological ordering). + /// + /// Note: they're not deduplicated (TODO: smart reconciliation). + pub events: Vec, } /// An update related to events happened in a room. @@ -934,228 +752,6 @@ mod tests { assert_matches!(result, Err(EventCacheError::NotSubscribedYet)); } - // Those tests require time to work, and it does not on wasm32. - #[cfg(not(target_arch = "wasm32"))] - mod time_tests { - use std::time::{Duration, Instant}; - - use matrix_sdk_base::RoomState; - use matrix_sdk_test::sync_timeline_event; - use serde_json::json; - use tokio::{spawn, time::sleep}; - use wiremock::{ - matchers::{header, method, path_regex, query_param}, - Mock, ResponseTemplate, - }; - - use super::{super::store::Gap, *}; - use crate::{ - event_cache::{store::PaginationToken, BackPaginationOutcome}, - test_utils::logged_in_client_with_server, - }; - - #[async_test] - async fn test_unknown_pagination_token() { - let (client, server) = logged_in_client_with_server().await; - - let room_id = room_id!("!galette:saucisse.bzh"); - client.base_client().get_or_create_room(room_id, RoomState::Joined); - - client.event_cache().subscribe().unwrap(); - - let (room_event_cache, _drop_handles) = - client.event_cache().for_room(room_id).await.unwrap(); - let room_event_cache = room_event_cache.unwrap(); - - // If I try to back-paginate with an unknown back-pagination token, - let token_name = "unknown"; - let token = PaginationToken(token_name.to_owned()); - - // Then I run into an error. - Mock::given(method("GET")) - .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) - .and(header("authorization", "Bearer 1234")) - .and(query_param("from", token_name)) - .respond_with(ResponseTemplate::new(200).set_body_json(json!({ - "start": token_name, - "chunk": [], - }))) - .expect(1) - .mount(&server) - .await; - - let res = room_event_cache.backpaginate(20, Some(token)).await; - assert_matches!(res, Ok(BackPaginationOutcome::UnknownBackpaginationToken)); - - server.verify().await - } - - #[async_test] - async fn test_wait_no_pagination_token() { - let client = logged_in_client(None).await; - let room_id = room_id!("!galette:saucisse.bzh"); - client.base_client().get_or_create_room(room_id, RoomState::Joined); - - let event_cache = client.event_cache(); - - event_cache.subscribe().unwrap(); - - let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap(); - let room_event_cache = room_event_cache.unwrap(); - - // When I only have events in a room, - { - let mut room_events = room_event_cache.inner.events.write().await; - room_events.push_events([sync_timeline_event!({ - "sender": "b@z.h", - "type": "m.room.message", - "event_id": "$ida", - "origin_server_ts": 12344446, - "content": { "body":"yolo", "msgtype": "m.text" }, - }) - .into()]); - } - - // If I don't wait for the backpagination token, - let found = room_event_cache.oldest_backpagination_token(None).await.unwrap(); - // Then I don't find it. - assert!(found.is_none()); - - // If I wait for a back-pagination token for 0 seconds, - let before = Instant::now(); - let found = room_event_cache - .oldest_backpagination_token(Some(Duration::default())) - .await - .unwrap(); - let waited = before.elapsed(); - // then I don't get any, - assert!(found.is_none()); - // and I haven't waited long. - assert!(waited.as_secs() < 1); - - // If I wait for a back-pagination token for 1 second, - let before = Instant::now(); - let found = room_event_cache - .oldest_backpagination_token(Some(Duration::from_secs(1))) - .await - .unwrap(); - let waited = before.elapsed(); - // then I still don't get any. - assert!(found.is_none()); - // and I've waited a bit. - assert!(waited.as_secs() < 2); - assert!(waited.as_secs() >= 1); - } - - #[async_test] - async fn test_wait_for_pagination_token_already_present() { - let client = logged_in_client(None).await; - let room_id = room_id!("!galette:saucisse.bzh"); - client.base_client().get_or_create_room(room_id, RoomState::Joined); - - let event_cache = client.event_cache(); - - event_cache.subscribe().unwrap(); - - let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap(); - let room_event_cache = room_event_cache.unwrap(); - - let expected_token = PaginationToken("old".to_owned()); - - // When I have events and multiple gaps, in a room, - { - let mut room_events = room_event_cache.inner.events.write().await; - room_events.push_gap(Gap { prev_token: expected_token.clone() }); - room_events.push_events([sync_timeline_event!({ - "sender": "b@z.h", - "type": "m.room.message", - "event_id": "$ida", - "origin_server_ts": 12344446, - "content": { "body":"yolo", "msgtype": "m.text" }, - }) - .into()]); - } - - // If I don't wait for a back-pagination token, - let found = room_event_cache.oldest_backpagination_token(None).await.unwrap(); - // Then I get it. - assert_eq!(found.as_ref(), Some(&expected_token)); - - // If I wait for a back-pagination token for 0 seconds, - let before = Instant::now(); - let found = room_event_cache - .oldest_backpagination_token(Some(Duration::default())) - .await - .unwrap(); - let waited = before.elapsed(); - // then I do get one. - assert_eq!(found.as_ref(), Some(&expected_token)); - // and I haven't waited long. - assert!(waited.as_millis() < 100); - - // If I wait for a back-pagination token for 1 second, - let before = Instant::now(); - let found = room_event_cache - .oldest_backpagination_token(Some(Duration::from_secs(1))) - .await - .unwrap(); - let waited = before.elapsed(); - // then I do get one. - assert_eq!(found.as_ref(), Some(&expected_token)); - // and I haven't waited long. - assert!(waited.as_millis() < 100); - } - - #[async_test] - async fn test_wait_for_late_pagination_token() { - let client = logged_in_client(None).await; - let room_id = room_id!("!galette:saucisse.bzh"); - client.base_client().get_or_create_room(room_id, RoomState::Joined); - - let event_cache = client.event_cache(); - - event_cache.subscribe().unwrap(); - - let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap(); - let room_event_cache = room_event_cache.unwrap(); - - let expected_token = PaginationToken("old".to_owned()); - - let before = Instant::now(); - let cloned_expected_token = expected_token.clone(); - let cloned_room_event_cache = room_event_cache.clone(); - let insert_token_task = spawn(async move { - // If a backpagination token is inserted after 400 milliseconds, - sleep(Duration::from_millis(400)).await; - - { - let mut room_events = cloned_room_event_cache.inner.events.write().await; - room_events.push_gap(Gap { prev_token: cloned_expected_token }); - } - }); - - // Then first I don't get it (if I'm not waiting,) - let found = room_event_cache.oldest_backpagination_token(None).await.unwrap(); - assert!(found.is_none()); - - // And if I wait for the back-pagination token for 600ms, - let found = room_event_cache - .oldest_backpagination_token(Some(Duration::from_millis(600))) - .await - .unwrap(); - let waited = before.elapsed(); - - // then I do get one eventually. - assert_eq!(found.as_ref(), Some(&expected_token)); - // and I have waited between ~400 and ~1000 milliseconds. - assert!(waited.as_secs() < 1); - assert!(waited.as_millis() >= 400); - - // The task succeeded. - insert_token_task.await.unwrap(); - } - } - #[async_test] async fn test_uniq_read_marker() { let client = logged_in_client(None).await; diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs new file mode 100644 index 00000000000..eae47454ad0 --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -0,0 +1,416 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A sub-object for running pagination tasks on a given room. + +use std::{sync::Arc, time::Duration}; + +use eyeball::Subscriber; +use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; +use tokio::{ + sync::{Mutex, Notify, RwLockReadGuard}, + time::timeout, +}; +use tracing::{debug, instrument, trace}; + +use super::{ + paginator::{PaginationResult, Paginator, PaginatorState}, + store::Gap, + BackPaginationOutcome, Result, RoomEventCacheInner, +}; +use crate::event_cache::{linked_chunk::ChunkContent, store::RoomEvents}; + +#[derive(Debug)] +pub(super) struct RoomPaginationData { + /// A notifier that we received a new pagination token. + pub token_notifier: Notify, + + /// The stateful paginator instance used for the integrated pagination. + pub paginator: Paginator, + + /// Have we ever waited for a previous-batch-token to come from sync? We do + /// this at most once per room, the first time we try to run backward + /// pagination. We reset that upon clearing the timeline events. + pub waited_for_initial_prev_token: Mutex, +} + +/// An API object to run pagination queries on a [`super::RoomEventCache`]. +/// +/// Can be created with [`super::RoomEventCache::pagination()`]. +#[allow(missing_debug_implementations)] +pub struct RoomPagination { + pub(super) inner: Arc, +} + +impl RoomPagination { + /// Starts a back-pagination for the requested number of events. + /// + /// This automatically takes care of waiting for a pagination token from + /// sync, if we haven't done that before. + /// + /// # Errors + /// + /// It may return an error if the pagination token used during + /// back-pagination has disappeared while we started the pagination. In + /// that case, it's desirable to call the method again. + #[instrument(skip(self))] + pub async fn run_backwards(&self, batch_size: u16) -> Result { + loop { + if let Some(result) = self.run_backwards_impl(batch_size).await? { + return Ok(result); + } + debug!("back-pagination has been internally restarted because of a timeline reset."); + } + } + + async fn run_backwards_impl(&self, batch_size: u16) -> Result> { + // Make sure there's at most one back-pagination request. + let prev_token = self.get_or_wait_for_token().await; + + let paginator = &self.inner.pagination.paginator; + + paginator.set_idle_state(prev_token.clone(), None).await?; + + // Run the actual pagination. + let PaginationResult { events, hit_end_of_timeline: reached_start } = + paginator.paginate_backward(batch_size.into()).await?; + + // Make sure the `RoomEvents` isn't updated while we are saving events from + // backpagination. + let mut room_events = self.inner.events.write().await; + + // Check that the previous token still exists; otherwise it's a sign that the + // room's timeline has been cleared. + let gap_identifier = if let Some(token) = prev_token { + let gap_identifier = room_events.chunk_identifier(|chunk| { + matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token) + }); + + // The method has been called with `token` but it doesn't exist in `RoomEvents`, + // it's an error. + if gap_identifier.is_none() { + return Ok(None); + } + + gap_identifier + } else { + None + }; + + let prev_token = paginator.prev_batch_token().map(|prev_token| Gap { prev_token }); + + // Note: The chunk could be empty. + // + // If there's any event, they are presented in reverse order (i.e. the first one + // should be prepended first). + + let sync_events = events + .iter() + // Reverse the order of the events as `/messages` has been called with `dir=b` + // (backward). The `RoomEvents` API expects the first event to be the oldest. + .rev() + .cloned() + .map(SyncTimelineEvent::from); + + // There is a `token`/gap, let's replace it by new events! + if let Some(gap_identifier) = gap_identifier { + let new_position = { + // Replace the gap by new events. + let new_chunk = room_events + .replace_gap_at(sync_events, gap_identifier) + // SAFETY: we are sure that `gap_identifier` represents a valid + // `ChunkIdentifier` for a `Gap` chunk. + .expect("The `gap_identifier` must represent a `Gap`"); + + new_chunk.first_position() + }; + + // And insert a new gap if there is any `prev_token`. + if let Some(prev_token_gap) = prev_token { + room_events + .insert_gap_at(prev_token_gap, new_position) + // SAFETY: we are sure that `new_position` represents a valid + // `ChunkIdentifier` for an `Item` chunk. + .expect("The `new_position` must represent an `Item`"); + } + + trace!("replaced gap with new events from backpagination"); + + // TODO: implement smarter reconciliation later + //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); + + return Ok(Some(BackPaginationOutcome { events, reached_start })); + } + + // There is no `token`/gap identifier. Let's assume we must prepend the new + // events. + let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos); + + match first_event_pos { + // Is there a first item? Insert at this position. + Some(first_event_pos) => { + if let Some(prev_token_gap) = prev_token { + room_events + .insert_gap_at(prev_token_gap, first_event_pos) + // SAFETY: The `first_event_pos` can only be an `Item` chunk, it's + // an invariant of `LinkedChunk`. Also, it can only represent a valid + // `ChunkIdentifier` as the data structure isn't modified yet. + .expect("`first_event_pos` must point to a valid `Item` chunk when inserting a gap"); + } + + room_events + .insert_events_at(sync_events, first_event_pos) + // SAFETY: The `first_event_pos` can only be an `Item` chunk, it's + // an invariant of `LinkedChunk`. The chunk it points to has not been + // removed. + .expect("The `first_event_pos` must point to a valid `Item` chunk when inserting events"); + } + + // There is no first item. Let's simply push. + None => { + if let Some(prev_token_gap) = prev_token { + room_events.push_gap(prev_token_gap); + } + + room_events.push_events(sync_events); + } + } + + Ok(Some(BackPaginationOutcome { events, reached_start })) + } + + /// Test-only function to get the latest pagination token, as stored in the + /// room events linked list. + #[doc(hidden)] + pub async fn get_or_wait_for_token(&self) -> Option { + const DEFAULT_INITIAL_WAIT_DURATION: Duration = Duration::from_secs(3); + + let mut waited = self.inner.pagination.waited_for_initial_prev_token.lock().await; + if *waited { + self.oldest_token(None).await + } else { + let token = self.oldest_token(Some(DEFAULT_INITIAL_WAIT_DURATION)).await; + *waited = true; + token + } + } + + /// Returns the oldest back-pagination token, that is, the one closest to + /// the start of the timeline as we know it. + /// + /// Optionally, wait at most for the given duration for a back-pagination + /// token to be returned by a sync. + async fn oldest_token(&self, max_wait: Option) -> Option { + // Optimistically try to return the backpagination token immediately. + fn get_oldest(room_events: RwLockReadGuard<'_, RoomEvents>) -> Option { + room_events.chunks().find_map(|chunk| match chunk.content() { + ChunkContent::Gap(gap) => Some(gap.prev_token.clone()), + ChunkContent::Items(..) => None, + }) + } + + if let Some(token) = get_oldest(self.inner.events.read().await) { + return Some(token); + } + + let Some(max_wait) = max_wait else { + // We had no token and no time to wait, so… no tokens. + return None; + }; + + // Otherwise wait for a notification that we received a token. + // Timeouts are fine, per this function's contract. + let _ = timeout(max_wait, self.inner.pagination.token_notifier.notified()).await; + + get_oldest(self.inner.events.read().await) + } + + /// Returns a subscriber to the pagination status used for the + /// back-pagination integrated to the event cache. + pub fn status(&self) -> Subscriber { + self.inner.pagination.paginator.state() + } +} + +#[cfg(test)] +mod tests { + // Those tests require time to work, and it does not on wasm32. + #[cfg(not(target_arch = "wasm32"))] + mod time_tests { + use std::time::{Duration, Instant}; + + use matrix_sdk_base::RoomState; + use matrix_sdk_test::{async_test, sync_timeline_event}; + use ruma::room_id; + use tokio::{spawn, time::sleep}; + + use crate::{event_cache::store::Gap, test_utils::logged_in_client}; + + #[async_test] + async fn test_wait_no_pagination_token() { + let client = logged_in_client(None).await; + let room_id = room_id!("!galette:saucisse.bzh"); + client.base_client().get_or_create_room(room_id, RoomState::Joined); + + let event_cache = client.event_cache(); + + event_cache.subscribe().unwrap(); + + let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap(); + let room_event_cache = room_event_cache.unwrap(); + + // When I only have events in a room, + { + let mut room_events = room_event_cache.inner.events.write().await; + room_events.push_events([sync_timeline_event!({ + "sender": "b@z.h", + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { "body":"yolo", "msgtype": "m.text" }, + }) + .into()]); + } + + let pagination = room_event_cache.pagination(); + + // If I don't wait for the backpagination token, + let found = pagination.oldest_token(None).await; + // Then I don't find it. + assert!(found.is_none()); + + // If I wait for a back-pagination token for 0 seconds, + let before = Instant::now(); + let found = pagination.oldest_token(Some(Duration::default())).await; + let waited = before.elapsed(); + // then I don't get any, + assert!(found.is_none()); + // and I haven't waited long. + assert!(waited.as_secs() < 1); + + // If I wait for a back-pagination token for 1 second, + let before = Instant::now(); + let found = pagination.oldest_token(Some(Duration::from_secs(1))).await; + let waited = before.elapsed(); + // then I still don't get any. + assert!(found.is_none()); + // and I've waited a bit. + assert!(waited.as_secs() < 2); + assert!(waited.as_secs() >= 1); + } + + #[async_test] + async fn test_wait_for_pagination_token_already_present() { + let client = logged_in_client(None).await; + let room_id = room_id!("!galette:saucisse.bzh"); + client.base_client().get_or_create_room(room_id, RoomState::Joined); + + let event_cache = client.event_cache(); + + event_cache.subscribe().unwrap(); + + let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap(); + let room_event_cache = room_event_cache.unwrap(); + + let expected_token = "old".to_owned(); + + // When I have events and multiple gaps, in a room, + { + let mut room_events = room_event_cache.inner.events.write().await; + room_events.push_gap(Gap { prev_token: expected_token.clone() }); + room_events.push_events([sync_timeline_event!({ + "sender": "b@z.h", + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { "body":"yolo", "msgtype": "m.text" }, + }) + .into()]); + } + + let paginator = room_event_cache.pagination(); + + // If I don't wait for a back-pagination token, + let found = paginator.oldest_token(None).await; + // Then I get it. + assert_eq!(found.as_ref(), Some(&expected_token)); + + // If I wait for a back-pagination token for 0 seconds, + let before = Instant::now(); + let found = paginator.oldest_token(Some(Duration::default())).await; + let waited = before.elapsed(); + // then I do get one. + assert_eq!(found.as_ref(), Some(&expected_token)); + // and I haven't waited long. + assert!(waited.as_millis() < 100); + + // If I wait for a back-pagination token for 1 second, + let before = Instant::now(); + let found = paginator.oldest_token(Some(Duration::from_secs(1))).await; + let waited = before.elapsed(); + // then I do get one. + assert_eq!(found, Some(expected_token)); + // and I haven't waited long. + assert!(waited.as_millis() < 100); + } + + #[async_test] + async fn test_wait_for_late_pagination_token() { + let client = logged_in_client(None).await; + let room_id = room_id!("!galette:saucisse.bzh"); + client.base_client().get_or_create_room(room_id, RoomState::Joined); + + let event_cache = client.event_cache(); + + event_cache.subscribe().unwrap(); + + let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap(); + let room_event_cache = room_event_cache.unwrap(); + + let expected_token = "old".to_owned(); + + let before = Instant::now(); + let cloned_expected_token = expected_token.clone(); + let cloned_room_event_cache = room_event_cache.clone(); + let insert_token_task = spawn(async move { + // If a backpagination token is inserted after 400 milliseconds, + sleep(Duration::from_millis(400)).await; + + { + let mut room_events = cloned_room_event_cache.inner.events.write().await; + room_events.push_gap(Gap { prev_token: cloned_expected_token }); + } + }); + + let pagination = room_event_cache.pagination(); + + // Then first I don't get it (if I'm not waiting,) + let found = pagination.oldest_token(None).await; + assert!(found.is_none()); + + // And if I wait for the back-pagination token for 600ms, + let found = pagination.oldest_token(Some(Duration::from_millis(600))).await; + let waited = before.elapsed(); + + // then I do get one eventually. + assert_eq!(found, Some(expected_token)); + // and I have waited between ~400 and ~1000 milliseconds. + assert!(waited.as_secs() < 1); + assert!(waited.as_millis() >= 400); + + // The task succeeded. + insert_token_task.await.unwrap(); + } + } +} diff --git a/crates/matrix-sdk/src/event_cache/paginator.rs b/crates/matrix-sdk/src/event_cache/paginator.rs index c4f2ba5a6de..016b1574d97 100644 --- a/crates/matrix-sdk/src/event_cache/paginator.rs +++ b/crates/matrix-sdk/src/event_cache/paginator.rs @@ -30,6 +30,7 @@ use crate::{ /// Current state of a [`Paginator`]. #[derive(Debug, PartialEq, Copy, Clone)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))] pub enum PaginatorState { /// The initial state of the paginator. Initial, @@ -66,6 +67,29 @@ pub enum PaginatorError { SdkError(#[source] crate::Error), } +/// Pagination token data, indicating in which state is the current pagination. +#[derive(Clone, Debug)] +enum PaginationToken { + /// We never had a pagination token, so we'll start back-paginating from the + /// end, or forward-paginating from the start. + None, + /// We paginated once before, and we received a prev/next batch token that + /// we may reuse for the next query. + HasMore(String), + /// We've hit one end of the timeline (either the start or the actual end), + /// so there's no need to continue paginating. + HitEnd, +} + +impl From> for PaginationToken { + fn from(token: Option) -> Self { + match token { + Some(val) => Self::HasMore(val), + None => Self::None, + } + } +} + /// A stateful object to reach to an event, and then paginate backward and /// forward from it. /// @@ -80,12 +104,12 @@ pub struct Paginator { /// The token to run the next backward pagination. /// /// This mutex is only taken for short periods of time, so it's sync. - prev_batch_token: Mutex>, + prev_batch_token: Mutex, /// The token to run the next forward pagination. /// /// This mutex is only taken for short periods of time, so it's sync. - next_batch_token: Mutex>, + next_batch_token: Mutex, } #[cfg(not(tarpaulin_include))] @@ -168,8 +192,8 @@ impl Paginator { Self { room, state: SharedObservable::new(PaginatorState::Initial), - prev_batch_token: Mutex::new(None), - next_batch_token: Mutex::new(None), + prev_batch_token: Mutex::new(None.into()), + next_batch_token: Mutex::new(None.into()), } } @@ -188,6 +212,45 @@ impl Paginator { self.state.subscribe() } + /// Prepares the paginator to be in the idle state, ready for backwards- and + /// forwards- pagination. + /// + /// Will return an `InvalidPreviousState` error if the paginator is busy + /// (running /context or /messages). + pub(super) async fn set_idle_state( + &self, + prev_batch_token: Option, + next_batch_token: Option, + ) -> Result<(), PaginatorError> { + let prev_state = self.state.get(); + + match prev_state { + PaginatorState::Initial | PaginatorState::Idle => {} + PaginatorState::FetchingTargetEvent | PaginatorState::Paginating => { + // The paginator was busy. Don't interrupt it. + return Err(PaginatorError::InvalidPreviousState { + // Technically it's initial OR idle, but we don't really care here. + expected: PaginatorState::Idle, + actual: prev_state, + }); + } + } + + self.state.set_if_not_eq(PaginatorState::Idle); + *self.prev_batch_token.lock().unwrap() = prev_batch_token.into(); + *self.next_batch_token.lock().unwrap() = next_batch_token.into(); + + Ok(()) + } + + /// Returns the current previous batch token, as stored in this paginator. + pub(super) fn prev_batch_token(&self) -> Option { + match &*self.prev_batch_token.lock().unwrap() { + PaginationToken::HitEnd | PaginationToken::None => None, + PaginationToken::HasMore(token) => Some(token.clone()), + } + } + /// Starts the pagination from the initial event, requesting `num_events` /// additional context events. /// @@ -224,8 +287,15 @@ impl Paginator { let has_prev = response.prev_batch_token.is_some(); let has_next = response.next_batch_token.is_some(); - *self.prev_batch_token.lock().unwrap() = response.prev_batch_token; - *self.next_batch_token.lock().unwrap() = response.next_batch_token; + + *self.prev_batch_token.lock().unwrap() = match response.prev_batch_token { + Some(token) => PaginationToken::HasMore(token), + None => PaginationToken::HitEnd, + }; + *self.next_batch_token.lock().unwrap() = match response.next_batch_token { + Some(token) => PaginationToken::HasMore(token), + None => PaginationToken::HitEnd, + }; // Forget the reset state guard, so its Drop method is not called. reset_state_guard.disarm(); @@ -285,16 +355,19 @@ impl Paginator { &self, dir: Direction, num_events: UInt, - token_lock: &Mutex>, + token_lock: &Mutex, ) -> Result { self.check_state(PaginatorState::Idle)?; let token = { let token = token_lock.lock().unwrap(); - if token.is_none() { - return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true }); - }; - token.clone() + match &*token { + PaginationToken::None => None, + PaginationToken::HasMore(val) => Some(val.clone()), + PaginationToken::HitEnd => { + return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true }); + } + } }; // Note: it's possible two callers have checked the state and both figured it's @@ -321,7 +394,11 @@ impl Paginator { // may be incorrect. let hit_end_of_timeline = response.end.is_none(); - *token_lock.lock().unwrap() = response.end; + + *token_lock.lock().unwrap() = match response.end { + Some(val) => PaginationToken::HasMore(val), + None => PaginationToken::HitEnd, + }; // TODO: what to do with state events? @@ -997,9 +1074,10 @@ mod tests { // Assuming a paginator ready to back- or forward- paginate, let paginator = Paginator::new(room.clone()); - paginator.state.set(PaginatorState::Idle); - *paginator.prev_batch_token.lock().unwrap() = Some("prev".to_owned()); - *paginator.next_batch_token.lock().unwrap() = Some("next".to_owned()); + paginator + .set_idle_state(Some("prev".to_owned()), Some("next".to_owned())) + .await + .unwrap(); let paginator = Arc::new(paginator); diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index fce7c29709c..4d8ee00f7ff 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -20,15 +20,11 @@ use super::linked_chunk::{ Chunk, ChunkIdentifier, Error, Iter, IterBackward, LinkedChunk, Position, }; -/// A newtype wrapper for a pagination token returned by a /messages response. -#[derive(Clone, Debug, PartialEq)] -pub struct PaginationToken(pub String); - #[derive(Clone, Debug)] pub struct Gap { /// The token to use in the query, extracted from a previous "from" / /// "end" field of a `/messages` response. - pub prev_token: PaginationToken, + pub prev_token: String, } const DEFAULT_CHUNK_CAPACITY: usize = 128; diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 1ce5e9c74fb..72029127f63 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -358,17 +358,15 @@ async fn test_backpaginate_once() { .await; // Then if I backpaginate, - let token = room_event_cache - .oldest_backpagination_token(Some(Duration::from_secs(1))) - .await - .unwrap(); - assert!(token.is_some()); + let pagination = room_event_cache.pagination(); - room_event_cache.backpaginate(20, token).await.unwrap() + assert!(pagination.get_or_wait_for_token().await.is_some()); + + pagination.run_backwards(20).await.unwrap() }; // I'll get all the previous events, in "reverse" order (same as the response). - assert_let!(BackPaginationOutcome::Success { events, reached_start } = outcome); + let BackPaginationOutcome { events, reached_start } = outcome; assert!(reached_start); assert_event_matches_msg(&events[0], "world"); @@ -449,20 +447,15 @@ async fn test_backpaginate_multiple_iterations() { .await; // Then if I backpaginate in a loop, - while let Some(token) = - room_event_cache.oldest_backpagination_token(Some(Duration::from_secs(1))).await.unwrap() - { - match room_event_cache.backpaginate(20, Some(token)).await.unwrap() { - BackPaginationOutcome::Success { reached_start, events } => { - if !global_reached_start { - global_reached_start = reached_start; - } - global_events.extend(events); - } - BackPaginationOutcome::UnknownBackpaginationToken => { - panic!("shouldn't run into unknown backpagination error") - } + let pagination = room_event_cache.pagination(); + while pagination.get_or_wait_for_token().await.is_some() { + let BackPaginationOutcome { reached_start, events } = + pagination.run_backwards(20).await.unwrap(); + + if !global_reached_start { + global_reached_start = reached_start; } + global_events.extend(events); num_iterations += 1; } @@ -501,7 +494,7 @@ async fn test_reset_while_backpaginating() { // token, let room_id = room_id!("!omelette:fromage.fr"); - let event_builder = EventBuilder::new(); + let ev_factory = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); let mut sync_builder = SyncResponseBuilder::new(); { @@ -509,10 +502,7 @@ async fn test_reset_while_backpaginating() { JoinedRoomBuilder::new(room_id) // Note to self: a timeline must have at least single event to be properly // serialized. - .add_timeline_event(event_builder.make_sync_message_event( - user_id!("@a:b.c"), - RoomMessageEventContent::text_plain("heyo"), - )) + .add_timeline_event(ev_factory.text_msg("heyo").into_raw_sync()) .set_timeline_prev_batch("first_backpagination".to_owned()), ); let response_body = sync_builder.build_json_sync_response(); @@ -552,17 +542,14 @@ async fn test_reset_while_backpaginating() { JoinedRoomBuilder::new(room_id) // Note to self: a timeline must have at least single event to be properly // serialized. - .add_timeline_event(event_builder.make_sync_message_event( - user_id!("@a:b.c"), - RoomMessageEventContent::text_plain("heyo"), - )) + .add_timeline_event(ev_factory.text_msg("heyo").into_raw_sync()) .set_timeline_prev_batch("second_backpagination".to_owned()) .set_timeline_limited(), ); let sync_response_body = sync_builder.build_json_sync_response(); - // First back-pagination request: - let chunk = non_sync_events!(event_builder, [ (room_id, "$2": "lalala") ]); + // Mock the first back-pagination request: + let chunk = vec![ev_factory.text_msg("lalala").into_raw_timeline()]; let response_json = json!({ "chunk": chunk, "start": "t392-516_47314_0_7_1_1_1_11444_1", @@ -581,13 +568,26 @@ async fn test_reset_while_backpaginating() { .mount(&server) .await; - let first_token = - room_event_cache.oldest_backpagination_token(Some(Duration::from_secs(1))).await.unwrap(); + // Mock the second back-pagination request, that will be hit after the reset + // caused by the sync. + mock_messages( + &server, + "second_backpagination", + Some("third_backpagination"), + vec![ev_factory.text_msg("finally!").into_raw_timeline()], + ) + .await; + + // Run the pagination! + let pagination = room_event_cache.pagination(); + + let first_token = pagination.get_or_wait_for_token().await; assert!(first_token.is_some()); - let rec = room_event_cache.clone(); - let first_token_clone = first_token.clone(); - let backpagination = spawn(async move { rec.backpaginate(20, first_token_clone).await }); + let backpagination = spawn({ + let pagination = room_event_cache.pagination(); + async move { pagination.run_backwards(20).await } + }); // Receive the sync response (which clears the timeline). mock_sync(&server, sync_response_body, None).await; @@ -595,14 +595,14 @@ async fn test_reset_while_backpaginating() { let outcome = backpagination.await.expect("join failed").unwrap(); - // Backpagination should be confused, and the operation should result in an - // unknown token. - assert_matches!(outcome, BackPaginationOutcome::UnknownBackpaginationToken); + // Backpagination will automatically restart, so eventually we get the events. + let BackPaginationOutcome { events, .. } = outcome; + assert!(!events.is_empty()); - // Now if we retrieve the earliest token, it's not the one we had before. - let second_token = room_event_cache.oldest_backpagination_token(None).await.unwrap().unwrap(); + // Now if we retrieve the oldest token, it's set to something else. + let second_token = pagination.get_or_wait_for_token().await.unwrap(); assert!(first_token.unwrap() != second_token); - assert_eq!(second_token.0, "second_backpagination"); + assert_eq!(second_token, "third_backpagination"); } #[async_test] @@ -650,14 +650,13 @@ async fn test_backpaginating_without_token() { .await; // We don't have a token. - let token = - room_event_cache.oldest_backpagination_token(Some(Duration::from_secs(1))).await.unwrap(); - assert!(token.is_none()); + let pagination = room_event_cache.pagination(); + assert!(pagination.get_or_wait_for_token().await.is_none()); // If we try to back-paginate with a token, it will hit the end of the timeline // and give us the resulting event. - let outcome = room_event_cache.backpaginate(20, token).await.unwrap(); - assert_let!(BackPaginationOutcome::Success { events, reached_start } = outcome); + let BackPaginationOutcome { events, reached_start } = + pagination.run_backwards(20).await.unwrap(); assert!(reached_start); diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index 16fa600c266..640f87fbdf9 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -29,9 +29,7 @@ use matrix_sdk::{ use matrix_sdk_ui::{ room_list_service, sync_service::{self, SyncService}, - timeline::{ - PaginationOptions, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem, - }, + timeline::{TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem}, Timeline as SdkTimeline, }; use ratatui::{prelude::*, style::palette::tailwind, widgets::*}; @@ -343,10 +341,7 @@ impl App { // Start a new one, request batches of 20 events, stop after 10 timeline items // have been added. *pagination = Some(spawn(async move { - if let Err(err) = sdk_timeline - .live_paginate_backwards(PaginationOptions::until_num_items(20, 10)) - .await - { + if let Err(err) = sdk_timeline.live_paginate_backwards(20).await { // TODO: would be nice to be able to set the status // message remotely? //self.set_status_message(format!(