Skip to content

Commit

Permalink
sending queue: do a few renamings after the live review
Browse files Browse the repository at this point in the history
Thanks @Hywan for the review comments!
  • Loading branch information
bnjbvr committed Jun 3, 2024
1 parent 75aba1d commit 78608a1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 44 deletions.
10 changes: 6 additions & 4 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,10 @@ pub(crate) struct ClientInner {
#[cfg(feature = "e2e-encryption")]
pub(crate) verification_state: SharedObservable<VerificationState>,

/// Data related to the sending queue.
pub(crate) sending_queue: Arc<SendingQueueData>,
/// Data related to the [`SendingQueue`].
///
/// [`SendingQueue`]: crate::send_queue::SendingQueue
pub(crate) sending_queue_data: Arc<SendingQueueData>,
}

impl ClientInner {
Expand Down Expand Up @@ -328,7 +330,7 @@ impl ClientInner {
respect_login_well_known,
sync_beat: event_listener::Event::new(),
event_cache,
sending_queue,
sending_queue_data: sending_queue,
#[cfg(feature = "e2e-encryption")]
e2ee: EncryptionData::new(encryption_settings),
#[cfg(feature = "e2e-encryption")]
Expand Down Expand Up @@ -2108,7 +2110,7 @@ impl Client {
self.inner.unstable_features.get().cloned(),
self.inner.respect_login_well_known,
self.inner.event_cache.clone(),
self.inner.sending_queue.clone(),
self.inner.sending_queue_data.clone(),
#[cfg(feature = "e2e-encryption")]
self.inner.e2ee.encryption_settings,
)
Expand Down
92 changes: 52 additions & 40 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl SendingQueue {
}

fn for_room(&self, room: Room) -> RoomSendingQueue {
let data = &self.client.inner.sending_queue;
let data = &self.client.inner.sending_queue_data;

let mut map = data.rooms.write().unwrap();

Expand All @@ -63,31 +63,31 @@ impl SendingQueue {

let owned_room_id = room_id.to_owned();
let room_q = RoomSendingQueue::new(
data.enabled.clone(),
data.shutting_down.clone(),
data.globally_enabled.clone(),
data.is_dropping.clone(),
&self.client,
owned_room_id.clone(),
);
map.insert(owned_room_id, room_q.clone());
room_q
}

/// Enables the sending queue for the entire client, i.e. all rooms.
/// Enable the sending queue for the entire client, i.e. all rooms.
///
/// This may wake up backgrounds tasks and resume sending of events in the
/// This may wake up background tasks and resume sending of events in the
/// background.
pub fn enable(&self) {
if self.client.inner.sending_queue.enabled.set_if_not_eq(true).is_some() {
if self.client.inner.sending_queue_data.globally_enabled.set_if_not_eq(true).is_some() {
debug!("globally enabling sending queue");
let rooms = self.client.inner.sending_queue.rooms.read().unwrap();
let rooms = self.client.inner.sending_queue_data.rooms.read().unwrap();
// Wake up the rooms, in case events have been queued in the meanwhile.
for room in rooms.values() {
room.inner.notifier.notify_one();
}
}
}

/// Disables the sending queue for the entire client, i.e. all rooms.
/// Disable the sending queue for the entire client, i.e. all rooms.
///
/// If requests were being sent, they're not aborted, and will continue
/// until a status resolves (error responses will keep the events in the
Expand All @@ -101,19 +101,19 @@ impl SendingQueue {
// - or they were not, and it's not worth it waking them to let them they're
// disabled, which causes them to go to sleep again.
debug!("globally disabling sending queue");
self.client.inner.sending_queue.enabled.set(false);
self.client.inner.sending_queue_data.globally_enabled.set(false);
}

/// Returns whether the sending queue is enabled, at a client-wide
/// granularity.
pub fn is_enabled(&self) -> bool {
self.client.inner.sending_queue.enabled.get()
self.client.inner.sending_queue_data.globally_enabled.get()
}

/// A subscriber to the enablement status (enabled or disabled) of the
/// sending queue.
pub fn subscribe_status(&self) -> Subscriber<bool> {
self.client.inner.sending_queue.enabled.subscribe()
self.client.inner.sending_queue_data.globally_enabled.subscribe()
}
}

Expand All @@ -130,19 +130,19 @@ pub(super) struct SendingQueueData {
rooms: SyncRwLock<BTreeMap<OwnedRoomId, RoomSendingQueue>>,

/// Is the whole mechanism enabled or disabled?
enabled: SharedObservable<bool>,
globally_enabled: SharedObservable<bool>,

/// Are we shutting down the entire queue?
shutting_down: Arc<AtomicBool>,
/// Are we currently dropping the Client?
is_dropping: Arc<AtomicBool>,
}

impl SendingQueueData {
/// Create the data for a sending queue, in the given enabled state.
pub fn new(enabled: bool) -> Self {
pub fn new(globally_enabled: bool) -> Self {
Self {
rooms: Default::default(),
enabled: SharedObservable::new(enabled),
shutting_down: Arc::new(false.into()),
globally_enabled: SharedObservable::new(globally_enabled),
is_dropping: Arc::new(false.into()),
}
}
}
Expand All @@ -151,8 +151,8 @@ impl Drop for SendingQueueData {
fn drop(&mut self) {
// Mark the whole sending queue as shutting down, then wake up all the room
// queues so they're stopped too.
debug!("globally shutting down the sending queue");
self.shutting_down.store(true, Ordering::SeqCst);
debug!("globally dropping the sending queue");
self.is_dropping.store(true, Ordering::SeqCst);

let rooms = self.rooms.read().unwrap();
for room in rooms.values() {
Expand Down Expand Up @@ -184,14 +184,14 @@ impl std::fmt::Debug for RoomSendingQueue {

impl RoomSendingQueue {
fn new(
enabled: SharedObservable<bool>,
shutting_down: Arc<AtomicBool>,
globally_enabled: SharedObservable<bool>,
is_dropping: Arc<AtomicBool>,
client: &Client,
room_id: OwnedRoomId,
) -> Self {
let (updates_sender, _) = broadcast::channel(32);

let queue = SharedQueue::new();
let queue = QueueStorage::new();
let notifier = Arc::new(Notify::new());

let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
Expand All @@ -201,8 +201,8 @@ impl RoomSendingQueue {
queue.clone(),
notifier.clone(),
updates_sender.clone(),
enabled,
shutting_down,
globally_enabled,
is_dropping,
));

Self {
Expand Down Expand Up @@ -242,6 +242,7 @@ impl RoomSendingQueue {

let transaction_id = self.inner.queue.push(content.clone()).await;
trace!(%transaction_id, "manager sends an event to the background task");

self.inner.notifier.notify_one();

let _ = self.inner.updates.send(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho {
Expand All @@ -261,29 +262,29 @@ impl RoomSendingQueue {
#[instrument(skip_all, fields(room_id = %room.room_id()))]
async fn sending_task(
room: WeakRoom,
queue: SharedQueue,
queue: QueueStorage,
notifier: Arc<Notify>,
updates: broadcast::Sender<RoomSendingQueueUpdate>,
enabled: SharedObservable<bool>,
shutting_down: Arc<AtomicBool>,
globally_enabled: SharedObservable<bool>,
is_dropping: Arc<AtomicBool>,
) {
info!("spawned the sending task");

loop {
// A request to shut down should be preferred above everything else.
if shutting_down.load(Ordering::SeqCst) {
if is_dropping.load(Ordering::SeqCst) {
trace!("shutting down!");
break;
}

if !enabled.get() {
if !globally_enabled.get() {
trace!("not enabled, sleeping");
// Wait for an explicit wakeup.
notifier.notified().await;
continue;
}

let Some(queued_event) = queue.pop_next_to_send().await else {
let Some(queued_event) = queue.peek_next_to_send().await else {
trace!("queue is empty, sleeping");
// Wait for an explicit wakeup.
notifier.notified().await;
Expand All @@ -293,7 +294,7 @@ impl RoomSendingQueue {
trace!("received an event to send!");

let Some(room) = room.get() else {
if shutting_down.load(Ordering::SeqCst) {
if is_dropping.load(Ordering::SeqCst) {
break;
}
error!("the weak room couldn't be upgraded but we're not shutting down?");
Expand Down Expand Up @@ -322,7 +323,7 @@ impl RoomSendingQueue {

// Disable the queue after an error.
// See comment in [`SendingQueue::disable()`].
enabled.set(false);
globally_enabled.set(false);

// In this case, we intentionally keep the event in the queue, but mark it as
// not being sent anymore.
Expand Down Expand Up @@ -360,10 +361,10 @@ struct RoomSendingQueueInner {
/// content / deleting entries, all that will be required will be to
/// manipulate the on-disk storage. In other words, the storage will become
/// the one source of truth.
queue: SharedQueue,
queue: QueueStorage,

/// A notifier that's updated any time common data is touched (stopped or
/// enabled statuses), or the associated room [`SharedQueue`].
/// enabled statuses), or the associated room [`QueueStorage`].
notifier: Arc<Notify>,

/// Handle to the actual sending task. Unused, but kept alive along this
Expand All @@ -375,13 +376,18 @@ struct RoomSendingQueueInner {
struct QueuedEvent {
event: AnyMessageLikeEventContent,
transaction_id: OwnedTransactionId,

/// Flag to indicate if an event has been scheduled for sending.
///
/// Useful to indicate if cancelling could happen or if it was too late and
/// the event had already been sent.
is_being_sent: bool,
}

#[derive(Clone)]
struct SharedQueue(Arc<RwLock<VecDeque<QueuedEvent>>>);
struct QueueStorage(Arc<RwLock<VecDeque<QueuedEvent>>>);

impl SharedQueue {
impl QueueStorage {
/// Create a new synchronized queue for queuing events to be sent later.
fn new() -> Self {
Self(Arc::new(RwLock::new(VecDeque::with_capacity(16))))
Expand All @@ -402,21 +408,23 @@ impl SharedQueue {
transaction_id
}

/// Pops the next event to be sent, marking it as being sent.
/// Peeks the next event to be sent, marking it as being sent.
///
/// It is required to call [`Self::mark_as_sent`] after it's been
/// effectively sent.
async fn pop_next_to_send(&self) -> Option<QueuedEvent> {
async fn peek_next_to_send(&self) -> Option<QueuedEvent> {
let mut q = self.0.write().await;
if let Some(event) = q.front_mut() {
// TODO: This flag should probably live in memory when we have an actual
// storage.
event.is_being_sent = true;
Some(event.clone())
} else {
None
}
}

/// Marks an event popped with [`Self::pop_next_to_send`] and identified
/// Marks an event popped with [`Self::peek_next_to_send`] and identified
/// with the given transaction id as not being sent anymore, so it can
/// be removed from the queue later.
async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
Expand All @@ -432,7 +440,11 @@ impl SharedQueue {
/// transaction id as sent by removing it from the local queue.
async fn mark_as_sent(&self, transaction_id: &TransactionId) {
let mut q = self.0.write().await;
q.retain(|item| item.transaction_id != transaction_id);
if let Some(index) = q.iter().position(|item| item.transaction_id == transaction_id) {
q.remove(index);
} else {
warn!("couldn't find item to mark as sent with transaction id {transaction_id}");
}
}

/// Cancel a sending command for an event that has been sent with
Expand Down

0 comments on commit 78608a1

Please sign in to comment.