summaryrefslogtreecommitdiffstats
path: root/melib/src/backends/imap/connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'melib/src/backends/imap/connection.rs')
-rw-r--r--melib/src/backends/imap/connection.rs384
1 files changed, 121 insertions, 263 deletions
diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs
index 601f39af..c24b8295 100644
--- a/melib/src/backends/imap/connection.rs
+++ b/melib/src/backends/imap/connection.rs
@@ -19,7 +19,7 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
-use super::protocol_parser::ImapLineSplit;
+use super::protocol_parser::{ImapLineSplit, ImapResponse};
use crate::email::parser::BytesExt;
use crate::error::*;
use std::io::Read;
@@ -27,6 +27,7 @@ use std::io::Write;
extern crate native_tls;
use fnv::FnvHashSet;
use native_tls::TlsConnector;
+use std::borrow::Cow;
use std::iter::FromIterator;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
@@ -56,96 +57,6 @@ impl Drop for ImapStream {
}
impl ImapStream {
- pub fn read_response(&mut self, ret: &mut String) -> Result<()> {
- let id = format!("M{} ", self.cmd_id - 1);
- self.read_lines(ret, &id)
- }
-
- pub fn read_lines(&mut self, ret: &mut String, termination_string: &str) -> Result<()> {
- let mut buf: [u8; 1024] = [0; 1024];
- ret.clear();
- let mut last_line_idx: usize = 0;
- loop {
- match self.stream.read(&mut buf) {
- Ok(0) => break,
- Ok(b) => {
- ret.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..b]) });
- if let Some(mut pos) = ret[last_line_idx..].rfind("\r\n") {
- if ret[last_line_idx..].starts_with("* BYE") {
- return Err(MeliError::new("Disconnected"));
- }
- if let Some(prev_line) =
- ret[last_line_idx..pos + last_line_idx].rfind("\r\n")
- {
- last_line_idx += prev_line + 2;
- pos -= prev_line + 2;
- }
- if pos + "\r\n".len() == ret[last_line_idx..].len() {
- if !termination_string.is_empty()
- && ret[last_line_idx..].starts_with(termination_string)
- {
- debug!(&ret[last_line_idx..]);
- ret.replace_range(last_line_idx.., "");
- break;
- } else if termination_string.is_empty() {
- break;
- }
- }
- last_line_idx += pos + 2;
- }
- }
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
- continue;
- }
- Err(e) => {
- return Err(MeliError::from(e));
- }
- }
- }
- Ok(())
- }
-
- pub fn wait_for_continuation_request(&mut self) -> Result<()> {
- let term = "+ ".to_string();
- let mut ret = String::new();
- self.read_lines(&mut ret, &term)
- }
-
- pub fn send_command(&mut self, command: &[u8]) -> Result<usize> {
- let command = command.trim();
- self.stream.write_all(b"M")?;
- self.stream.write_all(self.cmd_id.to_string().as_bytes())?;
- self.stream.write_all(b" ")?;
- let ret = self.cmd_id;
- self.cmd_id += 1;
- self.stream.write_all(command)?;
- self.stream.write_all(b"\r\n")?;
- debug!("sent: M{} {}", self.cmd_id - 1, unsafe {
- std::str::from_utf8_unchecked(command)
- });
- Ok(ret)
- }
-
- pub fn send_literal(&mut self, data: &[u8]) -> Result<()> {
- self.stream.write_all(data)?;
- if !data.ends_with(b"\r\n") {
- self.stream.write_all(b"\r\n")?;
- }
- self.stream.write_all(b"\r\n")?;
- Ok(())
- }
-
- pub fn send_raw(&mut self, raw: &[u8]) -> Result<()> {
- self.stream.write_all(raw)?;
- self.stream.write_all(b"\r\n")?;
- Ok(())
- }
-
- pub fn set_nonblocking(&mut self, val: bool) -> Result<()> {
- self.stream.get_mut().set_nonblocking(val)?;
- Ok(())
- }
-
pub fn new_connection(server_conf: &ImapServerConf) -> Result<(Capabilities, ImapStream)> {
use std::io::prelude::*;
use std::net::TcpStream;
@@ -167,8 +78,8 @@ impl ImapStream {
};
let mut socket = TcpStream::connect(&addr)?;
- socket.set_read_timeout(Some(std::time::Duration::new(120, 0)))?;
- socket.set_write_timeout(Some(std::time::Duration::new(120, 0)))?;
+ socket.set_read_timeout(Some(std::time::Duration::new(4, 0)))?;
+ socket.set_write_timeout(Some(std::time::Duration::new(4, 0)))?;
let cmd_id = 0;
if server_conf.use_starttls {
socket.write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())?;
@@ -274,7 +185,7 @@ impl ImapStream {
let tag_start = format!("M{} ", (ret.cmd_id - 1));
loop {
- ret.read_lines(&mut res, &String::new())?;
+ ret.read_lines(&mut res, &String::new(), false)?;
let mut should_break = false;
for l in res.split_rn() {
if l.starts_with("* CAPABILITY") {
@@ -317,6 +228,102 @@ impl ImapStream {
Ok((capabilities, ret))
}
}
+
+ pub fn read_response(&mut self, ret: &mut String) -> Result<()> {
+ let id = format!("M{} ", self.cmd_id - 1);
+ self.read_lines(ret, &id, true)
+ }
+
+ pub fn read_lines(
+ &mut self,
+ ret: &mut String,
+ termination_string: &str,
+ keep_termination_string: bool,
+ ) -> Result<()> {
+ let mut buf: [u8; 1024] = [0; 1024];
+ ret.clear();
+ let mut last_line_idx: usize = 0;
+ loop {
+ match self.stream.read(&mut buf) {
+ Ok(0) => break,
+ Ok(b) => {
+ ret.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..b]) });
+ if let Some(mut pos) = ret[last_line_idx..].rfind("\r\n") {
+ if ret[last_line_idx..].starts_with("* BYE") {
+ return Err(MeliError::new("Disconnected"));
+ }
+ if let Some(prev_line) =
+ ret[last_line_idx..pos + last_line_idx].rfind("\r\n")
+ {
+ last_line_idx += prev_line + 2;
+ pos -= prev_line + 2;
+ }
+ if pos + "\r\n".len() == ret[last_line_idx..].len() {
+ if !termination_string.is_empty()
+ && ret[last_line_idx..].starts_with(termination_string)
+ {
+ debug!(&ret[last_line_idx..]);
+ if !keep_termination_string {
+ ret.replace_range(last_line_idx.., "");
+ }
+ break;
+ } else if termination_string.is_empty() {
+ break;
+ }
+ }
+ last_line_idx += pos + 2;
+ }
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ continue;
+ }
+ Err(e) => {
+ return Err(MeliError::from(e));
+ }
+ }
+ }
+ Ok(())
+ }
+
+ pub fn wait_for_continuation_request(&mut self) -> Result<()> {
+ let term = "+ ".to_string();
+ let mut ret = String::new();
+ self.read_lines(&mut ret, &term, false)
+ }
+
+ pub fn send_command(&mut self, command: &[u8]) -> Result<()> {
+ let command = command.trim();
+ self.stream.write_all(b"M")?;
+ self.stream.write_all(self.cmd_id.to_string().as_bytes())?;
+ self.stream.write_all(b" ")?;
+ self.cmd_id += 1;
+ self.stream.write_all(command)?;
+ self.stream.write_all(b"\r\n")?;
+ debug!("sent: M{} {}", self.cmd_id - 1, unsafe {
+ std::str::from_utf8_unchecked(command)
+ });
+ Ok(())
+ }
+
+ pub fn send_literal(&mut self, data: &[u8]) -> Result<()> {
+ self.stream.write_all(data)?;
+ if !data.ends_with(b"\r\n") {
+ self.stream.write_all(b"\r\n")?;
+ }
+ self.stream.write_all(b"\r\n")?;
+ Ok(())
+ }
+
+ pub fn send_raw(&mut self, raw: &[u8]) -> Result<()> {
+ self.stream.write_all(raw)?;
+ self.stream.write_all(b"\r\n")?;
+ Ok(())
+ }
+
+ pub fn set_nonblocking(&mut self, val: bool) -> Result<()> {
+ self.stream.get_mut().set_nonblocking(val)?;
+ Ok(())
+ }
}
impl ImapConnection {
@@ -333,188 +340,39 @@ impl ImapConnection {
}
pub fn read_response(&mut self, ret: &mut String) -> Result<()> {
- if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() {
- if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
- *status = Err(MeliError::new("Connection timed out"));
- self.stream = Err(MeliError::new("Connection timed out"));
- }
- }
-
- if let Ok(ref mut stream) = self.stream {
- if let Ok(_) = stream.read_response(ret) {
- return Ok(());
- }
- }
- let new_stream = ImapStream::new_connection(&self.server_conf);
- if new_stream.is_err() {
- *self.online.lock().unwrap() = (
- Instant::now(),
- Err(new_stream.as_ref().unwrap_err().clone()),
- );
- } else {
- *self.online.lock().unwrap() = (Instant::now(), Ok(()));
- }
- let (capabilities, mut stream) = new_stream?;
- let ret = stream.read_response(ret);
- if ret.is_ok() {
- self.stream = Ok(stream);
- self.capabilities = capabilities;
- }
- ret
+ let res = self.try_send(|s| s.read_response(ret));
+ let r: Result<()> = ImapResponse::from(&ret).into();
+ r
}
pub fn read_lines(&mut self, ret: &mut String, termination_string: String) -> Result<()> {
- if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() {
- if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
- *status = Err(MeliError::new("Connection timed out"));
- self.stream = Err(MeliError::new("Connection timed out"));
- }
- }
-
- if let Ok(ref mut stream) = self.stream {
- if let Ok(_) = stream.read_lines(ret, &termination_string) {
- return Ok(());
- }
- }
- let new_stream = ImapStream::new_connection(&self.server_conf);
- if new_stream.is_err() {
- *self.online.lock().unwrap() = (
- Instant::now(),
- Err(new_stream.as_ref().unwrap_err().clone()),
- );
- } else {
- *self.online.lock().unwrap() = (Instant::now(), Ok(()));
- }
- let (capabilities, mut stream) = new_stream?;
- let ret = stream.read_lines(ret, &termination_string);
- if ret.is_ok() {
- self.stream = Ok(stream);
- self.capabilities = capabilities;
- }
- ret
+ self.try_send(|s| s.read_lines(ret, &termination_string, false))
}
pub fn wait_for_continuation_request(&mut self) -> Result<()> {
- if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() {
- if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
- *status = Err(MeliError::new("Connection timed out"));
- self.stream = Err(MeliError::new("Connection timed out"));
- }
- }
- if let Ok(ref mut stream) = self.stream {
- if let Ok(_) = stream.wait_for_continuation_request() {
- return Ok(());
- }
- }
- let new_stream = ImapStream::new_connection(&self.server_conf);
- if new_stream.is_err() {
- *self.online.lock().unwrap() = (
- Instant::now(),
- Err(new_stream.as_ref().unwrap_err().clone()),
- );
- } else {
- *self.online.lock().unwrap() = (Instant::now(), Ok(()));
- }
- let (capabilities, mut stream) = new_stream?;
- let ret = stream.wait_for_continuation_request();
- if ret.is_ok() {
- self.stream = Ok(stream);
- self.capabilities = capabilities;
- }
- ret
+ self.try_send(|s| s.wait_for_continuation_request())
}
- pub fn send_command(&mut self, command: &[u8]) -> Result<usize> {
- if let (instant, ref mut status) = *self.online.lock().unwrap() {
- if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
- *status = Err(MeliError::new("Connection timed out"));
- self.stream = Err(MeliError::new("Connection timed out"));
- }
- }
- if let Ok(ref mut stream) = self.stream {
- if let Ok(ret) = stream.send_command(command) {
- return Ok(ret);
- }
- }
- let new_stream = ImapStream::new_connection(&self.server_conf);
- if new_stream.is_err() {
- *self.online.lock().unwrap() = (
- Instant::now(),
- Err(new_stream.as_ref().unwrap_err().clone()),
- );
- } else {
- *self.online.lock().unwrap() = (Instant::now(), Ok(()));
- }
- let (capabilities, mut stream) = new_stream?;
- let ret = stream.send_command(command);
- if ret.is_ok() {
- self.stream = Ok(stream);
- self.capabilities = capabilities;
- }
- ret
+ pub fn send_command(&mut self, command: &[u8]) -> Result<()> {
+ self.try_send(|s| s.send_command(command))
}
pub fn send_literal(&mut self, data: &[u8]) -> Result<()> {
- if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() {
- if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
- *status = Err(MeliError::new("Connection timed out"));
- self.stream = Err(MeliError::new("Connection timed out"));
- }
- }
- if let Ok(ref mut stream) = self.stream {
- if let Ok(_) = stream.send_literal(data) {
- return Ok(());
- }
- }
- let new_stream = ImapStream::new_connection(&self.server_conf);
- if new_stream.is_err() {
- *self.online.lock().unwrap() = (
- Instant::now(),
- Err(new_stream.as_ref().unwrap_err().clone()),
- );
- } else {
- *self.online.lock().unwrap() = (Instant::now(), Ok(()));
- }
- let (capabilities, mut stream) = new_stream?;
- let ret = stream.send_literal(data);
- if ret.is_ok() {
- self.stream = Ok(stream);
- self.capabilities = capabilities;
- }
- ret
+ self.try_send(|s| s.send_literal(data))
}
pub fn send_raw(&mut self, raw: &[u8]) -> Result<()> {
- if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() {
- if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
- *status = Err(MeliError::new("Connection timed out"));
- self.stream = Err(MeliError::new("Connection timed out"));
- }
- }
- if let Ok(ref mut stream) = self.stream {
- if let Ok(_) = stream.send_raw(raw) {
- return Ok(());
- }
- }
- let new_stream = ImapStream::new_connection(&self.server_conf);
- if new_stream.is_err() {
- *self.online.lock().unwrap() = (
- Instant::now(),
- Err(new_stream.as_ref().unwrap_err().clone()),
- );
- } else {
- *self.online.lock().unwrap() = (Instant::now(), Ok(()));
- }
- let (capabilities, mut stream) = new_stream?;
- let ret = stream.send_raw(raw);
- if ret.is_ok() {
- self.stream = Ok(stream);
- self.capabilities = capabilities;
- }
- ret
+ self.try_send(|s| s.send_raw(raw))
}
pub fn set_nonblocking(&mut self, val: bool) -> Result<()> {
+ self.try_send(|s| s.set_nonblocking(val))
+ }
+
+ pub fn try_send(
+ &mut self,
+ mut action: impl FnMut(&mut ImapStream) -> Result<()>,
+ ) -> Result<()> {
if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() {
if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
*status = Err(MeliError::new("Connection timed out"));
@@ -522,7 +380,7 @@ impl ImapConnection {
}
}
if let Ok(ref mut stream) = self.stream {
- if let Ok(_) = stream.set_nonblocking(val) {
+ if let Ok(_) = action(stream) {
return Ok(());
}
}
@@ -536,7 +394,7 @@ impl ImapConnection {
*self.online.lock().unwrap() = (Instant::now(), Ok(()));
}
let (capabilities, mut stream) = new_stream?;
- let ret = stream.set_nonblocking(val);
+ let ret = action(&mut stream);
if ret.is_ok() {
self.stream = Ok(stream);
self.capabilities = capabilities;
@@ -562,7 +420,7 @@ impl From<ImapConnection> for ImapBlockingConnection {
.map(|s| {
s.stream
.get_mut()
- .set_write_timeout(Some(std::time::Duration::new(5 * 60, 0)))
+ .set_write_timeout(Some(std::time::Duration::new(30, 0)))
.expect("set_write_timeout call failed")
})
.expect("set_write_timeout call failed");
@@ -571,7 +429,7 @@ impl From<ImapConnection> for ImapBlockingConnection {
.map(|s| {
s.stream
.get_mut()
- .set_read_timeout(Some(std::time::Duration::new(5 * 60, 0)))
+ .set_read_timeout(Some(std::time::Duration::new(30, 0)))
.expect("set_read_timeout call failed")
})
.expect("set_read_timeout call failed");