diff options
author | Manos Pitsidianakis <el13635@mail.ntua.gr> | 2020-09-22 14:14:11 +0300 |
---|---|---|
committer | Manos Pitsidianakis <el13635@mail.ntua.gr> | 2020-09-23 10:52:19 +0300 |
commit | 36cc0d4212d62841d9145e57fdbfa243447b5440 (patch) | |
tree | 62f30d21eb51524221b4d2cbbfb6508425039eb5 /melib | |
parent | 425f4b9930048b1a1b6e8f240a4196e2aab82fc0 (diff) |
melib/jmap: implement refresh()
Closes #77
Diffstat (limited to 'melib')
-rw-r--r-- | melib/src/backends.rs | 69 | ||||
-rw-r--r-- | melib/src/backends/imap/mailbox.rs | 71 | ||||
-rw-r--r-- | melib/src/backends/jmap.rs | 34 | ||||
-rw-r--r-- | melib/src/backends/jmap/connection.rs | 242 | ||||
-rw-r--r-- | melib/src/backends/jmap/mailbox.rs | 12 | ||||
-rw-r--r-- | melib/src/backends/jmap/objects/email.rs | 53 | ||||
-rw-r--r-- | melib/src/backends/jmap/protocol.rs | 56 | ||||
-rw-r--r-- | melib/src/backends/jmap/rfc8620.rs | 158 | ||||
-rw-r--r-- | melib/src/backends/nntp/mailbox.rs | 3 |
9 files changed, 523 insertions, 175 deletions
diff --git a/melib/src/backends.rs b/melib/src/backends.rs index 695fa599..7d4dda40 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -58,7 +58,7 @@ use self::maildir::MaildirType; use self::mbox::MboxType; use super::email::{Envelope, EnvelopeHash, Flag}; use std::any::Any; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::fmt::Debug; use std::ops::Deref; @@ -617,3 +617,70 @@ impl EnvelopeHashBatch { 1 + self.rest.len() } } + +#[derive(Debug, Default, Clone)] +pub struct LazyCountSet { + not_yet_seen: usize, + set: BTreeSet<EnvelopeHash>, +} + +impl LazyCountSet { + pub fn set_not_yet_seen(&mut self, new_val: usize) { + self.not_yet_seen = new_val; + } + + pub fn insert_existing(&mut self, new_val: EnvelopeHash) -> bool { + if self.not_yet_seen == 0 { + false + } else { + self.not_yet_seen -= 1; + self.set.insert(new_val); + true + } + } + + pub fn insert_existing_set(&mut self, set: BTreeSet<EnvelopeHash>) -> bool { + debug!("insert_existing_set {:?}", &set); + if self.not_yet_seen < set.len() { + false + } else { + self.not_yet_seen -= set.len(); + self.set.extend(set.into_iter()); + true + } + } + + #[inline(always)] + pub fn len(&self) -> usize { + self.set.len() + self.not_yet_seen + } + + #[inline(always)] + pub fn clear(&mut self) { + self.set.clear(); + self.not_yet_seen = 0; + } + + pub fn insert_new(&mut self, new_val: EnvelopeHash) { + self.set.insert(new_val); + } + + pub fn insert_set(&mut self, set: BTreeSet<EnvelopeHash>) { + debug!("insert__set {:?}", &set); + self.set.extend(set.into_iter()); + } + + pub fn remove(&mut self, new_val: EnvelopeHash) -> bool { + self.set.remove(&new_val) + } +} + +#[test] +fn test_lazy_count_set() { + let mut new = LazyCountSet::default(); + new.set_not_yet_seen(10); + for i in 0..10 { + assert!(new.insert_existing(i)); + } + assert!(!new.insert_existing(10)); +} diff --git a/melib/src/backends/imap/mailbox.rs b/melib/src/backends/imap/mailbox.rs index abe95839..7461e814 100644 --- a/melib/src/backends/imap/mailbox.rs +++ b/melib/src/backends/imap/mailbox.rs @@ -21,81 +21,12 @@ use super::protocol_parser::SelectResponse; use crate::backends::{ - BackendMailbox, Mailbox, MailboxHash, MailboxPermissions, SpecialUsageMailbox, + BackendMailbox, LazyCountSet, Mailbox, MailboxHash, MailboxPermissions, SpecialUsageMailbox, }; -use crate::email::EnvelopeHash; use crate::error::*; -use std::collections::BTreeSet; use std::sync::{Arc, Mutex, RwLock}; #[derive(Debug, Default, Clone)] -pub struct LazyCountSet { - not_yet_seen: usize, - set: BTreeSet<EnvelopeHash>, -} - -impl LazyCountSet { - pub fn set_not_yet_seen(&mut self, new_val: usize) { - self.not_yet_seen = new_val; - } - - pub fn insert_existing(&mut self, new_val: EnvelopeHash) -> bool { - if self.not_yet_seen == 0 { - false - } else { - self.not_yet_seen -= 1; - self.set.insert(new_val); - true - } - } - - pub fn insert_existing_set(&mut self, set: BTreeSet<EnvelopeHash>) -> bool { - debug!("insert_existing_set {:?}", &set); - if self.not_yet_seen < set.len() { - false - } else { - self.not_yet_seen -= set.len(); - self.set.extend(set.into_iter()); - true - } - } - - #[inline(always)] - pub fn len(&self) -> usize { - self.set.len() + self.not_yet_seen - } - - #[inline(always)] - pub fn clear(&mut self) { - self.set.clear(); - self.not_yet_seen = 0; - } - - pub fn insert_new(&mut self, new_val: EnvelopeHash) { - self.set.insert(new_val); - } - - pub fn insert_set(&mut self, set: BTreeSet<EnvelopeHash>) { - debug!("insert__set {:?}", &set); - self.set.extend(set.into_iter()); - } - - pub fn remove(&mut self, new_val: EnvelopeHash) -> bool { - self.set.remove(&new_val) - } -} - -#[test] -fn test_lazy_count_set() { - let mut new = LazyCountSet::default(); - new.set_not_yet_seen(10); - for i in 0..10 { - assert!(new.insert_existing(i)); - } - assert!(!new.insert_existing(10)); -} - -#[derive(Debug, Default, Clone)] pub struct ImapMailbox { pub hash: MailboxHash, pub imap_path: String, diff --git a/melib/src/backends/jmap.rs b/melib/src/backends/jmap.rs index cf1f7c3a..2ee314a5 100644 --- a/melib/src/backends/jmap.rs +++ b/melib/src/backends/jmap.rs @@ -199,7 +199,6 @@ pub struct Store { pub tag_index: Arc<RwLock<BTreeMap<u64, String>>>, pub mailboxes: Arc<RwLock<HashMap<MailboxHash, JmapMailbox>>>, pub mailboxes_index: Arc<RwLock<HashMap<MailboxHash, HashSet<EnvelopeHash>>>>, - pub email_state: Arc<Mutex<State<EmailObject>>>, pub mailbox_state: Arc<Mutex<State<MailboxObject>>>, pub online_status: Arc<FutureMutex<(Instant, Result<()>)>>, pub is_subscribed: Arc<IsSubscribedFn>, @@ -283,7 +282,6 @@ impl Store { self.blob_id_store.lock().unwrap().remove(&env_hash); self.byte_cache.lock().unwrap().remove(&env_hash); let mut mailbox_hashes = SmallVec::new(); - let mailboxes_lck = self.mailboxes.read().unwrap(); for (k, set) in self.mailboxes_index.write().unwrap().iter_mut() { if set.remove(&env_hash) { mailbox_hashes.push(*k); @@ -345,12 +343,12 @@ impl MailBackend for JmapType { })) } - fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { + fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> { let connection = self.connection.clone(); Ok(Box::pin(async move { let mut conn = connection.lock().await; conn.connect().await?; - conn.email_changes().await?; + conn.email_changes(mailbox_hash).await?; Ok(()) })) } @@ -646,7 +644,6 @@ impl MailBackend for JmapType { let store = self.store.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { - let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone(); let mut update_map: HashMap<Id<EmailObject>, Value> = HashMap::default(); let mut ids: Vec<Id<EmailObject>> = Vec::with_capacity(env_hashes.rest.len() + 1); let mut id_map: HashMap<Id<EmailObject>, EnvelopeHash> = HashMap::default(); @@ -695,7 +692,7 @@ impl MailBackend for JmapType { } } } - let mut conn = connection.lock().await; + let conn = connection.lock().await; let email_set_call: EmailSet = EmailSet::new( Set::<EmailObject>::new() @@ -704,7 +701,7 @@ impl MailBackend for JmapType { ); let mut req = Request::new(conn.request_no.clone()); - let prev_seq = req.add_call(&email_set_call); + req.add_call(&email_set_call); let email_call: EmailGet = EmailGet::new( Get::new() .ids(Some(JmapArgument::Value(ids))) @@ -740,7 +737,7 @@ impl MailBackend for JmapType { let mut tag_index_lck = store.tag_index.write().unwrap(); for (flag, value) in flags.iter() { match flag { - Ok(f) => {} + Ok(_) => {} Err(t) => { if *value { tag_index_lck.insert(tag_hash!(t), t.clone()); @@ -754,14 +751,26 @@ impl MailBackend for JmapType { let GetResponse::<EmailObject> { list, state, .. } = e; { let (is_empty, is_equal) = { - let current_state_lck = conn.store.email_state.lock().unwrap(); - (current_state_lck.is_empty(), *current_state_lck != state) + let mailboxes_lck = conn.store.mailboxes.read().unwrap(); + mailboxes_lck + .get(&mailbox_hash) + .map(|mbox| { + let current_state_lck = mbox.email_state.lock().unwrap(); + ( + current_state_lck.is_some(), + current_state_lck.as_ref() != Some(&state), + ) + }) + .unwrap_or((true, true)) }; if is_empty { + let mut mailboxes_lck = conn.store.mailboxes.write().unwrap(); debug!("{:?}: inserting state {}", EmailObject::NAME, &state); - *conn.store.email_state.lock().unwrap() = state; + mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| { + *mbox.email_state.lock().unwrap() = Some(state); + }); } else if !is_equal { - conn.email_changes().await?; + conn.email_changes(mailbox_hash).await?; } } debug!(&list); @@ -813,7 +822,6 @@ impl JmapType { tag_index: Default::default(), mailboxes: Default::default(), mailboxes_index: Default::default(), - email_state: Default::default(), mailbox_state: Default::default(), }); diff --git a/melib/src/backends/jmap/connection.rs b/melib/src/backends/jmap/connection.rs index 97ad15c6..bd254851 100644 --- a/melib/src/backends/jmap/connection.rs +++ b/melib/src/backends/jmap/connection.rs @@ -108,12 +108,19 @@ impl JmapConnection { (self.store.event_consumer)(self.store.account_hash, BackendEvent::Refresh(event)); } - pub async fn email_changes(&self) -> Result<()> { - let mut current_state: State<EmailObject> = self.store.email_state.lock().unwrap().clone(); - if current_state.is_empty() { - debug!("{:?}: has no saved state", EmailObject::NAME); + pub async fn email_changes(&self, mailbox_hash: MailboxHash) -> Result<()> { + let mut current_state: State<EmailObject> = if let Some(s) = self + .store + .mailboxes + .read() + .unwrap() + .get(&mailbox_hash) + .and_then(|mbox| mbox.email_state.lock().unwrap().clone()) + { + s + } else { return Ok(()); - } + }; loop { let email_changes_call: EmailChanges = EmailChanges::new( @@ -134,7 +141,36 @@ impl JmapConnection { ); req.add_call(&email_get_call); - + if let Some(mailbox) = self.store.mailboxes.read().unwrap().get(&mailbox_hash) { + if let Some(email_query_state) = mailbox.email_query_state.lock().unwrap().clone() { + let email_query_changes_call = EmailQueryChanges::new( + QueryChanges::new(self.mail_account_id().clone(), email_query_state) + .filter(Some(Filter::Condition( + EmailFilterCondition::new() + .in_mailbox(Some(mailbox.id.clone())) + .into(), + ))), + ); + let seq_no = req.add_call(&email_query_changes_call); + let email_get_call: EmailGet = EmailGet::new( + Get::new() + .ids(Some(JmapArgument::reference( + seq_no, + ResultField::<EmailQueryChanges, EmailObject>::new("removed"), + ))) + .account_id(self.mail_account_id().clone()) + .properties(Some(vec![ + "keywords".to_string(), + "mailboxIds".to_string(), + ])), + ); + req.add_call(&email_get_call); + } else { + return Ok(()); + } + } else { + return Ok(()); + } let mut res = self .client .post_async(&self.session.api_url, serde_json::to_string(&req)?) @@ -143,79 +179,159 @@ impl JmapConnection { let res_text = res.text_async().await?; debug!(&res_text); let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap(); - let get_response = - GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?; - debug!(&get_response); - let GetResponse::<EmailObject> { list, .. } = get_response; let changes_response = - ChangesResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?; + ChangesResponse::<EmailObject>::try_from(v.method_responses.remove(0))?; if changes_response.new_state == current_state { return Ok(()); } + let get_response = GetResponse::<EmailObject>::try_from(v.method_responses.remove(0))?; - let mut mailbox_hashes: Vec<SmallVec<[MailboxHash; 8]>> = - Vec::with_capacity(list.len()); - for envobj in &list { - let v = self - .store - .mailboxes - .read() - .unwrap() - .iter() - .filter(|(_, m)| envobj.mailbox_ids.contains_key(&m.id)) - .map(|(k, _)| *k) - .collect::<SmallVec<[MailboxHash; 8]>>(); - mailbox_hashes.push(v); - } - - for (env, mailbox_hashes) in list - .into_iter() - .map(|obj| self.store.add_envelope(obj)) - .zip(mailbox_hashes) { - for mailbox_hash in mailbox_hashes.iter().skip(1).cloned() { - self.add_refresh_event(RefreshEvent { - account_hash: self.store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Create(Box::new(env.clone())), - }); + /* process get response */ + let GetResponse::<EmailObject> { list, .. } = get_response; + + let mut mailbox_hashes: Vec<SmallVec<[MailboxHash; 8]>> = + Vec::with_capacity(list.len()); + for envobj in &list { + let v = self + .store + .mailboxes + .read() + .unwrap() + .iter() + .filter(|(_, m)| envobj.mailbox_ids.contains_key(&m.id)) + .map(|(k, _)| *k) + .collect::<SmallVec<[MailboxHash; 8]>>(); + mailbox_hashes.push(v); } - if let Some(mailbox_hash) = mailbox_hashes.first().cloned() { - self.add_refresh_event(RefreshEvent { - account_hash: self.store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Create(Box::new(env)), - }); + for (env, mailbox_hashes) in list + .into_iter() + .map(|obj| self.store.add_envelope(obj)) + .zip(mailbox_hashes) + { + for mailbox_hash in mailbox_hashes.iter().skip(1).cloned() { + let mut mailboxes_lck = self.store.mailboxes.write().unwrap(); + mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| { + if !env.is_seen() { + mbox.unread_emails.lock().unwrap().insert_new(env.hash()); + } + mbox.total_emails.lock().unwrap().insert_new(env.hash()); + }); + self.add_refresh_event(RefreshEvent { + account_hash: self.store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Create(Box::new(env.clone())), + }); + } + if let Some(mailbox_hash) = mailbox_hashes.first().cloned() { + let mut mailboxes_lck = self.store.mailboxes.write().unwrap(); + mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| { + if !env.is_seen() { + mbox.unread_emails.lock().unwrap().insert_new(env.hash()); + } + mbox.total_emails.lock().unwrap().insert_new(env.hash()); + }); + self.add_refresh_event(RefreshEvent { + account_hash: self.store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Create(Box::new(env)), + }); + } } } - - let ChangesResponse::<EmailObject> { - account_id: _, - new_state, - old_state: _, - has_more_changes, - created: _, - updated, - destroyed, - _ph: _, - } = changes_response; - for (env_hash, mailbox_hashes) in destroyed - .into_iter() - .filter_map(|obj_id| self.store.remove_envelope(obj_id)) - { - for mailbox_hash in mailbox_hashes { + let reverse_id_store_lck = self.store.reverse_id_store.lock().unwrap(); + let response = v.method_responses.remove(0); + match EmailQueryChangesResponse::try_from(response) { + Ok(EmailQueryChangesResponse { + collapse_threads: _, + query_changes_response: + QueryChangesResponse { + account_id: _, + old_query_state, + new_query_state, + total: _, + removed, + added, + }, + }) if old_query_state != new_query_state => { + self.store + .mailboxes + .write() + .unwrap() + .entry(mailbox_hash) + .and_modify(|mbox| { + *mbox.email_query_state.lock().unwrap() = Some(new_query_state); + }); + /* If the "filter" or "sort" includes a mutable property, the server + MUST include all Foos in the current results for which this + property may have changed. The position of these may have moved + in the results, so they must be reinserted by the client to ensure + its query cache is correct. */ + for email_obj_id in removed + .into_iter() + .filter(|id| !added.iter().any(|item| item.id == *id)) + { + if let Some(env_hash) = reverse_id_store_lck.get(&email_obj_id) { + let mut mailboxes_lck = self.store.mailboxes.write().unwrap(); + mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| { + mbox.unread_emails.lock().unwrap().remove(*env_hash); + mbox.total_emails.lock().unwrap().insert_new(*env_hash); + }); + self.add_refresh_event(RefreshEvent { + account_hash: self.store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Remove(*env_hash), + }); + } + } + for AddedItem { + id: _email_obj_id, + index: _, + } in added + { + // FIXME + } + } + Ok(_) => {} + Err(err) => { + debug!(mailbox_hash); + debug!(err); + } + } + let GetResponse::<EmailObject> { list, .. } = + GetResponse::<EmailObject>::try_from(v.method_responses.remove(0))?; + let mut mailboxes_lck = self.store.mailboxes.write().unwrap(); + for envobj in list { + if let Some(env_hash) = reverse_id_store_lck.get(&envobj.id) { + let new_flags = + protocol::keywords_to_flags(envobj.keywords().keys().cloned().collect()); + mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| { + if new_flags.0.contains(Flag::SEEN) { + mbox.unread_emails.lock().unwrap().remove(*env_hash); + } else { + mbox.unread_emails.lock().unwrap().insert_new(*env_hash); + } + }); self.add_refresh_event(RefreshEvent { account_hash: self.store.account_hash, mailbox_hash, - kind: RefreshEventKind::Remove(env_hash), + kind: RefreshEventKind::NewFlags(*env_hash, new_flags), }); } } - - if has_more_changes { - current_state = new_state; + drop(mailboxes_lck); + if changes_response.has_more_changes { + current_state = changes_response.new_state; } else { - *self.store.email_state.lock().unwrap() = new_state; + self.store + .mailboxes + .write() + .unwrap() + .entry(mailbox_hash) + .and_modify(|mbox| { + *mbox.email_state.lock().unwrap() = Some(changes_response.new_state); + }); + break; } } diff --git a/melib/src/backends/jmap/mailbox.rs b/melib/src/backends/jmap/mailbox.rs index 33e87637..f20b2ed7 100644 --- a/melib/src/backends/jmap/mailbox.rs +++ b/melib/src/backends/jmap/mailbox.rs @@ -20,7 +20,7 @@ */ use super::*; -use crate::backends::{MailboxPermissions, SpecialUsageMailbox}; +use crate::backends::{LazyCountSet, MailboxPermissions, SpecialUsageMailbox}; use std::sync::{Arc, Mutex, RwLock}; #[derive(Debug, Clone)] @@ -36,11 +36,13 @@ pub struct JmapMailbox { pub parent_hash: Option<MailboxHash>, pub role: Option<String>, pub sort_order: u64, - pub total_emails: Arc<Mutex<u64>>, + pub total_emails: Arc<Mutex<LazyCountSet>>, pub total_threads: u64, - pub unread_emails: Arc<Mutex<u64>>, + pub unread_emails: Arc<Mutex<LazyCountSet>>, pub unread_threads: u64, pub usage: Arc<RwLock<SpecialUsageMailbox>>, + pub email_state: Arc<Mutex<Option<State<EmailObject>>>>, + pub email_query_state: Arc<Mutex<Option<String>>>, } impl BackendMailbox for JmapMailbox { @@ -109,8 +111,8 @@ impl BackendMailbox for JmapMailbox { fn count(&self) -> Result<(usize, usize)> { Ok(( - *self.unread_emails.lock()? as usize, - *self.total_emails.lock()? as usize, + self.unread_emails.lock()?.len(), + self.total_emails.lock()?.len(), )) } } diff --git a/melib/src/backends/jmap/objects/email.rs b/melib/src/backends/jmap/objects/email.rs index 6e2feda4..d2537b3f 100644 --- a/melib/src/backends/jmap/objects/email.rs +++ b/melib/src/backends/jmap/objects/email.rs @@ -24,6 +24,7 @@ use crate::backends::jmap::rfc8620::bool_false; use crate::email::address::{Address, MailboxAddress}; use core::marker::PhantomData; use serde::de::{Deserialize, Deserializer}; +use serde_json::value::RawValue; use serde_json::Value; use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; @@ -391,21 +392,6 @@ impl Object for EmailObject { const NAME: &'static str = "Email"; } -#[derive(Deserialize, Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct EmailQueryResponse { - pub account_id: Id<Account>, - pub can_calculate_changes: bool, - pub collapse_threads: bool, - // FIXME - pub filter: String, - pub ids: Vec<Id<EmailObject>>, - pub position: u64, - pub query_state: String, - pub sort: Option<String>, - pub total: usize, -} - #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct EmailQuery { @@ -800,3 +786,40 @@ impl EmailChanges { EmailChanges { changes_call } } } + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct EmailQueryChanges { + #[serde(flatten)] + pub query_changes_call: QueryChanges<Filter<EmailFilterCondition, EmailObject>, EmailObject>, +} + +impl Method<EmailObject> for EmailQueryChanges { + const NAME: &'static str = "Email/queryChanges"; +} + +impl EmailQueryChanges { + pub fn new( + query_changes_call: QueryChanges<Filter<EmailFilterCondition, EmailObject>, EmailObject>, + ) -> Self { + EmailQueryChanges { query_changes_call } + } +} + +#[derive(Deserialize, Serialize, Debug)] +pub struct EmailQueryChangesResponse { + ///o The "collapseThreads" argument that was used with "Email/query". + #[serde(default = "bool_false")] + pub collapse_threads: bool, + #[serde(flatten)] + pub query_changes_response: QueryChangesResponse<EmailObject>, +} + +impl std::convert::TryFrom<&RawValue> for EmailQueryChangesResponse { + type Error = crate::error::MeliError; + fn try_from(t: &RawValue) -> Result<EmailQueryChangesResponse> { + let res: (String, EmailQueryChangesResponse, String) = serde_json::from_str(t.get())?; + assert_eq!(&res.0, "Email/queryChanges"); + Ok(res.1) + } +} diff --git a/melib/src/backends/jmap/protocol.rs b/melib/src/backends/jmap/protocol.rs index 382a56e2..93a2c4f6 100644 --- a/melib/src/backends/jmap/protocol.rs +++ b/melib/src/backends/jmap/protocol.rs @@ -23,7 +23,7 @@ use super::mailbox::JmapMailbox; use super::*; use serde::Serialize; use serde_json::{json, Value}; -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; pub type UtcDate = String; @@ -124,6 +124,12 @@ pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash, unread_emails, unread_threads, } = r; + let mut total_emails_set = LazyCountSet::default(); + total_emails_set.set_not_yet_seen(total_emails.try_into().unwrap_or(0)); + let total_emails = total_emails_set; + let mut unread_emails_set = LazyCountSet::default(); + unread_emails_set.set_not_yet_seen(unread_emails.try_into().unwrap_or(0)); + let unread_emails = unread_emails_set; let hash = id.into_hash(); let parent_hash = parent_id.clone().map(|id| id.into_hash()); ( @@ -145,6 +151,8 @@ pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash, total_threads, unread_emails: Arc::new(Mutex::new(unread_emails)), unread_threads, + email_state: Arc::new(Mutex::new(None)), + email_query_state: Arc::new(Mutex::new(None)), }, ) }) @@ -250,23 +258,59 @@ pub async fn fetch( let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap(); let e = GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?; + let query_response = QueryResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?; + store + .mailboxes + .write() + .unwrap() + .entry(mailbox_hash) + .and_modify(|mbox| { + *mbox.email_query_state.lock().unwrap() = Some(query_response.query_state); + }); let GetResponse::<EmailObject> { list, state, .. } = e; { let (is_empty, is_equal) = { - let current_state_lck = conn.store.email_state.lock().unwrap(); - (current_state_lck.is_empty(), *current_state_lck != state) + let mailboxes_lck = conn.store.mailboxes.read().unwrap(); + mailboxes_lck + .get(&mailbox_hash) + .map(|mbox| { + let current_state_lck = mbox.email_state.lock().unwrap(); + ( + current_state_lck.is_none(), + current_state_lck.as_ref() != Some(&state), + ) + }) + .unwrap_or((true, true)) }; if is_empty { + let mut mailboxes_lck = conn.store.mailboxes.write().unwrap(); debug!("{:?}: inserting state {}", EmailObject::NAME, &state); - *conn.store.email_state.lock().unwrap() = state; + mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| { + *mbox.email_state.lock().unwrap() = Some(state); + }); } else if !is_equal { - conn.email_changes().await?; + conn.email_changes(mailbox_hash).await?; } } + let mut total = BTreeSet::default(); + let mut unread = BTreeSet::default(); let mut ret = Vec::with_capacity(list.len()); for obj in list { - ret.push(store.add_envelope(obj)); + let env = store.add_envelope(obj); + total.insert(env.hash()); + if !env.is_seen() { + unread.insert(env.hash()); + } + ret.push(env); } + let mut mailboxes_lck = store.mailboxes.write().unwrap(); + mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| { + mbox.total_emails.lock().unwrap().insert_existing_set(total); + mbox.unread_emails + .lock() + .unwrap() + .insert_existing_set(unread); + }); Ok(ret) } diff --git a/melib/src/backends/jmap/rfc8620.rs b/melib/src/backends/jmap/rfc8620.rs index d49bb593..91b87919 100644 --- a/melib/src/backends/jmap/rfc8620.rs +++ b/melib/src/backends/jmap/rfc8620.rs @@ -1013,3 +1013,161 @@ pub struct UploadResponse { /// The size of the file in octets. pub size: usize, } + +/// #`queryChanges` +/// +/// The "Foo/queryChanges" method allows a client to efficiently update +/// the state of a cached query to match the new state on the server. It +/// takes the following arguments: +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct QueryChanges<F: FilterTrait<OBJ>, OBJ: Object> +where + OBJ: std::fmt::Debug + Serialize, +{ + pub account_id: Id<Account>, + pub filter: Option<F>, + pub sort: Option<Comparator<OBJ>>, + ///sinceQueryState: "String" + /// + ///The current state of the query in the client. This is the string + ///that was returned as the "queryState" argument in the "Foo/query" + ///response with the same sort/filter. The server will return the + ///changes made to the query since this state. + pub since_query_state: String, + ///o maxChanges: "UnsignedInt|null" + /// + ///The maximum number of changes to return in the response. See + ///error descriptions below for more details. + pub max_changes: Option<usize>, + ///o upToId: "Id|null" + /// + ///The last (highest-index) id the client currently has cached from + ///the query results. When there are a large number of results, in a + ///common case, the client may have only downloaded and cached a + ///small subset from the beginning of the results. If the sort and + ///filter are both only on immutable properties, this allows the + ///server to omit changes after this point in the results, which can + ///significantly increase efficiency. If they are not immutable, + ///this argument is ignored. + pub up_to_id: Option<Id<OBJ>>, + + ///o calculateTotal: "Boolean" (default: false) + /// + ///Does the client wish to know the total number of results now in + ///the query? This may be slow and expensive for servers to + ///calculate, particularly with complex filters, so clients should + ///take care to only request the total when needed. + #[serde(default = "bool_false")] + pub calculate_total: bool, + + #[serde(skip)] + _ph: PhantomData<fn() -> OBJ>, +} + +impl<F: FilterTrait<OBJ>, OBJ: Object> QueryChanges<F, OBJ> +where + OBJ: std::fmt::Debug + Serialize, +{ + pub fn new(account_id: Id<Account>, since_query_state: String) -> Self { + Self { + account_id, + filter: None, + sort: None, + since_query_state, + max_changes: None, + up_to_id: None, + calculate_total: false, + _ph: PhantomData, + } + } + _impl!(filter: Option<F>); + _impl!(sort: Option<Comparator<OBJ>>); + _impl!(max_changes: Option<usize>); + _impl!(up_to_id: Option<Id<OBJ>>); + _impl!(calculate_total: bool); +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct QueryChangesResponse<OBJ: Object> { + /// The id of the account used for the call. + pub account_id: Id<Account>, + /// This is the "sinceQueryState" argument echoed back; that is, the stat |