summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPhilipp Korber <philippkorber@gmail.com>2018-05-11 14:56:04 +0200
committerPhilipp Korber <philippkorber@gmail.com>2018-05-11 14:56:04 +0200
commita95f660d7b20b40158f23465f3e8f583480e5f83 (patch)
tree67bf333fb3f422c67bf30111e7fd9ad0ac267d59
parent760a42222cde5ef3a0e26af4174149838ef54d7b (diff)
chore(api+deps): strongly simplified api and uses changed deps
- the api was stringly simplified - uses new-tokio-smtp instead of tokio-smtp - inital commit of the changed api, some parts are not commit yet as they still have to be changed including e.g. the integration tests
-rw-r--r--Cargo.toml6
-rw-r--r--src/common.rs141
-rw-r--r--src/encode.rs236
-rw-r--r--src/error.rs91
-rw-r--r--src/handle.rs256
-rw-r--r--src/lib.rs19
-rw-r--r--src/mpsc_ext.rs34
-rw-r--r--src/resolve_all.rs81
-rw-r--r--src/service.rs413
-rw-r--r--src/smtp_wrapper.rs202
-rw-r--r--src/stop_handle.rs18
-rw-r--r--src/test.rs270
12 files changed, 394 insertions, 1373 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 1cfdb3a..e343bdd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,5 +11,9 @@ readme = "./README.md"
repository = "https://github.com/1aim/mail-smtp"
[dependencies]
-mail = { git="https://github.com/1aim/mail" }
+futures = "0.1"
+failure = "0.1.1"
+mail-types = { git="https://github.com/1aim/mail-types" }
+mail-common = { git="https://github.com/1aim/mail-common" }
+mail-headers = { git="https://github.com/1aim/mail-headers" }
new-tokio-smtp = { git="https://github.com/1aim/new-tokio-smtp" }
diff --git a/src/common.rs b/src/common.rs
index bccf869..e9b9493 100644
--- a/src/common.rs
+++ b/src/common.rs
@@ -1,76 +1,32 @@
//! This modules contains some of the data types used, like e.g. Response, Request, Envelop etc.
+use std::mem;
-use vec1::Vec1;
+use new_tokio_smtp::send_mail::{
+ self as smtp,
+ MailAddress,
+ EnvelopData
+};
-use futures::sync::oneshot;
-use new_tokio_smtp::{ForwardPath, ReversePath};
-use mail::headers::{Sender, From as _From, To};
-use mail::headers::components::Mailbox;
+use mail_common::MailType;
+use mail_common::encoder::{EncodingBuffer, EncodableInHeader};
+use mail_common::error::EncodingError;
+use headers::{Sender, _From, _To};
+use headers::components::Mailbox;
+use headers::error::BuildInValidationError;
use mail::Mail;
-use super::error::{MailSendError, EnvelopFromMailError};
+use mail::error::MailError;
-#[derive(Debug)]
-pub struct EnvelopData {
- from: ReversePath,
- to: Vec1<ForwardPath>
-}
-
-impl EnvelopData {
- pub fn new(from: SmtpMailbox, to: Vec1<SmtpMailbox>) -> Self {
- EnvelopData { from, to }
- }
- pub fn split(self) -> (SmtpMailbox, Vec1<SmtpMailbox>) {
- let EnvelopData { from, to } = self;
- (from, to)
- }
- pub fn from_mail(mail: &Mail) -> Result<Self, EnvelopFromMailError> {
-
- let headers = mail.headers();
- let smtp_from =
- if let Some(sender) = headers.get_single(Sender) {
- let sender = sender.map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?;
- //TODO double check with from field
- mailbox2smtp_mailbox(sender)
- } else {
- let from = headers.get_single(_From)
- .ok_or(EnvelopFromMailError::NeitherSenderNorFrom)?
- .map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?;
-
- if from.len() > 1 {
- return Err(EnvelopFromMailError::NoSenderAndMoreThanOneFrom);
- }
-
- mailbox2smtp_mailbox(from.first())
- };
-
- let smtp_to =
- if let Some(to) = headers.get_single(To) {
- let to = to.map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?;
- to.mapped_ref(mailbox2smtp_mailbox)
- } else {
- return Err(EnvelopFromMailError::NoToHeaderField);
- };
-
- //TODO Cc, Bcc
-
- Ok(EnvelopData {
- from: smtp_from,
- to: smtp_to
- })
- }
-}
-pub type MailSendResult = Result<MailResponse, MailSendError>;
-pub(crate) type Handle2ServiceMsg = (MailRequest, oneshot::Sender<MailSendResult>);
+// pub type MailSendResult = Result<MailResponse, MailSendError>;
+// pub(crate) type Handle2ServiceMsg = (MailRequest, oneshot::Sender<MailSendResult>);
-#[derive(Debug, Clone)]
-pub struct MailResponse;
+// #[derive(Debug, Clone)]
+// pub struct MailResponse;
-//TODO derive(Clone): requires clone for Box<EncodableMail+'static>
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct MailRequest {
mail: Mail,
envelop_data: Option<EnvelopData>
@@ -94,19 +50,64 @@ impl MailRequest {
MailRequest { mail, envelop_data: Some(envelop) }
}
- pub fn into_mail_with_envelop(self) -> Result<(Mail, EnvelopData), EnvelopFromMailError> {
+ pub fn override_envelop(&mut self, envelop: EnvelopData) -> Option<EnvelopData> {
+ mem::replace(&mut self.envelop_data, Some(envelop))
+ }
+
+ pub fn into_mail_with_envelop(self) -> Result<(Mail, EnvelopData), MailError> {
let envelop =
if let Some(envelop) = self.envelop_data { envelop }
- else { EnvelopData::from_mail(&self.mail)? };
+ else { derive_envelop_data_from_mail(&self.mail)? };
Ok((self.mail, envelop))
}
}
-fn mailbox2smtp_mailbox(mailbox: &Mailbox) -> SmtpMailbox {
- use emailaddress::EmailAddress;
- SmtpMailbox(Some(EmailAddress {
- local: mailbox.email.local_part.as_str().to_owned(),
- domain: mailbox.email.domain.as_str().to_owned(),
- }))
+fn mailaddress_from_mailbox(mailbox: &Mailbox) -> Result<MailAddress, EncodingError> {
+ let email = &mailbox.email;
+ let needs_smtputf8 = email.check_if_internationalized();
+ let mt = if needs_smtputf8 { MailType::Internationalized } else { MailType::Ascii };
+ let mut buffer = EncodingBuffer::new(mt);
+ {
+ email.encode(&mut buffer.writer())?;
+ }
+ let raw: Vec<u8> = buffer.into();
+ let address = String::from_utf8(raw).expect("[BUG] encoding Email produced non utf8 data");
+ Ok(MailAddress::new_unchecked(address, needs_smtputf8))
+}
+
+pub fn derive_envelop_data_from_mail(mail: &Mail)
+ -> Result<smtp::EnvelopData, MailError>
+{
+ let headers = mail.headers();
+ let smtp_from =
+ if let Some(sender) = headers.get_single(Sender) {
+ let sender = sender?;
+ //TODO double check with from field
+ mailaddress_from_mailbox(sender)?
+ } else {
+ let from = headers.get_single(_From)
+ .ok_or(BuildInValidationError::NoFrom)??;
+
+ if from.len() > 1 {
+ return Err(BuildInValidationError::MultiMailboxFromWithoutSender.into());
+ }
+
+ mailaddress_from_mailbox(from.first())?
+ };
+
+ let smtp_to =
+ if let Some(to) = headers.get_single(_To) {
+ let to = to?;
+ to.try_mapped_ref(mailaddress_from_mailbox)?
+ } else {
+ return Err(BuildInValidationError::NoTo.into());
+ };
+
+ //TODO Cc, Bcc
+
+ Ok(EnvelopData {
+ from: Some(smtp_from),
+ to: smtp_to
+ })
} \ No newline at end of file
diff --git a/src/encode.rs b/src/encode.rs
index c5274bd..d6a92c2 100644
--- a/src/encode.rs
+++ b/src/encode.rs
@@ -1,98 +1,174 @@
-use std::io::{Error as IoError};
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::Arc;
-use std::mem;
-use futures::sync::{mpsc, oneshot};
-use futures::{future, Future, Stream, Poll, Async};
-use futures::stream::BufferUnordered;
+/*
-use new_tokio_smtp::Connection;
-use mail::prelude::{Encoder, Encodable, MailType};
-use mail::utils::SendBoxFuture;
+ ↓
+ ---[][][]-------------------------- \
+ | ↓ <<sequential>> | \
+ | ↓ | \
+ | (create envelop) | \ |
+ | ↓ | | 1. func |
+ | (into encodable mail) | / |
+ | ↓ [envelop, encodable mail] | > 3. encode_mails
+ | (offload encode mail) | > 2. func |
+ | ↓ [envelop, future->Vec<u8>] | |
+ ---[][][]-------------------------- /
+ ↓
+ [Future<Vec<Result<>>]............/ we don't want a stream here all encoding should \
+ ↓ | be already done, so that there are no large periods |
+ ↓ \ where the smtp connection is open and pending /
+ ↓
+ ---[][][]-------------------------- \
+ | ↓ <<async/sequential>> | \
+ | ↓ | |
+ | (send mail) | > send_mails
+ | ↓ | |
+ | <ok?> →→no→→ (add to failures)| |
+ | ↓ yes | /
+ ---[][][]-------------------------- /
+ ↓
+*/
+use std::iter::FromIterator;
+use std::vec;
+
+use futures::future::{self, Either, Loop, Future};
+use new_tokio_smtp::{Cmd, Connection, ConnectionConfig, SetupTls};
+use new_tokio_smtp::send_mail::{self as smtp, ConSendMailExt};
+
+use mail_common::MailType;
+use mail_common::encoder::EncodingBuffer;
use mail::context::BuilderContext;
+use mail::error::MailError;
-use super::smtp_wrapper::{send_mail, close_smtp_conn};
-use super::common::{
- MailSendResult, MailResponse, MailRequest, EnvelopData,
- Handle2ServiceMsg
-};
-use super::handle::MailServiceHandle;
-use super::error::MailSendError;
+use ::resolve_all::ResolveAll;
+use ::common::MailRequest;
+use ::error::{MailSendError, TransportError};
-pub(crate) type MailEncodingResult = Result<(Vec<u8>, EnvelopData), MailSendError>;
-/// encodes the mails in the input stream in a thread poll returning the results out of order
-///
-/// The `max_concurrent` parameter determines how many mails are encoded concurrently.
-///
-/// Note that it uses `ctx.offload` to offload work, i.e. send it to a thread pool,
-/// if `ctx.offload` is not implemented in terms of a thread pool this will not encode
-/// mail in a thread poll but with whatever mechanism `ctx.offload` uses to resolve
-/// futures.
-///
-pub(crate) fn stream_encode_mail<S, CTX>(steam: S, ctx: CTX, max_concurrent: usize)
- //FIXME[rust/impl Trait]: use impl Trait instead of boxing
- -> Box<Stream<
- //FIXME[futures >= 0.2]: replace () with Never
- Item=SendBoxFuture<(MailEncodingResult, oneshot::Sender<MailSendResult>), ()>,
- //FIXME[futures >= 0.2]: replace () with Never
- Error=()>>
- //FIXME[futures >= 0.2]: replace () with Never
- where S: Stream<Item=Handle2ServiceMsg, Error=()>, CTX: BuilderContext
+pub type EncodeMailResult = Result<smtp::MailEnvelop, MailError>;
+
+// errors:
+// - EnvelopFromMailError
+// - MailError
+pub fn encode_mails<I, C>(requests: I, ctx: &C)
+ //TODO[futures/v>=0.2 | rust/! type]: use Never or !
+ -> impl Future<Item=Vec<EncodeMailResult>, Error=()> + Send
+ where I: IntoIterator<Item=MailRequest>, C: BuilderContext
{
- let _ctx = ctx;
- let fut_stream = stream.map(move |(req, tx)| {
- //clone ctx to move it into the operation chain
- let ctx = _ctx.clone();
- let operation = future
- //use lazy to make sure it's run in the thread pool
- ::lazy(move || mail_request.into_mail_with_envelop())
- .then(move |result| match result {
- Ok((mail, envelop)) => Ok((mail, envelop, tx)),
- Err(err) => Err((MailSendError::CreatingEnvelop(err), tx))
- })
- .and_then(move |(mail, envelop, tx)| {
- mail.into_encodeable_mail(&ctx)
- .then(move |result| match result {
- Ok(enc_mail) => Ok((enc_mail, envelop, tx)),
- Err(err) => Err((MailSendError::Encoding(err), tx))
- })
- })
- .and_then(move |(encodable_mail, envelop, tx)| {
- //TODO we need to feed in the MailType (and get it from tokio smtp)
- let mut encoder = Encoder::new( MailType::Ascii );
- match encodable_mail.encode(&mut encoder) {
- Ok(()) => {},
- Err(err) => return Err((MailSendError::Encoding(err), tx))
- }
-
- let bytes = match encoder.to_vec() {
- Ok(bytes) => bytes,
- Err(err) => return Err((MailSendError::Encoding(err), tx))
+ let pending = requests
+ .into_iter()
+ .map(|request| {
+ let (mail, envelop_data) =
+ match request.into_mail_with_envelop() {
+ Ok(pair) => pair,
+ Err(e) => return Either::A(future::err(e.into()))
+ };
+
+ let _ctx = ctx.clone();
+ let fut = mail
+ .into_encodeable_mail(ctx)
+ .and_then(move |enc_mail| _ctx.offload_fn(move || {
+ let (mail_type, requirement) =
+ if envelop_data.needs_smtputf8() {
+ (MailType::Internationalized, smtp::EncodingRequirement::Smtputf8)
+ } else {
+ (MailType::Ascii, smtp::EncodingRequirement::None)
};
- //TODO we also need to return SmtpEnvelop<Vec<u8>>
- let enc_result = Ok((bytes, envelop));
- Ok((enc_result, tx))
- })
- .or_else(move |(err, tx)| {
- let enc_result = Err(err);
- Ok((enc_result, tx))
- });
+ let mut buffer = EncodingBuffer::new(mail_type);
+ enc_mail.encode(&mut buffer)?;
- // offload work to thread pool
- let fut = _ctx.offload(operation);
+ let smtp_mail = smtp::Mail::new(requirement, buffer.into());
+ Ok(smtp::MailEnvelop::from((smtp_mail, envelop_data)))
+ }));
- // return future as new item
- fut
+ Either::B(fut)
});
- // buffer
- let buffered = fut_stream.buffer_unordered(max_concurrent);
+ ResolveAll::from_iter(pending)
+}
+
+pub type SendMailResult = Result<(), MailSendError>;
+
+
+pub fn send_encoded_mails<I>(con: Connection, mails: I)
+ -> impl Future<
+ Item=(Connection, Vec<SendMailResult>),
+ Error=(TransportError, Vec<SendMailResult>, I::IntoIter)>
+ where I: IntoIterator<Item=EncodeMailResult>, I::IntoIter: 'static
+{
+ let iter = mails.into_iter();
+ let results = Vec::new();
+ let fut = future::loop_fn((con, iter, results), |(con, mut iter, mut results)| match iter.next() {
+ None => Either::A(future::ok(Loop::Break((con, results)))),
+ Some(Err(err)) => {
+ results.push(Err(MailSendError::from(err)));
+ Either::A(future::ok(Loop::Continue((con, iter, results))))
+ },
+ Some(Ok(envelop)) => Either::B(con
+ .send_mail(envelop)
+ .then(move |res| match res {
+ Ok((con, logic_result)) => {
+ results.push(logic_result.map_err(|(_idx, err)| MailSendError::from(err)));
+ Ok(Loop::Continue((con, iter, results)))
+ },
+ Err(err) => {
+ Err((TransportError::Io(err), results, iter))
+ }
+ }))
+ });
- Box::new(buffered)
+ fut
}
+/// Send mails _to a specific mail server_
+///
+/// This encodes the mails, opens a connection, sends the mails over and
+/// closes the connection again.
+///
+/// While this uses the `To` field of a mail to determine the smtp reveiver
+/// it does not resolve the server based on the mail address domain. This
+/// means it's best suite for sending to a Mail Submission Agent (MSA), but
+/// less for sending to a Mail Exchanger (MX).
+///
+/// Automatically handling Bcc/Cc is _not yet_ implemented.
+///
+pub fn send_mails<S, A, I, C>(config: ConnectionConfig<A, S>, requests: I, ctx: &C)
+ -> impl Future<
+ Item=Vec<SendMailResult>,
+ Error=(TransportError, Vec<SendMailResult>, vec::IntoIter<EncodeMailResult>)>
+ where I: IntoIterator<Item=MailRequest>,
+ C: BuilderContext,
+ S: SetupTls,
+ A: Cmd
+{
+
+ let fut = encode_mails(requests, ctx)
+ .map_err(|_| unreachable!())
+ .and_then(|mails| {
+ if mails.iter().all(|r| r.is_err()) {
+ let send_skipped = mails
+ .into_iter()
+ .map(|result| match result {
+ Ok(_) => unreachable!(),
+ Err(err) => Err(MailSendError::Mail(err))
+ })
+ .collect();
+
+ Either::A(future::ok(send_skipped))
+ } else {
+ let fut = Connection
+ ::connect(config)
+ .then(|result| match result {
+ Err(err) => Either::A(future::err((TransportError::Connecting(err), Vec::new(), mails.into_iter()))),
+ Ok(con) => Either::B(send_encoded_mails(con, mails))
+ })
+ .and_then(|(con, results)| con.quit().then(|_| Ok(results)));
+
+ Either::B(fut)
+ }
+ });
+
+ fut
+}
diff --git a/src/error.rs b/src/error.rs
index 48d4690..cf51918 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,24 +1,79 @@
-use std::io::{Error as IoError};
-use ::error::Error;
-use new_tokio_smtp::error::LogicError;
+use std::{io as std_io};
-#[derive(Debug)]
+use new_tokio_smtp::error::{ConnectingFailed, LogicError};
+use mail::error::MailError;
+
+
+#[derive(Debug, Fail)]
pub enum MailSendError {
- CreatingEnvelop(EnvelopFromMailError),
- Composition(Error),
- Encoding(Error),
- //Note with pipelining this will change to Vec<LogicError>
- Smtp(LogicError),
- Io(IoError),
- DriverDropped,
- CanceledByDriver
+ /// creating the mail failed
+ ///
+ /// This can happen because of a number of reasons including:
+ ///
+ /// 1. missing header fields
+ /// 2. invalid header fields
+ /// 2. encoding header fields fails
+ /// 3. loading resources failed
+ /// (resources like e.g. appendix, logo embedded in html mail, etc.)
+ ///
+ #[fail(display = "{}", _0)]
+ Mail(MailError),
+
+ /// sending the mail failed
+ ///
+ /// This can happen because of a number of reasons including:
+ /// 1. server rejects mail transaction because of send or receiver
+ /// address or body data (e.g. body to long).
+ /// 2. mail address requires smtputf8 support, which is not given
+ /// 3. server rejects sending the mail for other reasons (it's
+ /// closing, overloaded etc.)
+ #[fail(display = "{}", _0)]
+ Smtp(LogicError)
+}
+
+impl From<MailError> for MailSendError {
+ fn from(err: MailError) -> Self {
+ MailSendError::Mail(err)
+ }
+}
+
+impl From<LogicError> for MailSendError {
+ fn from(err: LogicError) -> Self {
+ MailSendError::Smtp(err)
+ }
+}
+
+#[derive(Debug, Fail)]
+pub enum TransportError {
+
+ // Setting up the connection failed
+ //
+ // Failures can include but are not limited to:
+ //
+ // - connecting with tcp failed
+ // - starting tls failed
+ // - server does not want to be used (e.g. failure on sending EHLO)
+ // - authentication failed
+ #[fail(display = "{}", _0)]
+ Connecting(ConnectingFailed),
+
+ // An I/O-Error happened while using the connection
+ //
+ // This is mainly for I/O-Error after the setup of the connection
+ // was successful, which normally sending includes Ehlo and Auth
+ // commands
+ #[fail(display = "{}", _0)]
+ Io(std_io::Error)
}
+impl From<std_io::Error> for TransportError {
+ fn from(err: std_io::Error) -> Self {
+ TransportError::Io(err)
+ }
+}
-#[derive(Debug)]
-pub enum EnvelopFromMailError {
- NeitherSenderNorFrom,
- TypeError(Error),
- NoSenderAndMoreThanOneFrom,
- NoToHeaderField
+impl From<ConnectingFailed> for TransportError {
+ fn from(err: ConnectingFailed) -> Self {
+ TransportError::Connecting(err)
+ }
}
diff --git a/src/handle.rs b/src/handle.rs
deleted file mode 100644
index 5f351e8..0000000
--- a/src/handle.rs
+++ /dev/null
@@ -1,256 +0,0 @@
-use futures::sync::mpsc;
-use futures::sync::oneshot;
-use futures::{sink, Sink, Future, Poll, Async};
-
-use super::common::{MailRequest, MailResponse};
-use super::error::MailSendError;
-
-type InnerChannel = mpsc::Sender<(MailRequest, oneshot::Sender<Result<MailResponse, MailSendError>>)>;
-
-#[derive(Debug, Clone)]
-pub struct MailServiceHandle {
- channel: InnerChannel
-}
-
-
-impl MailServiceHandle {
-
- pub(crate) fn new(sender: InnerChannel) -> Self {
- MailServiceHandle { channel: sender }
- }
-
- pub fn send_mail(self, mail_request: MailRequest) -> MailEnqueueFuture {
- let (sender, rx) = oneshot::channel();
-
- let send = self.channel.send((mail_request, sender));
-
- MailEnqueueFuture { send, rx: Some(rx) }
- }
-
-// pub fn map_request_stream<S>(self, stream: S, max_buffer: Option<usize>) -> SmtpMailStream<RQ, S>
-// where S: Stream<Item = RQ>,
-// S::Error: Into<MailSendError>
-// {
-// SmtpMailStream::new(self.channel, stream, max_buffer)
-// }
-
- pub fn into_inner(self) -> InnerChannel {
- self.channel
- }
-
-}
-
-pub struct MailEnqueueFuture {
- send: sink::Send<InnerChannel>,
- rx: Option<oneshot::Receiver<Result<MailResponse, MailSendError>>>
-}
-
-impl Future for MailEnqueueFuture {
-
- type Item = (MailServiceHandle, MailResponseFuture);
- type Error = MailSendError;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let channel = match self.send.poll() {
- Ok(Async::Ready(channel)) => channel,
- Ok(Async::NotReady) => return Ok(Async::NotReady),
- Err(_cancel_err) => return Err(MailSendError::DriverDropped),
- };
-
- let rx = self.rx.take().expect("called poll after polling completed");
- Ok(Async::Ready((MailServiceHandle { channel }, MailResponseFuture { rx })))
- }
-}
-
-
-pub struct MailResponseFuture {
- rx: oneshot::Receiver<Result<MailResponse, MailSendError>>
-}
-
-impl Future for MailResponseFuture {
- type Item = MailResponse;
- type Error = MailSendError;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let res = match self.rx.poll() {
- Ok(Async::Ready(res)) => res,
- Ok(Async::NotReady) => return Ok(Async::NotReady),
- Err(_cancel_err) => return Err(MailSendError::CanceledByDriver),
- };
-
- match res {
- Ok(resp) => Ok(Async::Ready(resp)),
- Err(err) => Err(err)
- }
- }
-}
-
-
-// use a closure to setup smtp Clone + FnMut() -> R, R: Future<Item=...,> or similar
-//fn setup_smtp(smtp_connect SmtpSetup) -> (Sender, impl Future<(), SendError>) {
-// let (send, resc) = mpcs::channel(XX);
-// let shared_composition_base = smtp_setup.composition_base;
-// let driver = smtp_connect().and_then(|service| {
-// let service = shared_composition_base.wrap_smtp_service(service);
-//
-// let pipe_in = resc;;
-//
-// pipe_in.for_each(|(cmd, pipe_out)| {
-// service.call(cmd).and_then(|res| {
-// let _ = pipe_out.send(res);
-// Ok(())
-// })
-// })
-// });
-// (send, driver)
-//}
-//
-//fn setup_smtp_with_retry(setup: SmtpSetup) -> impl Future<(),RetryFail> {
-// let driver = setup_smtp(setup.clone());
-//
-// future::loop_fn(driver, move |driver| {
-// driver.or_else(|err| {
-// if err.was_canceled() {
-// Ok(Loop::Break(()))
-// } else if err.can_recover() {
-// let new_driver = setup_smtp(setup.clone());
-// Ok(Loop::Continue(new_driver))
-// } else {
-// Err(err.into())
-// }
-// })
-// })
-//}
-//
-
-
-
-// TODO: This links into a stream mapping a stream of requests to a stream of responses,
-// but it needs more though into it, i.e. it decouples the sending of a mail, with
-// the response of getting one. So we need to at last add some mail Id to Req (we
-// might want to see if we can generally do so using MessageId).
-//
-//pub struct SmtpMailStream<RQ, S>
-// where RQ: MailSendRequest,
-// S: Stream<Item = RQ>,
-// S::Error: Into<MailSendError>
-//{
-// channel: Option<InnerChannel<RQ>>,
-// stream: Option<S>,
-// res_buffer: FuturesUnordered<oneshot::Receiver<Result<MailResponse, MailSendError>>>,
-// send_buffer: Option<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>,
-// max_buffer: Option<usize>
-//}
-//
-//
-//impl<RQ, S> SmtpMailStream<RQ, S>
-// where RQ: MailSendRequest,
-// S: Stream<Item = RQ>,
-// S::Error: Into<MailSendError>
-//{
-// pub(crate) fn new(channel: InnerChannel<RQ>, stream: S, max_buffer: Option<usize>) -> Self {
-// SmtpMailStream {
-// channel, stream, max_buffer,
-// send_buffer: None,
-// res_buffer: FuturesUnordered::new(),
-// }
-// }
-//
-// fn channel_mut(&mut self) -> &mut InnerChannel<RQ> {
-// self.channel.as_mut().unwrap()
-// }
-//
-// fn stream_mut(&mut self) -> &mut S {
-// self.stream.as_mut().unwrap()
-// }
-//
-// fn try_enqueue_mail(
-// &mut self,
-// msg: (RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)
-// ) -> Poll<(), DriverDropped<S>>
-// {
-// debug_assert!(self.send_buffer.is_none());
-// debug_assert!(self.stream.is_some() && self.channel.is_some());
-// match self.channel_mut().start_send(msg) {
-// Ok(AsyncSink::Ready) => Ok(Async::Ready(())),
-// Ok(AsyncSink::NotReady(msg)) => {
-// self.send_buffer = Some(msg);
-// Ok(Async::NotReady)
-// },
-// Err(_) => {
-// mem::drop(self.channel.take());
-// Err(DriverDropped::Stream(self.stream.take()))
-// }
-// }
-// }
-//
-// fn close_channel(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
-// try_ready!(self.channel_mut().close());
-// return Ok(Async::Ready(None));
-// }
-//
-// fn poll_stream(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
-// match self.stream_mut().poll()? {
-// Async::Ready(Some(item)) => Async::Ready(Some(item)),
-// Async::Ready(None) => {
-// self.stream.take();
-// return self.close_channel();
-// }
-// Async::NotReady => {
-// try_ready!(self.channel_mut().poll_complete());
-// return Ok(Async::NotRead);
-// }
-// }
-// }
-//}
-//
-//pub enum DriverDropped<S> {
-// Stream(S),
-// StreamTakenOnPreviousError
-//}
-//
-//impl<RQ, S> Stream for SmtpMailStream<RQ, S>
-// where RQ: MailSendRequest,
-// S: Stream<Item = RQ>,
-// //FIXME[tokio 0.2] use error Never??
-// S::Error: Into<MailSendError>
-//{
-// type Item = Result<MailResponse, MailSendError>;
-// type Error = DriverDropped<S>;
-//
-// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
-// if self.channel.is_none() {
-// return Err(DriverDropped::StreamTakenOnPreviousError);
-// }
-//
-// if self.stream.is_none() {
-// return self.close_channel();
-// }
-//
-// if let Some(msg) = self.send_buffer {
-// try_ready!(self.try_enqueue_mail(msg));
-// }
-//
-// loop {
-//
-// match self.res_buffer.poll() {
-// Ok(Async::NotReady) => {},
-// Ok(Async::Ready(res)) => return Ok(AsyncReady(Ok(res))),
-// Err(err) => return Ok(AsyncReady(Err(e)))
-// }
-//
-// if self.max_buffer.map(|max| self.res_buffer.len() >= max).unwrap_or_false() {
-// return Ok(Async::NotReady);
-// }
-//
-//
-// let item = try_some_ready!(self.poll_stream());
-//
-// let (tx, rx) = oneshot::channel();
-//
-// self.res_buffer.push(rx);
-//
-// try_ready!(self.try_enqueue_mail((item, tx)));
-// }
-// }
-//} \ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index 1ae530d..83f5dba 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,19 +1,16 @@
+extern crate futures;
extern crate new_tokio_smtp;
+extern crate mail_types as mail;
+extern crate mail_common;
+extern crate mail_headers as headers;
+#[macro_use]
+extern crate failure;
-mod stop_handle;
-mod mpsc_ext;
+mod resolve_all;
pub mod error;
mod common;
-mod smtp_wrapper;
mod encode;
-mod handle;
-mod service;
-pub use self::stop_handle::StopServiceHandle;
pub use self::common::*;
-pub use self::handle::*;
-pub use self::service::*;
-
-#[cfg(test)]
-mod test; \ No newline at end of file
+pub use self::encode::*; \ No newline at end of file
diff --git a/src/mpsc_ext.rs b/src/mpsc_ext.rs
deleted file mode 100644
index 79e6fef..0000000
--- a/src/mpsc_ext.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-use super::stop_handle::StopServiceHandle;
-
-use futures::sync::mpsc;
-use futures::{Poll, Async, Stream};
-
-pub struct AutoClose<I> {
- inner: mpsc::Receiver<I>,
- stop_handle: StopServiceHandle,
-}
-
-impl<I> AutoClose<I>
- //FIXME[tokio >= 0.2]: use Never
- where I: Stream<Error=()>
-{
- pub fn new(inner: mpsc::Receiver<I>, stop_handle: StopServiceHandle) -> Self {
- AutoClose { inner, stop_handle }
- }
-}
-
-impl<I> Stream for AutoClose<I>
- where I: Stream<Error=()>
-{
-
- type Item = I::Item;
- //FIXME[tokio >= 0.2]: use Never
- type Error = ();
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- if stop_handle.should_stop() {
- self.inner.close()
- }
- self.inner.poll()
- }
-} \ No newline at end of file
diff --git a/src/resolve_all.rs b/src/resolve_all.rs
new file mode 100644
index 0000000..69d3c8b
--- /dev/null
+++ b/src/resolve_all.rs
@@ -0,0 +1,81 @@
+use std::mem;
+use std::iter::FromIterator;
+
+use futures::{Future, Async, Poll};
+
+pub enum AltFuse<F: Future> {
+ Future(F),
+ Resolved(Result<F::Item, F::Error>)
+}
+
+impl<F> Future for AltFuse<F>
+ where F: Future
+{
+ type Item = ();
+ //TODO[futures/v>=0.2 |rust/! type]: use Never or !
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let result = match *self {
+ AltFuse::Resolved(_) => return Ok(Async::Ready(())),
+ AltFuse::Future(ref mut fut) => match fut.poll() {
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Ok(Async::Ready(val)) => Ok(val),
+ Err(err) => Err(err)
+ }
+ };
+
+ *self = AltFuse::Resolved(result);
+ Ok(Async::Ready(()))
+ }
+}
+
+
+pub struct ResolveAll<F>
+ where F: Future
+{
+ all: Vec<AltFuse<F>>
+}
+
+impl<F> Future for ResolveAll<F>
+ where F: Future
+{
+ type Item = Vec<Result<F::Item, F::Error>>;
+ //TODO[futures >= 0.2/rust ! type]: use Never or !
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let mut any_not_ready = false;
+ for fut in self.all.iter_mut() {
+ if let Ok(Async::NotReady) = fut.poll() {
+ any_not_ready = true;
+ }
+ }
+ if any_not_ready {
+ Ok(Async::NotReady)
+ } else {
+ let