summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorManos Pitsidianakis <el13635@mail.ntua.gr>2020-09-21 16:17:37 +0300
committerManos Pitsidianakis <el13635@mail.ntua.gr>2020-09-21 16:17:37 +0300
commit19d4a191d81c2f2225d8825d6645f061eda84468 (patch)
treefd06889939abff5b4824e5c7131e255a0f7611df
parent20dd4cfaf6171f3bd5155982b1227e00c9445d5b (diff)
melib/jmap: add email state sync
-rw-r--r--melib/src/backends/jmap.rs261
-rw-r--r--melib/src/backends/jmap/connection.rs153
-rw-r--r--melib/src/backends/jmap/mailbox.rs4
-rw-r--r--melib/src/backends/jmap/objects/email.rs17
-rw-r--r--melib/src/backends/jmap/operations.rs20
-rw-r--r--melib/src/backends/jmap/protocol.rs118
-rw-r--r--melib/src/backends/jmap/rfc8620.rs2
7 files changed, 383 insertions, 192 deletions
diff --git a/melib/src/backends/jmap.rs b/melib/src/backends/jmap.rs
index a16838c0..31e58ab3 100644
--- a/melib/src/backends/jmap.rs
+++ b/melib/src/backends/jmap.rs
@@ -27,12 +27,26 @@ use futures::lock::Mutex as FutureMutex;
use isahc::prelude::HttpClient;
use isahc::ResponseExt;
use serde_json::Value;
-use std::collections::{BTreeMap, HashMap};
+use std::collections::{hash_map::DefaultHasher, BTreeMap, HashMap, HashSet};
use std::convert::TryFrom;
+use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
+macro_rules! tag_hash {
+ ($t:ident) => {{
+ let mut hasher = DefaultHasher::default();
+ $t.hash(&mut hasher);
+ hasher.finish()
+ }};
+ ($t:literal) => {{
+ let mut hasher = DefaultHasher::default();
+ $t.hash(&mut hasher);
+ hasher.finish()
+ }};
+}
+
#[macro_export]
macro_rules! _impl {
($(#[$outer:meta])*$field:ident : $t:ty) => {
@@ -131,7 +145,7 @@ impl JmapServerConf {
}
}
-struct IsSubscribedFn(Box<dyn Fn(&str) -> bool + Send + Sync>);
+pub struct IsSubscribedFn(Box<dyn Fn(&str) -> bool + Send + Sync>);
impl std::fmt::Debug for IsSubscribedFn {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -173,24 +187,116 @@ macro_rules! get_conf_val {
};
}
-#[derive(Debug, Default)]
+#[derive(Debug)]
pub struct Store {
- byte_cache: HashMap<EnvelopeHash, EnvelopeCache>,
- id_store: HashMap<EnvelopeHash, Id>,
- blob_id_store: HashMap<EnvelopeHash, Id>,
+ pub account_name: Arc<String>,
+ pub account_hash: AccountHash,
+ pub account_id: Arc<Mutex<String>>,
+ pub byte_cache: Arc<Mutex<HashMap<EnvelopeHash, EnvelopeCache>>>,
+ pub id_store: Arc<Mutex<HashMap<EnvelopeHash, Id>>>,
+ pub reverse_id_store: Arc<Mutex<HashMap<Id, EnvelopeHash>>>,
+ pub blob_id_store: Arc<Mutex<HashMap<EnvelopeHash, Id>>>,
+ pub tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
+ pub mailboxes: Arc<RwLock<HashMap<MailboxHash, JmapMailbox>>>,
+ pub mailboxes_index: Arc<RwLock<HashMap<MailboxHash, HashSet<EnvelopeHash>>>>,
+ pub object_set_states: Arc<Mutex<HashMap<&'static str, String>>>,
+ pub online_status: Arc<FutureMutex<(Instant, Result<()>)>>,
+ pub is_subscribed: Arc<IsSubscribedFn>,
+ pub event_consumer: BackendEventConsumer,
+}
+
+impl Store {
+ pub fn add_envelope(&self, obj: EmailObject) -> Envelope {
+ let mut tag_lck = self.tag_index.write().unwrap();
+ let tags = obj
+ .keywords()
+ .keys()
+ .map(|tag| {
+ let tag_hash = {
+ let mut hasher = DefaultHasher::default();
+ tag.hash(&mut hasher);
+ hasher.finish()
+ };
+ if !tag_lck.contains_key(&tag_hash) {
+ tag_lck.insert(tag_hash, tag.to_string());
+ }
+ tag_hash
+ })
+ .collect::<SmallVec<[u64; 1024]>>();
+ let id = obj.id.clone();
+ let mailbox_ids = obj.mailbox_ids.clone();
+ let blob_id = obj.blob_id.clone();
+ drop(tag_lck);
+ let mut ret: Envelope = obj.into();
+
+ debug_assert_eq!(tag_hash!("$draft"), 6613915297903591176);
+ debug_assert_eq!(tag_hash!("$seen"), 1683863812294339685);
+ debug_assert_eq!(tag_hash!("$flagged"), 2714010747478170100);
+ debug_assert_eq!(tag_hash!("$answered"), 8940855303929342213);
+ debug_assert_eq!(tag_hash!("$junk"), 2656839745430720464);
+ debug_assert_eq!(tag_hash!("$notjunk"), 4091323799684325059);
+ let mut id_store_lck = self.id_store.lock().unwrap();
+ let mut reverse_id_store_lck = self.reverse_id_store.lock().unwrap();
+ let mut blob_id_store_lck = self.blob_id_store.lock().unwrap();
+ let mailboxes_lck = self.mailboxes.read().unwrap();
+ let mut mailboxes_index_lck = self.mailboxes_index.write().unwrap();
+ for (mailbox_id, _) in mailbox_ids {
+ if let Some((mailbox_hash, _)) = mailboxes_lck.iter().find(|(_, m)| m.id == mailbox_id)
+ {
+ mailboxes_index_lck
+ .entry(*mailbox_hash)
+ .or_default()
+ .insert(ret.hash());
+ }
+ }
+ reverse_id_store_lck.insert(id.clone(), ret.hash());
+ id_store_lck.insert(ret.hash(), id);
+ blob_id_store_lck.insert(ret.hash(), blob_id);
+ for t in tags {
+ match t {
+ 6613915297903591176 => {
+ ret.set_flags(ret.flags() | Flag::DRAFT);
+ }
+ 1683863812294339685 => {
+ ret.set_flags(ret.flags() | Flag::SEEN);
+ }
+ 2714010747478170100 => {
+ ret.set_flags(ret.flags() | Flag::FLAGGED);
+ }
+ 8940855303929342213 => {
+ ret.set_flags(ret.flags() | Flag::REPLIED);
+ }
+ 2656839745430720464 | 4091323799684325059 => { /* ignore */ }
+ _ => ret.labels_mut().push(t),
+ }
+ }
+ ret
+ }
+
+ pub fn remove_envelope(
+ &self,
+ obj_id: Id,
+ ) -> Option<(EnvelopeHash, SmallVec<[MailboxHash; 8]>)> {
+ let env_hash = self.reverse_id_store.lock().unwrap().remove(&obj_id)?;
+ self.id_store.lock().unwrap().remove(&env_hash);
+ self.blob_id_store.lock().unwrap().remove(&env_hash);
+ self.byte_cache.lock().unwrap().remove(&env_hash);
+ let mut mailbox_hashes = SmallVec::new();
+ let mailboxes_lck = self.mailboxes.read().unwrap();
+ for (k, set) in self.mailboxes_index.write().unwrap().iter_mut() {
+ if set.remove(&env_hash) {
+ mailbox_hashes.push(*k);
+ }
+ }
+ Some((env_hash, mailbox_hashes))
+ }
}
#[derive(Debug)]
pub struct JmapType {
- account_name: String,
- account_hash: AccountHash,
- online: Arc<FutureMutex<(Instant, Result<()>)>>,
- is_subscribed: Arc<IsSubscribedFn>,
server_conf: JmapServerConf,
connection: Arc<FutureMutex<JmapConnection>>,
- store: Arc<RwLock<Store>>,
- tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
- mailboxes: Arc<RwLock<HashMap<MailboxHash, JmapMailbox>>>,
+ store: Arc<Store>,
}
impl MailBackend for JmapType {
@@ -207,7 +313,7 @@ impl MailBackend for JmapType {
}
fn is_online(&self) -> ResultFuture<()> {
- let online = self.online.clone();
+ let online = self.store.online_status.clone();
Ok(Box::pin(async move {
//match timeout(std::time::Duration::from_secs(3), connection.lock()).await {
let online_lck = online.lock().await;
@@ -224,9 +330,7 @@ impl MailBackend for JmapType {
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
- let mailboxes = self.mailboxes.clone();
let store = self.store.clone();
- let tag_index = self.tag_index.clone();
let connection = self.connection.clone();
Ok(Box::pin(async_stream::try_stream! {
let mut conn = connection.lock().await;
@@ -234,8 +338,6 @@ impl MailBackend for JmapType {
let res = protocol::fetch(
&conn,
&store,
- &tag_index,
- &mailboxes,
mailbox_hash,
).await?;
yield res;
@@ -243,7 +345,13 @@ impl MailBackend for JmapType {
}
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
- Err(MeliError::new("Unimplemented."))
+ let connection = self.connection.clone();
+ Ok(Box::pin(async move {
+ let mut conn = connection.lock().await;
+ conn.connect().await?;
+ conn.email_changes().await?;
+ Ok(())
+ }))
}
fn watch(&self) -> ResultFuture<()> {
@@ -251,17 +359,18 @@ impl MailBackend for JmapType {
}
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
- let mailboxes = self.mailboxes.clone();
+ let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
let mut conn = connection.lock().await;
conn.connect().await?;
- if mailboxes.read().unwrap().is_empty() {
+ if store.mailboxes.read().unwrap().is_empty() {
let new_mailboxes = debug!(protocol::get_mailboxes(&conn).await)?;
- *mailboxes.write().unwrap() = new_mailboxes;
+ *store.mailboxes.write().unwrap() = new_mailboxes;
}
- let ret = mailboxes
+ let ret = store
+ .mailboxes
.read()
.unwrap()
.iter()
@@ -287,7 +396,7 @@ impl MailBackend for JmapType {
mailbox_hash: MailboxHash,
_flags: Option<Flag>,
) -> ResultFuture<()> {
- let mailboxes = self.mailboxes.clone();
+ let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
let mut conn = connection.lock().await;
@@ -305,7 +414,7 @@ impl MailBackend for JmapType {
.await?;
let mailbox_id: String = {
- let mailboxes_lck = mailboxes.read().unwrap();
+ let mailboxes_lck = store.mailboxes.read().unwrap();
if let Some(mailbox) = mailboxes_lck.get(&mailbox_hash) {
mailbox.id.clone()
} else {
@@ -360,7 +469,7 @@ impl MailBackend for JmapType {
}
fn tags(&self) -> Option<Arc<RwLock<BTreeMap<u64, String>>>> {
- Some(self.tag_index.clone())
+ Some(self.store.tag_index.clone())
}
fn as_any(&self) -> &dyn Any {
@@ -376,9 +485,12 @@ impl MailBackend for JmapType {
q: crate::search::Query,
mailbox_hash: Option<MailboxHash>,
) -> ResultFuture<SmallVec<[EnvelopeHash; 512]>> {
+ let store = self.store.clone();
let connection = self.connection.clone();
let filter = if let Some(mailbox_hash) = mailbox_hash {
- let mailbox_id = self.mailboxes.read().unwrap()[&mailbox_hash].id.clone();
+ let mailbox_id = self.store.mailboxes.read().unwrap()[&mailbox_hash]
+ .id
+ .clone();
let mut f = Filter::Condition(
EmailFilterCondition::new()
@@ -412,14 +524,13 @@ impl MailBackend for JmapType {
let res_text = res.text_async().await?;
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
- *conn.online_status.lock().await = (std::time::Instant::now(), Ok(()));
+ *store.online_status.lock().await = (std::time::Instant::now(), Ok(()));
let m = QueryResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
let QueryResponse::<EmailObject> { ids, .. } = m;
let ret = ids
.into_iter()
.map(|id| {
- use std::hash::Hasher;
- let mut h = std::collections::hash_map::DefaultHasher::new();
+ let mut h = DefaultHasher::new();
h.write(id.as_bytes());
h.finish()
})
@@ -450,12 +561,11 @@ impl MailBackend for JmapType {
destination_mailbox_hash: MailboxHash,
move_: bool,
) -> ResultFuture<()> {
- let mailboxes = self.mailboxes.clone();
let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
let (source_mailbox_id, destination_mailbox_id) = {
- let mailboxes_lck = mailboxes.read().unwrap();
+ let mailboxes_lck = store.mailboxes.read().unwrap();
if !mailboxes_lck.contains_key(&source_mailbox_hash) {
return Err(MeliError::new(format!(
"Could not find source mailbox with hash {}",
@@ -489,9 +599,8 @@ impl MailBackend for JmapType {
);
}
{
- let store_lck = store.read().unwrap();
for env_hash in env_hashes.iter() {
- if let Some(id) = store_lck.id_store.get(&env_hash) {
+ if let Some(id) = store.id_store.lock().unwrap().get(&env_hash) {
ids.push(id.clone());
id_map.insert(id.clone(), env_hash);
update_map.insert(id.clone(), serde_json::json!(update_keywords.clone()));
@@ -517,7 +626,7 @@ impl MailBackend for JmapType {
let res_text = res.text_async().await?;
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
- *conn.online_status.lock().await = (std::time::Instant::now(), Ok(()));
+ *store.online_status.lock().await = (std::time::Instant::now(), Ok(()));
let m = SetResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
if let Some(ids) = m.not_updated {
if !ids.is_empty() {
@@ -540,13 +649,10 @@ impl MailBackend for JmapType {
mailbox_hash: MailboxHash,
flags: SmallVec<[(std::result::Result<Flag, String>, bool); 8]>,
) -> ResultFuture<()> {
- let mailboxes = self.mailboxes.clone();
let store = self.store.clone();
- let account_hash = self.account_hash;
- let tag_index = self.tag_index.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
- let mailbox_id = mailboxes.read().unwrap()[&mailbox_hash].id.clone();
+ let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone();
let mut update_map: HashMap<String, Value> = HashMap::default();
let mut ids: Vec<Id> = Vec::with_capacity(env_hashes.rest.len() + 1);
let mut id_map: HashMap<Id, EnvelopeHash> = HashMap::default();
@@ -587,9 +693,8 @@ impl MailBackend for JmapType {
}
}
{
- let store_lck = store.read().unwrap();
for hash in env_hashes.iter() {
- if let Some(id) = store_lck.id_store.get(&hash) {
+ if let Some(id) = store.id_store.lock().unwrap().get(&hash) {
ids.push(id.clone());
id_map.insert(id.clone(), hash);
update_map.insert(id.clone(), serde_json::json!(update_keywords.clone()));
@@ -626,7 +731,7 @@ impl MailBackend for JmapType {
*/
//debug!("res_text = {}", &res_text);
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
- *conn.online_status.lock().await = (std::time::Instant::now(), Ok(()));
+ *store.online_status.lock().await = (std::time::Instant::now(), Ok(()));
let m = SetResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
if let Some(ids) = m.not_updated {
return Err(MeliError::new(
@@ -637,24 +742,45 @@ impl MailBackend for JmapType {
));
}
- let mut tag_index_lck = tag_index.write().unwrap();
- for (flag, value) in flags.iter() {
- match flag {
- Ok(f) => {}
- Err(t) => {
- if *value {
- tag_index_lck.insert(tag_hash!(t), t.clone());
+ {
+ let mut tag_index_lck = store.tag_index.write().unwrap();
+ for (flag, value) in flags.iter() {
+ match flag {
+ Ok(f) => {}
+ Err(t) => {
+ if *value {
+ tag_index_lck.insert(tag_hash!(t), t.clone());
+ }
}
}
}
+ drop(tag_index_lck);
}
let e = GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
let GetResponse::<EmailObject> { list, state, .. } = e;
- //debug!(&list);
+ {
+ let c = store
+ .object_set_states
+ .lock()
+ .unwrap()
+ .get(&EmailObject::NAME)
+ .map(|prev_state| *prev_state == state);
+ if let Some(false) = c {
+ conn.email_changes().await?;
+ } else {
+ debug!("{:?}: inserting state {}", EmailObject::NAME, &state);
+ store
+ .object_set_states
+ .lock()
+ .unwrap()
+ .insert(EmailObject::NAME, state);
+ }
+ }
+ debug!(&list);
for envobj in list {
let env_hash = id_map[&envobj.id];
conn.add_refresh_event(RefreshEvent {
- account_hash,
+ account_hash: store.account_hash,
mailbox_hash,
kind: RefreshEventKind::NewFlags(
env_hash,
@@ -673,34 +799,41 @@ impl JmapType {
is_subscribed: Box<dyn Fn(&str) -> bool + Send + Sync>,
event_consumer: BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
- let online = Arc::new(FutureMutex::new((
+ let online_status = Arc::new(FutureMutex::new((
std::time::Instant::now(),
Err(MeliError::new("Account is uninitialised.")),
)));
let server_conf = JmapServerConf::new(s)?;
let account_hash = {
- use std::collections::hash_map::DefaultHasher;
- use std::hash::Hasher;
let mut hasher = DefaultHasher::new();
hasher.write(s.name.as_bytes());
hasher.finish()
};
+ let store = Arc::new(Store {
+ account_name: Arc::new(s.name.clone()),
+ account_hash,
+ account_id: Arc::new(Mutex::new(String::new())),
+ online_status,
+ event_consumer,
+ is_subscribed: Arc::new(IsSubscribedFn(is_subscribed)),
+
+ byte_cache: Default::default(),
+ id_store: Default::default(),
+ reverse_id_store: Default::default(),
+ blob_id_store: Default::default(),
+ tag_index: Default::default(),
+ mailboxes: Default::default(),
+ mailboxes_index: Default::default(),
+ object_set_states: Default::default(),
+ });
Ok(Box::new(JmapType {
connection: Arc::new(FutureMutex::new(JmapConnection::new(
&server_conf,
- account_hash,
- event_consumer,
- online.clone(),
+ store.clone(),
)?)),
- store: Arc::new(RwLock::new(Store::default())),
- tag_index: Arc::new(RwLock::new(Default::default())),
- mailboxes: Arc::new(RwLock::new(HashMap::default())),
- account_name: s.name.clone(),
- account_hash,
- online,
- is_subscribed: Arc::new(IsSubscribedFn(is_subscribed)),
+ store,
server_conf,
}))
}
diff --git a/melib/src/backends/jmap/connection.rs b/melib/src/backends/jmap/connection.rs
index 415795eb..af46bba6 100644
--- a/melib/src/backends/jmap/connection.rs
+++ b/melib/src/backends/jmap/connection.rs
@@ -27,21 +27,12 @@ pub struct JmapConnection {
pub session: JmapSession,
pub request_no: Arc<Mutex<usize>>,
pub client: Arc<HttpClient>,
- pub online_status: Arc<FutureMutex<(Instant, Result<()>)>>,
pub server_conf: JmapServerConf,
- pub account_id: Arc<Mutex<String>>,
- pub account_hash: AccountHash,
- pub method_call_states: Arc<Mutex<HashMap<&'static str, String>>>,
- pub event_consumer: BackendEventConsumer,
+ pub store: Arc<Store>,
}
impl JmapConnection {
- pub fn new(
- server_conf: &JmapServerConf,
- account_hash: AccountHash,
- event_consumer: BackendEventConsumer,
- online_status: Arc<FutureMutex<(Instant, Result<()>)>>,
- ) -> Result<Self> {
+ pub fn new(server_conf: &JmapServerConf, store: Arc<Store>) -> Result<Self> {
let client = HttpClient::builder()
.timeout(std::time::Duration::from_secs(10))
.authentication(isahc::auth::Authentication::basic())
@@ -55,17 +46,13 @@ impl JmapConnection {
session: Default::default(),
request_no: Arc::new(Mutex::new(0)),
client: Arc::new(client),
- online_status,
server_conf,
- account_id: Arc::new(Mutex::new(String::new())),
- account_hash,
- event_consumer,
- method_call_states: Arc::new(Mutex::new(Default::default())),
+ store,
})
}
pub async fn connect(&mut self) -> Result<()> {
- if self.online_status.lock().await.1.is_ok() {
+ if self.store.online_status.lock().await.1.is_ok() {
return Ok(());
}
let mut jmap_session_resource_url =
@@ -86,7 +73,7 @@ impl JmapConnection {
let session: JmapSession = match serde_json::from_str(&res_text) {
Err(err) => {
let err = MeliError::new(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nReply from server: {}", &self.server_conf.server_hostname, &res_text)).set_source(Some(Arc::new(err)));
- *self.online_status.lock().await = (Instant::now(), Err(err.clone()));
+ *self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
return Err(err);
}
Ok(s) => s,
@@ -96,7 +83,7 @@ impl JmapConnection {
.contains_key("urn:ietf:params:jmap:core")
{
let err = MeliError::new(format!("Server {} did not return JMAP Core capability (urn:ietf:params:jmap:core). Returned capabilities were: {}", &self.server_conf.server_hostname, session.capabilities.keys().map(String::as_str).collect::<Vec<&str>>().join(", ")));
- *self.online_status.lock().await = (Instant::now(), Err(err.clone()));
+ *self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
return Err(err);
}
if !session
@@ -104,11 +91,11 @@ impl JmapConnection {
.contains_key("urn:ietf:params:jmap:mail")
{
let err = MeliError::new(format!("Server {} does not support JMAP Mail capability (urn:ietf:params:jmap:mail). Returned capabilities were: {}", &self.server_conf.server_hostname, session.capabilities.keys().map(String::as_str).collect::<Vec<&str>>().join(", ")));
- *self.online_status.lock().await = (Instant::now(), Err(err.clone()));
+ *self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
return Err(err);
}
- *self.online_status.lock().await = (Instant::now(), Ok(()));
+ *self.store.online_status.lock().await = (Instant::now(), Ok(()));
self.session = session;
Ok(())
}
@@ -118,6 +105,128 @@ impl JmapConnection {
}
pub fn add_refresh_event(&self, event: RefreshEvent) {
- (self.event_consumer)(self.account_hash, BackendEvent::Refresh(event));
+ (self.store.event_consumer)(self.store.account_hash, BackendEvent::Refresh(event));
+ }
+
+ pub async fn email_changes(&self) -> Result<()> {
+ let mut current_state: String = {
+ let object_set_states_lck = self.store.object_set_states.lock().unwrap();
+ let v = if let Some(prev_state) = debug!(object_set_states_lck.get(&EmailObject::NAME))
+ {
+ prev_state.clone()
+ } else {
+ return Ok(());
+ };
+ drop(object_set_states_lck);
+ v
+ };
+ loop {
+
+ let email_changes_call: EmailChanges = EmailChanges::new(
+ Changes::<EmailObject>::new()
+ .account_id(self.mail_account_id().to_string())
+ .since_state(current_state.clone()),
+ );
+
+ let mut req = Request::new(self.request_no.clone());
+ let prev_seq = req.add_call(&email_changes_call);
+ let email_get_call: EmailGet = EmailGet::new(
+ Get::new()
+ .ids(Some(JmapArgument::reference(
+ prev_seq,
+ ResultField::<EmailChanges, EmailObject>::new("created"),
+ )))
+ .account_id(self.mail_account_id().to_string()),
+ );
+
+ req.add_call(&email_get_call);
+
+ let mut res = self
+ .client
+ .post_async(&self.session.api_url, serde_json::to_string(&req)?)
+ .await?;
+
+ let res_text = res.text_async().await?;
+ debug!(&res_text);
+ let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
+ let get_response =
+ GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
+ debug!(&get_response);
+ let GetResponse::<EmailObject> { list, .. } = get_response;
+ let mut mailbox_hashes: Vec<SmallVec<[MailboxHash; 8]>> =
+ Vec::with_capacity(list.len());
+ for envobj in &list {
+ let v = self
+ .store
+ .mailboxes
+ .read()
+ .unwrap()
+ .iter()
+ .filter(|(_, m)| envobj.mailbox_ids.contains_key(&m.id))
+ .map(|(k, _)| *k)
+ .collect::<SmallVec<[MailboxHash; 8]>>();
+ mailbox_hashes.push(v);
+ }
+
+ for (env, mailbox_hashes) in list
+ .into_iter()
+ .map(|obj| self.store.add_envelope(obj))
+ .zip(mailbox_hashes)
+ {
+ for mailbox_hash in mailbox_hashes.iter().skip(1).cloned() {
+ self.add_refresh_event(RefreshEvent {
+ account_hash: self.store.account_hash,
+ mailbox_hash,
+ kind: RefreshEventKind::Create(Box::new(env.clone())),
+ });
+ }
+ if let Some(mailbox_hash) = mailbox_hashes.first().cloned() {
+ self.add_refresh_event(RefreshEvent {
+ account_hash: self.store.account_hash,
+ mailbox_hash,
+ kind: RefreshEventKind::Create(Box::new(env)),
+ });
+ }
+ }
+
+ let changes_response =
+ ChangesResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
+
+ let ChangesResponse::<EmailObject> {
+ account_id: _,
+ new_state,
+ old_state: _,
+ has_more_changes,
+ created: _,
+ updated,
+ destroyed,
+ _ph: _,
+ } = changes_response;
+ for (env_hash, mailbox_hashes) in destroyed
+ .into_iter()
+ .filter_map(|obj_id| self.store.remove_envelope(obj_id))
+ {
+ for mailbox_hash in mailbox_hashes {
+ self.add_refresh_event(RefreshEvent {
+ account_hash: self.store.account_hash,
+ mailbox_hash,
+ kind: RefreshEventKind::Remove(env_hash),
+ });
+ }
+ }
+
+ if has_more_changes {
+ current_state = new_state;
+ } else {
+ self.store
+ .object_set_states
+ .lock()
+ .unwrap()
+ .insert(EmailObject::NAME, new_state);
+ break;
+ }
+ }
+
+ Ok(())
}
}
diff --git a/melib/src/backends/jmap/mailbox.rs b/melib/src/backends/jmap/mailbox.rs
index cbed47e4..cf1987a4 100644
--- a/melib/src/backends/jmap/mailbox.rs
+++ b/melib/src/backends/jmap/mailbox.rs
@@ -28,7 +28,7 @@ pub struct JmapMailbox {
pub name: String,
pub path: String,
pub hash: MailboxHash,
- pub v: Vec<MailboxHash>,
+ pub children: Vec<MailboxHash>,
pub id: String,
pub is_subscribed: bool,
pub my_rights: JmapRights,
@@ -62,7 +62,7 @@ impl BackendMailbox for JmapMailbox {
}
fn children(&self) -> &[MailboxHash] {
- &self.v
+ &self.children
}
fn parent(&self) -> Option<MailboxHash> {
diff --git a/melib/src/backends/jmap/objects/email.rs b/melib/src/backends/jmap/objects/email.rs
index bb9f8fb0..415b62bc 100644
--- a/melib/src/backends/jmap/objects/email.rs
+++ b/melib/src/backends/jmap/objects/email.rs
@@ -770,3 +770,20 @@ impl EmailSet {
EmailSet { set_call }
}
}
+
+#[derive(Serialize, Debug)]
+#[serde(rename_all = "camelCase")]
+pub struct EmailChanges {
+ #[serde(flatten)]
+ pub changes_call: Changes<EmailObject>,
+}
+
+impl Method<EmailObject> for EmailChanges {
+ const NAME: &'static str = "Email/changes";
+}
+
+impl EmailChanges {
+ pub fn new(changes_call: Changes<EmailObject>) -> Self {
+ EmailChanges { changes_call }
+ }
+}
diff --git a/melib/src/backends/jmap/operations.rs b/melib/src/backends/jmap/operations.rs
index 0605b484..6358c71e 100644
--- a/melib/src/backends/jmap/operations.rs
+++ b/melib/src/backends/jmap/operations.rs
@@ -20,21 +20,21 @@
*/
use super::*;
-use std::sync::{Arc, RwLock};
+use std::sync::Arc;
/// `BackendOp` implementor for Imap
#[derive(Debug, Clone)]
pub struct JmapOp {
hash: EnvelopeHash,
connection: Arc<FutureMutex<JmapConnection>>,
- store: Arc<RwLock<Store>>,
+ store: Arc<Store>,
}
impl JmapOp {
pub fn new(
hash: EnvelopeHash,
connection: Arc<FutureMutex<JmapConnection>>,
- store: Arc<RwLock<Store>>,
+ store: Arc<Store>,
) -> Self {
JmapOp {
hash,
@@ -47,11 +47,9 @@ impl JmapOp {
impl BackendOp for JmapOp {
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
{
- let store_lck = self.store.read().unwrap();
- if store_lck.byte_cache.contains_key(&self.hash)
- && store_lck.byte_cache[&self.hash].bytes.is_some()
- {
- let ret = store_lck.byte_cache[&self.hash].bytes.clone().unwrap();
+ let byte_lck = self.store.byte_cache.lock().unwrap();
+ if byte_lck.contains_key(&self.hash) && byte_lck[&self.hash].bytes.is_some() {
+ let ret = byte_lck[&self.hash].bytes.clone().unwrap();
return Ok(Box::pin(async move { Ok(ret.into_bytes()) }));
}
}
@@ -59,7 +57,7 @@ impl BackendOp for JmapOp {
let hash = self.hash;
let connection = self.connection.clone();
Ok(Box::pin(async move {
- let blob_id = store.read().unwrap().blob_id_store[&hash].clone();
+ let blob_id = store.blob_id_store.lock().unwrap()[&hash].clone();
let mut conn = connection.lock().await;
conn.connect().await?;
let mut res = conn
@@ -75,9 +73,9 @@ impl BackendOp for JmapOp {
let res_text = res.text_async().await?;
store
- .write()
- .unwrap()
.byte_cache
+ .lock()
+ .unwrap()
.entry(hash)
.or_default()
.bytes = Some(res_text.clone());
diff --git a/melib/src/backends/jmap/protocol.rs b/melib/src/backends/jmap/protocol.rs
index 87e33b04..cf6b3d0c 100644
--- a/melib/src/backends/jmap/protocol.rs
+++ b/melib/src/backends/jmap/protocol.rs
@@ -23,11 +23,7 @@ use super::mailbox::JmapMailbox;
use super::*;
use serde::Serialize;
use serde_json::{json, Value};
-use smallvec::SmallVec;
-use std::collections::hash_map::DefaultHasher;
-use std::collections::HashMap;
use std::convert::TryFrom;
-use std::hash::{Hash, Hasher};
pub type Id = String;
pub type UtcDate = String;
@@ -43,19 +39,6 @@ macro_rules! get_request_no {
}};
}
-macro_rules! tag_hash {
- ($t:ident) => {{
- let mut hasher = DefaultHasher::default();
- $t.hash(&mut hasher);
- hasher.finish()
- }};
- ($t:literal) => {{
- let mut hasher = DefaultHasher::default();
- $t.hash(&mut hasher);
- hasher.finish()
- }};
-}
-
pub trait Response<OBJ: Object> {
const NAME: &'static str;
}
@@ -120,12 +103,12 @@ pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash,
let res_text = res.text_async().await?;
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
- *conn.online_status.lock().await = (std::time::Instant::now(), Ok(()));
+ *conn.store.online_status.lock().await = (std::time::Instant::now(), Ok(()));
let m = GetResponse::<MailboxObject>::try_from(v.method_responses.remove(0))?;
let GetResponse::<MailboxObject> {
list, account_id, ..
} = m;
- *conn.account_id.lock().unwrap() = account_id;
+ *conn.store.account_id.lock().unwrap() = account_id;
Ok(list
.into_iter()
.map(|r| {
@@ -149,7 +132,7 @@ pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash,
name: name.clone(),
hash,
path: name,
- v: Vec::new(),
+ children: Vec::new(),
id,
is_subscribed,
my_rights,
@@ -190,12 +173,13 @@ pub async fn get_message_list(conn: &JmapConnection, mailbox: &JmapMailbox) -> R
let res_text = res.text_async().await?;
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
- *conn.online_status.lock().await = (std::time::Instant::now(), Ok(()));
+ *conn.store.online_status.lock().await = (std::time::Instant::now(), Ok(()));
let m = QueryResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
let QueryResponse::<EmailObject> { ids, .. } = m;
Ok(ids)
}
+/*
pub async fn get_message(conn: &JmapConnection, ids: &[String]) -> Result<Vec<Envelope>> {
let email_call: EmailGet = EmailGet::new(
Get::new()
@@ -219,15 +203,14 @@ pub async fn get_message(conn: &JmapConnection, ids: &[String]) -> Res