diff options
author | Philipp Korber <philippkorber@gmail.com> | 2018-03-01 18:23:38 +0100 |
---|---|---|
committer | Philipp Korber <philippkorber@gmail.com> | 2018-03-05 17:38:02 +0100 |
commit | 9119a9bb4b6d8043e321ec8b9aa767fbdb5aefa3 (patch) | |
tree | 418fccd5d9349bb99c792a1f34ec92f045d37fd4 | |
parent | 0625db709c551486d267f837709d4ce86588b3bd (diff) |
chore: initial tokio_smtp mail binding implementation
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | examples/composition.rs | 2 | ||||
-rw-r--r-- | src/compositor/_impl.rs | 16 | ||||
-rw-r--r-- | src/compositor/mail_send_data.rs | 16 | ||||
-rw-r--r-- | src/compositor/mod.rs | 45 | ||||
-rw-r--r-- | src/lib.rs | 14 | ||||
-rw-r--r-- | src/macros.rs | 40 | ||||
-rw-r--r-- | src/smtp/handle.rs | 265 | ||||
-rw-r--r-- | src/smtp/mod.rs | 13 | ||||
-rw-r--r-- | src/smtp/service.rs | 251 | ||||
-rw-r--r-- | src/smtp/smtp_wrapper.rs | 209 | ||||
-rw-r--r-- | test_resources/template_a.out.regex | 2 | ||||
-rw-r--r-- | tests/tera/main.rs | 2 |
13 files changed, 850 insertions, 31 deletions
@@ -31,6 +31,10 @@ tera = { version = "0.11.1", optional = true } conduit-mime-types = { version = "0.7.3", optional = true} lazy_static = { version = "1.0.0", optional = true } tokio-smtp = { path="../depends/tokio-smtp", optional=true } +tokio-service = { version = "0.1", optional=true } +tokio-proto = { version = "0.1", optional=true } +# this is used by tokio-smtp, we need it to convert our Mailbox to their Mailbox struct +emailaddress = { version = "0.4", optional=true } [dependencies.mime] git="https://github.com/1aim/mime" @@ -47,7 +51,7 @@ futures-cpupool="0.1.5" debug_trace_tokens = [] render-template-engine = ["conduit-mime-types", "lazy_static"] tera-bindings = ["tera", "render-template-engine"] -smtp = ['tokio-smtp'] +smtp = ['tokio-smtp', 'tokio-service', 'tokio-proto', 'emailaddress'] default = ["default_impl"] default_impl = [ "default_impl_name_composer", "default_impl_component_id", diff --git a/examples/composition.rs b/examples/composition.rs index 58d5677..b9a0528 100644 --- a/examples/composition.rs +++ b/examples/composition.rs @@ -74,7 +74,7 @@ fn _main() -> Result<(), Error> { //this doesn't realy do anything as the NoNameComposer is used send_data.auto_gen_display_names(NoNameComposer)?; - let mail = (&context, &template_engine).compose_mail(send_data)?; + let (mail, _envelop) = (&context, &template_engine).compose_mail(send_data)?; let mut encoder = Encoder::new( MailType::Ascii ); let encodable_mail = mail.into_encodeable_mail( &context ).wait().unwrap(); diff --git a/src/compositor/_impl.rs b/src/compositor/_impl.rs index 9e835c1..8bb83bf 100644 --- a/src/compositor/_impl.rs +++ b/src/compositor/_impl.rs @@ -21,7 +21,7 @@ use template::{ }; use super::mail_send_data::MailSendData; -use super::CompositionBase; +use super::{CompositionBase, EnvelopData}; pub(crate) trait InnerCompositionBaseExt: CompositionBase { @@ -29,25 +29,27 @@ pub(crate) trait InnerCompositionBaseExt: CompositionBase { &self, send_data: MailSendData<<Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId, D> - ) -> Result<Mail> + ) -> Result<(Mail, EnvelopData)> where D: Serialize { - + let envelop = EnvelopData::from(&send_data); //compose display name => create Address with display name; let (core_headers, data, template_id) = self.process_mail_send_data(send_data)?; let MailParts { alternative_bodies, shared_embeddings, attachments } - = self.use_template_engine(&*template_id, data)?; + = self.use_template_engine(&*template_id, data)?; + + let mail = self.build_mail( alternative_bodies, shared_embeddings.into_iter(), + attachments, core_headers )?; - self.build_mail( alternative_bodies, shared_embeddings.into_iter(), attachments, - core_headers ) + Ok((mail, envelop)) } fn process_mail_send_data<'a, D>( &self, send_data: MailSendData<'a, <Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId, D> - ) -> Result<( + ) -> Result<( HeaderMap, D, Cow<'a, <Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId> diff --git a/src/compositor/mail_send_data.rs b/src/compositor/mail_send_data.rs index 2608e3d..9ff9d04 100644 --- a/src/compositor/mail_send_data.rs +++ b/src/compositor/mail_send_data.rs @@ -113,15 +113,13 @@ impl<'a, TId: ?Sized + 'a, D> MailSendData<'a, TId, D> } } - pub fn sender(&self) -> Option<&Mailbox> { - self.sender.as_ref() + /// returns a reference to a explicity set sender or else the first (and only) from mailbox + pub fn sender(&self) -> &Mailbox { + self.sender.as_ref().unwrap_or_else(|| self.from.first()) } - pub fn sender_mut(&mut self) -> Option<&mut Mailbox> { - self.sender.as_mut() - } - pub fn from(&self) -> &MailboxList { + pub fn _from(&self) -> &MailboxList { &self.from } @@ -129,7 +127,7 @@ impl<'a, TId: ?Sized + 'a, D> MailSendData<'a, TId, D> /// /// this does only expose a &mut Slice of Mailboxes, instead of a &mut MailboxList /// to make sure that no from mailbox can be added as sender might be empty - pub fn from_mut(&mut self) -> &mut [Mailbox] { + pub fn _from_mut(&mut self) -> &mut [Mailbox] { &mut self.from } @@ -137,11 +135,11 @@ impl<'a, TId: ?Sized + 'a, D> MailSendData<'a, TId, D> //TODO add try_add_from method failing if sender is None //TODO maybe add a try_set_from(MailboxList) too - pub fn to(&self) -> &MailboxList { + pub fn _to(&self) -> &MailboxList { &self.to } - pub fn to_mut(&mut self) -> &mut MailboxList { + pub fn _to_mut(&mut self) -> &mut MailboxList { &mut self.to } diff --git a/src/compositor/mod.rs b/src/compositor/mod.rs index d1af8f4..8123f4a 100644 --- a/src/compositor/mod.rs +++ b/src/compositor/mod.rs @@ -1,9 +1,12 @@ +use std::borrow::ToOwned; + use serde::Serialize; use error::Result; use context::Context; use template::TemplateEngine; use mail::Mail; +use mail::headers::components::{MailboxList, Mailbox}; //TODO make sure Box/Arc auto wrapping is impl for all parts use self::_impl::InnerCompositionBaseExt; @@ -31,9 +34,9 @@ pub trait CompositionBase { /// composes a mail based on the given MailSendData fn compose_mail<D>( &self, - send_data: - MailSendData<<Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId, D> - ) -> Result<Mail> + send_data: MailSendData< + <Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId, D> + ) -> Result<(Mail, EnvelopData)> where D: Serialize { InnerCompositionBaseExt::_compose_mail(self, send_data) @@ -41,4 +44,40 @@ pub trait CompositionBase { fn template_engine(&self) -> &Self::TemplateEngine; fn context(&self) -> &Self::Context; +} + + +//NOTE: this might get more complex at some point, wrt. e.g. cc, bcc, resent etc. +pub struct EnvelopData { + sender: Mailbox, + to: MailboxList + //cc: MailboxList, //add if added to MailSendData + //bcc: MailboxList, //add if added to MailSendData +} + +impl EnvelopData { + + pub fn new(sender: Mailbox, to: MailboxList) -> Self { + EnvelopData { + sender, to + } + } + + pub fn sender(&self) -> &Mailbox { + &self.sender + } + + pub fn _to(&self) -> &MailboxList { + &self.to + } +} + +impl<'a, T: ?Sized, D> From<&'a MailSendData<'a, T, D>> for EnvelopData + where T: ToOwned, D: Serialize +{ + fn from(msd: &'a MailSendData<'a, T, D>) -> Self { + let sender = msd.sender().clone(); + let to = msd._to().clone(); + EnvelopData::new(sender, to) + } }
\ No newline at end of file @@ -8,6 +8,8 @@ extern crate mail_codec_headers as headers; extern crate error_chain; extern crate log; extern crate mime as media_type; +//#[cfg_attr(feature="smtp", macro_use)] +#[macro_use] extern crate futures; extern crate serde; extern crate rand; @@ -31,6 +33,15 @@ extern crate lazy_static; extern crate tera as tera_crate; #[cfg(feature="smtp")] extern crate tokio_smtp; +#[cfg(feature="smtp")] +extern crate tokio_service; +#[cfg(feature="smtp")] +extern crate tokio_proto; +#[cfg(feature="smtp")] +extern crate emailaddress; + +#[macro_use] +mod macros; pub mod error; mod builder_extension; @@ -42,7 +53,8 @@ mod compositor; pub use self::compositor::{ CompositionBase, NameComposer, MailSendData, MailSendDataBuilder, - SharedCompositionBase, SimpleCompositionBase + SharedCompositionBase, SimpleCompositionBase, + EnvelopData }; mod utils; diff --git a/src/macros.rs b/src/macros.rs new file mode 100644 index 0000000..ffe37ce --- /dev/null +++ b/src/macros.rs @@ -0,0 +1,40 @@ + +/// like try on a result but converts the error to a boxed error-future before returning it +#[cfg(feature="smtp")] +macro_rules! r2f_try { + ($code:expr) => ({ + use futures::future; + match $code { + Ok(val) => val, + Err(error) => return Box::new(future::err(error)) + } + }); +} + +/// +/// ``` +/// cloned!([service] => move |name| { +/// drop(service) +/// }) +/// ``` +#[cfg(feature="smtp")] +macro_rules! cloned { + ([$($toclo:ident),*] => $doit:expr) => ({ + $( + let $toclo = $toclo.clone(); + )* + $doit + }); +} + +/// like try_ready but for streams also checking it's some +#[cfg(features="smtp")] +macro_rules! try_some_ready { + ($e:expr) => ({ + match $e { + Ok(Async::Ready(Some(item))) => item, + Ok(other) => return Ok(other), + Err(err) => return Err(From::from(err)) + } + }); +}
\ No newline at end of file diff --git a/src/smtp/handle.rs b/src/smtp/handle.rs new file mode 100644 index 0000000..149a861 --- /dev/null +++ b/src/smtp/handle.rs @@ -0,0 +1,265 @@ +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::{sink, Sink, Future, Poll, Async}; + +use super::smtp_wrapper::{MailSendRequest, MailResponse, MailSendError}; + + +type InnerChannel<RQ> = mpsc::Sender<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>; + +#[derive(Debug, Clone)] +pub struct MailServiceHandle<RQ> + where RQ: MailSendRequest +{ + channel: InnerChannel<RQ> +} + + +impl<RQ> MailServiceHandle<RQ> + where RQ: MailSendRequest +{ + + pub(crate) fn new(sender: InnerChannel<RQ>) -> Self { + MailServiceHandle { channel: sender } + } + + pub fn send_mail(self, mail_request: RQ) -> MailEnqueueFuture<RQ> { + let (sender, rx) = oneshot::channel(); + + let send = self.channel.send((mail_request, sender)); + + MailEnqueueFuture { send, rx: Some(rx) } + } + +// pub fn map_request_stream<S>(self, stream: S, max_buffer: Option<usize>) -> SmtpMailStream<RQ, S> +// where S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +// { +// SmtpMailStream::new(self.channel, stream, max_buffer) +// } + + pub fn into_inner(self) + -> mpsc::Sender<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)> + { + self.channel + } + +} + +pub struct MailEnqueueFuture<RQ> + where RQ: MailSendRequest +{ + send: sink::Send<InnerChannel<RQ>>, + rx: Option<oneshot::Receiver<Result<MailResponse, MailSendError>>> +} + +impl<RQ> Future for MailEnqueueFuture<RQ> + where RQ: MailSendRequest +{ + type Item = (MailServiceHandle<RQ>, MailResponseFuture); + type Error = MailSendError; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let channel = match self.send.poll() { + Ok(Async::Ready(channel)) => channel, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_cancel_err) => return Err(MailSendError::DriverDropped), + }; + + let rx = self.rx.take().expect("called poll after polling completed"); + Ok(Async::Ready((MailServiceHandle { channel }, MailResponseFuture { rx }))) + } +} + + +pub struct MailResponseFuture { + rx: oneshot::Receiver<Result<MailResponse, MailSendError>> +} + +impl Future for MailResponseFuture { + type Item = MailResponse; + type Error = MailSendError; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let res = match self.rx.poll() { + Ok(Async::Ready(res)) => res, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_cancel_err) => return Err(MailSendError::CanceledByDriver), + }; + + match res { + Ok(resp) => Ok(Async::Ready(resp)), + Err(err) => Err(err) + } + } +} + + +// use a closure to setup smtp Clone + FnMut() -> R, R: Future<Item=...,> or similar +//fn setup_smtp(smtp_connect SmtpSetup) -> (Sender, impl Future<(), SendError>) { +// let (send, resc) = mpcs::channel(XX); +// let shared_composition_base = smtp_setup.composition_base; +// let driver = smtp_connect().and_then(|service| { +// let service = shared_composition_base.wrap_smtp_service(service); +// +// let pipe_in = resc;; +// +// pipe_in.for_each(|(cmd, pipe_out)| { +// service.call(cmd).and_then(|res| { +// let _ = pipe_out.send(res); +// Ok(()) +// }) +// }) +// }); +// (send, driver) +//} +// +//fn setup_smtp_with_retry(setup: SmtpSetup) -> impl Future<(),RetryFail> { +// let driver = setup_smtp(setup.clone()); +// +// future::loop_fn(driver, move |driver| { +// driver.or_else(|err| { +// if err.was_canceled() { +// Ok(Loop::Break(())) +// } else if err.can_recover() { +// let new_driver = setup_smtp(setup.clone()); +// Ok(Loop::Continue(new_driver)) +// } else { +// Err(err.into()) +// } +// }) +// }) +//} +// + + + +// TODO: This links into a stream mapping a stream of requests to a stream of responses, +// but it needs more though into it, i.e. it decouples the sending of a mail, with +// the response of getting one. So we need to at last add some mail Id to Req (we +// might want to see if we can generally do so using MessageId). +// +//pub struct SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +//{ +// channel: Option<InnerChannel<RQ>>, +// stream: Option<S>, +// res_buffer: FuturesUnordered<oneshot::Receiver<Result<MailResponse, MailSendError>>>, +// send_buffer: Option<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>, +// max_buffer: Option<usize> +//} +// +// +//impl<RQ, S> SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +//{ +// pub(crate) fn new(channel: InnerChannel<RQ>, stream: S, max_buffer: Option<usize>) -> Self { +// SmtpMailStream { +// channel, stream, max_buffer, +// send_buffer: None, +// res_buffer: FuturesUnordered::new(), +// } +// } +// +// fn channel_mut(&mut self) -> &mut InnerChannel<RQ> { +// self.channel.as_mut().unwrap() +// } +// +// fn stream_mut(&mut self) -> &mut S { +// self.stream.as_mut().unwrap() +// } +// +// fn try_enqueue_mail( +// &mut self, +// msg: (RQ, oneshot::Sender<Result<MailResponse, MailSendError>>) +// ) -> Poll<(), DriverDropped<S>> +// { +// debug_assert!(self.send_buffer.is_none()); +// debug_assert!(self.stream.is_some() && self.channel.is_some()); +// match self.channel_mut().start_send(msg) { +// Ok(AsyncSink::Ready) => Ok(Async::Ready(())), +// Ok(AsyncSink::NotReady(msg)) => { +// self.send_buffer = Some(msg); +// Ok(Async::NotReady) +// }, +// Err(_) => { +// mem::drop(self.channel.take()); +// Err(DriverDropped::Stream(self.stream.take())) +// } +// } +// } +// +// fn close_channel(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// try_ready!(self.channel_mut().close()); +// return Ok(Async::Ready(None)); +// } +// +// fn poll_stream(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// match self.stream_mut().poll()? { +// Async::Ready(Some(item)) => Async::Ready(Some(item)), +// Async::Ready(None) => { +// self.stream.take(); +// return self.close_channel(); +// } +// Async::NotReady => { +// try_ready!(self.channel_mut().poll_complete()); +// return Ok(Async::NotRead); +// } +// } +// } +//} +// +//pub enum DriverDropped<S> { +// Stream(S), +// StreamTakenOnPreviousError +//} +// +//impl<RQ, S> Stream for SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// //FIXME(tokio 0.2) use error Never?? +// S::Error: Into<MailSendError> +//{ +// type Item = Result<MailResponse, MailSendError>; +// type Error = DriverDropped<S>; +// +// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// if self.channel.is_none() { +// return Err(DriverDropped::StreamTakenOnPreviousError); +// } +// +// if self.stream.is_none() { +// return self.close_channel(); +// } +// +// if let Some(msg) = self.send_buffer { +// try_ready!(self.try_enqueue_mail(msg)); +// } +// +// loop { +// +// match self.res_buffer.poll() { +// Ok(Async::NotReady) => {}, +// Ok(Async::Ready(res)) => return Ok(AsyncReady(Ok(res))), +// Err(err) => return Ok(AsyncReady(Err(e))) +// } +// +// if self.max_buffer.map(|max| self.res_buffer.len() >= max).unwrap_or_false() { +// return Ok(Async::NotReady); +// } +// +// +// let item = try_some_ready!(self.poll_stream()); +// +// let (tx, rx) = oneshot::channel(); +// +// self.res_buffer.push(rx); +// +// try_ready!(self.try_enqueue_mail((item, tx))); +// } +// } +//}
\ No newline at end of file diff --git a/src/smtp/mod.rs b/src/smtp/mod.rs index 816b2f6..f0c938c 100644 --- a/src/smtp/mod.rs +++ b/src/smtp/mod.rs @@ -1,10 +1,9 @@ -use std::net::SocketAddr; -use serde::Serialize; -use tokio_smtp; -use tokio_smtp::request::{ClientId as SmtpClientId}; -use tokio_smtp::client::ClientParams; +mod smtp_wrapper; +pub use self::smtp_wrapper::*; -use ::context::BuilderContext; +mod handle; +pub use self::handle::*; -//tmp empty
\ No newline at end of file +mod service; +pub use self::service::*;
\ No newline at end of file diff --git a/src/smtp/service.rs b/src/smtp/service.rs new file mode 100644 index 0000000..95f3699 --- /dev/null +++ b/src/smtp/service.rs @@ -0,0 +1,251 @@ +use std::io::{Error as IoError}; +use std::mem; + +use futures::sync::{mpsc, oneshot}; +use futures::{Future, Stream, Poll, Async}; + +use tokio_proto::util::client_proxy::ClientProxy; +use tokio_service::{Service as TokioService}; +use tokio_proto::streaming::{Body, Message}; + +use tokio_smtp::request::{ + Request as SmtpRequest, +}; +use tokio_smtp::response::{Response as SmtpResponse}; + +use ::CompositionBase; +use super::smtp_wrapper::{WrappedService, MailSendRequest, MailSendError, MailResponse}; +use super::handle::MailServiceHandle; + +//Next steps: +// make the service accept encodable mails nothing else +// do the bla -> encodable in the setup part check for canceleation before sending + +pub type TokioSmtpService = ClientProxy< + Message<SmtpRequest, Body<Vec<u8>, IoError>>, + SmtpResponse, IoError>; + +trait SmtpSetup: Send { + + /// The future returned which returns a Smtp connection, + /// + /// as this smtp mail bindings are writting for `tokio_smtp` + /// the Item is fixed to `ClientProxy`, (This might change + /// in future versions) + type ConnectFuture: 'static + Future< + Item=TokioSmtpService, + Error=Self::NotConnectingError>; + + /// The error returned if it is not possible to connect, + /// this might represent a direct connection failure or + /// one involfing multiple retries or similar aspects. + type NotConnectingError; + + /// The impl of the CompositionBAse + type CompositionBase: CompositionBase; + + // this future can contain all kind of retry connection handling etc. + /// This method is called to connect with an SMTP server. + /// + /// It is called whenever connecting to a SMTP server is necessary, + /// this includes the initial connection as well as reconnecting after + /// the connection might no longer be usable. + /// + /// As it returns a future with it's own error it can be used to + /// handle automatically retrying failed connections and limiting + /// the amount of retries or having a timeout before retrying to + /// connect. + /// + //TODO + /// Currently it is not implemented to retry sending failed mails, even + /// if it reconnects after e.g. an IO error + fn connect(&mut self) -> Self::ConnectFuture; + + /// return the buffer size for the mpsc channel between the service and it's handles + fn driver_input_buffer_size(&self) -> usize { 16 } + + /// Return a `CompositionBase` to use, this will only be called once for + /// the lifetime of a service. + fn composition_base(&self) -> Self::CompositionBase; +} + + + +struct MailService<SUP, RQ> + where SUP: SmtpSetup, RQ: MailSendRequest, +{ + setup: SUP, + rx: mpsc::Receiver<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>, + service: ServiceState<SUP::CompositionBase, SUP::ConnectFuture, RQ>, + pending: Option<( + Box<Future<Item=MailResponse, Error=MailSendError>>, + oneshot::Sender<Result<MailResponse, MailSendError>> + )> +} + +enum ServiceState<CB, F, RQ> { + Initial(CB), + Connecting(CB, F), + Connected(WrappedService<TokioSmtpService, CB, RQ>), + Dead +} + +impl<CB, F, RQ> ServiceState<CB, F, RQ> { + + fn reset(&mut self) { + use self::ServiceState::*; + let state = mem::replace(self, ServiceState::Dead); + *self = match state { + Initial(cb) => Initial(cb), + Connecting(cb, _f) => Initial(cb), + Connected(ws) => { + let (_, cb) = ws.destruct(); + Initial(cb) + }, + Dead => Dead + } + } +} + +impl<SUP, RQ> MailService<SUP, RQ> + where SUP: SmtpSetup, + RQ: MailSendRequest +{ + + pub fn new(setup: SUP) -> (Self, MailServiceHandle<RQ>) { + let cb = setup.composition_base(); + let (tx, rx) = mpsc::channel(setup.driver_input_buffer_size()); + + let driver = MailService { + setup, rx, + service: ServiceState::Initial(cb), + pending: None, + }; + + let handle = MailServiceHandle::new(tx); + (driver, handle) + } + + fn poll_connect(&mut self) -> Poll<(), SUP::NotConnectingError> { + use self::ServiceState::*; + let mut state = mem::replace(&mut self.service, ServiceState::Dead); + let mut result = None; + while result.is_none() { + state = match state { + Initial(cb) => { + Connecting(cb, self.setup.connect()) + }, + Connecting(cb, mut fut) => { + match fut.poll() { + Ok(Async::Ready(service)) => { + result = Some(Ok(Async::Ready(()))); + Connected(WrappedService::new(service, cb)) + }, + Ok(Async::NotReady) => { + result = Some(Ok(Async::NotReady)); + Connecting(cb, fut) + } + Err(err) => { + result = Some(Err(err)); + Dead + } + } + }, + Connected(service) => { + result = Some(Ok(Async::Ready(()))); + Connected(service) + }, + Dead => { + panic!("polled Service after completion through Err or Panic+catch_unwing") + } + } + } + self.service = state; + result.unwrap() + } + + fn service_mut(&mut self) + -> &mut WrappedService<TokioSmtpService, SUP::CompositionBase, RQ> + { + use self::ServiceState::*; + match &mut self.service { + &mut Connected(ref mut service) => service, + _ => panic!("[BUG] service_mut can only be called if we are connected") + } + } + + fn poll_pending_complete(&mut self) -> bool { + let res = + if let Some(&mut (ref mut pending, _)) = self.pending.as_mut() { + match pending.poll() { + Ok(Async::Ready(res)) => Ok(res), + Ok(Async::NotReady) => return false, + Err(err) => { + // if an error happend on smtp RSET we can just reconnect + let reset_conn = + match &err { + &MailSendError::OnReset(_) => true, + &MailSendError::Io(_) => true, + // we should not reach here but intentionally just + // ignore the fact that someone pretents to be us + // and generates this errors + &MailSendError::DriverDropped | + &MailSendError::CanceledByDriver => false, + _ => false + }; + + if reset_conn { + self.service.reset(); + } + Err(err) + } + } + } else { + return true; + }; + + //UNWRAP_SAFE: we can only be here if aboves `if let Some` succeeded + let (_pending, req_rx) = self.pending.take().unwrap(); + // we do not care about + // cancellation at this point + let _ = req_rx.send(res); + true + } +} + +impl<SUP, RQ> Future for MailService<SUP, RQ> + where RQ: MailSendRequest, + SUP: SmtpSetup +{ + type Item = (); + type Error = SUP::NotConnectingError; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + loop { + // 1. complete the "current"/pending request (if there is any) + if !self.poll_pending_complete() { + return Ok(Async::NotReady) + } + + // 2. make sure we are connected, the current request might have "broken" the connection + try_ready!(self.poll_connect()); + + // 3. try to get a new request + let item = match self.rx.poll() { + Ok(Async::Ready(item)) => item, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_) => unreachable!("mpsc::Receiver.poll does not error") + }; + + // stop driver if all channels to it got closed + // with the current mpsc impl. (tokio 0.1.14) the receiver won't + // know about it but future impl. might be more clever + let (request, req_rx) = + if let Some(ele) = item { ele } + else { return Ok(Async::Ready(())) }; + + // 4. set call the service with the new request and set it to pending + self.pending = Some((self.service_mut().call(request), req_rx)); + } + } +}
\ No newline at end of file diff --git a/src/smtp/smtp_wrapper.rs b/src/smtp/smtp_wrapper.rs new file mode 100644 index 0000000..efc642d --- /dev/null +++ b/src/smtp/smtp_wrapper.rs @@ -0,0 +1,209 @@ +use std::marker::PhantomData; +use std::io::{Error as IoError}; + +use futures::future::{self, Either, Future}; + +use tokio_service::{Service as TokioService}; + +use tokio_smtp::request::{ + Request as SmtpRequest, + Mailbox as SmtpMailbox +}; +use tokio_smtp::response::{Response as SmtpResponse}; +use tokio_proto::streaming::{Body, Message}; + +use mail::Mail; +use mail::headers::components::Mailbox; +use ::error::Error; +use ::{CompositionBase, EnvelopData}; + + +#[derive(Clone, Debug)] +pub struct MailResponse; + +/// +/// # Example (Double Dispatch to allow multiple MailSendData Data types) +/// +/// ``` +//TODO imports etc. +// # type UserData = (); +// # type OtherData = (); +// enum Request { +// UserData(UserData), +// OtherData(OtherData) +// } +// +// impl MailSendRequest for Request { +// fn call_compose_mail<CB>(&self, cb: &CB) -> Result<(Mail, EnvelopData), Error> +// where CB: CompositionBase +// { +// use self::Request::*; +// match *self { +// UserData(ref data) => cb.compose_mail(data), +// OtherData(ref data) => cb.compose_mail(data) +// } +// } +// } +/// ``` +/// +/// +pub trait MailSendRequest { + + /// Call the composition bases `compose_mail` method with the contained requests SendMailData + /// + /// If all teplates/send mail use the same type of Data this would just be implemented one the + /// data with `cb.compose_mail(self)` but if there are multiple different type for e.g. + /// different templates you can wrapp them in a enum implement `MailSendRequest` on the enum + /// and then match on the enum and calling `compose_mail` depending on the actual used type. + /// + fn call_compose_mail<CB>(&self, cb: &CB) -> Result<(Mail, EnvelopData), Error> + where CB: CompositionBase; +} + +pub struct WrappedService<I, CB, R> { + inner: I, + composition_base: CB, + _limiter: PhantomData<R> +} + +impl<I, CB, R> WrappedService<I, CB, R> { + pub fn destruct(self) -> (I, CB) { + (self.inner, self.composition_base) + } +} + +impl<I, CB, R> WrappedService<I, CB, R> + where I: TokioService< + Request=Message<SmtpRequest, Body<Vec<u8>, IoError>>, + Response=SmtpResponse + >, + CB: CompositionBase, + R: MailSendRequest +{ + + pub fn new(service: I, composition_base: CB) -> WrappedService<I, CB, R> { |