Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ThreadMetadata store #189

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
65 changes: 63 additions & 2 deletions presage-store-sled/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};

use crate::protobuf::ContentProto;
use async_trait::async_trait;
use log::{debug, error, trace, warn};
use presage::libsignal_service::zkgroup::GroupMasterKeyBytes;
Expand All @@ -26,14 +27,14 @@ use presage::libsignal_service::{
Profile, ServiceAddress,
};
use presage::store::{ContentExt, ContentsStore, StateStore, Store, Thread};
use presage::ThreadMetadata;
use presage::{manager::RegistrationData, proto::verified};
use presage_store_cipher::StoreCipher;
use prost::Message;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sha2::{Digest, Sha256};
use sled::{Batch, IVec};

use crate::protobuf::ContentProto;

mod error;
mod protobuf;

Expand All @@ -49,6 +50,7 @@ const SLED_TREE_SIGNED_PRE_KEYS: &str = "signed_pre_keys";
const SLED_TREE_KYBER_PRE_KEYS: &str = "kyber_pre_keys";
const SLED_TREE_STATE: &str = "state";
const SLED_TREE_THREADS_PREFIX: &str = "threads";
const SLED_TREE_THREADS_METADATA: &str = "threads_metadata";
const SLED_TREE_PROFILES: &str = "profiles";
const SLED_TREE_PROFILE_KEYS: &str = "profile_keys";

Expand Down Expand Up @@ -305,6 +307,13 @@ impl SledStore {
hasher.update(key.collect::<Vec<_>>());
format!("{:x}", hasher.finalize())
}

fn thread_metadata_key(&self, thread: Thread) -> Vec<u8> {
match thread {
Thread::Contact(contact) => contact.to_string().into_bytes(),
Thread::Group(group) => group.to_vec(),
}
}
}

fn migrate(
Expand Down Expand Up @@ -385,6 +394,8 @@ impl ProtocolStore for SledStore {}
impl StateStore for SledStore {
type StateStoreError = SledStoreError;

/// State

fn load_registration_data(&self) -> Result<Option<RegistrationData>, SledStoreError> {
self.get(SLED_TREE_STATE, SLED_KEY_REGISTRATION)
}
Expand Down Expand Up @@ -424,6 +435,7 @@ impl ContentsStore for SledStore {
type ContactsIter = SledContactsIter;
type GroupsIter = SledGroupsIter;
type MessagesIter = SledMessagesIter;
type ThreadMetadataIter = SledThreadMetadataIter;

fn clear_contacts(&mut self) -> Result<(), SledStoreError> {
self.write().drop_tree(SLED_TREE_CONTACTS)?;
Expand Down Expand Up @@ -603,6 +615,33 @@ impl ContentsStore for SledStore {
let key = self.profile_key_for_uuid(uuid, key);
self.get(SLED_TREE_PROFILES, key)
}
/// Thread metadata

fn save_thread_metadata(
&mut self,
metadata: ThreadMetadata,
) -> Result<(), Self::ContentsStoreError> {
let key = self.thread_metadata_key(metadata.thread.clone());
self.insert(SLED_TREE_THREADS_METADATA, key, metadata)?;
Ok(())
}

fn thread_metadata(
&self,
thread: Thread,
) -> Result<Option<ThreadMetadata>, Self::ContentsStoreError> {
let key = self.thread_metadata_key(thread);
self.get(SLED_TREE_THREADS_METADATA, key)
}

fn thread_metadatas(&self) -> Result<Self::ThreadMetadataIter, SledStoreError> {
let tree = self.read().open_tree(SLED_TREE_THREADS_METADATA)?;
let iter = tree.iter();
Ok(SledThreadMetadataIter {
cipher: self.cipher.clone(),
iter,
})
}
}

impl PreKeysStore for SledStore {
Expand Down Expand Up @@ -1140,6 +1179,28 @@ impl DoubleEndedIterator for SledMessagesIter {
}
}

pub struct SledThreadMetadataIter {
cipher: Option<Arc<StoreCipher>>,
iter: sled::Iter,
}

impl Iterator for SledThreadMetadataIter {
type Item = Result<ThreadMetadata, SledStoreError>;

fn next(&mut self) -> Option<Self::Item> {
self.iter
.next()?
.map_err(SledStoreError::from)
.and_then(|(_key, value)| {
self.cipher.as_ref().map_or_else(
|| serde_json::from_slice(&value).map_err(SledStoreError::from),
|c| c.decrypt_value(&value).map_err(SledStoreError::from),
)
})
.into()
}
}

#[cfg(test)]
mod tests {
use core::fmt;
Expand Down
2 changes: 1 addition & 1 deletion presage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ base64 = "0.21"
futures = "0.3"
log = "0.4.8"
rand = "0.8"
serde = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
url = "2.2"
Expand Down
2 changes: 2 additions & 0 deletions presage/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum Error<S: std::error::Error> {
AttachmentCipherError(#[from] libsignal_service::attachment_cipher::AttachmentCipherError),
#[error("unknown group")]
UnknownGroup,
#[error("unknown contact")]
UnknownContact,
#[error("unknown recipient")]
UnknownRecipient,
#[error("timeout: {0}")]
Expand Down
28 changes: 25 additions & 3 deletions presage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,36 @@
mod cache;
mod errors;
mod serializers;
use serde::{Deserialize, Serialize};
pub mod manager;
mod serde;
pub mod store;

pub use crate::libsignal_service::prelude;
pub use errors::Error;
pub use libsignal_service;
/// Protobufs used in Signal protocol and service communication
pub use libsignal_service::proto;

pub use errors::Error;
pub use manager::Manager;
pub use store::Thread;

const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "-rs-", env!("CARGO_PKG_VERSION"));

// TODO: open a PR in libsignal and make sure the bytes can be read from `GroupMasterKey` instead of using this type
pub type GroupMasterKeyBytes = [u8; 32];

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ThreadMetadata {
pub thread: Thread,
pub last_message: Option<ThreadMetadataMessageContent>,
pub unread_messages_count: usize,
pub title: Option<String>,
pub archived: bool,
pub muted: bool,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ThreadMetadataMessageContent {
pub sender: prelude::Uuid,
pub timestamp: u64,
pub message: Option<String>,
}
7 changes: 5 additions & 2 deletions presage/src/manager/linking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ impl<S: Store> Manager<S, Linking> {
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let store =
/// SledStore::open("/tmp/presage-example", MigrationConflictStrategy::Drop, OnNewIdentity::Trust)?;
/// let store = SledStore::open(
/// "/tmp/presage-example",
/// MigrationConflictStrategy::Drop,
/// OnNewIdentity::Trust,
/// )?;
///
/// let (mut tx, mut rx) = oneshot::channel();
/// let (manager, err) = future::join(
Expand Down
2 changes: 1 addition & 1 deletion presage/src/manager/registered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use sha2::Digest;
use tokio::sync::Mutex;

use crate::cache::CacheCell;
use crate::serde::serde_profile_key;
use crate::serializers::serde_profile_key;
use crate::store::{Store, Thread};
use crate::{Error, Manager};

Expand Down
7 changes: 5 additions & 2 deletions presage/src/manager/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ impl<S: Store> Manager<S, Registration> {
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let store =
/// SledStore::open("/tmp/presage-example", MigrationConflictStrategy::Drop, OnNewIdentity::Trust)?;
/// let store = SledStore::open(
/// "/tmp/presage-example",
/// MigrationConflictStrategy::Drop,
/// OnNewIdentity::Trust,
/// )?;
///
/// let manager = Manager::register(
/// store,
Expand Down
File renamed without changes.
35 changes: 31 additions & 4 deletions presage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{fmt, ops::RangeBounds, time::SystemTime};

use crate::{manager::RegistrationData, GroupMasterKeyBytes, ThreadMetadata};
use libsignal_service::{
content::{ContentBody, Metadata},
groups_v2::{Group, Timer},
Expand All @@ -14,17 +15,23 @@ use libsignal_service::{
},
protocol::{IdentityKey, ProtocolAddress, ProtocolStore, SenderKeyStore},
session_store::SessionStoreExt,
zkgroup::GroupMasterKeyBytes,
Profile,
};
use log::error;
use serde::{Deserialize, Serialize};

use crate::manager::RegistrationData;

/// An error trait implemented by store error types
pub trait StoreError: std::error::Error + Sync + Send + 'static {}

// pub trait Store: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone {
// type Error: StoreError;
//
// type ContactsIter: Iterator<Item = Result<Contact, Self::Error>>;
// type GroupsIter: Iterator<Item = Result<(GroupMasterKeyBytes, Group), Self::Error>>;
// type MessagesIter: Iterator<Item = Result<Content, Self::Error>>;
// type ThreadMetadataIter: Iterator<Item = Result<ThreadMetadata, Self::Error>>;
//
/// State
/// Stores the registered state of the manager
pub trait StateStore {
type StateStoreError: StoreError;
Expand Down Expand Up @@ -57,6 +64,11 @@ pub trait ContentsStore {
/// Each items is a tuple consisting of the group master key and its corresponding data.
type GroupsIter: Iterator<Item = Result<(GroupMasterKeyBytes, Group), Self::ContentsStoreError>>;

/// Iterator over all stored thread metadata
///
/// Each item is a tuple consisting of the thread and its corresponding metadata.
type ThreadMetadataIter: Iterator<Item = Result<ThreadMetadata, Self::ContentsStoreError>>;

/// Iterator over all stored messages
type MessagesIter: Iterator<Item = Result<Content, Self::ContentsStoreError>>;

Expand Down Expand Up @@ -233,7 +245,22 @@ pub trait ContentsStore {
profile: Profile,
) -> Result<(), Self::ContentsStoreError>;

/// Retrieve a profile by [Uuid] and [ProfileKey].
/// Retrieve ThereadMetadata for all threads.
fn thread_metadatas(&self) -> Result<Self::ThreadMetadataIter, Self::ContentsStoreError>;

/// Retrieve ThereadMetadata for a single thread.
fn thread_metadata(
&self,
thread: Thread,
) -> Result<Option<ThreadMetadata>, Self::ContentsStoreError>;

/// Save ThereadMetadata for a single thread.
/// This will overwrite any existing metadata for the thread.
fn save_thread_metadata(
&mut self,
metadata: ThreadMetadata,
) -> Result<(), Self::ContentsStoreError>;

fn profile(
&self,
uuid: Uuid,
Expand Down
Loading