summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPhilipp Korber <philippkorber@gmail.com>2018-03-13 18:46:35 +0100
committerPhilipp Korber <philippkorber@gmail.com>2018-03-13 18:46:35 +0100
commit3c6d62e94ee3d6e16e679209530bf15b646f8769 (patch)
tree2f083457ccd775c0aa353ed739032b4d3cb3b0a8
parent132d76ebcabe6f5d728f856e83e7f3c74b993636 (diff)
chore(smtp): smtp Quit on when stopping the service
-rw-r--r--src/smtp/error.rs2
-rw-r--r--src/smtp/service.rs134
-rw-r--r--src/smtp/smtp_wrapper.rs21
3 files changed, 116 insertions, 41 deletions
diff --git a/src/smtp/error.rs b/src/smtp/error.rs
index 058f3a1..ad471c2 100644
--- a/src/smtp/error.rs
+++ b/src/smtp/error.rs
@@ -20,7 +20,7 @@ pub enum MailSendError {
/// error on RSET, this should be fine, while it's not perfect it's not worth
/// the additional impl. cost for servers which are not RFC conform in a strange
/// way, as there is no reason for a server to behave this way).
- OnReset(SmtpResponse),
+ OnRSET(SmtpResponse),
DriverDropped,
CanceledByDriver
}
diff --git a/src/smtp/service.rs b/src/smtp/service.rs
index 7ea4332..44c54db 100644
--- a/src/smtp/service.rs
+++ b/src/smtp/service.rs
@@ -17,7 +17,7 @@ use mail::prelude::{Encoder, Encodable, MailType};
use mail::utils::SendBoxFuture;
use mail::context::BuilderContext;
-use super::smtp_wrapper::send_mail;
+use super::smtp_wrapper::{send_mail, close_smtp_conn};
use super::common::{MailResponse, MailRequest, EnvelopData};
use super::handle::MailServiceHandle;
use super::error::MailSendError;
@@ -129,7 +129,7 @@ impl<SUP> MailService<SUP>
self.stop_handle.clone()
}
- fn poll_connect(&mut self) -> Poll<(), SUP::NotConnectingError> {
+ fn poll_state(&mut self) -> Poll<bool, SUP::NotConnectingError> {
use self::ServiceState::*;
let mut state = mem::replace(&mut self.service, ServiceState::Dead);
let mut result = None;
@@ -139,7 +139,6 @@ impl<SUP> MailService<SUP>
Connecting(mut fut) => {
match fut.poll() {
Ok(Async::Ready(service)) => {
- result = Some(Ok(Async::Ready(())));
Connected(service)
},
Ok(Async::NotReady) => {
@@ -153,9 +152,26 @@ impl<SUP> MailService<SUP>
}
},
Connected(service) => {
- result = Some(Ok(Async::Ready(())));
+ result = Some(Ok(Async::Ready(true)));
Connected(service)
},
+ Closing(mut fut) => {
+ match fut.poll() {
+ Ok(Async::Ready(())) => {
+ result = Some(Ok(Async::Ready(false)));
+ Initial
+ },
+ Ok(Async::NotReady) => {
+ result = Some(Ok(Async::NotReady));
+ Closing(fut)
+ },
+ Err(_) => {
+ // sadly the can not fail is not yet encoded in the type system
+ // waiting for tokio v0.2's `Never` and rusts `!` type
+ unreachable!("closing future can not fail")
+ }
+ }
+ }
Dead => {
panic!("polled Service after completion through Err or Panic+catch_unwind")
}
@@ -183,24 +199,17 @@ impl<SUP> MailService<SUP>
Ok(Async::NotReady) => return false,
Err(err) => {
// if an error happend on smtp RSET we can just reconnect
- let reset_conn =
- match &err {
- &MailSendError::OnReset(_) => true,
- &MailSendError::Io(_) => true,
- // we should not reach here but intentionally just
- // ignore the fact that someone pretents to be us
- // and generates this errors
- &MailSendError::DriverDropped |
- &MailSendError::CanceledByDriver => false,
- _ => false
- };
-
- if reset_conn {
- //IMPROVE: we might want consider sending Quit, through it's not needed
- // - one way to land here is a IoError in which we can't even do it
- // - the other is if
- self.service.reset();
- }
+
+ match &err {
+ &MailSendError::OnRSET(_) => {
+ self.service.close();
+ },
+ &MailSendError::Io(_) => {
+ self.service.reset();
+ },
+ _ => {}
+ };
+
Err(err)
}
}
@@ -224,24 +233,45 @@ impl<SUP> Future for MailService<SUP>
type Error = SUP::NotConnectingError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.stop_handle.should_stop() {
- // close the underlying streams ability to receive new messages,
- // but the buffer still contains messages so continue with the rest
- self.rx.get_mut().get_mut().close()
- }
loop {
- // 1. complete the "current"/pending request (if there is any)
+ // 1. close rx, if we should_stop
+ let should_stop = self.stop_handle.should_stop();
+ if should_stop {
+ // close the underlying streams ability to receive new messages,
+ // but the buffer still contains messages so continue with the rest
+ self.rx.get_mut().get_mut().close()
+ }
+
+ // 2. complete the "current"/pending request (if there is any)
if !self.poll_pending_complete() {
return Ok(Async::NotReady)
}
- // 2. make sure we are connected, the current request might have "broken" the connection
- try_ready!(self.poll_connect());
+ // TODO we only want to open the connection on the first request
+ // TODO and close it with a timeout if not needed
+ // 3. make sure we are connected, the current request might have "broken" the connection
+ let is_connected = try_ready!(self.poll_state());
+
+ // 4. if we arn't connected but also not in any state tranformation and should_stop
+ // then we well do stop
+ if !is_connected {
+ if should_stop {
+ return Ok(Async::Ready(()));
+ } else {
+ // we closed it for some other reason so continue to loop
+ // to start reopening the connection
+ continue;
+ }
+ }
- // 3. try to get a new request
+ // 4. try to get a new request
let item = match self.rx.poll() {
Ok(Async::Ready(Some(item))) => item,
- Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
+ Ok(Async::Ready(None)) => {
+ self.stop_handle().stop();
+ self.service.close();
+ continue
+ },
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => unreachable!("mpsc::Receiver.poll does not error")
};
@@ -350,19 +380,45 @@ enum ServiceState<F> {
Initial,
Connecting(F),
Connected(TokioSmtpService),
+
+ //CONSTRAINT: Error can never be reached
+ //FIXME[tokio v0.2]: Error=() => Error=Never
+ Closing(Box<Future<Item=(), Error=()>>),
Dead
}
-impl<F> ServiceState<F> {
+impl<F: 'static> ServiceState<F>
+ where F: Future<Item=TokioSmtpService>
+{
- fn reset(&mut self) {
+ fn close(&mut self) {
use self::ServiceState::*;
let state = mem::replace(self, ServiceState::Dead);
*self = match state {
+ Initial => Initial,
+ Connecting(fut) => {
+ Closing(Box::new(fut.then(|res| match res {
+ Ok(mut service) => close_smtp_conn(&mut service),
+ Err(_) => Box::new(future::ok(()))
+ })))
+ },
+ Connected(mut service) => {
+ Closing(close_smtp_conn(&mut service))
+ },
+ Closing(fut) => Closing(fut),
Dead => Dead,
- _ => Initial
}
}
+
+ fn reset(&mut self) {
+ if let &mut ServiceState::Dead = self {}
+ else { *self = ServiceState::Initial }
+ }
+
+
+// fn force_stop(&mut self) {
+// mem::replace(self, ServiceState::Dead);
+// }
}
@@ -476,12 +532,14 @@ mod test {
from: "djinns@are.magic".parse().unwrap(), params: Vec::new() }),
Normal(SmtpRequest::Rcpt {
to: "lord.of@the.bottle".parse().unwrap(), params: Vec::new() }),
- Body(SmtpRequest::Data, expected_body.to_owned().into_bytes())
+ Body(SmtpRequest::Data, expected_body.to_owned().into_bytes()),
+ Normal(SmtpRequest::Quit)
],
vec![
Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1),
Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1),
Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1),
+ Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1),
]
);
@@ -511,7 +569,8 @@ mod test {
from: "djinns@are.magic".parse().unwrap(), params: Vec::new() }),
Normal(SmtpRequest::Rcpt {
to: "lord.of@the.bottle".parse().unwrap(), params: Vec::new() }),
- Body(SmtpRequest::Data, expected_body.to_owned().into_bytes())
+ Body(SmtpRequest::Data, expected_body.to_owned().into_bytes()),
+ Normal(SmtpRequest::Quit)
],
vec![
Err(example_io_error()),
@@ -519,6 +578,7 @@ mod test {
Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1),
Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1),
Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1),
+ Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1),
]
);
diff --git a/src/smtp/smtp_wrapper.rs b/src/smtp/smtp_wrapper.rs
index beaf16f..83ab9ab 100644
--- a/src/smtp/smtp_wrapper.rs
+++ b/src/smtp/smtp_wrapper.rs
@@ -52,6 +52,21 @@ use super::common::{EnvelopData, MailResponse};
// where CB: CompositionBase;
//}
+//FIXME[impl Trait]
+pub(crate) fn close_smtp_conn<I: 'static>(service: &mut I) -> Box<Future<Item=(), Error=()>>
+ where I: Clone + TokioService<
+ Request=Message<SmtpRequest, Body<Vec<u8>, IoError>>,
+ Response=SmtpResponse,
+ Error=IoError
+ >
+{
+ let fut = service.call(Message::WithoutBody(SmtpRequest::Quit))
+ // we are already quiting there is no reason to bother with
+ // handling io errors or strange server responses
+ .then(|_| Ok(()));
+
+ Box::new(fut)
+}
pub(crate) fn send_mail<I: 'static>(
service: &mut I,
@@ -109,7 +124,7 @@ pub(crate) fn send_mail<I: 'static>(
if response.code.severity.is_positive() {
Err(MailSendError::Smtp(errors))
} else {
- Err(MailSendError::OnReset(response))
+ Err(MailSendError::OnRSET(response))
}
});
Either::B(fut)
@@ -247,12 +262,12 @@ mod test {
.then(|res| match res {
Ok(MailResponse) => Err(TestError("unexpectadly no error".to_owned())),
Err(err) => {
- if let MailSendError::OnReset(resp) = err {
+ if let MailSendError::OnRSET(resp) = err {
if resp == worse_response {
Ok(())
} else {
Err(TestError(format!("unexpected error kind {:?}",
- MailSendError::OnReset(resp))))
+ MailSendError::OnRSET(resp))))
}
} else {
Err(TestError(format!("unexpected error kind {:?}", err)))