From 65b5f0f5f40057009d4624d9604d7e3688737270 Mon Sep 17 00:00:00 2001 From: torrybr <16907963+torrybr@users.noreply.github.com> Date: Sun, 22 Sep 2024 11:24:46 -0400 Subject: [PATCH 1/2] feat(sdk): support for task listening to beacon events --- crates/matrix-sdk/src/room/mod.rs | 73 ++++++++++-- .../tests/integration/room/beacon/mod.rs | 110 +++++++++++++++++- 2 files changed, 173 insertions(+), 10 deletions(-) diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 9e4a94de6c..b764940403 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -53,10 +53,11 @@ use ruma::{ }, assign, events::{ - beacon::BeaconEventContent, + beacon::{BeaconEventContent, OriginalSyncBeaconEvent}, beacon_info::BeaconInfoEventContent, call::notify::{ApplicationType, CallNotifyEventContent, NotifyType}, direct::DirectEventContent, + location::LocationContent, marked_unread::MarkedUnreadEventContent, receipt::{Receipt, ReceiptThread, ReceiptType}, room::{ @@ -82,12 +83,13 @@ use ruma::{ push::{Action, PushConditionRoomCtx}, serde::Raw, time::Instant, - EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName, - OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId, + EventId, Int, MatrixToUri, MatrixUri, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, + OwnedRoomId, OwnedServerName, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, + UserId, }; use serde::de::DeserializeOwned; use thiserror::Error; -use tokio::sync::broadcast; +use tokio::{sync::broadcast, task::JoinHandle}; use tracing::{debug, info, instrument, warn}; use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent}; @@ -2826,7 +2828,7 @@ impl Room { Ok(()) } - /// Get the beacon information event in the room for the current user. + /// Get the beacon information event in the room for the `user_id`. /// /// # Errors /// @@ -2834,9 +2836,10 @@ impl Room { /// not be deserialized. async fn get_user_beacon_info( &self, + user_id: &UserId, ) -> Result, BeaconError> { let raw_event = self - .get_state_event_static_for_key::(self.own_user_id()) + .get_state_event_static_for_key::(user_id) .await? .ok_or(BeaconError::NotFound)?; @@ -2889,7 +2892,7 @@ impl Room { ) -> Result { self.ensure_room_joined()?; - let mut beacon_info_event = self.get_user_beacon_info().await?; + let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?; beacon_info_event.content.stop(); Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?) } @@ -2911,7 +2914,7 @@ impl Room { ) -> Result { self.ensure_room_joined()?; - let beacon_info_event = self.get_user_beacon_info().await?; + let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?; if beacon_info_event.content.is_live() { let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None); @@ -2976,6 +2979,60 @@ impl Room { .await?; Ok(()) } + + /// Subscribe to live location sharing events for this room. + /// + /// The returned receiver will receive a new event for each sync response + /// that contains a 'm.beacon' event. + pub async fn subscribe_to_live_location_shares( + &self, + ) -> (JoinHandle<()>, broadcast::Receiver) { + let (sender, receiver) = broadcast::channel(16); + + let client = self.client.clone(); + let room_id = self.room_id().to_owned(); + + let handle: JoinHandle<()> = tokio::spawn(async move { + let beacon_event_handler_handle = client.add_room_event_handler(&room_id, { + move |event: OriginalSyncBeaconEvent| async move { + let live_location_share = LiveLocationShare { + user_id: event.sender, + last_location: LastLocation { + location: event.content.location, + ts: event.content.ts, + }, + }; + + // Send the live location update to all subscribers. + let _ = sender.send(live_location_share); + } + }); + + let _ = beacon_event_handler_handle; + }); + + (handle, receiver) + } +} + +/// Details of the last known location beacon. +#[derive(Clone, Debug)] +pub struct LastLocation { + /// The most recent location content of the user. + pub location: LocationContent, + /// The timestamp of when the location was updated + pub ts: MilliSecondsSinceUnixEpoch, +} + +/// Details of a users live location share. +#[derive(Clone, Debug)] +pub struct LiveLocationShare { + /// The user's last known location. + pub last_location: LastLocation, + // /// Information about the associated beacon event (currently commented out). + // pub beacon_info: BeaconInfoEventContent, + /// The user ID of the person sharing their live location. + pub user_id: OwnedUserId, } /// A wrapper for a weak client and a room id that allows to lazily retrieve a diff --git a/crates/matrix-sdk/tests/integration/room/beacon/mod.rs b/crates/matrix-sdk/tests/integration/room/beacon/mod.rs index d83bfda7c7..e1030d9052 100644 --- a/crates/matrix-sdk/tests/integration/room/beacon/mod.rs +++ b/crates/matrix-sdk/tests/integration/room/beacon/mod.rs @@ -1,8 +1,12 @@ use std::time::{Duration, UNIX_EPOCH}; +use js_int::uint; use matrix_sdk::config::SyncSettings; -use matrix_sdk_test::{async_test, mocks::mock_encryption_state, test_json, DEFAULT_TEST_ROOM_ID}; -use ruma::{event_id, time::SystemTime}; +use matrix_sdk_test::{ + async_test, mocks::mock_encryption_state, sync_timeline_event, test_json, JoinedRoomBuilder, + SyncResponseBuilder, DEFAULT_TEST_ROOM_ID, +}; +use ruma::{event_id, time::SystemTime, MilliSecondsSinceUnixEpoch}; use serde_json::json; use wiremock::{ matchers::{body_partial_json, header, method, path_regex}, @@ -153,3 +157,105 @@ async fn test_send_location_beacon_with_expired_live_share() { assert!(response.is_err()); } + +#[async_test] +async fn test_subscribe_to_live_location_shares() { + let (client, server) = logged_in_client_with_server().await; + + // let live_location_shares: Arc>> = + // Arc::new(Mutex::new(Vec::new())); + + let mut sync_builder = SyncResponseBuilder::new(); + + // Get the current timestamp for the `beacon_info` event. + let current_timestamp = + SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_millis() + as u64; + + mock_sync( + &server, + json!({ + "next_batch": "s526_47314_0_7_1_1_1_1_1", + "rooms": { + "join": { + *DEFAULT_TEST_ROOM_ID: { + "state": { + "events": [ + { + "content": { + "description": "Live Share", + "live": true, + "org.matrix.msc3488.ts": current_timestamp, + "timeout": 600_000, + "org.matrix.msc3488.asset": { "type": "m.self" } + }, + "event_id": "$15139375514XsgmR:localhost", + "origin_server_ts": 1_636_829_458, + "sender": "@example:localhost", + "state_key": "@example:localhost", + "type": "org.matrix.msc3672.beacon_info", + "unsigned": { + "age": 7034220 + } + }, + ] + } + } + } + } + + }), + None, + ) + .await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); + + let (task_handle, mut subscriber) = room.subscribe_to_live_location_shares().await; + + sync_builder.add_joined_room(JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event( + sync_timeline_event!({ + "content": { + "m.relates_to": { + "event_id": "$TlS7h0NHzBdZIccsSspF5CMpQE8YMT0stRern0nXscI", + "rel_type": "m.reference" + }, + "org.matrix.msc3488.location": { + "uri": "geo:8.95752746197222,12.494122581370175;u=10" + }, + "org.matrix.msc3488.ts": 1_636_829_458 + }, + "event_id": "$152037280074GZeOm:localhost", + "origin_server_ts": 1_636_829_458, + "sender": "@example:localhost", + "type": "org.matrix.msc3672.beacon", + "unsigned": { + "age": 598971 + } + }), + )); + mock_sync(&server, sync_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let live_location_share = + subscriber.recv().await.expect("Failed to receive live location share"); + + assert_eq!(live_location_share.user_id.to_string(), "@example:localhost"); + + assert_eq!( + live_location_share.last_location.location.uri, + "geo:8.95752746197222,12.494122581370175;u=10" + ); + assert!(live_location_share.last_location.location.description.is_none()); + assert!(live_location_share.last_location.location.zoom_level.is_none()); + assert_eq!( + live_location_share.last_location.ts, + MilliSecondsSinceUnixEpoch(uint!(1_636_829_458)) + ); + + task_handle.await.unwrap(); +} From 03d498d8bdaabee94743ccf44bfd4d79cc4b94d2 Mon Sep 17 00:00:00 2001 From: torrybr <16907963+torrybr@users.noreply.github.com> Date: Sun, 22 Sep 2024 11:47:21 -0400 Subject: [PATCH 2/2] fix: clippy --- crates/matrix-sdk/src/room/mod.rs | 2 +- crates/matrix-sdk/tests/integration/room/beacon/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index b764940403..f4e19ce155 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -2984,7 +2984,7 @@ impl Room { /// /// The returned receiver will receive a new event for each sync response /// that contains a 'm.beacon' event. - pub async fn subscribe_to_live_location_shares( + pub fn subscribe_to_live_location_shares( &self, ) -> (JoinHandle<()>, broadcast::Receiver) { let (sender, receiver) = broadcast::channel(16); diff --git a/crates/matrix-sdk/tests/integration/room/beacon/mod.rs b/crates/matrix-sdk/tests/integration/room/beacon/mod.rs index e1030d9052..cf0e98ea41 100644 --- a/crates/matrix-sdk/tests/integration/room/beacon/mod.rs +++ b/crates/matrix-sdk/tests/integration/room/beacon/mod.rs @@ -214,7 +214,7 @@ async fn test_subscribe_to_live_location_shares() { let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); - let (task_handle, mut subscriber) = room.subscribe_to_live_location_shares().await; + let (task_handle, mut subscriber) = room.subscribe_to_live_location_shares(); sync_builder.add_joined_room(JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event( sync_timeline_event!({