diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 67274ae179a..dd42d1373b1 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -981,32 +981,6 @@ impl SendAttachmentJoinHandle { } } -#[derive(uniffi::Enum)] -pub enum PaginationOptions { - SimpleRequest { event_limit: u16, wait_for_token: bool }, - UntilNumItems { event_limit: u16, items: u16, wait_for_token: bool }, -} - -impl From for matrix_sdk_ui::timeline::PaginationOptions<'static> { - fn from(value: PaginationOptions) -> Self { - use matrix_sdk_ui::timeline::PaginationOptions as Opts; - let (wait_for_token, mut opts) = match value { - PaginationOptions::SimpleRequest { event_limit, wait_for_token } => { - (wait_for_token, Opts::simple_request(event_limit)) - } - PaginationOptions::UntilNumItems { event_limit, items, wait_for_token } => { - (wait_for_token, Opts::until_num_items(event_limit, items)) - } - }; - - if wait_for_token { - opts = opts.wait_for_token(); - } - - opts - } -} - /// A [`TimelineItem`](super::TimelineItem) that doesn't correspond to an event. #[derive(uniffi::Enum)] pub enum VirtualTimelineItem { diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 3b17b771a77..09d9c7e138d 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -95,7 +95,6 @@ pub use self::{ event_type_filter::TimelineEventTypeFilter, inner::default_event_filter, item::{TimelineItem, TimelineItemKind}, - pagination::{PaginationOptions, PaginationOutcome}, polls::PollResult, reactions::ReactionSenderData, sliding_sync_ext::SlidingSyncRoomExt, diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index 61faf4c2639..95508507126 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, ops::ControlFlow, sync::Arc}; - use eyeball::Subscriber; use matrix_sdk::event_cache::{ self, @@ -32,9 +30,7 @@ impl super::Timeline { #[instrument(skip_all, fields(room_id = ?self.room().room_id()))] pub async fn paginate_backwards(&self, num_events: u16) -> Result { if self.inner.is_live().await { - Ok(self - .live_paginate_backwards(PaginationOptions::until_num_items(num_events, 20)) - .await?) + Ok(self.live_paginate_backwards(num_events).await?) } else { Ok(self.focused_paginate_backwards(num_events).await?) } @@ -64,79 +60,50 @@ impl super::Timeline { /// on a specific event. /// /// Returns whether we hit the start of the timeline. - #[instrument(skip_all, fields(room_id = ?self.room().room_id(), ?options))] - pub async fn live_paginate_backwards( - &self, - mut options: PaginationOptions<'_>, - ) -> event_cache::Result { - let initial_options = options.clone(); - let mut outcome = PaginationOutcome::default(); - + #[instrument(skip_all, fields(room_id = ?self.room().room_id()))] + pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result { let pagination = self.event_cache.pagination(); - while let Some(batch_size) = options.next_event_limit(outcome) { - loop { - let result = pagination.run_backwards(batch_size).await; - - let event_cache_outcome = match result { - Ok(outcome) => outcome, + loop { + let result = pagination.run_backwards(batch_size).await; - Err(EventCacheError::BackpaginationError( - PaginatorError::InvalidPreviousState { actual, .. }, - )) if actual == PaginatorState::Paginating => { - warn!("Another pagination request is already happening, returning early"); - return Ok(false); - } + let event_cache_outcome = match result { + Ok(outcome) => outcome, - Err(err) => return Err(err), - }; - - match event_cache_outcome { - BackPaginationOutcome::Success { events, reached_start } => { - let num_events = events.len(); - trace!("Back-pagination succeeded with {num_events} events"); - - let handle_many_res = self - .inner - .add_events_at( - events, - TimelineEnd::Front, - RemoteEventOrigin::Pagination, - ) - .await; + Err(EventCacheError::BackpaginationError( + PaginatorError::InvalidPreviousState { + actual: PaginatorState::Paginating, .. + }, + )) => { + warn!("Another pagination request is already happening, returning early"); + return Ok(false); + } - if reached_start { - return Ok(true); - } + Err(err) => return Err(err), + }; - outcome.events_received = num_events as u64; - outcome.total_events_received += outcome.events_received; + let BackPaginationOutcome { events, reached_start } = event_cache_outcome; - outcome.items_added = handle_many_res.items_added; - outcome.items_updated = handle_many_res.items_updated; - outcome.total_items_added += outcome.items_added; - outcome.total_items_updated += outcome.items_updated; + let num_events = events.len(); + trace!("Back-pagination succeeded with {num_events} events"); - if num_events == 0 { - // As an exceptional contract: if there were no events in the response, - // and we've not hit the start of the timeline, so retry until we get - // some events. - continue; - } - } + self.inner + .add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination) + .await; - BackPaginationOutcome::UnknownBackpaginationToken => { - // The token has been lost. - // It's possible the timeline has been cleared; restart the whole - // back-pagination. - outcome = Default::default(); - options = initial_options.clone(); - } - } + if reached_start { + return Ok(true); + } - // Exit the inner loop, and ask for another limit. - break; + if num_events == 0 { + // As an exceptional contract: if there were no events in the response, + // and we've not hit the start of the timeline, retry until we get + // some events or reach the start of the timeline. + continue; } + + // Exit the inner loop, and ask for another limit. + break; } Ok(false) @@ -150,216 +117,3 @@ impl super::Timeline { self.event_cache.pagination().status() } } - -/// Options for pagination. -#[derive(Clone)] -pub struct PaginationOptions<'a> { - inner: PaginationOptionsInner<'a>, - pub(super) wait_for_token: bool, -} - -impl<'a> PaginationOptions<'a> { - /// Do pagination requests until we receive some events, asking the server - /// for the given maximum number of events. - /// - /// The server may choose to return fewer events, even if the start or end - /// of the visible timeline is not yet reached. - pub fn simple_request(event_limit: u16) -> Self { - Self::new(PaginationOptionsInner::SingleRequest { event_limit_if_first: Some(event_limit) }) - } - - /// Continually paginate with a fixed `limit` until at least the given - /// amount of timeline items have been added (unless the start or end of the - /// visible timeline is reached). - /// - /// The `event_limit` represents the maximum number of events the server - /// should return in one batch. It may choose to return fewer events per - /// response. - pub fn until_num_items(event_limit: u16, items: u16) -> Self { - Self::new(PaginationOptionsInner::UntilNumItems { event_limit, items }) - } - - /// Paginate once with the given initial maximum number of events, then - /// do more requests based on the user-provided strategy - /// callback. - /// - /// The callback is given numbers on the events and resulting timeline - /// items for the last request as well as summed over all - /// requests in a `paginate_backwards` call, and can decide - /// whether to do another request (by returning - /// `ControlFlow::Continue(next_event_limit)`) or not (by returning - /// `ControlFlow::Break(())`). - pub fn custom( - initial_event_limit: u16, - pagination_strategy: impl Fn(PaginationOutcome) -> ControlFlow<(), u16> + Send + Sync + 'a, - ) -> Self { - Self::new(PaginationOptionsInner::Custom { - event_limit_if_first: Some(initial_event_limit), - strategy: Arc::new(pagination_strategy), - }) - } - - /// Whether to wait for a pagination token to be set before starting. - /// - /// This is not something you should normally do since it can lead to very - /// long wait times, however in the specific case of using sliding sync with - /// the current proxy and subscribing to the room in a way that you know a - /// sync will be coming in soon, it can be useful to reduce unnecessary - /// traffic from duplicated events and avoid ordering issues from the sync - /// proxy returning older data than pagination. - pub fn wait_for_token(mut self) -> Self { - self.wait_for_token = true; - self - } - - pub(super) fn next_event_limit( - &mut self, - pagination_outcome: PaginationOutcome, - ) -> Option { - match &mut self.inner { - PaginationOptionsInner::SingleRequest { event_limit_if_first } => { - event_limit_if_first.take() - } - PaginationOptionsInner::UntilNumItems { items, event_limit } => { - (pagination_outcome.total_items_added < *items as u64).then_some(*event_limit) - } - PaginationOptionsInner::Custom { event_limit_if_first, strategy } => { - event_limit_if_first.take().or_else(|| match strategy(pagination_outcome) { - ControlFlow::Continue(event_limit) => Some(event_limit), - ControlFlow::Break(_) => None, - }) - } - } - } - - fn new(inner: PaginationOptionsInner<'a>) -> Self { - Self { inner, wait_for_token: false } - } -} - -#[derive(Clone)] -pub enum PaginationOptionsInner<'a> { - SingleRequest { - event_limit_if_first: Option, - }, - UntilNumItems { - event_limit: u16, - items: u16, - }, - Custom { - event_limit_if_first: Option, - strategy: Arc ControlFlow<(), u16> + Send + Sync + 'a>, - }, -} - -#[cfg(not(tarpaulin_include))] -impl<'a> fmt::Debug for PaginationOptions<'a> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.inner { - PaginationOptionsInner::SingleRequest { event_limit_if_first } => f - .debug_struct("SingleRequest") - .field("event_limit_if_first", event_limit_if_first) - .finish(), - PaginationOptionsInner::UntilNumItems { items, event_limit } => f - .debug_struct("UntilNumItems") - .field("items", items) - .field("event_limit", event_limit) - .finish(), - PaginationOptionsInner::Custom { event_limit_if_first, .. } => f - .debug_struct("Custom") - .field("event_limit_if_first", event_limit_if_first) - .finish(), - } - } -} - -/// The result of a successful pagination request. -#[derive(Clone, Copy, Debug, Default)] -#[non_exhaustive] -pub struct PaginationOutcome { - /// The number of events received in last pagination response. - pub events_received: u64, - - /// The number of timeline items added by the last pagination response. - pub items_added: u64, - - /// The number of timeline items updated by the last pagination - /// response. - pub items_updated: u64, - - /// The number of events received by a `paginate_backwards` call so far. - pub total_events_received: u64, - - /// The total number of items added by a `paginate_backwards` call so - /// far. - pub total_items_added: u64, - - /// The total number of items updated by a `paginate_backwards` call so - /// far. - pub total_items_updated: u64, -} - -#[cfg(test)] -mod tests { - use std::{ - ops::ControlFlow, - sync::atomic::{AtomicU8, Ordering}, - }; - - use super::{PaginationOptions, PaginationOutcome}; - - fn bump_outcome(outcome: &mut PaginationOutcome) { - outcome.events_received = 8; - outcome.items_added = 6; - outcome.items_updated = 1; - outcome.total_events_received += 8; - outcome.total_items_added += 6; - outcome.total_items_updated += 1; - } - - #[test] - fn test_simple_request_limits() { - let mut opts = PaginationOptions::simple_request(10); - let mut outcome = PaginationOutcome::default(); - assert_eq!(opts.next_event_limit(outcome), Some(10)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), None); - } - - #[test] - fn test_until_num_items_limits() { - let mut opts = PaginationOptions::until_num_items(10, 10); - let mut outcome = PaginationOutcome::default(); - assert_eq!(opts.next_event_limit(outcome), Some(10)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), Some(10)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), None); - } - - #[test] - fn test_custom_limits() { - let num_calls = AtomicU8::new(0); - let mut opts = PaginationOptions::custom(8, |outcome| { - num_calls.fetch_add(1, Ordering::AcqRel); - if outcome.total_items_added - outcome.total_items_updated < 6 { - ControlFlow::Continue(12) - } else { - ControlFlow::Break(()) - } - }); - let mut outcome = PaginationOutcome::default(); - assert_eq!(opts.next_event_limit(outcome), Some(8)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), Some(12)); - - bump_outcome(&mut outcome); - assert_eq!(opts.next_event_limit(outcome), None); - - assert_eq!(num_calls.load(Ordering::Acquire), 2); - } -} diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs index dd01ee671a9..86d1ab9b56c 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs @@ -16,7 +16,10 @@ use std::{sync::Arc, time::Duration}; use assert_matches2::assert_let; use eyeball_im::VectorDiff; -use futures_util::future::{join, join3}; +use futures_util::{ + future::{join, join3}, + FutureExt as _, +}; use matrix_sdk::{ config::SyncSettings, event_cache::paginator::PaginatorState, test_utils::logged_in_client_with_server, @@ -24,9 +27,7 @@ use matrix_sdk::{ use matrix_sdk_test::{ async_test, EventBuilder, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, ALICE, BOB, }; -use matrix_sdk_ui::timeline::{ - AnyOtherFullStateEventContent, PaginationOptions, RoomExt, TimelineItemContent, -}; +use matrix_sdk_ui::timeline::{AnyOtherFullStateEventContent, RoomExt, TimelineItemContent}; use once_cell::sync::Lazy; use ruma::{ events::{ @@ -76,7 +77,7 @@ async fn test_back_pagination() { .await; let paginate = async { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); server.reset().await; }; let observe_paginating = async { @@ -135,8 +136,7 @@ async fn test_back_pagination() { .mount(&server) .await; - let hit_start = - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + let hit_start = timeline.live_paginate_backwards(10).await.unwrap(); assert!(hit_start); assert_next_eq!(back_pagination_status, PaginatorState::Idle); } @@ -202,7 +202,7 @@ async fn test_back_pagination_highlighted() { .mount(&server) .await; - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); server.reset().await; let first = assert_next_matches!( @@ -269,10 +269,7 @@ async fn test_wait_for_token() { mock_sync(&server, sync_builder.build_json_sync_response(), None).await; let paginate = async { - timeline - .live_paginate_backwards(PaginationOptions::simple_request(10).wait_for_token()) - .await - .unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); }; let observe_paginating = async { assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating)); @@ -290,7 +287,7 @@ async fn test_wait_for_token() { } #[async_test] -async fn test_dedup() { +async fn test_dedup_pagination() { let room_id = room_id!("!a98sd12bjh:example.org"); let (client, server) = logged_in_client_with_server().await; let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); @@ -335,10 +332,10 @@ async fn test_dedup() { // If I try to paginate twice at the same time, let paginate_1 = async { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); }; let paginate_2 = async { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); }; timeout(Duration::from_secs(5), join(paginate_1, paginate_2)).await.unwrap(); @@ -427,12 +424,7 @@ async fn test_timeline_reset_while_paginating() { let mut back_pagination_status = timeline.back_pagination_status(); - let paginate = async { - timeline - .live_paginate_backwards(PaginationOptions::simple_request(10).wait_for_token()) - .await - .unwrap() - }; + let paginate = async { timeline.live_paginate_backwards(10).await.unwrap() }; let observe_paginating = async { let mut seen_paginating = false; @@ -590,7 +582,7 @@ async fn test_empty_chunk() { .await; let paginate = async { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); server.reset().await; }; let observe_paginating = async { @@ -689,8 +681,7 @@ async fn test_until_num_items_with_empty_chunk() { .await; let paginate = async { - timeline.live_paginate_backwards(PaginationOptions::until_num_items(4, 4)).await.unwrap(); - server.reset().await; + timeline.live_paginate_backwards(10).await.unwrap(); }; let observe_paginating = async { assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating)); @@ -734,6 +725,8 @@ async fn test_until_num_items_with_empty_chunk() { ); assert!(day_divider.is_day_divider()); + timeline.live_paginate_backwards(10).await.unwrap(); + let message = assert_next_matches!( timeline_stream, VectorDiff::PushFront { value } => value @@ -782,11 +775,11 @@ async fn test_back_pagination_aborted() { let paginate = spawn({ let timeline = timeline.clone(); async move { - timeline.live_paginate_backwards(PaginationOptions::simple_request(10)).await.unwrap(); + timeline.live_paginate_backwards(10).await.unwrap(); } }); - assert_eq!(back_pagination_status.next().await, Some(PaginationStatus::Paginating)); + assert_eq!(back_pagination_status.next().await, Some(PaginatorState::Paginating)); // Abort the pagination! paginate.abort(); @@ -795,7 +788,7 @@ async fn test_back_pagination_aborted() { assert!(paginate.await.unwrap_err().is_cancelled()); // The timeline should automatically reset to idle. - assert_next_eq!(back_pagination_status, PaginationStatus::Idle); + assert_next_eq!(back_pagination_status, PaginatorState::Idle); // And there should be no other pending pagination status updates. assert!(back_pagination_status.next().now_or_never().is_none()); diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index c785e6ae332..e482b3a29e7 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -642,26 +642,19 @@ impl RoomEventCacheInner { /// The result of a single back-pagination request. #[derive(Debug)] -pub enum BackPaginationOutcome { - /// The back-pagination succeeded, and new events have been found. - Success { - /// Did the back-pagination reach the start of the timeline? - reached_start: bool, - - /// All the events that have been returned in the back-pagination - /// request. - /// - /// Events are presented in reverse order: the first element of the vec, - /// if present, is the most "recent" event from the chunk (or - /// technically, the last one in the topological ordering). - /// - /// Note: they're not deduplicated (TODO: smart reconciliation). - events: Vec, - }, +pub struct BackPaginationOutcome { + /// Did the back-pagination reach the start of the timeline? + pub reached_start: bool, - /// The back-pagination token was unknown to the event cache, and the caller - /// must retry after obtaining a new back-pagination token. - UnknownBackpaginationToken, + /// All the events that have been returned in the back-pagination + /// request. + /// + /// Events are presented in reverse order: the first element of the vec, + /// if present, is the most "recent" event from the chunk (or + /// technically, the last one in the topological ordering). + /// + /// Note: they're not deduplicated (TODO: smart reconciliation). + pub events: Vec, } /// An update related to events happened in a room. diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index b8b079a53c5..64d7b4ba947 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -22,7 +22,7 @@ use tokio::{ sync::{Mutex, Notify, RwLockReadGuard}, time::timeout, }; -use tracing::{instrument, trace}; +use tracing::{debug, instrument, trace}; use super::{ paginator::{PaginationResult, Paginator, PaginatorState}, @@ -64,9 +64,17 @@ impl RoomPagination { /// It may return an error if the pagination token used during /// back-pagination has disappeared while we started the pagination. In /// that case, it's desirable to call the method again. - // TODO(bnjbvr): that's bad API, this API should restart for us! #[instrument(skip(self))] pub async fn run_backwards(&self, batch_size: u16) -> Result { + loop { + if let Some(result) = self.run_backwards_impl(batch_size).await? { + return Ok(result); + } + debug!("back-pagination has been internally restarted because of a timeline reset."); + } + } + + async fn run_backwards_impl(&self, batch_size: u16) -> Result> { // Make sure there's at most one back-pagination request. let prev_token = self.get_or_wait_for_token().await; @@ -92,7 +100,7 @@ impl RoomPagination { // The method has been called with `token` but it doesn't exist in `RoomEvents`, // it's an error. if gap_identifier.is_none() { - return Ok(BackPaginationOutcome::UnknownBackpaginationToken); + return Ok(None); } gap_identifier @@ -142,7 +150,7 @@ impl RoomPagination { // TODO: implement smarter reconciliation later //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); - return Ok(BackPaginationOutcome::Success { events, reached_start }); + return Ok(Some(BackPaginationOutcome { events, reached_start })); } // There is no `token`/gap identifier. Let's assume we must prepend the new @@ -179,7 +187,7 @@ impl RoomPagination { } } - Ok(BackPaginationOutcome::Success { events, reached_start }) + Ok(Some(BackPaginationOutcome { events, reached_start })) } /// Test-only function to get the latest pagination token, as stored in the diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index c37f797f910..62b2cb63c9d 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -3,7 +3,7 @@ use std::time::Duration; use assert_matches2::{assert_let, assert_matches}; use matrix_sdk::{ event_cache::{BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate}, - test_utils::{assert_event_matches_msg, logged_in_client_with_server}, + test_utils::{assert_event_matches_msg, events::EventFactory, logged_in_client_with_server}, }; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; use matrix_sdk_test::{ @@ -271,7 +271,7 @@ async fn test_backpaginate_once() { }; // I'll get all the previous events, in "reverse" order (same as the response). - assert_let!(BackPaginationOutcome::Success { events, reached_start } = outcome); + let BackPaginationOutcome { events, reached_start } = outcome; assert!(reached_start); assert_event_matches_msg(&events[0], "world"); @@ -354,17 +354,13 @@ async fn test_backpaginate_multiple_iterations() { // Then if I backpaginate in a loop, let pagination = room_event_cache.pagination(); while pagination.get_or_wait_for_token().await.is_some() { - match pagination.run_backwards(20).await.unwrap() { - BackPaginationOutcome::Success { reached_start, events } => { - if !global_reached_start { - global_reached_start = reached_start; - } - global_events.extend(events); - } - BackPaginationOutcome::UnknownBackpaginationToken => { - panic!("shouldn't run into unknown backpagination error") - } + let BackPaginationOutcome { reached_start, events } = + pagination.run_backwards(20).await.unwrap(); + + if !global_reached_start { + global_reached_start = reached_start; } + global_events.extend(events); num_iterations += 1; } @@ -403,7 +399,7 @@ async fn test_reset_while_backpaginating() { // token, let room_id = room_id!("!omelette:fromage.fr"); - let event_builder = EventBuilder::new(); + let ev_factory = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); let mut sync_builder = SyncResponseBuilder::new(); { @@ -411,10 +407,7 @@ async fn test_reset_while_backpaginating() { JoinedRoomBuilder::new(room_id) // Note to self: a timeline must have at least single event to be properly // serialized. - .add_timeline_event(event_builder.make_sync_message_event( - user_id!("@a:b.c"), - RoomMessageEventContent::text_plain("heyo"), - )) + .add_timeline_event(ev_factory.text_msg("heyo").into_raw_sync()) .set_timeline_prev_batch("first_backpagination".to_owned()), ); let response_body = sync_builder.build_json_sync_response(); @@ -454,17 +447,14 @@ async fn test_reset_while_backpaginating() { JoinedRoomBuilder::new(room_id) // Note to self: a timeline must have at least single event to be properly // serialized. - .add_timeline_event(event_builder.make_sync_message_event( - user_id!("@a:b.c"), - RoomMessageEventContent::text_plain("heyo"), - )) + .add_timeline_event(ev_factory.text_msg("heyo").into_raw_sync()) .set_timeline_prev_batch("second_backpagination".to_owned()) .set_timeline_limited(), ); let sync_response_body = sync_builder.build_json_sync_response(); - // First back-pagination request: - let chunk = non_sync_events!(event_builder, [ (room_id, "$2": "lalala") ]); + // Mock the first back-pagination request: + let chunk = vec![ev_factory.text_msg("lalala").into_raw_timeline()]; let response_json = json!({ "chunk": chunk, "start": "t392-516_47314_0_7_1_1_1_11444_1", @@ -483,7 +473,19 @@ async fn test_reset_while_backpaginating() { .mount(&server) .await; + // Mock the second back-pagination request, that will be hit after the reset + // caused by the sync. + mock_messages( + &server, + "second_backpagination", + Some("third_backpagination"), + vec![ev_factory.text_msg("finally!").into_raw_timeline()], + ) + .await; + + // Run the pagination! let pagination = room_event_cache.pagination(); + let first_token = pagination.get_or_wait_for_token().await; assert!(first_token.is_some()); @@ -498,14 +500,14 @@ async fn test_reset_while_backpaginating() { let outcome = backpagination.await.expect("join failed").unwrap(); - // Backpagination should be confused, and the operation should result in an - // unknown token. - assert_matches!(outcome, BackPaginationOutcome::UnknownBackpaginationToken); + // Backpagination will automatically restart, so eventually we get the events. + let BackPaginationOutcome { events, .. } = outcome; + assert!(!events.is_empty()); - // Now if we retrieve the earliest token, it's not the one we had before. + // Now if we retrieve the oldest token, it's set to something else. let second_token = pagination.get_or_wait_for_token().await.unwrap(); assert!(first_token.unwrap() != second_token); - assert_eq!(second_token, "second_backpagination"); + assert_eq!(second_token, "third_backpagination"); } #[async_test] @@ -558,8 +560,8 @@ async fn test_backpaginating_without_token() { // If we try to back-paginate with a token, it will hit the end of the timeline // and give us the resulting event. - let outcome = pagination.run_backwards(20).await.unwrap(); - assert_let!(BackPaginationOutcome::Success { events, reached_start } = outcome); + let BackPaginationOutcome { events, reached_start } = + pagination.run_backwards(20).await.unwrap(); assert!(reached_start); diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index f2539eb3d25..a010887b647 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -29,9 +29,7 @@ use matrix_sdk::{ use matrix_sdk_ui::{ room_list_service, sync_service::{self, SyncService}, - timeline::{ - PaginationOptions, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem, - }, + timeline::{TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem}, Timeline as SdkTimeline, }; use ratatui::{prelude::*, style::palette::tailwind, widgets::*}; @@ -343,10 +341,7 @@ impl App { // Start a new one, request batches of 20 events, stop after 10 timeline items // have been added. *pagination = Some(spawn(async move { - if let Err(err) = sdk_timeline - .live_paginate_backwards(PaginationOptions::until_num_items(20, 10)) - .await - { + if let Err(err) = sdk_timeline.live_paginate_backwards(20).await { // TODO: would be nice to be able to set the status // message remotely? //self.set_status_message(format!(