diff options
author | Philipp Korber <philippkorber@gmail.com> | 2018-03-13 18:46:35 +0100 |
---|---|---|
committer | Philipp Korber <philippkorber@gmail.com> | 2018-03-13 18:46:35 +0100 |
commit | 3c6d62e94ee3d6e16e679209530bf15b646f8769 (patch) | |
tree | 2f083457ccd775c0aa353ed739032b4d3cb3b0a8 | |
parent | 132d76ebcabe6f5d728f856e83e7f3c74b993636 (diff) |
chore(smtp): smtp Quit on when stopping the service
-rw-r--r-- | src/smtp/error.rs | 2 | ||||
-rw-r--r-- | src/smtp/service.rs | 134 | ||||
-rw-r--r-- | src/smtp/smtp_wrapper.rs | 21 |
3 files changed, 116 insertions, 41 deletions
diff --git a/src/smtp/error.rs b/src/smtp/error.rs index 058f3a1..ad471c2 100644 --- a/src/smtp/error.rs +++ b/src/smtp/error.rs @@ -20,7 +20,7 @@ pub enum MailSendError { /// error on RSET, this should be fine, while it's not perfect it's not worth /// the additional impl. cost for servers which are not RFC conform in a strange /// way, as there is no reason for a server to behave this way). - OnReset(SmtpResponse), + OnRSET(SmtpResponse), DriverDropped, CanceledByDriver } diff --git a/src/smtp/service.rs b/src/smtp/service.rs index 7ea4332..44c54db 100644 --- a/src/smtp/service.rs +++ b/src/smtp/service.rs @@ -17,7 +17,7 @@ use mail::prelude::{Encoder, Encodable, MailType}; use mail::utils::SendBoxFuture; use mail::context::BuilderContext; -use super::smtp_wrapper::send_mail; +use super::smtp_wrapper::{send_mail, close_smtp_conn}; use super::common::{MailResponse, MailRequest, EnvelopData}; use super::handle::MailServiceHandle; use super::error::MailSendError; @@ -129,7 +129,7 @@ impl<SUP> MailService<SUP> self.stop_handle.clone() } - fn poll_connect(&mut self) -> Poll<(), SUP::NotConnectingError> { + fn poll_state(&mut self) -> Poll<bool, SUP::NotConnectingError> { use self::ServiceState::*; let mut state = mem::replace(&mut self.service, ServiceState::Dead); let mut result = None; @@ -139,7 +139,6 @@ impl<SUP> MailService<SUP> Connecting(mut fut) => { match fut.poll() { Ok(Async::Ready(service)) => { - result = Some(Ok(Async::Ready(()))); Connected(service) }, Ok(Async::NotReady) => { @@ -153,9 +152,26 @@ impl<SUP> MailService<SUP> } }, Connected(service) => { - result = Some(Ok(Async::Ready(()))); + result = Some(Ok(Async::Ready(true))); Connected(service) }, + Closing(mut fut) => { + match fut.poll() { + Ok(Async::Ready(())) => { + result = Some(Ok(Async::Ready(false))); + Initial + }, + Ok(Async::NotReady) => { + result = Some(Ok(Async::NotReady)); + Closing(fut) + }, + Err(_) => { + // sadly the can not fail is not yet encoded in the type system + // waiting for tokio v0.2's `Never` and rusts `!` type + unreachable!("closing future can not fail") + } + } + } Dead => { panic!("polled Service after completion through Err or Panic+catch_unwind") } @@ -183,24 +199,17 @@ impl<SUP> MailService<SUP> Ok(Async::NotReady) => return false, Err(err) => { // if an error happend on smtp RSET we can just reconnect - let reset_conn = - match &err { - &MailSendError::OnReset(_) => true, - &MailSendError::Io(_) => true, - // we should not reach here but intentionally just - // ignore the fact that someone pretents to be us - // and generates this errors - &MailSendError::DriverDropped | - &MailSendError::CanceledByDriver => false, - _ => false - }; - - if reset_conn { - //IMPROVE: we might want consider sending Quit, through it's not needed - // - one way to land here is a IoError in which we can't even do it - // - the other is if - self.service.reset(); - } + + match &err { + &MailSendError::OnRSET(_) => { + self.service.close(); + }, + &MailSendError::Io(_) => { + self.service.reset(); + }, + _ => {} + }; + Err(err) } } @@ -224,24 +233,45 @@ impl<SUP> Future for MailService<SUP> type Error = SUP::NotConnectingError; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.stop_handle.should_stop() { - // close the underlying streams ability to receive new messages, - // but the buffer still contains messages so continue with the rest - self.rx.get_mut().get_mut().close() - } loop { - // 1. complete the "current"/pending request (if there is any) + // 1. close rx, if we should_stop + let should_stop = self.stop_handle.should_stop(); + if should_stop { + // close the underlying streams ability to receive new messages, + // but the buffer still contains messages so continue with the rest + self.rx.get_mut().get_mut().close() + } + + // 2. complete the "current"/pending request (if there is any) if !self.poll_pending_complete() { return Ok(Async::NotReady) } - // 2. make sure we are connected, the current request might have "broken" the connection - try_ready!(self.poll_connect()); + // TODO we only want to open the connection on the first request + // TODO and close it with a timeout if not needed + // 3. make sure we are connected, the current request might have "broken" the connection + let is_connected = try_ready!(self.poll_state()); + + // 4. if we arn't connected but also not in any state tranformation and should_stop + // then we well do stop + if !is_connected { + if should_stop { + return Ok(Async::Ready(())); + } else { + // we closed it for some other reason so continue to loop + // to start reopening the connection + continue; + } + } - // 3. try to get a new request + // 4. try to get a new request let item = match self.rx.poll() { Ok(Async::Ready(Some(item))) => item, - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(None)) => { + self.stop_handle().stop(); + self.service.close(); + continue + }, Ok(Async::NotReady) => return Ok(Async::NotReady), Err(_) => unreachable!("mpsc::Receiver.poll does not error") }; @@ -350,19 +380,45 @@ enum ServiceState<F> { Initial, Connecting(F), Connected(TokioSmtpService), + + //CONSTRAINT: Error can never be reached + //FIXME[tokio v0.2]: Error=() => Error=Never + Closing(Box<Future<Item=(), Error=()>>), Dead } -impl<F> ServiceState<F> { +impl<F: 'static> ServiceState<F> + where F: Future<Item=TokioSmtpService> +{ - fn reset(&mut self) { + fn close(&mut self) { use self::ServiceState::*; let state = mem::replace(self, ServiceState::Dead); *self = match state { + Initial => Initial, + Connecting(fut) => { + Closing(Box::new(fut.then(|res| match res { + Ok(mut service) => close_smtp_conn(&mut service), + Err(_) => Box::new(future::ok(())) + }))) + }, + Connected(mut service) => { + Closing(close_smtp_conn(&mut service)) + }, + Closing(fut) => Closing(fut), Dead => Dead, - _ => Initial } } + + fn reset(&mut self) { + if let &mut ServiceState::Dead = self {} + else { *self = ServiceState::Initial } + } + + +// fn force_stop(&mut self) { +// mem::replace(self, ServiceState::Dead); +// } } @@ -476,12 +532,14 @@ mod test { from: "djinns@are.magic".parse().unwrap(), params: Vec::new() }), Normal(SmtpRequest::Rcpt { to: "lord.of@the.bottle".parse().unwrap(), params: Vec::new() }), - Body(SmtpRequest::Data, expected_body.to_owned().into_bytes()) + Body(SmtpRequest::Data, expected_body.to_owned().into_bytes()), + Normal(SmtpRequest::Quit) ], vec![ Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), ] ); @@ -511,7 +569,8 @@ mod test { from: "djinns@are.magic".parse().unwrap(), params: Vec::new() }), Normal(SmtpRequest::Rcpt { to: "lord.of@the.bottle".parse().unwrap(), params: Vec::new() }), - Body(SmtpRequest::Data, expected_body.to_owned().into_bytes()) + Body(SmtpRequest::Data, expected_body.to_owned().into_bytes()), + Normal(SmtpRequest::Quit) ], vec![ Err(example_io_error()), @@ -519,6 +578,7 @@ mod test { Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), ] ); diff --git a/src/smtp/smtp_wrapper.rs b/src/smtp/smtp_wrapper.rs index beaf16f..83ab9ab 100644 --- a/src/smtp/smtp_wrapper.rs +++ b/src/smtp/smtp_wrapper.rs @@ -52,6 +52,21 @@ use super::common::{EnvelopData, MailResponse}; // where CB: CompositionBase; //} +//FIXME[impl Trait] +pub(crate) fn close_smtp_conn<I: 'static>(service: &mut I) -> Box<Future<Item=(), Error=()>> + where I: Clone + TokioService< + Request=Message<SmtpRequest, Body<Vec<u8>, IoError>>, + Response=SmtpResponse, + Error=IoError + > +{ + let fut = service.call(Message::WithoutBody(SmtpRequest::Quit)) + // we are already quiting there is no reason to bother with + // handling io errors or strange server responses + .then(|_| Ok(())); + + Box::new(fut) +} pub(crate) fn send_mail<I: 'static>( service: &mut I, @@ -109,7 +124,7 @@ pub(crate) fn send_mail<I: 'static>( if response.code.severity.is_positive() { Err(MailSendError::Smtp(errors)) } else { - Err(MailSendError::OnReset(response)) + Err(MailSendError::OnRSET(response)) } }); Either::B(fut) @@ -247,12 +262,12 @@ mod test { .then(|res| match res { Ok(MailResponse) => Err(TestError("unexpectadly no error".to_owned())), Err(err) => { - if let MailSendError::OnReset(resp) = err { + if let MailSendError::OnRSET(resp) = err { if resp == worse_response { Ok(()) } else { Err(TestError(format!("unexpected error kind {:?}", - MailSendError::OnReset(resp)))) + MailSendError::OnRSET(resp)))) } } else { Err(TestError(format!("unexpected error kind {:?}", err))) |