diff options
Diffstat (limited to 'melib/src/backends/imap/connection.rs')
-rw-r--r-- | melib/src/backends/imap/connection.rs | 384 |
1 files changed, 121 insertions, 263 deletions
diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index 601f39af..c24b8295 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -19,7 +19,7 @@ * along with meli. If not, see <http://www.gnu.org/licenses/>. */ -use super::protocol_parser::ImapLineSplit; +use super::protocol_parser::{ImapLineSplit, ImapResponse}; use crate::email::parser::BytesExt; use crate::error::*; use std::io::Read; @@ -27,6 +27,7 @@ use std::io::Write; extern crate native_tls; use fnv::FnvHashSet; use native_tls::TlsConnector; +use std::borrow::Cow; use std::iter::FromIterator; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; @@ -56,96 +57,6 @@ impl Drop for ImapStream { } impl ImapStream { - pub fn read_response(&mut self, ret: &mut String) -> Result<()> { - let id = format!("M{} ", self.cmd_id - 1); - self.read_lines(ret, &id) - } - - pub fn read_lines(&mut self, ret: &mut String, termination_string: &str) -> Result<()> { - let mut buf: [u8; 1024] = [0; 1024]; - ret.clear(); - let mut last_line_idx: usize = 0; - loop { - match self.stream.read(&mut buf) { - Ok(0) => break, - Ok(b) => { - ret.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..b]) }); - if let Some(mut pos) = ret[last_line_idx..].rfind("\r\n") { - if ret[last_line_idx..].starts_with("* BYE") { - return Err(MeliError::new("Disconnected")); - } - if let Some(prev_line) = - ret[last_line_idx..pos + last_line_idx].rfind("\r\n") - { - last_line_idx += prev_line + 2; - pos -= prev_line + 2; - } - if pos + "\r\n".len() == ret[last_line_idx..].len() { - if !termination_string.is_empty() - && ret[last_line_idx..].starts_with(termination_string) - { - debug!(&ret[last_line_idx..]); - ret.replace_range(last_line_idx.., ""); - break; - } else if termination_string.is_empty() { - break; - } - } - last_line_idx += pos + 2; - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - return Err(MeliError::from(e)); - } - } - } - Ok(()) - } - - pub fn wait_for_continuation_request(&mut self) -> Result<()> { - let term = "+ ".to_string(); - let mut ret = String::new(); - self.read_lines(&mut ret, &term) - } - - pub fn send_command(&mut self, command: &[u8]) -> Result<usize> { - let command = command.trim(); - self.stream.write_all(b"M")?; - self.stream.write_all(self.cmd_id.to_string().as_bytes())?; - self.stream.write_all(b" ")?; - let ret = self.cmd_id; - self.cmd_id += 1; - self.stream.write_all(command)?; - self.stream.write_all(b"\r\n")?; - debug!("sent: M{} {}", self.cmd_id - 1, unsafe { - std::str::from_utf8_unchecked(command) - }); - Ok(ret) - } - - pub fn send_literal(&mut self, data: &[u8]) -> Result<()> { - self.stream.write_all(data)?; - if !data.ends_with(b"\r\n") { - self.stream.write_all(b"\r\n")?; - } - self.stream.write_all(b"\r\n")?; - Ok(()) - } - - pub fn send_raw(&mut self, raw: &[u8]) -> Result<()> { - self.stream.write_all(raw)?; - self.stream.write_all(b"\r\n")?; - Ok(()) - } - - pub fn set_nonblocking(&mut self, val: bool) -> Result<()> { - self.stream.get_mut().set_nonblocking(val)?; - Ok(()) - } - pub fn new_connection(server_conf: &ImapServerConf) -> Result<(Capabilities, ImapStream)> { use std::io::prelude::*; use std::net::TcpStream; @@ -167,8 +78,8 @@ impl ImapStream { }; let mut socket = TcpStream::connect(&addr)?; - socket.set_read_timeout(Some(std::time::Duration::new(120, 0)))?; - socket.set_write_timeout(Some(std::time::Duration::new(120, 0)))?; + socket.set_read_timeout(Some(std::time::Duration::new(4, 0)))?; + socket.set_write_timeout(Some(std::time::Duration::new(4, 0)))?; let cmd_id = 0; if server_conf.use_starttls { socket.write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())?; @@ -274,7 +185,7 @@ impl ImapStream { let tag_start = format!("M{} ", (ret.cmd_id - 1)); loop { - ret.read_lines(&mut res, &String::new())?; + ret.read_lines(&mut res, &String::new(), false)?; let mut should_break = false; for l in res.split_rn() { if l.starts_with("* CAPABILITY") { @@ -317,6 +228,102 @@ impl ImapStream { Ok((capabilities, ret)) } } + + pub fn read_response(&mut self, ret: &mut String) -> Result<()> { + let id = format!("M{} ", self.cmd_id - 1); + self.read_lines(ret, &id, true) + } + + pub fn read_lines( + &mut self, + ret: &mut String, + termination_string: &str, + keep_termination_string: bool, + ) -> Result<()> { + let mut buf: [u8; 1024] = [0; 1024]; + ret.clear(); + let mut last_line_idx: usize = 0; + loop { + match self.stream.read(&mut buf) { + Ok(0) => break, + Ok(b) => { + ret.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..b]) }); + if let Some(mut pos) = ret[last_line_idx..].rfind("\r\n") { + if ret[last_line_idx..].starts_with("* BYE") { + return Err(MeliError::new("Disconnected")); + } + if let Some(prev_line) = + ret[last_line_idx..pos + last_line_idx].rfind("\r\n") + { + last_line_idx += prev_line + 2; + pos -= prev_line + 2; + } + if pos + "\r\n".len() == ret[last_line_idx..].len() { + if !termination_string.is_empty() + && ret[last_line_idx..].starts_with(termination_string) + { + debug!(&ret[last_line_idx..]); + if !keep_termination_string { + ret.replace_range(last_line_idx.., ""); + } + break; + } else if termination_string.is_empty() { + break; + } + } + last_line_idx += pos + 2; + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(MeliError::from(e)); + } + } + } + Ok(()) + } + + pub fn wait_for_continuation_request(&mut self) -> Result<()> { + let term = "+ ".to_string(); + let mut ret = String::new(); + self.read_lines(&mut ret, &term, false) + } + + pub fn send_command(&mut self, command: &[u8]) -> Result<()> { + let command = command.trim(); + self.stream.write_all(b"M")?; + self.stream.write_all(self.cmd_id.to_string().as_bytes())?; + self.stream.write_all(b" ")?; + self.cmd_id += 1; + self.stream.write_all(command)?; + self.stream.write_all(b"\r\n")?; + debug!("sent: M{} {}", self.cmd_id - 1, unsafe { + std::str::from_utf8_unchecked(command) + }); + Ok(()) + } + + pub fn send_literal(&mut self, data: &[u8]) -> Result<()> { + self.stream.write_all(data)?; + if !data.ends_with(b"\r\n") { + self.stream.write_all(b"\r\n")?; + } + self.stream.write_all(b"\r\n")?; + Ok(()) + } + + pub fn send_raw(&mut self, raw: &[u8]) -> Result<()> { + self.stream.write_all(raw)?; + self.stream.write_all(b"\r\n")?; + Ok(()) + } + + pub fn set_nonblocking(&mut self, val: bool) -> Result<()> { + self.stream.get_mut().set_nonblocking(val)?; + Ok(()) + } } impl ImapConnection { @@ -333,188 +340,39 @@ impl ImapConnection { } pub fn read_response(&mut self, ret: &mut String) -> Result<()> { - if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() { - if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { - *status = Err(MeliError::new("Connection timed out")); - self.stream = Err(MeliError::new("Connection timed out")); - } - } - - if let Ok(ref mut stream) = self.stream { - if let Ok(_) = stream.read_response(ret) { - return Ok(()); - } - } - let new_stream = ImapStream::new_connection(&self.server_conf); - if new_stream.is_err() { - *self.online.lock().unwrap() = ( - Instant::now(), - Err(new_stream.as_ref().unwrap_err().clone()), - ); - } else { - *self.online.lock().unwrap() = (Instant::now(), Ok(())); - } - let (capabilities, mut stream) = new_stream?; - let ret = stream.read_response(ret); - if ret.is_ok() { - self.stream = Ok(stream); - self.capabilities = capabilities; - } - ret + let res = self.try_send(|s| s.read_response(ret)); + let r: Result<()> = ImapResponse::from(&ret).into(); + r } pub fn read_lines(&mut self, ret: &mut String, termination_string: String) -> Result<()> { - if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() { - if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { - *status = Err(MeliError::new("Connection timed out")); - self.stream = Err(MeliError::new("Connection timed out")); - } - } - - if let Ok(ref mut stream) = self.stream { - if let Ok(_) = stream.read_lines(ret, &termination_string) { - return Ok(()); - } - } - let new_stream = ImapStream::new_connection(&self.server_conf); - if new_stream.is_err() { - *self.online.lock().unwrap() = ( - Instant::now(), - Err(new_stream.as_ref().unwrap_err().clone()), - ); - } else { - *self.online.lock().unwrap() = (Instant::now(), Ok(())); - } - let (capabilities, mut stream) = new_stream?; - let ret = stream.read_lines(ret, &termination_string); - if ret.is_ok() { - self.stream = Ok(stream); - self.capabilities = capabilities; - } - ret + self.try_send(|s| s.read_lines(ret, &termination_string, false)) } pub fn wait_for_continuation_request(&mut self) -> Result<()> { - if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() { - if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { - *status = Err(MeliError::new("Connection timed out")); - self.stream = Err(MeliError::new("Connection timed out")); - } - } - if let Ok(ref mut stream) = self.stream { - if let Ok(_) = stream.wait_for_continuation_request() { - return Ok(()); - } - } - let new_stream = ImapStream::new_connection(&self.server_conf); - if new_stream.is_err() { - *self.online.lock().unwrap() = ( - Instant::now(), - Err(new_stream.as_ref().unwrap_err().clone()), - ); - } else { - *self.online.lock().unwrap() = (Instant::now(), Ok(())); - } - let (capabilities, mut stream) = new_stream?; - let ret = stream.wait_for_continuation_request(); - if ret.is_ok() { - self.stream = Ok(stream); - self.capabilities = capabilities; - } - ret + self.try_send(|s| s.wait_for_continuation_request()) } - pub fn send_command(&mut self, command: &[u8]) -> Result<usize> { - if let (instant, ref mut status) = *self.online.lock().unwrap() { - if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { - *status = Err(MeliError::new("Connection timed out")); - self.stream = Err(MeliError::new("Connection timed out")); - } - } - if let Ok(ref mut stream) = self.stream { - if let Ok(ret) = stream.send_command(command) { - return Ok(ret); - } - } - let new_stream = ImapStream::new_connection(&self.server_conf); - if new_stream.is_err() { - *self.online.lock().unwrap() = ( - Instant::now(), - Err(new_stream.as_ref().unwrap_err().clone()), - ); - } else { - *self.online.lock().unwrap() = (Instant::now(), Ok(())); - } - let (capabilities, mut stream) = new_stream?; - let ret = stream.send_command(command); - if ret.is_ok() { - self.stream = Ok(stream); - self.capabilities = capabilities; - } - ret + pub fn send_command(&mut self, command: &[u8]) -> Result<()> { + self.try_send(|s| s.send_command(command)) } pub fn send_literal(&mut self, data: &[u8]) -> Result<()> { - if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() { - if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { - *status = Err(MeliError::new("Connection timed out")); - self.stream = Err(MeliError::new("Connection timed out")); - } - } - if let Ok(ref mut stream) = self.stream { - if let Ok(_) = stream.send_literal(data) { - return Ok(()); - } - } - let new_stream = ImapStream::new_connection(&self.server_conf); - if new_stream.is_err() { - *self.online.lock().unwrap() = ( - Instant::now(), - Err(new_stream.as_ref().unwrap_err().clone()), - ); - } else { - *self.online.lock().unwrap() = (Instant::now(), Ok(())); - } - let (capabilities, mut stream) = new_stream?; - let ret = stream.send_literal(data); - if ret.is_ok() { - self.stream = Ok(stream); - self.capabilities = capabilities; - } - ret + self.try_send(|s| s.send_literal(data)) } pub fn send_raw(&mut self, raw: &[u8]) -> Result<()> { - if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() { - if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { - *status = Err(MeliError::new("Connection timed out")); - self.stream = Err(MeliError::new("Connection timed out")); - } - } - if let Ok(ref mut stream) = self.stream { - if let Ok(_) = stream.send_raw(raw) { - return Ok(()); - } - } - let new_stream = ImapStream::new_connection(&self.server_conf); - if new_stream.is_err() { - *self.online.lock().unwrap() = ( - Instant::now(), - Err(new_stream.as_ref().unwrap_err().clone()), - ); - } else { - *self.online.lock().unwrap() = (Instant::now(), Ok(())); - } - let (capabilities, mut stream) = new_stream?; - let ret = stream.send_raw(raw); - if ret.is_ok() { - self.stream = Ok(stream); - self.capabilities = capabilities; - } - ret + self.try_send(|s| s.send_raw(raw)) } pub fn set_nonblocking(&mut self, val: bool) -> Result<()> { + self.try_send(|s| s.set_nonblocking(val)) + } + + pub fn try_send( + &mut self, + mut action: impl FnMut(&mut ImapStream) -> Result<()>, + ) -> Result<()> { if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() { if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { *status = Err(MeliError::new("Connection timed out")); @@ -522,7 +380,7 @@ impl ImapConnection { } } if let Ok(ref mut stream) = self.stream { - if let Ok(_) = stream.set_nonblocking(val) { + if let Ok(_) = action(stream) { return Ok(()); } } @@ -536,7 +394,7 @@ impl ImapConnection { *self.online.lock().unwrap() = (Instant::now(), Ok(())); } let (capabilities, mut stream) = new_stream?; - let ret = stream.set_nonblocking(val); + let ret = action(&mut stream); if ret.is_ok() { self.stream = Ok(stream); self.capabilities = capabilities; @@ -562,7 +420,7 @@ impl From<ImapConnection> for ImapBlockingConnection { .map(|s| { s.stream .get_mut() - .set_write_timeout(Some(std::time::Duration::new(5 * 60, 0))) + .set_write_timeout(Some(std::time::Duration::new(30, 0))) .expect("set_write_timeout call failed") }) .expect("set_write_timeout call failed"); @@ -571,7 +429,7 @@ impl From<ImapConnection> for ImapBlockingConnection { .map(|s| { s.stream .get_mut() - .set_read_timeout(Some(std::time::Duration::new(5 * 60, 0))) + .set_read_timeout(Some(std::time::Duration::new(30, 0))) .expect("set_read_timeout call failed") }) .expect("set_read_timeout call failed"); |