summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPhilipp Korber <philippkorber@gmail.com>2018-03-01 18:23:38 +0100
committerPhilipp Korber <philippkorber@gmail.com>2018-03-05 17:38:02 +0100
commit9119a9bb4b6d8043e321ec8b9aa767fbdb5aefa3 (patch)
tree418fccd5d9349bb99c792a1f34ec92f045d37fd4
parent0625db709c551486d267f837709d4ce86588b3bd (diff)
chore: initial tokio_smtp mail binding implementation
-rw-r--r--Cargo.toml6
-rw-r--r--examples/composition.rs2
-rw-r--r--src/compositor/_impl.rs16
-rw-r--r--src/compositor/mail_send_data.rs16
-rw-r--r--src/compositor/mod.rs45
-rw-r--r--src/lib.rs14
-rw-r--r--src/macros.rs40
-rw-r--r--src/smtp/handle.rs265
-rw-r--r--src/smtp/mod.rs13
-rw-r--r--src/smtp/service.rs251
-rw-r--r--src/smtp/smtp_wrapper.rs209
-rw-r--r--test_resources/template_a.out.regex2
-rw-r--r--tests/tera/main.rs2
13 files changed, 850 insertions, 31 deletions
diff --git a/Cargo.toml b/Cargo.toml
index a14972c..9491cac 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,6 +31,10 @@ tera = { version = "0.11.1", optional = true }
conduit-mime-types = { version = "0.7.3", optional = true}
lazy_static = { version = "1.0.0", optional = true }
tokio-smtp = { path="../depends/tokio-smtp", optional=true }
+tokio-service = { version = "0.1", optional=true }
+tokio-proto = { version = "0.1", optional=true }
+# this is used by tokio-smtp, we need it to convert our Mailbox to their Mailbox struct
+emailaddress = { version = "0.4", optional=true }
[dependencies.mime]
git="https://github.com/1aim/mime"
@@ -47,7 +51,7 @@ futures-cpupool="0.1.5"
debug_trace_tokens = []
render-template-engine = ["conduit-mime-types", "lazy_static"]
tera-bindings = ["tera", "render-template-engine"]
-smtp = ['tokio-smtp']
+smtp = ['tokio-smtp', 'tokio-service', 'tokio-proto', 'emailaddress']
default = ["default_impl"]
default_impl = [
"default_impl_name_composer", "default_impl_component_id",
diff --git a/examples/composition.rs b/examples/composition.rs
index 58d5677..b9a0528 100644
--- a/examples/composition.rs
+++ b/examples/composition.rs
@@ -74,7 +74,7 @@ fn _main() -> Result<(), Error> {
//this doesn't realy do anything as the NoNameComposer is used
send_data.auto_gen_display_names(NoNameComposer)?;
- let mail = (&context, &template_engine).compose_mail(send_data)?;
+ let (mail, _envelop) = (&context, &template_engine).compose_mail(send_data)?;
let mut encoder = Encoder::new( MailType::Ascii );
let encodable_mail = mail.into_encodeable_mail( &context ).wait().unwrap();
diff --git a/src/compositor/_impl.rs b/src/compositor/_impl.rs
index 9e835c1..8bb83bf 100644
--- a/src/compositor/_impl.rs
+++ b/src/compositor/_impl.rs
@@ -21,7 +21,7 @@ use template::{
};
use super::mail_send_data::MailSendData;
-use super::CompositionBase;
+use super::{CompositionBase, EnvelopData};
pub(crate) trait InnerCompositionBaseExt: CompositionBase {
@@ -29,25 +29,27 @@ pub(crate) trait InnerCompositionBaseExt: CompositionBase {
&self,
send_data:
MailSendData<<Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId, D>
- ) -> Result<Mail>
+ ) -> Result<(Mail, EnvelopData)>
where D: Serialize
{
-
+ let envelop = EnvelopData::from(&send_data);
//compose display name => create Address with display name;
let (core_headers, data, template_id) = self.process_mail_send_data(send_data)?;
let MailParts { alternative_bodies, shared_embeddings, attachments }
- = self.use_template_engine(&*template_id, data)?;
+ = self.use_template_engine(&*template_id, data)?;
+
+ let mail = self.build_mail( alternative_bodies, shared_embeddings.into_iter(),
+ attachments, core_headers )?;
- self.build_mail( alternative_bodies, shared_embeddings.into_iter(), attachments,
- core_headers )
+ Ok((mail, envelop))
}
fn process_mail_send_data<'a, D>(
&self,
send_data:
MailSendData<'a, <Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId, D>
- ) -> Result<(
+ ) -> Result<(
HeaderMap,
D,
Cow<'a, <Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId>
diff --git a/src/compositor/mail_send_data.rs b/src/compositor/mail_send_data.rs
index 2608e3d..9ff9d04 100644
--- a/src/compositor/mail_send_data.rs
+++ b/src/compositor/mail_send_data.rs
@@ -113,15 +113,13 @@ impl<'a, TId: ?Sized + 'a, D> MailSendData<'a, TId, D>
}
}
- pub fn sender(&self) -> Option<&Mailbox> {
- self.sender.as_ref()
+ /// returns a reference to a explicity set sender or else the first (and only) from mailbox
+ pub fn sender(&self) -> &Mailbox {
+ self.sender.as_ref().unwrap_or_else(|| self.from.first())
}
- pub fn sender_mut(&mut self) -> Option<&mut Mailbox> {
- self.sender.as_mut()
- }
- pub fn from(&self) -> &MailboxList {
+ pub fn _from(&self) -> &MailboxList {
&self.from
}
@@ -129,7 +127,7 @@ impl<'a, TId: ?Sized + 'a, D> MailSendData<'a, TId, D>
///
/// this does only expose a &mut Slice of Mailboxes, instead of a &mut MailboxList
/// to make sure that no from mailbox can be added as sender might be empty
- pub fn from_mut(&mut self) -> &mut [Mailbox] {
+ pub fn _from_mut(&mut self) -> &mut [Mailbox] {
&mut self.from
}
@@ -137,11 +135,11 @@ impl<'a, TId: ?Sized + 'a, D> MailSendData<'a, TId, D>
//TODO add try_add_from method failing if sender is None
//TODO maybe add a try_set_from(MailboxList) too
- pub fn to(&self) -> &MailboxList {
+ pub fn _to(&self) -> &MailboxList {
&self.to
}
- pub fn to_mut(&mut self) -> &mut MailboxList {
+ pub fn _to_mut(&mut self) -> &mut MailboxList {
&mut self.to
}
diff --git a/src/compositor/mod.rs b/src/compositor/mod.rs
index d1af8f4..8123f4a 100644
--- a/src/compositor/mod.rs
+++ b/src/compositor/mod.rs
@@ -1,9 +1,12 @@
+use std::borrow::ToOwned;
+
use serde::Serialize;
use error::Result;
use context::Context;
use template::TemplateEngine;
use mail::Mail;
+use mail::headers::components::{MailboxList, Mailbox};
//TODO make sure Box/Arc auto wrapping is impl for all parts
use self::_impl::InnerCompositionBaseExt;
@@ -31,9 +34,9 @@ pub trait CompositionBase {
/// composes a mail based on the given MailSendData
fn compose_mail<D>(
&self,
- send_data:
- MailSendData<<Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId, D>
- ) -> Result<Mail>
+ send_data: MailSendData<
+ <Self::TemplateEngine as TemplateEngine<Self::Context>>::TemplateId, D>
+ ) -> Result<(Mail, EnvelopData)>
where D: Serialize
{
InnerCompositionBaseExt::_compose_mail(self, send_data)
@@ -41,4 +44,40 @@ pub trait CompositionBase {
fn template_engine(&self) -> &Self::TemplateEngine;
fn context(&self) -> &Self::Context;
+}
+
+
+//NOTE: this might get more complex at some point, wrt. e.g. cc, bcc, resent etc.
+pub struct EnvelopData {
+ sender: Mailbox,
+ to: MailboxList
+ //cc: MailboxList, //add if added to MailSendData
+ //bcc: MailboxList, //add if added to MailSendData
+}
+
+impl EnvelopData {
+
+ pub fn new(sender: Mailbox, to: MailboxList) -> Self {
+ EnvelopData {
+ sender, to
+ }
+ }
+
+ pub fn sender(&self) -> &Mailbox {
+ &self.sender
+ }
+
+ pub fn _to(&self) -> &MailboxList {
+ &self.to
+ }
+}
+
+impl<'a, T: ?Sized, D> From<&'a MailSendData<'a, T, D>> for EnvelopData
+ where T: ToOwned, D: Serialize
+{
+ fn from(msd: &'a MailSendData<'a, T, D>) -> Self {
+ let sender = msd.sender().clone();
+ let to = msd._to().clone();
+ EnvelopData::new(sender, to)
+ }
} \ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index 1085b0b..67d9a2a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -8,6 +8,8 @@ extern crate mail_codec_headers as headers;
extern crate error_chain;
extern crate log;
extern crate mime as media_type;
+//#[cfg_attr(feature="smtp", macro_use)]
+#[macro_use]
extern crate futures;
extern crate serde;
extern crate rand;
@@ -31,6 +33,15 @@ extern crate lazy_static;
extern crate tera as tera_crate;
#[cfg(feature="smtp")]
extern crate tokio_smtp;
+#[cfg(feature="smtp")]
+extern crate tokio_service;
+#[cfg(feature="smtp")]
+extern crate tokio_proto;
+#[cfg(feature="smtp")]
+extern crate emailaddress;
+
+#[macro_use]
+mod macros;
pub mod error;
mod builder_extension;
@@ -42,7 +53,8 @@ mod compositor;
pub use self::compositor::{
CompositionBase, NameComposer,
MailSendData, MailSendDataBuilder,
- SharedCompositionBase, SimpleCompositionBase
+ SharedCompositionBase, SimpleCompositionBase,
+ EnvelopData
};
mod utils;
diff --git a/src/macros.rs b/src/macros.rs
new file mode 100644
index 0000000..ffe37ce
--- /dev/null
+++ b/src/macros.rs
@@ -0,0 +1,40 @@
+
+/// like try on a result but converts the error to a boxed error-future before returning it
+#[cfg(feature="smtp")]
+macro_rules! r2f_try {
+ ($code:expr) => ({
+ use futures::future;
+ match $code {
+ Ok(val) => val,
+ Err(error) => return Box::new(future::err(error))
+ }
+ });
+}
+
+///
+/// ```
+/// cloned!([service] => move |name| {
+/// drop(service)
+/// })
+/// ```
+#[cfg(feature="smtp")]
+macro_rules! cloned {
+ ([$($toclo:ident),*] => $doit:expr) => ({
+ $(
+ let $toclo = $toclo.clone();
+ )*
+ $doit
+ });
+}
+
+/// like try_ready but for streams also checking it's some
+#[cfg(features="smtp")]
+macro_rules! try_some_ready {
+ ($e:expr) => ({
+ match $e {
+ Ok(Async::Ready(Some(item))) => item,
+ Ok(other) => return Ok(other),
+ Err(err) => return Err(From::from(err))
+ }
+ });
+} \ No newline at end of file
diff --git a/src/smtp/handle.rs b/src/smtp/handle.rs
new file mode 100644
index 0000000..149a861
--- /dev/null
+++ b/src/smtp/handle.rs
@@ -0,0 +1,265 @@
+use futures::sync::mpsc;
+use futures::sync::oneshot;
+use futures::{sink, Sink, Future, Poll, Async};
+
+use super::smtp_wrapper::{MailSendRequest, MailResponse, MailSendError};
+
+
+type InnerChannel<RQ> = mpsc::Sender<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>;
+
+#[derive(Debug, Clone)]
+pub struct MailServiceHandle<RQ>
+ where RQ: MailSendRequest
+{
+ channel: InnerChannel<RQ>
+}
+
+
+impl<RQ> MailServiceHandle<RQ>
+ where RQ: MailSendRequest
+{
+
+ pub(crate) fn new(sender: InnerChannel<RQ>) -> Self {
+ MailServiceHandle { channel: sender }
+ }
+
+ pub fn send_mail(self, mail_request: RQ) -> MailEnqueueFuture<RQ> {
+ let (sender, rx) = oneshot::channel();
+
+ let send = self.channel.send((mail_request, sender));
+
+ MailEnqueueFuture { send, rx: Some(rx) }
+ }
+
+// pub fn map_request_stream<S>(self, stream: S, max_buffer: Option<usize>) -> SmtpMailStream<RQ, S>
+// where S: Stream<Item = RQ>,
+// S::Error: Into<MailSendError>
+// {
+// SmtpMailStream::new(self.channel, stream, max_buffer)
+// }
+
+ pub fn into_inner(self)
+ -> mpsc::Sender<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>
+ {
+ self.channel
+ }
+
+}
+
+pub struct MailEnqueueFuture<RQ>
+ where RQ: MailSendRequest
+{
+ send: sink::Send<InnerChannel<RQ>>,
+ rx: Option<oneshot::Receiver<Result<MailResponse, MailSendError>>>
+}
+
+impl<RQ> Future for MailEnqueueFuture<RQ>
+ where RQ: MailSendRequest
+{
+ type Item = (MailServiceHandle<RQ>, MailResponseFuture);
+ type Error = MailSendError;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let channel = match self.send.poll() {
+ Ok(Async::Ready(channel)) => channel,
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(_cancel_err) => return Err(MailSendError::DriverDropped),
+ };
+
+ let rx = self.rx.take().expect("called poll after polling completed");
+ Ok(Async::Ready((MailServiceHandle { channel }, MailResponseFuture { rx })))
+ }
+}
+
+
+pub struct MailResponseFuture {
+ rx: oneshot::Receiver<Result<MailResponse, MailSendError>>
+}
+
+impl Future for MailResponseFuture {
+ type Item = MailResponse;
+ type Error = MailSendError;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let res = match self.rx.poll() {
+ Ok(Async::Ready(res)) => res,
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(_cancel_err) => return Err(MailSendError::CanceledByDriver),
+ };
+
+ match res {
+ Ok(resp) => Ok(Async::Ready(resp)),
+ Err(err) => Err(err)
+ }
+ }
+}
+
+
+// use a closure to setup smtp Clone + FnMut() -> R, R: Future<Item=...,> or similar
+//fn setup_smtp(smtp_connect SmtpSetup) -> (Sender, impl Future<(), SendError>) {
+// let (send, resc) = mpcs::channel(XX);
+// let shared_composition_base = smtp_setup.composition_base;
+// let driver = smtp_connect().and_then(|service| {
+// let service = shared_composition_base.wrap_smtp_service(service);
+//
+// let pipe_in = resc;;
+//
+// pipe_in.for_each(|(cmd, pipe_out)| {
+// service.call(cmd).and_then(|res| {
+// let _ = pipe_out.send(res);
+// Ok(())
+// })
+// })
+// });
+// (send, driver)
+//}
+//
+//fn setup_smtp_with_retry(setup: SmtpSetup) -> impl Future<(),RetryFail> {
+// let driver = setup_smtp(setup.clone());
+//
+// future::loop_fn(driver, move |driver| {
+// driver.or_else(|err| {
+// if err.was_canceled() {
+// Ok(Loop::Break(()))
+// } else if err.can_recover() {
+// let new_driver = setup_smtp(setup.clone());
+// Ok(Loop::Continue(new_driver))
+// } else {
+// Err(err.into())
+// }
+// })
+// })
+//}
+//
+
+
+
+// TODO: This links into a stream mapping a stream of requests to a stream of responses,
+// but it needs more though into it, i.e. it decouples the sending of a mail, with
+// the response of getting one. So we need to at last add some mail Id to Req (we
+// might want to see if we can generally do so using MessageId).
+//
+//pub struct SmtpMailStream<RQ, S>
+// where RQ: MailSendRequest,
+// S: Stream<Item = RQ>,
+// S::Error: Into<MailSendError>
+//{
+// channel: Option<InnerChannel<RQ>>,
+// stream: Option<S>,
+// res_buffer: FuturesUnordered<oneshot::Receiver<Result<MailResponse, MailSendError>>>,
+// send_buffer: Option<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>,
+// max_buffer: Option<usize>
+//}
+//
+//
+//impl<RQ, S> SmtpMailStream<RQ, S>
+// where RQ: MailSendRequest,
+// S: Stream<Item = RQ>,
+// S::Error: Into<MailSendError>
+//{
+// pub(crate) fn new(channel: InnerChannel<RQ>, stream: S, max_buffer: Option<usize>) -> Self {
+// SmtpMailStream {
+// channel, stream, max_buffer,
+// send_buffer: None,
+// res_buffer: FuturesUnordered::new(),
+// }
+// }
+//
+// fn channel_mut(&mut self) -> &mut InnerChannel<RQ> {
+// self.channel.as_mut().unwrap()
+// }
+//
+// fn stream_mut(&mut self) -> &mut S {
+// self.stream.as_mut().unwrap()
+// }
+//
+// fn try_enqueue_mail(
+// &mut self,
+// msg: (RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)
+// ) -> Poll<(), DriverDropped<S>>
+// {
+// debug_assert!(self.send_buffer.is_none());
+// debug_assert!(self.stream.is_some() && self.channel.is_some());
+// match self.channel_mut().start_send(msg) {
+// Ok(AsyncSink::Ready) => Ok(Async::Ready(())),
+// Ok(AsyncSink::NotReady(msg)) => {
+// self.send_buffer = Some(msg);
+// Ok(Async::NotReady)
+// },
+// Err(_) => {
+// mem::drop(self.channel.take());
+// Err(DriverDropped::Stream(self.stream.take()))
+// }
+// }
+// }
+//
+// fn close_channel(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+// try_ready!(self.channel_mut().close());
+// return Ok(Async::Ready(None));
+// }
+//
+// fn poll_stream(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+// match self.stream_mut().poll()? {
+// Async::Ready(Some(item)) => Async::Ready(Some(item)),
+// Async::Ready(None) => {
+// self.stream.take();
+// return self.close_channel();
+// }
+// Async::NotReady => {
+// try_ready!(self.channel_mut().poll_complete());
+// return Ok(Async::NotRead);
+// }
+// }
+// }
+//}
+//
+//pub enum DriverDropped<S> {
+// Stream(S),
+// StreamTakenOnPreviousError
+//}
+//
+//impl<RQ, S> Stream for SmtpMailStream<RQ, S>
+// where RQ: MailSendRequest,
+// S: Stream<Item = RQ>,
+// //FIXME(tokio 0.2) use error Never??
+// S::Error: Into<MailSendError>
+//{
+// type Item = Result<MailResponse, MailSendError>;
+// type Error = DriverDropped<S>;
+//
+// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+// if self.channel.is_none() {
+// return Err(DriverDropped::StreamTakenOnPreviousError);
+// }
+//
+// if self.stream.is_none() {
+// return self.close_channel();
+// }
+//
+// if let Some(msg) = self.send_buffer {
+// try_ready!(self.try_enqueue_mail(msg));
+// }
+//
+// loop {
+//
+// match self.res_buffer.poll() {
+// Ok(Async::NotReady) => {},
+// Ok(Async::Ready(res)) => return Ok(AsyncReady(Ok(res))),
+// Err(err) => return Ok(AsyncReady(Err(e)))
+// }
+//
+// if self.max_buffer.map(|max| self.res_buffer.len() >= max).unwrap_or_false() {
+// return Ok(Async::NotReady);
+// }
+//
+//
+// let item = try_some_ready!(self.poll_stream());
+//
+// let (tx, rx) = oneshot::channel();
+//
+// self.res_buffer.push(rx);
+//
+// try_ready!(self.try_enqueue_mail((item, tx)));
+// }
+// }
+//} \ No newline at end of file
diff --git a/src/smtp/mod.rs b/src/smtp/mod.rs
index 816b2f6..f0c938c 100644
--- a/src/smtp/mod.rs
+++ b/src/smtp/mod.rs
@@ -1,10 +1,9 @@
-use std::net::SocketAddr;
-use serde::Serialize;
-use tokio_smtp;
-use tokio_smtp::request::{ClientId as SmtpClientId};
-use tokio_smtp::client::ClientParams;
+mod smtp_wrapper;
+pub use self::smtp_wrapper::*;
-use ::context::BuilderContext;
+mod handle;
+pub use self::handle::*;
-//tmp empty \ No newline at end of file
+mod service;
+pub use self::service::*; \ No newline at end of file
diff --git a/src/smtp/service.rs b/src/smtp/service.rs
new file mode 100644
index 0000000..95f3699
--- /dev/null
+++ b/src/smtp/service.rs
@@ -0,0 +1,251 @@
+use std::io::{Error as IoError};
+use std::mem;
+
+use futures::sync::{mpsc, oneshot};
+use futures::{Future, Stream, Poll, Async};
+
+use tokio_proto::util::client_proxy::ClientProxy;
+use tokio_service::{Service as TokioService};
+use tokio_proto::streaming::{Body, Message};
+
+use tokio_smtp::request::{
+ Request as SmtpRequest,
+};
+use tokio_smtp::response::{Response as SmtpResponse};
+
+use ::CompositionBase;
+use super::smtp_wrapper::{WrappedService, MailSendRequest, MailSendError, MailResponse};
+use super::handle::MailServiceHandle;
+
+//Next steps:
+// make the service accept encodable mails nothing else
+// do the bla -> encodable in the setup part check for canceleation before sending
+
+pub type TokioSmtpService = ClientProxy<
+ Message<SmtpRequest, Body<Vec<u8>, IoError>>,
+ SmtpResponse, IoError>;
+
+trait SmtpSetup: Send {
+
+ /// The future returned which returns a Smtp connection,
+ ///
+ /// as this smtp mail bindings are writting for `tokio_smtp`
+ /// the Item is fixed to `ClientProxy`, (This might change
+ /// in future versions)
+ type ConnectFuture: 'static + Future<
+ Item=TokioSmtpService,
+ Error=Self::NotConnectingError>;
+
+ /// The error returned if it is not possible to connect,
+ /// this might represent a direct connection failure or
+ /// one involfing multiple retries or similar aspects.
+ type NotConnectingError;
+
+ /// The impl of the CompositionBAse
+ type CompositionBase: CompositionBase;
+
+ // this future can contain all kind of retry connection handling etc.
+ /// This method is called to connect with an SMTP server.
+ ///
+ /// It is called whenever connecting to a SMTP server is necessary,
+ /// this includes the initial connection as well as reconnecting after
+ /// the connection might no longer be usable.
+ ///
+ /// As it returns a future with it's own error it can be used to
+ /// handle automatically retrying failed connections and limiting
+ /// the amount of retries or having a timeout before retrying to
+ /// connect.
+ ///
+ //TODO
+ /// Currently it is not implemented to retry sending failed mails, even
+ /// if it reconnects after e.g. an IO error
+ fn connect(&mut self) -> Self::ConnectFuture;
+
+ /// return the buffer size for the mpsc channel between the service and it's handles
+ fn driver_input_buffer_size(&self) -> usize { 16 }
+
+ /// Return a `CompositionBase` to use, this will only be called once for
+ /// the lifetime of a service.
+ fn composition_base(&self) -> Self::CompositionBase;
+}
+
+
+
+struct MailService<SUP, RQ>
+ where SUP: SmtpSetup, RQ: MailSendRequest,
+{
+ setup: SUP,
+ rx: mpsc::Receiver<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>,
+ service: ServiceState<SUP::CompositionBase, SUP::ConnectFuture, RQ>,
+ pending: Option<(
+ Box<Future<Item=MailResponse, Error=MailSendError>>,
+ oneshot::Sender<Result<MailResponse, MailSendError>>
+ )>
+}
+
+enum ServiceState<CB, F, RQ> {
+ Initial(CB),
+ Connecting(CB, F),
+ Connected(WrappedService<TokioSmtpService, CB, RQ>),
+ Dead
+}
+
+impl<CB, F, RQ> ServiceState<CB, F, RQ> {
+
+ fn reset(&mut self) {
+ use self::ServiceState::*;
+ let state = mem::replace(self, ServiceState::Dead);
+ *self = match state {
+ Initial(cb) => Initial(cb),
+ Connecting(cb, _f) => Initial(cb),
+ Connected(ws) => {
+ let (_, cb) = ws.destruct();
+ Initial(cb)
+ },
+ Dead => Dead
+ }
+ }
+}
+
+impl<SUP, RQ> MailService<SUP, RQ>
+ where SUP: SmtpSetup,
+ RQ: MailSendRequest
+{
+
+ pub fn new(setup: SUP) -> (Self, MailServiceHandle<RQ>) {
+ let cb = setup.composition_base();
+ let (tx, rx) = mpsc::channel(setup.driver_input_buffer_size());
+
+ let driver = MailService {
+ setup, rx,
+ service: ServiceState::Initial(cb),
+ pending: None,
+ };
+
+ let handle = MailServiceHandle::new(tx);
+ (driver, handle)
+ }
+
+ fn poll_connect(&mut self) -> Poll<(), SUP::NotConnectingError> {
+ use self::ServiceState::*;
+ let mut state = mem::replace(&mut self.service, ServiceState::Dead);
+ let mut result = None;
+ while result.is_none() {
+ state = match state {
+ Initial(cb) => {
+ Connecting(cb, self.setup.connect())
+ },
+ Connecting(cb, mut fut) => {
+ match fut.poll() {
+ Ok(Async::Ready(service)) => {
+ result = Some(Ok(Async::Ready(())));
+ Connected(WrappedService::new(service, cb))
+ },
+ Ok(Async::NotReady) => {
+ result = Some(Ok(Async::NotReady));
+ Connecting(cb, fut)
+ }
+ Err(err) => {
+ result = Some(Err(err));
+ Dead
+ }
+ }
+ },
+ Connected(service) => {
+ result = Some(Ok(Async::Ready(())));
+ Connected(service)
+ },
+ Dead => {
+ panic!("polled Service after completion through Err or Panic+catch_unwing")
+ }
+ }
+ }
+ self.service = state;
+ result.unwrap()
+ }
+
+ fn service_mut(&mut self)
+ -> &mut WrappedService<TokioSmtpService, SUP::CompositionBase, RQ>
+ {
+ use self::ServiceState::*;
+ match &mut self.service {
+ &mut Connected(ref mut service) => service,
+ _ => panic!("[BUG] service_mut can only be called if we are connected")
+ }
+ }
+
+ fn poll_pending_complete(&mut self) -> bool {
+ let res =
+ if let Some(&mut (ref mut pending, _)) = self.pending.as_mut() {
+ match pending.poll() {
+ Ok(Async::Ready(res)) => Ok(res),
+ Ok(Async::NotReady) => return false,
+ Err(err) => {
+ // if an error happend on smtp RSET we can just reconnect
+ let reset_conn =
+ match &err {
+ &MailSendError::OnReset(_) => true,
+ &MailSendError::Io(_) => true,
+ // we should not reach here but intentionally just
+ // ignore the fact that someone pretents to be us
+ // and generates this errors
+ &MailSendError::DriverDropped |
+ &MailSendError::CanceledByDriver => false,
+ _ => false
+ };
+
+ if reset_conn {
+ self.service.reset();
+ }
+ Err(err)
+ }
+ }
+ } else {
+ return true;
+ };
+
+ //UNWRAP_SAFE: we can only be here if aboves `if let Some` succeeded
+ let (_pending, req_rx) = self.pending.take().unwrap();
+ // we do not care about
+ // cancellation at this point
+ let _ = req_rx.send(res);
+ true
+ }
+}
+
+impl<SUP, RQ> Future for MailService<SUP, RQ>
+ where RQ: MailSendRequest,
+ SUP: SmtpSetup
+{
+ type Item = ();
+ type Error = SUP::NotConnectingError;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ loop {
+ // 1. complete the "current"/pending request (if there is any)
+ if !self.poll_pending_complete() {
+ return Ok(Async::NotReady)
+ }
+
+ // 2. make sure we are connected, the current request might have "broken" the connection
+ try_ready!(self.poll_connect());
+
+ // 3. try to get a new request
+ let item = match self.rx.poll() {
+ Ok(Async::Ready(item)) => item,
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(_) => unreachable!("mpsc::Receiver.poll does not error")
+ };
+
+ // stop driver if all channels to it got closed
+ // with the current mpsc impl. (tokio 0.1.14) the receiver won't
+ // know about it but future impl. might be more clever
+ let (request, req_rx) =
+ if let Some(ele) = item { ele }
+ else { return Ok(Async::Ready(())) };
+
+ // 4. set call the service with the new request and set it to pending
+ self.pending = Some((self.service_mut().call(request), req_rx));
+ }
+ }
+} \ No newline at end of file
diff --git a/src/smtp/smtp_wrapper.rs b/src/smtp/smtp_wrapper.rs
new file mode 100644
index 0000000..efc642d
--- /dev/null
+++ b/src/smtp/smtp_wrapper.rs
@@ -0,0 +1,209 @@
+use std::marker::PhantomData;
+use std::io::{Error as IoError};
+
+use futures::future::{self, Either, Future};
+
+use tokio_service::{Service as TokioService};
+
+use tokio_smtp::request::{
+ Request as SmtpRequest,
+ Mailbox as SmtpMailbox
+};
+use tokio_smtp::response::{Response as SmtpResponse};
+use tokio_proto::streaming::{Body, Message};
+
+use mail::Mail;
+use mail::headers::components::Mailbox;
+use ::error::Error;
+use ::{CompositionBase, EnvelopData};
+
+
+#[derive(Clone, Debug)]
+pub struct MailResponse;
+
+///
+/// # Example (Double Dispatch to allow multiple MailSendData Data types)
+///
+/// ```
+//TODO imports etc.
+// # type UserData = ();
+// # type OtherData = ();
+// enum Request {
+// UserData(UserData),
+// OtherData(OtherData)
+// }
+//
+// impl MailSendRequest for Request {
+// fn call_compose_mail<CB>(&self, cb: &CB) -> Result<(Mail, EnvelopData), Error>
+// where CB: CompositionBase
+// {
+// use self::Request::*;
+// match *self {
+// UserData(ref data) => cb.compose_mail(data),
+// OtherData(ref data) => cb.compose_mail(data)
+// }
+// }
+// }
+/// ```
+///
+///
+pub trait MailSendRequest {
+
+ /// Call the composition bases `compose_mail` method with the contained requests SendMailData
+ ///
+ /// If all teplates/send mail use the same type of Data this would just be implemented one the
+ /// data with `cb.compose_mail(self)` but if there are multiple different type for e.g.
+ /// different templates you can wrapp them in a enum implement `MailSendRequest` on the enum
+ /// and then match on the enum and calling `compose_mail` depending on the actual used type.
+ ///
+ fn call_compose_mail<CB>(&self, cb: &CB) -> Result<(Mail, EnvelopData), Error>
+ where CB: CompositionBase;
+}
+
+pub struct WrappedService<I, CB, R> {
+ inner: I,
+ composition_base: CB,
+ _limiter: PhantomData<R>
+}
+
+impl<I, CB, R> WrappedService<I, CB, R> {
+ pub fn destruct(self) -> (I, CB) {
+ (self.inner, self.composition_base)
+ }
+}
+
+impl<I, CB, R> WrappedService<I, CB, R>
+ where I: TokioService<
+ Request=Message<SmtpRequest, Body<Vec<u8>, IoError>>,
+ Response=SmtpResponse
+ >,
+ CB: CompositionBase,
+ R: MailSendRequest
+{
+
+ pub fn new(service: I, composition_base: CB) -> WrappedService<I, CB, R> {