From 760a42222cde5ef3a0e26af4174149838ef54d7b Mon Sep 17 00:00:00 2001 From: Philipp Korber Date: Mon, 16 Apr 2018 11:44:08 +0200 Subject: wip: moved files over from mail-codec-composition but no integration - the smtp module in mail-codec-composition was moved to become this crate - but imports and path's where not yet changed in the moved files to represent this change - also dependencies are not yet updated - also some parts where in the process of beeing changed to use `new-tokio-smtp` instead of `tokio-smtp`, to not discard the works the wip files of that where used so it's roughly _just_ moving files over --- .gitignore | 5 + Cargo.toml | 15 ++ LICENSE-APACHE | 201 +++++++++++++++++++++++++ LICENSE-MIT | 25 ++++ src/common.rs | 112 ++++++++++++++ src/encode.rs | 98 +++++++++++++ src/error.rs | 24 +++ src/handle.rs | 256 ++++++++++++++++++++++++++++++++ src/lib.rs | 19 +++ src/mpsc_ext.rs | 34 +++++ src/service.rs | 413 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/smtp_wrapper.rs | 202 +++++++++++++++++++++++++ src/stop_handle.rs | 18 +++ src/test.rs | 270 ++++++++++++++++++++++++++++++++++ 14 files changed, 1692 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 LICENSE-APACHE create mode 100644 LICENSE-MIT create mode 100644 src/common.rs create mode 100644 src/encode.rs create mode 100644 src/error.rs create mode 100644 src/handle.rs create mode 100644 src/lib.rs create mode 100644 src/mpsc_ext.rs create mode 100644 src/service.rs create mode 100644 src/smtp_wrapper.rs create mode 100644 src/stop_handle.rs create mode 100644 src/test.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a10770d --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/target +Cargo.lock +.idea/ +.vscode/ +*.iml diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1cfdb3a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,15 @@ +[package] +authors = ["Philipp Korber "] +name = "mail-smtp" +version = "0.1.0" +categories = [] +description = "combines mail-codec with new-tokio-smtp" +documentation = "https://docs.rs/mail-smtp" +keywords = ["mail", "smtp", "send" ] +license = "MIT OR Apache-2.0" +readme = "./README.md" +repository = "https://github.com/1aim/mail-smtp" + +[dependencies] +mail = { git="https://github.com/1aim/mail" } +new-tokio-smtp = { git="https://github.com/1aim/new-tokio-smtp" } diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..56434b8 --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..b25ff75 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/src/common.rs b/src/common.rs new file mode 100644 index 0000000..bccf869 --- /dev/null +++ b/src/common.rs @@ -0,0 +1,112 @@ +//! This modules contains some of the data types used, like e.g. Response, Request, Envelop etc. + +use vec1::Vec1; + +use futures::sync::oneshot; +use new_tokio_smtp::{ForwardPath, ReversePath}; + +use mail::headers::{Sender, From as _From, To}; +use mail::headers::components::Mailbox; +use mail::Mail; +use super::error::{MailSendError, EnvelopFromMailError}; + +#[derive(Debug)] +pub struct EnvelopData { + from: ReversePath, + to: Vec1 +} + +impl EnvelopData { + pub fn new(from: SmtpMailbox, to: Vec1) -> Self { + EnvelopData { from, to } + } + + pub fn split(self) -> (SmtpMailbox, Vec1) { + let EnvelopData { from, to } = self; + (from, to) + } + + pub fn from_mail(mail: &Mail) -> Result { + + let headers = mail.headers(); + let smtp_from = + if let Some(sender) = headers.get_single(Sender) { + let sender = sender.map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; + //TODO double check with from field + mailbox2smtp_mailbox(sender) + } else { + let from = headers.get_single(_From) + .ok_or(EnvelopFromMailError::NeitherSenderNorFrom)? + .map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; + + if from.len() > 1 { + return Err(EnvelopFromMailError::NoSenderAndMoreThanOneFrom); + } + + mailbox2smtp_mailbox(from.first()) + }; + + let smtp_to = + if let Some(to) = headers.get_single(To) { + let to = to.map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; + to.mapped_ref(mailbox2smtp_mailbox) + } else { + return Err(EnvelopFromMailError::NoToHeaderField); + }; + + //TODO Cc, Bcc + + Ok(EnvelopData { + from: smtp_from, + to: smtp_to + }) + } +} + +pub type MailSendResult = Result; +pub(crate) type Handle2ServiceMsg = (MailRequest, oneshot::Sender); + +#[derive(Debug, Clone)] +pub struct MailResponse; + +//TODO derive(Clone): requires clone for Box +#[derive(Debug)] +pub struct MailRequest { + mail: Mail, + envelop_data: Option +} + +impl From for MailRequest { + fn from(mail: Mail) -> Self { + MailRequest::new(mail) + } +} + + + +impl MailRequest { + + pub fn new(mail: Mail) -> Self { + MailRequest { mail, envelop_data: None } + } + + pub fn new_with_envelop(mail: Mail, envelop: EnvelopData) -> Self { + MailRequest { mail, envelop_data: Some(envelop) } + } + + pub fn into_mail_with_envelop(self) -> Result<(Mail, EnvelopData), EnvelopFromMailError> { + let envelop = + if let Some(envelop) = self.envelop_data { envelop } + else { EnvelopData::from_mail(&self.mail)? }; + + Ok((self.mail, envelop)) + } +} + +fn mailbox2smtp_mailbox(mailbox: &Mailbox) -> SmtpMailbox { + use emailaddress::EmailAddress; + SmtpMailbox(Some(EmailAddress { + local: mailbox.email.local_part.as_str().to_owned(), + domain: mailbox.email.domain.as_str().to_owned(), + })) +} \ No newline at end of file diff --git a/src/encode.rs b/src/encode.rs new file mode 100644 index 0000000..c5274bd --- /dev/null +++ b/src/encode.rs @@ -0,0 +1,98 @@ +use std::io::{Error as IoError}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::mem; + +use futures::sync::{mpsc, oneshot}; +use futures::{future, Future, Stream, Poll, Async}; +use futures::stream::BufferUnordered; + +use new_tokio_smtp::Connection; + +use mail::prelude::{Encoder, Encodable, MailType}; +use mail::utils::SendBoxFuture; +use mail::context::BuilderContext; + +use super::smtp_wrapper::{send_mail, close_smtp_conn}; +use super::common::{ + MailSendResult, MailResponse, MailRequest, EnvelopData, + Handle2ServiceMsg +}; +use super::handle::MailServiceHandle; +use super::error::MailSendError; + +pub(crate) type MailEncodingResult = Result<(Vec, EnvelopData), MailSendError>; + +/// encodes the mails in the input stream in a thread poll returning the results out of order +/// +/// The `max_concurrent` parameter determines how many mails are encoded concurrently. +/// +/// Note that it uses `ctx.offload` to offload work, i.e. send it to a thread pool, +/// if `ctx.offload` is not implemented in terms of a thread pool this will not encode +/// mail in a thread poll but with whatever mechanism `ctx.offload` uses to resolve +/// futures. +/// +pub(crate) fn stream_encode_mail(steam: S, ctx: CTX, max_concurrent: usize) + //FIXME[rust/impl Trait]: use impl Trait instead of boxing + -> Box= 0.2]: replace () with Never + Item=SendBoxFuture<(MailEncodingResult, oneshot::Sender), ()>, + //FIXME[futures >= 0.2]: replace () with Never + Error=()>> + //FIXME[futures >= 0.2]: replace () with Never + where S: Stream, CTX: BuilderContext +{ + let _ctx = ctx; + let fut_stream = stream.map(move |(req, tx)| { + //clone ctx to move it into the operation chain + let ctx = _ctx.clone(); + let operation = future + //use lazy to make sure it's run in the thread pool + ::lazy(move || mail_request.into_mail_with_envelop()) + .then(move |result| match result { + Ok((mail, envelop)) => Ok((mail, envelop, tx)), + Err(err) => Err((MailSendError::CreatingEnvelop(err), tx)) + }) + .and_then(move |(mail, envelop, tx)| { + mail.into_encodeable_mail(&ctx) + .then(move |result| match result { + Ok(enc_mail) => Ok((enc_mail, envelop, tx)), + Err(err) => Err((MailSendError::Encoding(err), tx)) + }) + }) + .and_then(move |(encodable_mail, envelop, tx)| { + //TODO we need to feed in the MailType (and get it from tokio smtp) + let mut encoder = Encoder::new( MailType::Ascii ); + match encodable_mail.encode(&mut encoder) { + Ok(()) => {}, + Err(err) => return Err((MailSendError::Encoding(err), tx)) + } + + let bytes = match encoder.to_vec() { + Ok(bytes) => bytes, + Err(err) => return Err((MailSendError::Encoding(err), tx)) + }; + + //TODO we also need to return SmtpEnvelop> + let enc_result = Ok((bytes, envelop)); + Ok((enc_result, tx)) + }) + .or_else(move |(err, tx)| { + let enc_result = Err(err); + Ok((enc_result, tx)) + }); + + // offload work to thread pool + let fut = _ctx.offload(operation); + + + // return future as new item + fut + }); + + // buffer + let buffered = fut_stream.buffer_unordered(max_concurrent); + + Box::new(buffered) +} + diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..48d4690 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,24 @@ +use std::io::{Error as IoError}; +use ::error::Error; +use new_tokio_smtp::error::LogicError; + +#[derive(Debug)] +pub enum MailSendError { + CreatingEnvelop(EnvelopFromMailError), + Composition(Error), + Encoding(Error), + //Note with pipelining this will change to Vec + Smtp(LogicError), + Io(IoError), + DriverDropped, + CanceledByDriver +} + + +#[derive(Debug)] +pub enum EnvelopFromMailError { + NeitherSenderNorFrom, + TypeError(Error), + NoSenderAndMoreThanOneFrom, + NoToHeaderField +} diff --git a/src/handle.rs b/src/handle.rs new file mode 100644 index 0000000..5f351e8 --- /dev/null +++ b/src/handle.rs @@ -0,0 +1,256 @@ +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::{sink, Sink, Future, Poll, Async}; + +use super::common::{MailRequest, MailResponse}; +use super::error::MailSendError; + +type InnerChannel = mpsc::Sender<(MailRequest, oneshot::Sender>)>; + +#[derive(Debug, Clone)] +pub struct MailServiceHandle { + channel: InnerChannel +} + + +impl MailServiceHandle { + + pub(crate) fn new(sender: InnerChannel) -> Self { + MailServiceHandle { channel: sender } + } + + pub fn send_mail(self, mail_request: MailRequest) -> MailEnqueueFuture { + let (sender, rx) = oneshot::channel(); + + let send = self.channel.send((mail_request, sender)); + + MailEnqueueFuture { send, rx: Some(rx) } + } + +// pub fn map_request_stream(self, stream: S, max_buffer: Option) -> SmtpMailStream +// where S: Stream, +// S::Error: Into +// { +// SmtpMailStream::new(self.channel, stream, max_buffer) +// } + + pub fn into_inner(self) -> InnerChannel { + self.channel + } + +} + +pub struct MailEnqueueFuture { + send: sink::Send, + rx: Option>> +} + +impl Future for MailEnqueueFuture { + + type Item = (MailServiceHandle, MailResponseFuture); + type Error = MailSendError; + + fn poll(&mut self) -> Poll { + let channel = match self.send.poll() { + Ok(Async::Ready(channel)) => channel, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_cancel_err) => return Err(MailSendError::DriverDropped), + }; + + let rx = self.rx.take().expect("called poll after polling completed"); + Ok(Async::Ready((MailServiceHandle { channel }, MailResponseFuture { rx }))) + } +} + + +pub struct MailResponseFuture { + rx: oneshot::Receiver> +} + +impl Future for MailResponseFuture { + type Item = MailResponse; + type Error = MailSendError; + + fn poll(&mut self) -> Poll { + let res = match self.rx.poll() { + Ok(Async::Ready(res)) => res, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_cancel_err) => return Err(MailSendError::CanceledByDriver), + }; + + match res { + Ok(resp) => Ok(Async::Ready(resp)), + Err(err) => Err(err) + } + } +} + + +// use a closure to setup smtp Clone + FnMut() -> R, R: Future or similar +//fn setup_smtp(smtp_connect SmtpSetup) -> (Sender, impl Future<(), SendError>) { +// let (send, resc) = mpcs::channel(XX); +// let shared_composition_base = smtp_setup.composition_base; +// let driver = smtp_connect().and_then(|service| { +// let service = shared_composition_base.wrap_smtp_service(service); +// +// let pipe_in = resc;; +// +// pipe_in.for_each(|(cmd, pipe_out)| { +// service.call(cmd).and_then(|res| { +// let _ = pipe_out.send(res); +// Ok(()) +// }) +// }) +// }); +// (send, driver) +//} +// +//fn setup_smtp_with_retry(setup: SmtpSetup) -> impl Future<(),RetryFail> { +// let driver = setup_smtp(setup.clone()); +// +// future::loop_fn(driver, move |driver| { +// driver.or_else(|err| { +// if err.was_canceled() { +// Ok(Loop::Break(())) +// } else if err.can_recover() { +// let new_driver = setup_smtp(setup.clone()); +// Ok(Loop::Continue(new_driver)) +// } else { +// Err(err.into()) +// } +// }) +// }) +//} +// + + + +// TODO: This links into a stream mapping a stream of requests to a stream of responses, +// but it needs more though into it, i.e. it decouples the sending of a mail, with +// the response of getting one. So we need to at last add some mail Id to Req (we +// might want to see if we can generally do so using MessageId). +// +//pub struct SmtpMailStream +// where RQ: MailSendRequest, +// S: Stream, +// S::Error: Into +//{ +// channel: Option>, +// stream: Option, +// res_buffer: FuturesUnordered>>, +// send_buffer: Option<(RQ, oneshot::Sender>)>, +// max_buffer: Option +//} +// +// +//impl SmtpMailStream +// where RQ: MailSendRequest, +// S: Stream, +// S::Error: Into +//{ +// pub(crate) fn new(channel: InnerChannel, stream: S, max_buffer: Option) -> Self { +// SmtpMailStream { +// channel, stream, max_buffer, +// send_buffer: None, +// res_buffer: FuturesUnordered::new(), +// } +// } +// +// fn channel_mut(&mut self) -> &mut InnerChannel { +// self.channel.as_mut().unwrap() +// } +// +// fn stream_mut(&mut self) -> &mut S { +// self.stream.as_mut().unwrap() +// } +// +// fn try_enqueue_mail( +// &mut self, +// msg: (RQ, oneshot::Sender>) +// ) -> Poll<(), DriverDropped> +// { +// debug_assert!(self.send_buffer.is_none()); +// debug_assert!(self.stream.is_some() && self.channel.is_some()); +// match self.channel_mut().start_send(msg) { +// Ok(AsyncSink::Ready) => Ok(Async::Ready(())), +// Ok(AsyncSink::NotReady(msg)) => { +// self.send_buffer = Some(msg); +// Ok(Async::NotReady) +// }, +// Err(_) => { +// mem::drop(self.channel.take()); +// Err(DriverDropped::Stream(self.stream.take())) +// } +// } +// } +// +// fn close_channel(&mut self) -> Poll, Self::Error> { +// try_ready!(self.channel_mut().close()); +// return Ok(Async::Ready(None)); +// } +// +// fn poll_stream(&mut self) -> Poll, Self::Error> { +// match self.stream_mut().poll()? { +// Async::Ready(Some(item)) => Async::Ready(Some(item)), +// Async::Ready(None) => { +// self.stream.take(); +// return self.close_channel(); +// } +// Async::NotReady => { +// try_ready!(self.channel_mut().poll_complete()); +// return Ok(Async::NotRead); +// } +// } +// } +//} +// +//pub enum DriverDropped { +// Stream(S), +// StreamTakenOnPreviousError +//} +// +//impl Stream for SmtpMailStream +// where RQ: MailSendRequest, +// S: Stream, +// //FIXME[tokio 0.2] use error Never?? +// S::Error: Into +//{ +// type Item = Result; +// type Error = DriverDropped; +// +// fn poll(&mut self) -> Poll, Self::Error> { +// if self.channel.is_none() { +// return Err(DriverDropped::StreamTakenOnPreviousError); +// } +// +// if self.stream.is_none() { +// return self.close_channel(); +// } +// +// if let Some(msg) = self.send_buffer { +// try_ready!(self.try_enqueue_mail(msg)); +// } +// +// loop { +// +// match self.res_buffer.poll() { +// Ok(Async::NotReady) => {}, +// Ok(Async::Ready(res)) => return Ok(AsyncReady(Ok(res))), +// Err(err) => return Ok(AsyncReady(Err(e))) +// } +// +// if self.max_buffer.map(|max| self.res_buffer.len() >= max).unwrap_or_false() { +// return Ok(Async::NotReady); +// } +// +// +// let item = try_some_ready!(self.poll_stream()); +// +// let (tx, rx) = oneshot::channel(); +// +// self.res_buffer.push(rx); +// +// try_ready!(self.try_enqueue_mail((item, tx))); +// } +// } +//} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..1ae530d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,19 @@ +extern crate new_tokio_smtp; + +mod stop_handle; +mod mpsc_ext; + +pub mod error; +mod common; +mod smtp_wrapper; +mod encode; +mod handle; +mod service; + +pub use self::stop_handle::StopServiceHandle; +pub use self::common::*; +pub use self::handle::*; +pub use self::service::*; + +#[cfg(test)] +mod test; \ No newline at end of file diff --git a/src/mpsc_ext.rs b/src/mpsc_ext.rs new file mode 100644 index 0000000..79e6fef --- /dev/null +++ b/src/mpsc_ext.rs @@ -0,0 +1,34 @@ +use super::stop_handle::StopServiceHandle; + +use futures::sync::mpsc; +use futures::{Poll, Async, Stream}; + +pub struct AutoClose { + inner: mpsc::Receiver, + stop_handle: StopServiceHandle, +} + +impl AutoClose + //FIXME[tokio >= 0.2]: use Never + where I: Stream +{ + pub fn new(inner: mpsc::Receiver, stop_handle: StopServiceHandle) -> Self { + AutoClose { inner, stop_handle } + } +} + +impl Stream for AutoClose + where I: Stream +{ + + type Item = I::Item; + //FIXME[tokio >= 0.2]: use Never + type Error = (); + + fn poll(&mut self) -> Poll, Self::Error> { + if stop_handle.should_stop() { + self.inner.close() + } + self.inner.poll() + } +} \ No newline at end of file diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..05387d8 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,413 @@ +use std::io::{Error as IoError}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::mem; + +use futures::sync::{mpsc, oneshot}; +use futures::{future, Future, Stream, Poll, Async}; +use futures::stream::Peekable; + +use new_tokio_smtp::Connection; + +use mail::prelude::{Encoder, Encodable, MailType}; +use mail::utils::SendBoxFuture; +use mail::context::BuilderContext; + +use super::smtp_wrapper::{send_mail, ConnectionState, CompletionState}; +use super::encode::{MailEncodingResult, stream_encode_mail}; +use super::common::{MailResponse, MailRequest, EnvelopData, MailSendResult}; +use super::handle::MailServiceHandle; +use super::error::MailSendError; +use super::stop_handle::StopServiceHandle; +use super::mpsc_ext::AutoClose; + + +pub trait SmtpSetup { + + /// The future returned which returns a Smtp connection, + /// + /// as this smtp mail bindings are writting for `tokio_smtp` + /// the Item is fixed to `ClientProxy`, (This might change + /// in future versions) + type ConnectFuture: 'static + Future< + Item=Connection, + Error=Self::NotConnectingError>; + + /// The error returned if it is not possible to connect, + /// this might represent a direct connection failure or + /// one involfing multiple retries or similar aspects. + type NotConnectingError; + + type BuilderContext: BuilderContext; + + // this future can contain all kind of retry connection handling etc. + /// This method is called to connect with an SMTP server. + /// + /// It is called whenever connecting to a SMTP server is necessary, + /// this includes the initial connection as well as reconnecting after + /// the connection might no longer be usable. + /// + /// As it returns a future with it's own error it can be used to + /// handle automatically retrying failed connections and limiting + /// the amount of retries or having a timeout before retrying to + /// connect. + /// + //TODO + /// Currently it is not implemented to retry sending failed mails, even + /// if it reconnects after e.g. an IO error + fn connect(&mut self) -> Self::ConnectFuture; + + fn context(&self) -> Self::BuilderContext; + + /// return how many mail should be encoded at the same time + /// + /// encoding a `Mail` includes transforming it into an `EncodableMail` which means + /// loading all resources associated with the `Mail` + fn mail_encoding_buffer_size(&self) -> usize { 16 } + + /// return the buffer size for the mpsc channel between the service and it's handles + /// + /// By default each handle has one and the loading buffer is directly connected to the + /// receiver, but the difference between the buffers is that sender can write into the + /// mpsc channels buffer _in their thread_ while moving the data buffered in the mpsc + /// channel to the `BufferUnordered` buffer is done _while polling the service driver_. + fn mail_enqueuing_buffer_size(&self) -> usize { 16 } +} + + + +pub struct MailService + where SUP: SmtpSetup +{ + setup: SUP, + //FIXME[future >= 0.2]: use Never + //FIXME[rust/impl Trait+abstract type]: use impl Trait/abstract type for rx + rx: Peekable), Error=()>>>, + connection: ConnectionState, + tx_of_pending: Option>, + stop_handle: StopServiceHandle +} + + +impl MailService + where SUP: SmtpSetup +{ + + pub fn new(setup: SUP) -> (Self, MailServiceHandle) { + let ctx = setup.context(); + let stop_handle = StopServiceHandle::new(); + + let (tx, raw_rx) = mpsc::channel(setup.mail_enqueuing_buffer_size()); + let auto_close_rx = AutoClose::new(raw_rx, stop_handle.clone()); + let enc_rx = stream_encode_mail(raw_rx, ctx, setup.mail_encoding_buffer_size()); + let rx = enc_rx.peekable(); + + let driver = MailService { + setup, rx, + connection: ConnectionState::Idle, + tx_of_pending: None, + stop_handle: StopServiceHandle::new() + }; + + let handle = MailServiceHandle::new(tx); + (driver, handle) + } + + pub fn stop_handle(&self) -> StopServiceHandle { + self.stop_handle.clone() + } + + /// # Error + /// + /// returns an error if the connection is "in use" + fn start_stopping_now(&mut self) -> Result<(), ()> { + self.stop_handle.stop(); + self.connection.terminate() + } + + fn poll_next_request(&mut self) -> Async<()> { + // 2. try to get a new request + let (enc_result, req_tx) = match self.rx.poll() { + Ok(Async::Ready(Some(item))) => { + item + }, + Ok(Async::Ready(None)) => { + //stop ourself, all senders where closed + self.start_stopping_now() + .expect("[BUG] we try_ready early return on \"in use\" connections"); + return Async::Ready(()); + }, + Ok(Async::NotReady) => { + return Ok(Async::NotReady) + }, + Err(_) => unreachable!("mpsc::Receiver.poll does not error") + }; + + // 3. start transmitting new request / send error back + match enc_result { + Ok((body, envelop)) => { + self.connection.send_mail(data, envelop) + .expect("[BUG] we can only reach here if the connection is \"Usable\"" ); + self.tx_of_pending = Some(req_tx); + }, + Err(err) => { + let _ = req_tx.send(err); + } + } + + Async::Ready(()) + } +} + +impl Future for MailService + where SUP: SmtpSetup +{ + type Item = (); + type Error = SUP::NotConnectingError; + + fn poll(&mut self) -> Poll { + //TODO[futures+tokio/new timout api]: use timeout and then close open connection + loop { + // 1. poll the connections state + match try_ready!(self.connection.poll_state_completion().map(|err| TODO)) { + CompletionState::Usable(opt_mail_send_result) => { + if let Some(mail_result) = opt_mail_send_result { + let tx = self.tx_of_pending.take().expect("[BUG] pending result but no tx"); + // we do not care if the receiver is now dropped + let _ = tx.send(mail_result); + } + + match self.poll_next_request() { + Async::Ready(()) => (), + Async::NotReady => return Ok(Async::NotReady) + } + + }, + CompletionState::Idle => { + let peek = try_ready!(self.rx.peek()); + // only open connection if a mail request is pending + if peek.is_some() { + self.connection.change_into_connecting(self.setup.connect()); + } else { + self.start_stopping_now() + .expect("[BUG] we try_ready early return on \"in use\" connections"); + } + } + CompletionState::Terminated => { + // be robust and make sure the flag is set anyway, through + // it should not be possible to end up here without stop_handle + // being set + self.stop_handle.stop(); + return Ok(Async::Ready(())); + } + } + } + } +} + + + + + + +#[cfg(test)] +mod test { + use std::io; + use futures::{Future, IntoFuture}; + use mail::prelude::*; + use chrono::{Utc, TimeZone}; + use super::super::test::*; + use super::*; + + + fn _test(setup: S, fail_connect: bool, func: F, other_driver: D) + where S: SmtpSetup, + F: FnOnce(MailServiceHandle) -> R, + R: IntoFuture, + D: Future + { + let (driver, handle) = MailService::new(setup); + let stop_handle = driver.stop_handle(); + + let test_fut = func(handle) + .into_future() + .then(|res| { + stop_handle.stop(); + res + }); + + let driver = driver.then(|res| match res { + Ok(_) if fail_connect => + Err(TestError("[test] did not fail to connect".to_owned())), + Err(_) if !fail_connect => + Err(TestError("[test] did unexpected fail to connect".to_owned())), + _ => Ok(()) + }); + + // we want all futures to complete independent of errors + // so there errors get "lifted" into their item + let driver = driver.then(|res| Ok(res)); + let test_fut = test_fut.then(|res| Ok(res)); + let other_driver = other_driver.then(|res| Ok(res)); + + let res: Result<_, ()> = driver.join3(test_fut, other_driver).wait(); + let (rd, rt, rod) = res.unwrap(); + match rd { Ok(_) => {}, Err(TestError(msg)) => panic!(msg) } + match rt { Ok(_) => {}, Err(TestError(msg)) => panic!(msg) } + match rod { Ok(_) => {}, Err(TestError(msg)) => panic!(msg) } + } + + fn example_io_error() -> IoError { + IoError::new(io::ErrorKind::Other, "it broke") + } + + fn example_mail() -> (MailRequest, &'static str) { + let headers = headers! { + From: ["djinns@are.magic"], + To: ["lord.of@the.bottle"], + Subject: "empty bottle, no djinn", + Date: Utc.ymd(2023, 1, 1).and_hms(1, 1, 1) + }.unwrap(); + + let mail = Builder + ::singlepart(text_resource("<--body-->")) + .headers(headers).unwrap() + .build().unwrap(); + + let req = MailRequest::new(mail); + + let expected_body = concat!( + "MIME-Version: 1.0\r\n", + "From: \r\n", + "To: \r\n", + "Subject: empty bottle, no djinn\r\n", + "Date: Sun, 1 Jan 2023 01:01:01 +0000\r\n", + "Content-Type: text/plain\r\n", + "Content-Transfer-Encoding: 7bit\r\n", + "\r\n", + "<--body-->\r\n" + ); + + (req, expected_body) + + } + #[test] + fn send_simple_mail() { + use self::RequestMock::*; + let (req, expected_body) = example_mail(); + let (setup, fake_server) = TestSetup::new(1, + vec![ + Normal(SmtpRequest::Mail { + from: "djinns@are.magic".parse().unwrap(), params: Vec::new() }), + Normal(SmtpRequest::Rcpt { + to: "lord.of@the.bottle".parse().unwrap(), params: Vec::new() }), + Body(SmtpRequest::Data, expected_body.to_owned().into_bytes()), + Normal(SmtpRequest::Quit) + ], + vec![ + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + ] + ); + + _test(setup, false, |handle| { + handle.send_mail(req) + .and_then(|(_handle, resp_fut)| resp_fut) + .and_then(|_resp: MailResponse| { + //MailResponse is currently zero sized, so nothing to do here + Ok(()) + }) + .map_err(|mse| TestError(format!("unexpected error: {:?}", mse))) + }, fake_server) + } + + #[test] + fn reset_connection_on_io_error() { + use self::RequestMock::*; + let (req, expected_body) = example_mail(); + let (setup, fake_server) = TestSetup::new(2, + vec![ + Normal(SmtpRequest::Mail { + from: "djinns@are.magic".parse().unwrap(), params: Vec::new() }), + // currently we only check for errs after sending all non Data parts + Normal(SmtpRequest::Rcpt { + to: "lord.of@the.bottle".parse().unwrap(), params: Vec::new() }), + Normal(SmtpRequest::Mail { + from: "djinns@are.magic".parse().unwrap(), params: Vec::new() }), + Normal(SmtpRequest::Rcpt { + to: "lord.of@the.bottle".parse().unwrap(), params: Vec::new() }), + Body(SmtpRequest::Data, expected_body.to_owned().into_bytes()), + Normal(SmtpRequest::Quit) + ], + vec![ + Err(example_io_error()), + Err(example_io_error()), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + Ok(SmtpResponse::parse(b"250 Ok\r\n").unwrap().1), + ] + ); + + _test(setup, false, |handle| { + handle.send_mail(req) + .map_err(|err| TestError(format!("unexpected enque error {:?}", err))) + .and_then(|(handle, resp_fut)| resp_fut.then(|res| match res { + Ok(MailResponse) => Err(TestError("[test] unexpected no error".to_owned())), + Err(err) => { + if let MailSendError::Io(_) = err { + Ok(handle) + } else { + Err(TestError(format!("unexpected error kind {:?}", err))) + } + } + })) + .and_then(|handle| { + let (req, _) = example_mail(); + handle.send_mail(req) + .map_err(|err| TestError(format!("unexpected enque error {:?}", err))) + }) + .and_then(|(_handle, res_fut)| { + res_fut.map_err(|err| TestError(format!("unexpected error {:?}", err))) + }) + .map(|_| ()) + }, fake_server) + } + + #[test] + fn failed_reset_connection() { + use self::RequestMock::*; + let (req, _) = example_mail(); + let (setup, fake_server) = TestSetup::new(1, + vec![ + Normal(SmtpRequest::Mail { + from: "djinns@are.magic".parse().unwrap(), params: Vec::new() }), + // currently we only check for errs after sending all non Data parts + Normal(SmtpRequest::Rcpt { + to: "lord.of@the.bottle".parse().unwrap(), params: Vec::new() }), + ], + vec![ + Err(example_io_error()), + Err(example_io_error()), + ] + ); + + _test(setup, true, |handle| { + handle.send_mail(req) + .and_then(|(_handle, res_fut)| res_fut) + .then(|res| match res { + Ok(_) => Err(TestError("unexpected no error".to_owned())), + Err(err) => { + if let MailSendError::Io(_) = err { + Ok(()) + } else { + Err(TestError(format!("unexpected error kind: {:?}", err))) + } + } + }) + }, fake_server) + + } +} \ No newline at end of file diff --git a/src/smtp_wrapper.rs b/src/smtp_wrapper.rs new file mode 100644 index 0000000..dc91294 --- /dev/null +++ b/src/smtp_wrapper.rs @@ -0,0 +1,202 @@ +use std::io as std_io; +use std::mem; +use std::sync::Mutex; + +use futures::future::{self, Loop, Either}; +use futures::{Future, Poll, Async}; + +use new_tokio_smtp::{ + command, + Connection, +}; +use new_tokio_smtp::io::Socket; +use new_tokio_smtp::chain::{chain, OnError}; + +use super::error::MailSendError; +use super::common::{EnvelopData, MailResponse}; + +//FIXME[rust/impl Trait + abstract type]: use abstract type +type SmtpMailSendFuture = Box), + Error=MailSendError>>; + +//FIXME[rust/impl Trait]: use impl Trait +pub(crate) fn send_mail(con: Connection, body_bytes: Vec, envelop: EnvelopData) + -> SmtpMailSendFuture +{ + let (from, tos) = envelop.split(); + let mut cmds = vec![command::Mail::new(from).boxed()]; + + for to in tos.into_iter() { + cmds.push(command::Recipient::new(to).boxed()); + } + + let fut = chain(con, cmds, OnError::StopAndReset) + .map_err(|err| MailSendError::Io(err)) + .map(|(con, result)| match result { + Ok(_) => (con, Ok(MailResponse)), + Err((_, err)) => (con, Err(MailSendError::Smtp(err))) + }); + + Box::new(fut) +} + +pub enum ConnectionState { + Idle, + Connecting(F), + Connected(Connection), + ConnectionInUse(SmtpMailSendFuture), + Closing { + fut: Box>, + is_termination: bool + }, + Terminated, + Poison +} + +pub enum CompletionState { + Usable(Option>), + Idle, + Terminated +} + +impl ConnectionState + where F: Future +{ + + pub fn change_into_connecting(&mut self, con_fut: F) { + let old = mem::replace(self, ConnectionState::Connecting(con_fut)); + if let ConnectionState::Poison = old { + panic!("reuse of poisoned state in ConnectionState"); + } + } + + pub fn poll_state_completion(&mut self) + -> Poll> + { + use self::ConnectionState::*; + use self::CompletionState::Usable; + let state = mem::replace(self, Poison); + + let mut new_state = None; + let (new_state, result) = + match state { + Idle => { + (Idle, Ok(Async::Ready(CompletionState::Idle))) + }, + Connected(con) => { + (Connected(con), Ok(Async::Ready(Usable(None)))) + }, + Connecting(mut fut) => match fut.poll() { + Ok(Async::NotReady) => (Connecting(fut), Ok(Async::NotReady)), + Ok(Async::Ready(con)) => { + (Connected(con), Ok(Async::Ready(Usable(None)))) + }, + Err(err) => (Terminated, Err(Either::B(err))) + }, + ConnectionInUse(mut fut) => match fut.poll() { + Ok(Async::NotReady) => (ConnectionInUse(fut), Ok(Async::NotReady)), + Ok(Async::Ready((con, result))) => { + (Connected(con), Ok(Async::Ready(Usable(Some(result))))) + }, + Err(err) => (Terminated, Err(Either::A(err))) + }, + Closing{ mut fut, is_termination } => match fut.poll() { + Ok(Async::NotReady) => (Closing {fut, is_termination }, Ok(Async::NotReady)), + Ok(Async::Ready(())) => { + if is_termination { + (Terminated, Ok(Async::Ready(CompletionState::Terminated))) + } else { + (Idle, Ok(Async::Ready(CompletionState::Idle))) + } + } + }, + Terminated => (Terminated, Ok(Async::Ready(CompletionState::Terminated))), + Poison => panic!("polled ConnectionState after it was poisoned") + }; + + *self = new_state; + result + } + + pub fn send_mail(&mut self, body_bytes: Vec, envelop: EnvelopData) + -> Result<(), (Vec, EnvelopData)> + { + use self::ConnectionState::*; + + let state = mem::replace(self, Poison); + let (state, result) = + match state { + state @ Idle | Terminated | Connecting(_) | ConnectionInUse(_) | Closing(_) => + (state, Err((body_bytes, envelop))), + Poison => panic!("used ConnectionState after it was poisoned"), + Connected(con) => { + let in_use_fut = send_mail(con, body_bytes, envelop); + (ConnectionInUse(in_use_fut), Ok(())) + } + }; + + *self = state; + result + } + + /// # Panic + /// + /// panics if it's either poisoned or if the connections is "in use", i.e. + /// there is currently in the process of beeing send. + pub fn close_current(&mut self) -> Result<(), ()> { + self._close_con(false) + + } + + /// # Panic + /// + /// panics if it's either poisoned or if the connections is "in use", i.e. + /// there is currently in the process of beeing send. + pub fn terminate(&mut self) -> Result<(), ()> { + self._close_con(true) + } + + fn _close_con(&mut self, is_termination: bool) -> Result<(), ()> { + use self::ConnectionState::*; + + let mut result = Ok(()); + let force_termination = is_termination; + let state = mem::replace(self, Poison); + *self = + match state { + Idle => { + if is_termination { + Terminated + } else { + Idle + } + }, + Connecting(fut) => Closing { + fut: Box::new(fut.and_then(|con| con.quit())), + is_termination + }, + Connected(con) => Closing { + fut: Box::new(con.quit()), + is_termination + }, + ConnectionInUse(fut) => { + result = Err(()); + ConnectionInUse(fut) + }, + Closing { fut, is_termination } => { + // terminating overides quiting but not the other way around + if force_termination { + Closing{ fut, is_termination: true } + } else { + Closing{ fut, is_termination } + } + }, + Terminated => Terminated, + Poison => panic!("used ConnectionState after it was poisoned") + }; + + result + } +} + diff --git a/src/stop_handle.rs b/src/stop_handle.rs new file mode 100644 index 0000000..f1be50f --- /dev/null +++ b/src/stop_handle.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +#[derive(Debug, Clone)] +pub struct StopServiceHandle(Arc); + +impl StopServiceHandle { + pub fn new() -> Self { + StopServiceHandle(Arc::new(AtomicBool::new(false))) + } + pub fn should_stop(&self) -> bool { + self.0.load(Ordering::Acquire) + } + + pub fn stop(&self) { + self.0.store(true, Ordering::Release) + } +} \ No newline at end of file diff --git a/src/test.rs b/src/test.rs new file mode 100644 index 0000000..08fc5a2 --- /dev/null +++ b/src/test.rs @@ -0,0 +1,270 @@ +use std::io::{Error as IoError}; + +use futures::future; +use futures::sync::oneshot; +use futures::{Future, Stream, Poll, Async}; + +use tokio_proto::streaming::{Body, Message}; +use tokio_proto::util::client_proxy::{self, Receiver}; + +use tokio_smtp::response::{Response as SmtpResponse}; +use tokio_smtp::request::{Request as SmtpRequest}; + +use super::service::{TokioSmtpService, SmtpSetup, StopServiceHandle}; +use mail::default_impl::simple_context; +use mail::prelude::*; +use std::convert::From; + +pub(crate) struct FakeSmtpServer { + rx: Receiver< + Message, IoError>>, + Message>, + IoError>, + expected_requests: Vec, + use_responses: Vec, + stop_flag: StopServiceHandle +} + +pub(crate) type ResponseMock = Result; + +pub(crate) enum RequestMock { + Normal(SmtpRequest), + Body(SmtpRequest, Vec) +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct TestError(pub(crate) String); + +impl From for TestError { + fn from(inp: String) -> Self { + TestError(inp) + } +} + +impl<'a> From<&'a str> for TestError { + fn from(inp: &'a str) -> Self { + TestError(inp.to_owned()) + } +} + + +impl FakeSmtpServer { + + pub(crate) fn new( + expected_requests: Vec, + use_responses: Vec> + ) -> (TokioSmtpService, Self) + { + let (proxy, rx) = client_proxy::pair(); + let mut expected_requests = expected_requests; + expected_requests.reverse(); + let mut use_responses = use_responses; + use_responses.reverse(); + let _self = FakeSmtpServer { + rx, expected_requests, use_responses, + stop_flag: StopServiceHandle::new() + }; + (proxy, _self) + } + + pub(crate) fn get_stop_flag(&self) -> StopServiceHandle { + self.stop_flag.clone() + } + + fn get_next_expected(&mut self, got: &SmtpRequest) -> Result { + if let Some(expected) = self.expected_requests.pop() { + Ok(expected) + } else { + Err(TestError( + format!("[test] got request but expected no more requests, got: {:?}", got))) + } + } + fn check_next_request(&mut self, req: SmtpRequest) -> Result<(), TestError> { + let expected = self.get_next_expected(&req)?; + + let (expected, expected_body) = match expected { + RequestMock::Normal(req) => (req, false), + RequestMock::Body(req, _) => (req, true) + }; + + if req != expected { + return Err( + TestError(format!("[test] expected req and received req differ: {:?} != {:?}", + expected, req))); + } + + if expected_body { + Err(TestError(format!("[test] expected with body, got: {:?}", req))) + } else { + Ok(()) + } + } + + //NOTE: only call during a poll or it will panic + fn check_next_request_with_body( + &mut self, + req: SmtpRequest, mut body: Body, IoError> + ) -> Result<(), TestError> + { + let expected = self.get_next_expected(&req)?; + + let (expected, expected_body) = match expected { + RequestMock::Normal(_exp) => { + return Err(TestError(format!("[test] expected req without body, got: {:?}", req))); + }, + RequestMock::Body(exp, body) => { + (exp, body) + } + }; + + if req != expected { + return Err( + TestError(format!("[test] expected req and received req differ: {:?} != {:?}", + expected, req))); + } + + + let body_bytes = + match body.poll() { + Ok(Async::Ready(Some(data))) => data, + Ok(Async::Ready(None)) => + panic!("[TEST_BUG] Body::from(data) is used so data should be ready"), + Ok(Async::NotReady) => + panic!("[TEST_BUG] Body::from(data) is used so data should be ready"), + Err(_) => + panic!("[TEST_BUG] Body::from(data) is used so no error can occure") + }; + + if expected_body != body_bytes { + let readable_exp_body = String::from_utf8_lossy(&expected_body); + let readable_body = String::from_utf8_lossy(&body_bytes); + Err(TestError( + format!("unexpected body, got {:?} expected {:?}", + readable_body, readable_exp_body))) + } else { + Ok(()) + } + } + + fn send_next_response( + &mut self, + tx: oneshot::Sender>, IoError>> + ) -> Result<(), TestError> + { + let next_response = match self.use_responses.pop() { + Some(resp) => resp, + None => return Err(TestError("[test] run out of responses".to_owned())) + }; + + tx.send(next_response.map(|res| Message::WithoutBody(res))) + .map_err(|_| + TestError("[test] Smtp ClientProxy call response future dropped early".to_owned())) + } +} + +impl Future for FakeSmtpServer { + type Item = (); + type Error = TestError; + + fn poll(&mut self) -> Poll { + loop { + + if self.stop_flag.should_stop() { + return Ok(Async::Ready(())); + } + + let item = try_ready! { + self.rx.poll() + .map_err(|_| TestError::from("[test] smtp mock service channel closed in poll")) + }; + + let io_res = match item { + Some(item) => item, + None => { + if !self.expected_requests.is_empty() { + return Err(TestError( + "[test] receiver 'closed' but expected more requests".to_owned())); + } else { + return Ok(Async::Ready(())); + } + } + }; + + let (req, tx) = io_res + .expect("[TEST_BUG] unexpected io error send form ClinetProxy"); + + match req { + Message::WithoutBody(req) => { + self.check_next_request(req)?; + }, + Message::WithBody(req, body) => { + self.check_next_request_with_body(req, body)?; + } + } + self.send_next_response(tx)?; + } + } + +} + +pub(crate) struct TestSetup { + nr_of_ok_connection_attemps: usize, + client_proxy: TokioSmtpService, + context: simple_context::Context +} + +impl TestSetup { + + pub(crate) fn new( + nr_of_ok_connection_attemps: usize, + expected_requests: Vec, + use_responses: Vec> + ) -> (Self, FakeSmtpServer) + { + let (client_proxy, fake_driver) = FakeSmtpServer::new(expected_requests, use_responses); + + let _self = TestSetup { + nr_of_ok_connection_attemps, client_proxy, + context: simple_context::new().unwrap() + }; + (_self, fake_driver) + } + +} + + + + +#[derive(Debug, PartialEq)] +pub(crate) enum ConnectionError { + RunOutOfConnection +} + + +impl SmtpSetup for TestSetup { + + type ConnectFuture = future::FutureResult; + type NotConnectingError = ConnectionError; + type BuilderContext = simple_context::Context; + + fn connect(&mut self) -> Self::ConnectFuture { + let left_attemps = self.nr_of_ok_connection_attemps; + if left_attemps < 1 { + future::err(ConnectionError::RunOutOfConnection) + } else { + self.nr_of_ok_connection_attemps = left_attemps - 1; + future::ok(self.client_proxy.clone()) + } + } + + fn context(&self) -> Self::BuilderContext { + self.context.clone() + } + +} + + +pub(crate) fn text_resource>(text: I) -> Resource { + let fb = F