diff options
author | Philipp Korber <philippkorber@gmail.com> | 2018-05-11 14:56:04 +0200 |
---|---|---|
committer | Philipp Korber <philippkorber@gmail.com> | 2018-05-11 14:56:04 +0200 |
commit | a95f660d7b20b40158f23465f3e8f583480e5f83 (patch) | |
tree | 67bf333fb3f422c67bf30111e7fd9ad0ac267d59 | |
parent | 760a42222cde5ef3a0e26af4174149838ef54d7b (diff) |
chore(api+deps): strongly simplified api and uses changed deps
- the api was stringly simplified
- uses new-tokio-smtp instead of tokio-smtp
- inital commit of the changed api, some parts
are not commit yet as they still have to be changed
including e.g. the integration tests
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | src/common.rs | 141 | ||||
-rw-r--r-- | src/encode.rs | 236 | ||||
-rw-r--r-- | src/error.rs | 91 | ||||
-rw-r--r-- | src/handle.rs | 256 | ||||
-rw-r--r-- | src/lib.rs | 19 | ||||
-rw-r--r-- | src/mpsc_ext.rs | 34 | ||||
-rw-r--r-- | src/resolve_all.rs | 81 | ||||
-rw-r--r-- | src/service.rs | 413 | ||||
-rw-r--r-- | src/smtp_wrapper.rs | 202 | ||||
-rw-r--r-- | src/stop_handle.rs | 18 | ||||
-rw-r--r-- | src/test.rs | 270 |
12 files changed, 394 insertions, 1373 deletions
@@ -11,5 +11,9 @@ readme = "./README.md" repository = "https://github.com/1aim/mail-smtp" [dependencies] -mail = { git="https://github.com/1aim/mail" } +futures = "0.1" +failure = "0.1.1" +mail-types = { git="https://github.com/1aim/mail-types" } +mail-common = { git="https://github.com/1aim/mail-common" } +mail-headers = { git="https://github.com/1aim/mail-headers" } new-tokio-smtp = { git="https://github.com/1aim/new-tokio-smtp" } diff --git a/src/common.rs b/src/common.rs index bccf869..e9b9493 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,76 +1,32 @@ //! This modules contains some of the data types used, like e.g. Response, Request, Envelop etc. +use std::mem; -use vec1::Vec1; +use new_tokio_smtp::send_mail::{ + self as smtp, + MailAddress, + EnvelopData +}; -use futures::sync::oneshot; -use new_tokio_smtp::{ForwardPath, ReversePath}; -use mail::headers::{Sender, From as _From, To}; -use mail::headers::components::Mailbox; +use mail_common::MailType; +use mail_common::encoder::{EncodingBuffer, EncodableInHeader}; +use mail_common::error::EncodingError; +use headers::{Sender, _From, _To}; +use headers::components::Mailbox; +use headers::error::BuildInValidationError; use mail::Mail; -use super::error::{MailSendError, EnvelopFromMailError}; +use mail::error::MailError; -#[derive(Debug)] -pub struct EnvelopData { - from: ReversePath, - to: Vec1<ForwardPath> -} - -impl EnvelopData { - pub fn new(from: SmtpMailbox, to: Vec1<SmtpMailbox>) -> Self { - EnvelopData { from, to } - } - pub fn split(self) -> (SmtpMailbox, Vec1<SmtpMailbox>) { - let EnvelopData { from, to } = self; - (from, to) - } - pub fn from_mail(mail: &Mail) -> Result<Self, EnvelopFromMailError> { - - let headers = mail.headers(); - let smtp_from = - if let Some(sender) = headers.get_single(Sender) { - let sender = sender.map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; - //TODO double check with from field - mailbox2smtp_mailbox(sender) - } else { - let from = headers.get_single(_From) - .ok_or(EnvelopFromMailError::NeitherSenderNorFrom)? - .map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; - - if from.len() > 1 { - return Err(EnvelopFromMailError::NoSenderAndMoreThanOneFrom); - } - - mailbox2smtp_mailbox(from.first()) - }; - - let smtp_to = - if let Some(to) = headers.get_single(To) { - let to = to.map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; - to.mapped_ref(mailbox2smtp_mailbox) - } else { - return Err(EnvelopFromMailError::NoToHeaderField); - }; - - //TODO Cc, Bcc - - Ok(EnvelopData { - from: smtp_from, - to: smtp_to - }) - } -} -pub type MailSendResult = Result<MailResponse, MailSendError>; -pub(crate) type Handle2ServiceMsg = (MailRequest, oneshot::Sender<MailSendResult>); +// pub type MailSendResult = Result<MailResponse, MailSendError>; +// pub(crate) type Handle2ServiceMsg = (MailRequest, oneshot::Sender<MailSendResult>); -#[derive(Debug, Clone)] -pub struct MailResponse; +// #[derive(Debug, Clone)] +// pub struct MailResponse; -//TODO derive(Clone): requires clone for Box<EncodableMail+'static> -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct MailRequest { mail: Mail, envelop_data: Option<EnvelopData> @@ -94,19 +50,64 @@ impl MailRequest { MailRequest { mail, envelop_data: Some(envelop) } } - pub fn into_mail_with_envelop(self) -> Result<(Mail, EnvelopData), EnvelopFromMailError> { + pub fn override_envelop(&mut self, envelop: EnvelopData) -> Option<EnvelopData> { + mem::replace(&mut self.envelop_data, Some(envelop)) + } + + pub fn into_mail_with_envelop(self) -> Result<(Mail, EnvelopData), MailError> { let envelop = if let Some(envelop) = self.envelop_data { envelop } - else { EnvelopData::from_mail(&self.mail)? }; + else { derive_envelop_data_from_mail(&self.mail)? }; Ok((self.mail, envelop)) } } -fn mailbox2smtp_mailbox(mailbox: &Mailbox) -> SmtpMailbox { - use emailaddress::EmailAddress; - SmtpMailbox(Some(EmailAddress { - local: mailbox.email.local_part.as_str().to_owned(), - domain: mailbox.email.domain.as_str().to_owned(), - })) +fn mailaddress_from_mailbox(mailbox: &Mailbox) -> Result<MailAddress, EncodingError> { + let email = &mailbox.email; + let needs_smtputf8 = email.check_if_internationalized(); + let mt = if needs_smtputf8 { MailType::Internationalized } else { MailType::Ascii }; + let mut buffer = EncodingBuffer::new(mt); + { + email.encode(&mut buffer.writer())?; + } + let raw: Vec<u8> = buffer.into(); + let address = String::from_utf8(raw).expect("[BUG] encoding Email produced non utf8 data"); + Ok(MailAddress::new_unchecked(address, needs_smtputf8)) +} + +pub fn derive_envelop_data_from_mail(mail: &Mail) + -> Result<smtp::EnvelopData, MailError> +{ + let headers = mail.headers(); + let smtp_from = + if let Some(sender) = headers.get_single(Sender) { + let sender = sender?; + //TODO double check with from field + mailaddress_from_mailbox(sender)? + } else { + let from = headers.get_single(_From) + .ok_or(BuildInValidationError::NoFrom)??; + + if from.len() > 1 { + return Err(BuildInValidationError::MultiMailboxFromWithoutSender.into()); + } + + mailaddress_from_mailbox(from.first())? + }; + + let smtp_to = + if let Some(to) = headers.get_single(_To) { + let to = to?; + to.try_mapped_ref(mailaddress_from_mailbox)? + } else { + return Err(BuildInValidationError::NoTo.into()); + }; + + //TODO Cc, Bcc + + Ok(EnvelopData { + from: Some(smtp_from), + to: smtp_to + }) }
\ No newline at end of file diff --git a/src/encode.rs b/src/encode.rs index c5274bd..d6a92c2 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -1,98 +1,174 @@ -use std::io::{Error as IoError}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::mem; -use futures::sync::{mpsc, oneshot}; -use futures::{future, Future, Stream, Poll, Async}; -use futures::stream::BufferUnordered; +/* -use new_tokio_smtp::Connection; -use mail::prelude::{Encoder, Encodable, MailType}; -use mail::utils::SendBoxFuture; + ↓ + ---[][][]-------------------------- \ + | ↓ <<sequential>> | \ + | ↓ | \ + | (create envelop) | \ | + | ↓ | | 1. func | + | (into encodable mail) | / | + | ↓ [envelop, encodable mail] | > 3. encode_mails + | (offload encode mail) | > 2. func | + | ↓ [envelop, future->Vec<u8>] | | + ---[][][]-------------------------- / + ↓ + [Future<Vec<Result<>>]............/ we don't want a stream here all encoding should \ + ↓ | be already done, so that there are no large periods | + ↓ \ where the smtp connection is open and pending / + ↓ + ---[][][]-------------------------- \ + | ↓ <<async/sequential>> | \ + | ↓ | | + | (send mail) | > send_mails + | ↓ | | + | <ok?> →→no→→ (add to failures)| | + | ↓ yes | / + ---[][][]-------------------------- / + ↓ +*/ +use std::iter::FromIterator; +use std::vec; + +use futures::future::{self, Either, Loop, Future}; +use new_tokio_smtp::{Cmd, Connection, ConnectionConfig, SetupTls}; +use new_tokio_smtp::send_mail::{self as smtp, ConSendMailExt}; + +use mail_common::MailType; +use mail_common::encoder::EncodingBuffer; use mail::context::BuilderContext; +use mail::error::MailError; -use super::smtp_wrapper::{send_mail, close_smtp_conn}; -use super::common::{ - MailSendResult, MailResponse, MailRequest, EnvelopData, - Handle2ServiceMsg -}; -use super::handle::MailServiceHandle; -use super::error::MailSendError; +use ::resolve_all::ResolveAll; +use ::common::MailRequest; +use ::error::{MailSendError, TransportError}; -pub(crate) type MailEncodingResult = Result<(Vec<u8>, EnvelopData), MailSendError>; -/// encodes the mails in the input stream in a thread poll returning the results out of order -/// -/// The `max_concurrent` parameter determines how many mails are encoded concurrently. -/// -/// Note that it uses `ctx.offload` to offload work, i.e. send it to a thread pool, -/// if `ctx.offload` is not implemented in terms of a thread pool this will not encode -/// mail in a thread poll but with whatever mechanism `ctx.offload` uses to resolve -/// futures. -/// -pub(crate) fn stream_encode_mail<S, CTX>(steam: S, ctx: CTX, max_concurrent: usize) - //FIXME[rust/impl Trait]: use impl Trait instead of boxing - -> Box<Stream< - //FIXME[futures >= 0.2]: replace () with Never - Item=SendBoxFuture<(MailEncodingResult, oneshot::Sender<MailSendResult>), ()>, - //FIXME[futures >= 0.2]: replace () with Never - Error=()>> - //FIXME[futures >= 0.2]: replace () with Never - where S: Stream<Item=Handle2ServiceMsg, Error=()>, CTX: BuilderContext +pub type EncodeMailResult = Result<smtp::MailEnvelop, MailError>; + +// errors: +// - EnvelopFromMailError +// - MailError +pub fn encode_mails<I, C>(requests: I, ctx: &C) + //TODO[futures/v>=0.2 | rust/! type]: use Never or ! + -> impl Future<Item=Vec<EncodeMailResult>, Error=()> + Send + where I: IntoIterator<Item=MailRequest>, C: BuilderContext { - let _ctx = ctx; - let fut_stream = stream.map(move |(req, tx)| { - //clone ctx to move it into the operation chain - let ctx = _ctx.clone(); - let operation = future - //use lazy to make sure it's run in the thread pool - ::lazy(move || mail_request.into_mail_with_envelop()) - .then(move |result| match result { - Ok((mail, envelop)) => Ok((mail, envelop, tx)), - Err(err) => Err((MailSendError::CreatingEnvelop(err), tx)) - }) - .and_then(move |(mail, envelop, tx)| { - mail.into_encodeable_mail(&ctx) - .then(move |result| match result { - Ok(enc_mail) => Ok((enc_mail, envelop, tx)), - Err(err) => Err((MailSendError::Encoding(err), tx)) - }) - }) - .and_then(move |(encodable_mail, envelop, tx)| { - //TODO we need to feed in the MailType (and get it from tokio smtp) - let mut encoder = Encoder::new( MailType::Ascii ); - match encodable_mail.encode(&mut encoder) { - Ok(()) => {}, - Err(err) => return Err((MailSendError::Encoding(err), tx)) - } - - let bytes = match encoder.to_vec() { - Ok(bytes) => bytes, - Err(err) => return Err((MailSendError::Encoding(err), tx)) + let pending = requests + .into_iter() + .map(|request| { + let (mail, envelop_data) = + match request.into_mail_with_envelop() { + Ok(pair) => pair, + Err(e) => return Either::A(future::err(e.into())) + }; + + let _ctx = ctx.clone(); + let fut = mail + .into_encodeable_mail(ctx) + .and_then(move |enc_mail| _ctx.offload_fn(move || { + let (mail_type, requirement) = + if envelop_data.needs_smtputf8() { + (MailType::Internationalized, smtp::EncodingRequirement::Smtputf8) + } else { + (MailType::Ascii, smtp::EncodingRequirement::None) }; - //TODO we also need to return SmtpEnvelop<Vec<u8>> - let enc_result = Ok((bytes, envelop)); - Ok((enc_result, tx)) - }) - .or_else(move |(err, tx)| { - let enc_result = Err(err); - Ok((enc_result, tx)) - }); + let mut buffer = EncodingBuffer::new(mail_type); + enc_mail.encode(&mut buffer)?; - // offload work to thread pool - let fut = _ctx.offload(operation); + let smtp_mail = smtp::Mail::new(requirement, buffer.into()); + Ok(smtp::MailEnvelop::from((smtp_mail, envelop_data))) + })); - // return future as new item - fut + Either::B(fut) }); - // buffer - let buffered = fut_stream.buffer_unordered(max_concurrent); + ResolveAll::from_iter(pending) +} + +pub type SendMailResult = Result<(), MailSendError>; + + +pub fn send_encoded_mails<I>(con: Connection, mails: I) + -> impl Future< + Item=(Connection, Vec<SendMailResult>), + Error=(TransportError, Vec<SendMailResult>, I::IntoIter)> + where I: IntoIterator<Item=EncodeMailResult>, I::IntoIter: 'static +{ + let iter = mails.into_iter(); + let results = Vec::new(); + let fut = future::loop_fn((con, iter, results), |(con, mut iter, mut results)| match iter.next() { + None => Either::A(future::ok(Loop::Break((con, results)))), + Some(Err(err)) => { + results.push(Err(MailSendError::from(err))); + Either::A(future::ok(Loop::Continue((con, iter, results)))) + }, + Some(Ok(envelop)) => Either::B(con + .send_mail(envelop) + .then(move |res| match res { + Ok((con, logic_result)) => { + results.push(logic_result.map_err(|(_idx, err)| MailSendError::from(err))); + Ok(Loop::Continue((con, iter, results))) + }, + Err(err) => { + Err((TransportError::Io(err), results, iter)) + } + })) + }); - Box::new(buffered) + fut } +/// Send mails _to a specific mail server_ +/// +/// This encodes the mails, opens a connection, sends the mails over and +/// closes the connection again. +/// +/// While this uses the `To` field of a mail to determine the smtp reveiver +/// it does not resolve the server based on the mail address domain. This +/// means it's best suite for sending to a Mail Submission Agent (MSA), but +/// less for sending to a Mail Exchanger (MX). +/// +/// Automatically handling Bcc/Cc is _not yet_ implemented. +/// +pub fn send_mails<S, A, I, C>(config: ConnectionConfig<A, S>, requests: I, ctx: &C) + -> impl Future< + Item=Vec<SendMailResult>, + Error=(TransportError, Vec<SendMailResult>, vec::IntoIter<EncodeMailResult>)> + where I: IntoIterator<Item=MailRequest>, + C: BuilderContext, + S: SetupTls, + A: Cmd +{ + + let fut = encode_mails(requests, ctx) + .map_err(|_| unreachable!()) + .and_then(|mails| { + if mails.iter().all(|r| r.is_err()) { + let send_skipped = mails + .into_iter() + .map(|result| match result { + Ok(_) => unreachable!(), + Err(err) => Err(MailSendError::Mail(err)) + }) + .collect(); + + Either::A(future::ok(send_skipped)) + } else { + let fut = Connection + ::connect(config) + .then(|result| match result { + Err(err) => Either::A(future::err((TransportError::Connecting(err), Vec::new(), mails.into_iter()))), + Ok(con) => Either::B(send_encoded_mails(con, mails)) + }) + .and_then(|(con, results)| con.quit().then(|_| Ok(results))); + + Either::B(fut) + } + }); + + fut +} diff --git a/src/error.rs b/src/error.rs index 48d4690..cf51918 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,24 +1,79 @@ -use std::io::{Error as IoError}; -use ::error::Error; -use new_tokio_smtp::error::LogicError; +use std::{io as std_io}; -#[derive(Debug)] +use new_tokio_smtp::error::{ConnectingFailed, LogicError}; +use mail::error::MailError; + + +#[derive(Debug, Fail)] pub enum MailSendError { - CreatingEnvelop(EnvelopFromMailError), - Composition(Error), - Encoding(Error), - //Note with pipelining this will change to Vec<LogicError> - Smtp(LogicError), - Io(IoError), - DriverDropped, - CanceledByDriver + /// creating the mail failed + /// + /// This can happen because of a number of reasons including: + /// + /// 1. missing header fields + /// 2. invalid header fields + /// 2. encoding header fields fails + /// 3. loading resources failed + /// (resources like e.g. appendix, logo embedded in html mail, etc.) + /// + #[fail(display = "{}", _0)] + Mail(MailError), + + /// sending the mail failed + /// + /// This can happen because of a number of reasons including: + /// 1. server rejects mail transaction because of send or receiver + /// address or body data (e.g. body to long). + /// 2. mail address requires smtputf8 support, which is not given + /// 3. server rejects sending the mail for other reasons (it's + /// closing, overloaded etc.) + #[fail(display = "{}", _0)] + Smtp(LogicError) +} + +impl From<MailError> for MailSendError { + fn from(err: MailError) -> Self { + MailSendError::Mail(err) + } +} + +impl From<LogicError> for MailSendError { + fn from(err: LogicError) -> Self { + MailSendError::Smtp(err) + } +} + +#[derive(Debug, Fail)] +pub enum TransportError { + + // Setting up the connection failed + // + // Failures can include but are not limited to: + // + // - connecting with tcp failed + // - starting tls failed + // - server does not want to be used (e.g. failure on sending EHLO) + // - authentication failed + #[fail(display = "{}", _0)] + Connecting(ConnectingFailed), + + // An I/O-Error happened while using the connection + // + // This is mainly for I/O-Error after the setup of the connection + // was successful, which normally sending includes Ehlo and Auth + // commands + #[fail(display = "{}", _0)] + Io(std_io::Error) } +impl From<std_io::Error> for TransportError { + fn from(err: std_io::Error) -> Self { + TransportError::Io(err) + } +} -#[derive(Debug)] -pub enum EnvelopFromMailError { - NeitherSenderNorFrom, - TypeError(Error), - NoSenderAndMoreThanOneFrom, - NoToHeaderField +impl From<ConnectingFailed> for TransportError { + fn from(err: ConnectingFailed) -> Self { + TransportError::Connecting(err) + } } diff --git a/src/handle.rs b/src/handle.rs deleted file mode 100644 index 5f351e8..0000000 --- a/src/handle.rs +++ /dev/null @@ -1,256 +0,0 @@ -use futures::sync::mpsc; -use futures::sync::oneshot; -use futures::{sink, Sink, Future, Poll, Async}; - -use super::common::{MailRequest, MailResponse}; -use super::error::MailSendError; - -type InnerChannel = mpsc::Sender<(MailRequest, oneshot::Sender<Result<MailResponse, MailSendError>>)>; - -#[derive(Debug, Clone)] -pub struct MailServiceHandle { - channel: InnerChannel -} - - -impl MailServiceHandle { - - pub(crate) fn new(sender: InnerChannel) -> Self { - MailServiceHandle { channel: sender } - } - - pub fn send_mail(self, mail_request: MailRequest) -> MailEnqueueFuture { - let (sender, rx) = oneshot::channel(); - - let send = self.channel.send((mail_request, sender)); - - MailEnqueueFuture { send, rx: Some(rx) } - } - -// pub fn map_request_stream<S>(self, stream: S, max_buffer: Option<usize>) -> SmtpMailStream<RQ, S> -// where S: Stream<Item = RQ>, -// S::Error: Into<MailSendError> -// { -// SmtpMailStream::new(self.channel, stream, max_buffer) -// } - - pub fn into_inner(self) -> InnerChannel { - self.channel - } - -} - -pub struct MailEnqueueFuture { - send: sink::Send<InnerChannel>, - rx: Option<oneshot::Receiver<Result<MailResponse, MailSendError>>> -} - -impl Future for MailEnqueueFuture { - - type Item = (MailServiceHandle, MailResponseFuture); - type Error = MailSendError; - - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let channel = match self.send.poll() { - Ok(Async::Ready(channel)) => channel, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(_cancel_err) => return Err(MailSendError::DriverDropped), - }; - - let rx = self.rx.take().expect("called poll after polling completed"); - Ok(Async::Ready((MailServiceHandle { channel }, MailResponseFuture { rx }))) - } -} - - -pub struct MailResponseFuture { - rx: oneshot::Receiver<Result<MailResponse, MailSendError>> -} - -impl Future for MailResponseFuture { - type Item = MailResponse; - type Error = MailSendError; - - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let res = match self.rx.poll() { - Ok(Async::Ready(res)) => res, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(_cancel_err) => return Err(MailSendError::CanceledByDriver), - }; - - match res { - Ok(resp) => Ok(Async::Ready(resp)), - Err(err) => Err(err) - } - } -} - - -// use a closure to setup smtp Clone + FnMut() -> R, R: Future<Item=...,> or similar -//fn setup_smtp(smtp_connect SmtpSetup) -> (Sender, impl Future<(), SendError>) { -// let (send, resc) = mpcs::channel(XX); -// let shared_composition_base = smtp_setup.composition_base; -// let driver = smtp_connect().and_then(|service| { -// let service = shared_composition_base.wrap_smtp_service(service); -// -// let pipe_in = resc;; -// -// pipe_in.for_each(|(cmd, pipe_out)| { -// service.call(cmd).and_then(|res| { -// let _ = pipe_out.send(res); -// Ok(()) -// }) -// }) -// }); -// (send, driver) -//} -// -//fn setup_smtp_with_retry(setup: SmtpSetup) -> impl Future<(),RetryFail> { -// let driver = setup_smtp(setup.clone()); -// -// future::loop_fn(driver, move |driver| { -// driver.or_else(|err| { -// if err.was_canceled() { -// Ok(Loop::Break(())) -// } else if err.can_recover() { -// let new_driver = setup_smtp(setup.clone()); -// Ok(Loop::Continue(new_driver)) -// } else { -// Err(err.into()) -// } -// }) -// }) -//} -// - - - -// TODO: This links into a stream mapping a stream of requests to a stream of responses, -// but it needs more though into it, i.e. it decouples the sending of a mail, with -// the response of getting one. So we need to at last add some mail Id to Req (we -// might want to see if we can generally do so using MessageId). -// -//pub struct SmtpMailStream<RQ, S> -// where RQ: MailSendRequest, -// S: Stream<Item = RQ>, -// S::Error: Into<MailSendError> -//{ -// channel: Option<InnerChannel<RQ>>, -// stream: Option<S>, -// res_buffer: FuturesUnordered<oneshot::Receiver<Result<MailResponse, MailSendError>>>, -// send_buffer: Option<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>, -// max_buffer: Option<usize> -//} -// -// -//impl<RQ, S> SmtpMailStream<RQ, S> -// where RQ: MailSendRequest, -// S: Stream<Item = RQ>, -// S::Error: Into<MailSendError> -//{ -// pub(crate) fn new(channel: InnerChannel<RQ>, stream: S, max_buffer: Option<usize>) -> Self { -// SmtpMailStream { -// channel, stream, max_buffer, -// send_buffer: None, -// res_buffer: FuturesUnordered::new(), -// } -// } -// -// fn channel_mut(&mut self) -> &mut InnerChannel<RQ> { -// self.channel.as_mut().unwrap() -// } -// -// fn stream_mut(&mut self) -> &mut S { -// self.stream.as_mut().unwrap() -// } -// -// fn try_enqueue_mail( -// &mut self, -// msg: (RQ, oneshot::Sender<Result<MailResponse, MailSendError>>) -// ) -> Poll<(), DriverDropped<S>> -// { -// debug_assert!(self.send_buffer.is_none()); -// debug_assert!(self.stream.is_some() && self.channel.is_some()); -// match self.channel_mut().start_send(msg) { -// Ok(AsyncSink::Ready) => Ok(Async::Ready(())), -// Ok(AsyncSink::NotReady(msg)) => { -// self.send_buffer = Some(msg); -// Ok(Async::NotReady) -// }, -// Err(_) => { -// mem::drop(self.channel.take()); -// Err(DriverDropped::Stream(self.stream.take())) -// } -// } -// } -// -// fn close_channel(&mut self) -> Poll<Option<Self::Item>, Self::Error> { -// try_ready!(self.channel_mut().close()); -// return Ok(Async::Ready(None)); -// } -// -// fn poll_stream(&mut self) -> Poll<Option<Self::Item>, Self::Error> { -// match self.stream_mut().poll()? { -// Async::Ready(Some(item)) => Async::Ready(Some(item)), -// Async::Ready(None) => { -// self.stream.take(); -// return self.close_channel(); -// } -// Async::NotReady => { -// try_ready!(self.channel_mut().poll_complete()); -// return Ok(Async::NotRead); -// } -// } -// } -//} -// -//pub enum DriverDropped<S> { -// Stream(S), -// StreamTakenOnPreviousError -//} -// -//impl<RQ, S> Stream for SmtpMailStream<RQ, S> -// where RQ: MailSendRequest, -// S: Stream<Item = RQ>, -// //FIXME[tokio 0.2] use error Never?? -// S::Error: Into<MailSendError> -//{ -// type Item = Result<MailResponse, MailSendError>; -// type Error = DriverDropped<S>; -// -// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { -// if self.channel.is_none() { -// return Err(DriverDropped::StreamTakenOnPreviousError); -// } -// -// if self.stream.is_none() { -// return self.close_channel(); -// } -// -// if let Some(msg) = self.send_buffer { -// try_ready!(self.try_enqueue_mail(msg)); -// } -// -// loop { -// -// match self.res_buffer.poll() { -// Ok(Async::NotReady) => {}, -// Ok(Async::Ready(res)) => return Ok(AsyncReady(Ok(res))), -// Err(err) => return Ok(AsyncReady(Err(e))) -// } -// -// if self.max_buffer.map(|max| self.res_buffer.len() >= max).unwrap_or_false() { -// return Ok(Async::NotReady); -// } -// -// -// let item = try_some_ready!(self.poll_stream()); -// -// let (tx, rx) = oneshot::channel(); -// -// self.res_buffer.push(rx); -// -// try_ready!(self.try_enqueue_mail((item, tx))); -// } -// } -//}
\ No newline at end of file @@ -1,19 +1,16 @@ +extern crate futures; extern crate new_tokio_smtp; +extern crate mail_types as mail; +extern crate mail_common; +extern crate mail_headers as headers; +#[macro_use] +extern crate failure; -mod stop_handle; -mod mpsc_ext; +mod resolve_all; pub mod error; mod common; -mod smtp_wrapper; mod encode; -mod handle; -mod service; -pub use self::stop_handle::StopServiceHandle; pub use self::common::*; -pub use self::handle::*; -pub use self::service::*; - -#[cfg(test)] -mod test;
\ No newline at end of file +pub use self::encode::*;
\ No newline at end of file diff --git a/src/mpsc_ext.rs b/src/mpsc_ext.rs deleted file mode 100644 index 79e6fef..0000000 --- a/src/mpsc_ext.rs +++ /dev/null @@ -1,34 +0,0 @@ -use super::stop_handle::StopServiceHandle; - -use futures::sync::mpsc; -use futures::{Poll, Async, Stream}; - -pub struct AutoClose<I> { - inner: mpsc::Receiver<I>, - stop_handle: StopServiceHandle, -} - -impl<I> AutoClose<I> - //FIXME[tokio >= 0.2]: use Never - where I: Stream<Error=()> -{ - pub fn new(inner: mpsc::Receiver<I>, stop_handle: StopServiceHandle) -> Self { - AutoClose { inner, stop_handle } - } -} - -impl<I> Stream for AutoClose<I> - where I: Stream<Error=()> -{ - - type Item = I::Item; - //FIXME[tokio >= 0.2]: use Never - type Error = (); - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - if stop_handle.should_stop() { - self.inner.close() - } - self.inner.poll() - } -}
\ No newline at end of file diff --git a/src/resolve_all.rs b/src/resolve_all.rs new file mode 100644 index 0000000..69d3c8b --- /dev/null +++ b/src/resolve_all.rs @@ -0,0 +1,81 @@ +use std::mem; +use std::iter::FromIterator; + +use futures::{Future, Async, Poll}; + +pub enum AltFuse<F: Future> { + Future(F), + Resolved(Result<F::Item, F::Error>) +} + +impl<F> Future for AltFuse<F> + where F: Future +{ + type Item = (); + //TODO[futures/v>=0.2 |rust/! type]: use Never or ! + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let result = match *self { + AltFuse::Resolved(_) => return Ok(Async::Ready(())), + AltFuse::Future(ref mut fut) => match fut.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(val)) => Ok(val), + Err(err) => Err(err) + } + }; + + *self = AltFuse::Resolved(result); + Ok(Async::Ready(())) + } +} + + +pub struct ResolveAll<F> + where F: Future +{ + all: Vec<AltFuse<F>> +} + +impl<F> Future for ResolveAll<F> + where F: Future +{ + type Item = Vec<Result<F::Item, F::Error>>; + //TODO[futures >= 0.2/rust ! type]: use Never or ! + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let mut any_not_ready = false; + for fut in self.all.iter_mut() { + if let Ok(Async::NotReady) = fut.poll() { + any_not_ready = true; + } + } + if any_not_ready { + Ok(Async::NotReady) + } else { + let |