diff options
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | Cargo.toml | 15 | ||||
-rw-r--r-- | LICENSE-APACHE | 201 | ||||
-rw-r--r-- | LICENSE-MIT | 25 | ||||
-rw-r--r-- | src/common.rs | 112 | ||||
-rw-r--r-- | src/encode.rs | 98 | ||||
-rw-r--r-- | src/error.rs | 24 | ||||
-rw-r--r-- | src/handle.rs | 256 | ||||
-rw-r--r-- | src/lib.rs | 19 | ||||
-rw-r--r-- | src/mpsc_ext.rs | 34 | ||||
-rw-r--r-- | src/service.rs | 413 | ||||
-rw-r--r-- | src/smtp_wrapper.rs | 202 | ||||
-rw-r--r-- | src/stop_handle.rs | 18 | ||||
-rw-r--r-- | src/test.rs | 270 |
14 files changed, 1692 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a10770d --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/target +Cargo.lock +.idea/ +.vscode/ +*.iml diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1cfdb3a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,15 @@ +[package] +authors = ["Philipp Korber <p.korber@1aim.com>"] +name = "mail-smtp" +version = "0.1.0" +categories = [] +description = "combines mail-codec with new-tokio-smtp" +documentation = "https://docs.rs/mail-smtp" +keywords = ["mail", "smtp", "send" ] +license = "MIT OR Apache-2.0" +readme = "./README.md" +repository = "https://github.com/1aim/mail-smtp" + +[dependencies] +mail = { git="https://github.com/1aim/mail" } +new-tokio-smtp = { git="https://github.com/1aim/new-tokio-smtp" } diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..56434b8 --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright <year> <name> + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..b25ff75 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) <year> <name> + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/src/common.rs b/src/common.rs new file mode 100644 index 0000000..bccf869 --- /dev/null +++ b/src/common.rs @@ -0,0 +1,112 @@ +//! This modules contains some of the data types used, like e.g. Response, Request, Envelop etc. + +use vec1::Vec1; + +use futures::sync::oneshot; +use new_tokio_smtp::{ForwardPath, ReversePath}; + +use mail::headers::{Sender, From as _From, To}; +use mail::headers::components::Mailbox; +use mail::Mail; +use super::error::{MailSendError, EnvelopFromMailError}; + +#[derive(Debug)] +pub struct EnvelopData { + from: ReversePath, + to: Vec1<ForwardPath> +} + +impl EnvelopData { + pub fn new(from: SmtpMailbox, to: Vec1<SmtpMailbox>) -> Self { + EnvelopData { from, to } + } + + pub fn split(self) -> (SmtpMailbox, Vec1<SmtpMailbox>) { + let EnvelopData { from, to } = self; + (from, to) + } + + pub fn from_mail(mail: &Mail) -> Result<Self, EnvelopFromMailError> { + + let headers = mail.headers(); + let smtp_from = + if let Some(sender) = headers.get_single(Sender) { + let sender = sender.map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; + //TODO double check with from field + mailbox2smtp_mailbox(sender) + } else { + let from = headers.get_single(_From) + .ok_or(EnvelopFromMailError::NeitherSenderNorFrom)? + .map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; + + if from.len() > 1 { + return Err(EnvelopFromMailError::NoSenderAndMoreThanOneFrom); + } + + mailbox2smtp_mailbox(from.first()) + }; + + let smtp_to = + if let Some(to) = headers.get_single(To) { + let to = to.map_err(|tpr| EnvelopFromMailError::TypeError(tpr))?; + to.mapped_ref(mailbox2smtp_mailbox) + } else { + return Err(EnvelopFromMailError::NoToHeaderField); + }; + + //TODO Cc, Bcc + + Ok(EnvelopData { + from: smtp_from, + to: smtp_to + }) + } +} + +pub type MailSendResult = Result<MailResponse, MailSendError>; +pub(crate) type Handle2ServiceMsg = (MailRequest, oneshot::Sender<MailSendResult>); + +#[derive(Debug, Clone)] +pub struct MailResponse; + +//TODO derive(Clone): requires clone for Box<EncodableMail+'static> +#[derive(Debug)] +pub struct MailRequest { + mail: Mail, + envelop_data: Option<EnvelopData> +} + +impl From<Mail> for MailRequest { + fn from(mail: Mail) -> Self { + MailRequest::new(mail) + } +} + + + +impl MailRequest { + + pub fn new(mail: Mail) -> Self { + MailRequest { mail, envelop_data: None } + } + + pub fn new_with_envelop(mail: Mail, envelop: EnvelopData) -> Self { + MailRequest { mail, envelop_data: Some(envelop) } + } + + pub fn into_mail_with_envelop(self) -> Result<(Mail, EnvelopData), EnvelopFromMailError> { + let envelop = + if let Some(envelop) = self.envelop_data { envelop } + else { EnvelopData::from_mail(&self.mail)? }; + + Ok((self.mail, envelop)) + } +} + +fn mailbox2smtp_mailbox(mailbox: &Mailbox) -> SmtpMailbox { + use emailaddress::EmailAddress; + SmtpMailbox(Some(EmailAddress { + local: mailbox.email.local_part.as_str().to_owned(), + domain: mailbox.email.domain.as_str().to_owned(), + })) +}
\ No newline at end of file diff --git a/src/encode.rs b/src/encode.rs new file mode 100644 index 0000000..c5274bd --- /dev/null +++ b/src/encode.rs @@ -0,0 +1,98 @@ +use std::io::{Error as IoError}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::mem; + +use futures::sync::{mpsc, oneshot}; +use futures::{future, Future, Stream, Poll, Async}; +use futures::stream::BufferUnordered; + +use new_tokio_smtp::Connection; + +use mail::prelude::{Encoder, Encodable, MailType}; +use mail::utils::SendBoxFuture; +use mail::context::BuilderContext; + +use super::smtp_wrapper::{send_mail, close_smtp_conn}; +use super::common::{ + MailSendResult, MailResponse, MailRequest, EnvelopData, + Handle2ServiceMsg +}; +use super::handle::MailServiceHandle; +use super::error::MailSendError; + +pub(crate) type MailEncodingResult = Result<(Vec<u8>, EnvelopData), MailSendError>; + +/// encodes the mails in the input stream in a thread poll returning the results out of order +/// +/// The `max_concurrent` parameter determines how many mails are encoded concurrently. +/// +/// Note that it uses `ctx.offload` to offload work, i.e. send it to a thread pool, +/// if `ctx.offload` is not implemented in terms of a thread pool this will not encode +/// mail in a thread poll but with whatever mechanism `ctx.offload` uses to resolve +/// futures. +/// +pub(crate) fn stream_encode_mail<S, CTX>(steam: S, ctx: CTX, max_concurrent: usize) + //FIXME[rust/impl Trait]: use impl Trait instead of boxing + -> Box<Stream< + //FIXME[futures >= 0.2]: replace () with Never + Item=SendBoxFuture<(MailEncodingResult, oneshot::Sender<MailSendResult>), ()>, + //FIXME[futures >= 0.2]: replace () with Never + Error=()>> + //FIXME[futures >= 0.2]: replace () with Never + where S: Stream<Item=Handle2ServiceMsg, Error=()>, CTX: BuilderContext +{ + let _ctx = ctx; + let fut_stream = stream.map(move |(req, tx)| { + //clone ctx to move it into the operation chain + let ctx = _ctx.clone(); + let operation = future + //use lazy to make sure it's run in the thread pool + ::lazy(move || mail_request.into_mail_with_envelop()) + .then(move |result| match result { + Ok((mail, envelop)) => Ok((mail, envelop, tx)), + Err(err) => Err((MailSendError::CreatingEnvelop(err), tx)) + }) + .and_then(move |(mail, envelop, tx)| { + mail.into_encodeable_mail(&ctx) + .then(move |result| match result { + Ok(enc_mail) => Ok((enc_mail, envelop, tx)), + Err(err) => Err((MailSendError::Encoding(err), tx)) + }) + }) + .and_then(move |(encodable_mail, envelop, tx)| { + //TODO we need to feed in the MailType (and get it from tokio smtp) + let mut encoder = Encoder::new( MailType::Ascii ); + match encodable_mail.encode(&mut encoder) { + Ok(()) => {}, + Err(err) => return Err((MailSendError::Encoding(err), tx)) + } + + let bytes = match encoder.to_vec() { + Ok(bytes) => bytes, + Err(err) => return Err((MailSendError::Encoding(err), tx)) + }; + + //TODO we also need to return SmtpEnvelop<Vec<u8>> + let enc_result = Ok((bytes, envelop)); + Ok((enc_result, tx)) + }) + .or_else(move |(err, tx)| { + let enc_result = Err(err); + Ok((enc_result, tx)) + }); + + // offload work to thread pool + let fut = _ctx.offload(operation); + + + // return future as new item + fut + }); + + // buffer + let buffered = fut_stream.buffer_unordered(max_concurrent); + + Box::new(buffered) +} + diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..48d4690 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,24 @@ +use std::io::{Error as IoError}; +use ::error::Error; +use new_tokio_smtp::error::LogicError; + +#[derive(Debug)] +pub enum MailSendError { + CreatingEnvelop(EnvelopFromMailError), + Composition(Error), + Encoding(Error), + //Note with pipelining this will change to Vec<LogicError> + Smtp(LogicError), + Io(IoError), + DriverDropped, + CanceledByDriver +} + + +#[derive(Debug)] +pub enum EnvelopFromMailError { + NeitherSenderNorFrom, + TypeError(Error), + NoSenderAndMoreThanOneFrom, + NoToHeaderField +} diff --git a/src/handle.rs b/src/handle.rs new file mode 100644 index 0000000..5f351e8 --- /dev/null +++ b/src/handle.rs @@ -0,0 +1,256 @@ +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::{sink, Sink, Future, Poll, Async}; + +use super::common::{MailRequest, MailResponse}; +use super::error::MailSendError; + +type InnerChannel = mpsc::Sender<(MailRequest, oneshot::Sender<Result<MailResponse, MailSendError>>)>; + +#[derive(Debug, Clone)] +pub struct MailServiceHandle { + channel: InnerChannel +} + + +impl MailServiceHandle { + + pub(crate) fn new(sender: InnerChannel) -> Self { + MailServiceHandle { channel: sender } + } + + pub fn send_mail(self, mail_request: MailRequest) -> MailEnqueueFuture { + let (sender, rx) = oneshot::channel(); + + let send = self.channel.send((mail_request, sender)); + + MailEnqueueFuture { send, rx: Some(rx) } + } + +// pub fn map_request_stream<S>(self, stream: S, max_buffer: Option<usize>) -> SmtpMailStream<RQ, S> +// where S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +// { +// SmtpMailStream::new(self.channel, stream, max_buffer) +// } + + pub fn into_inner(self) -> InnerChannel { + self.channel + } + +} + +pub struct MailEnqueueFuture { + send: sink::Send<InnerChannel>, + rx: Option<oneshot::Receiver<Result<MailResponse, MailSendError>>> +} + +impl Future for MailEnqueueFuture { + + type Item = (MailServiceHandle, MailResponseFuture); + type Error = MailSendError; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let channel = match self.send.poll() { + Ok(Async::Ready(channel)) => channel, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_cancel_err) => return Err(MailSendError::DriverDropped), + }; + + let rx = self.rx.take().expect("called poll after polling completed"); + Ok(Async::Ready((MailServiceHandle { channel }, MailResponseFuture { rx }))) + } +} + + +pub struct MailResponseFuture { + rx: oneshot::Receiver<Result<MailResponse, MailSendError>> +} + +impl Future for MailResponseFuture { + type Item = MailResponse; + type Error = MailSendError; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let res = match self.rx.poll() { + Ok(Async::Ready(res)) => res, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_cancel_err) => return Err(MailSendError::CanceledByDriver), + }; + + match res { + Ok(resp) => Ok(Async::Ready(resp)), + Err(err) => Err(err) + } + } +} + + +// use a closure to setup smtp Clone + FnMut() -> R, R: Future<Item=...,> or similar +//fn setup_smtp(smtp_connect SmtpSetup) -> (Sender, impl Future<(), SendError>) { +// let (send, resc) = mpcs::channel(XX); +// let shared_composition_base = smtp_setup.composition_base; +// let driver = smtp_connect().and_then(|service| { +// let service = shared_composition_base.wrap_smtp_service(service); +// +// let pipe_in = resc;; +// +// pipe_in.for_each(|(cmd, pipe_out)| { +// service.call(cmd).and_then(|res| { +// let _ = pipe_out.send(res); +// Ok(()) +// }) +// }) +// }); +// (send, driver) +//} +// +//fn setup_smtp_with_retry(setup: SmtpSetup) -> impl Future<(),RetryFail> { +// let driver = setup_smtp(setup.clone()); +// +// future::loop_fn(driver, move |driver| { +// driver.or_else(|err| { +// if err.was_canceled() { +// Ok(Loop::Break(())) +// } else if err.can_recover() { +// let new_driver = setup_smtp(setup.clone()); +// Ok(Loop::Continue(new_driver)) +// } else { +// Err(err.into()) +// } +// }) +// }) +//} +// + + + +// TODO: This links into a stream mapping a stream of requests to a stream of responses, +// but it needs more though into it, i.e. it decouples the sending of a mail, with +// the response of getting one. So we need to at last add some mail Id to Req (we +// might want to see if we can generally do so using MessageId). +// +//pub struct SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +//{ +// channel: Option<InnerChannel<RQ>>, +// stream: Option<S>, +// res_buffer: FuturesUnordered<oneshot::Receiver<Result<MailResponse, MailSendError>>>, +// send_buffer: Option<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>, +// max_buffer: Option<usize> +//} +// +// +//impl<RQ, S> SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +//{ +// pub(crate) fn new(channel: InnerChannel<RQ>, stream: S, max_buffer: Option<usize>) -> Self { +// SmtpMailStream { +// channel, stream, max_buffer, +// send_buffer: None, +// res_buffer: FuturesUnordered::new(), +// } +// } +// +// fn channel_mut(&mut self) -> &mut InnerChannel<RQ> { +// self.channel.as_mut().unwrap() +// } +// +// fn stream_mut(&mut self) -> &mut S { +// self.stream.as_mut().unwrap() +// } +// +// fn try_enqueue_mail( +// &mut self, +// msg: (RQ, oneshot::Sender<Result<MailResponse, MailSendError>>) +// ) -> Poll<(), DriverDropped<S>> +// { +// debug_assert!(self.send_buffer.is_none()); +// debug_assert!(self.stream.is_some() && self.channel.is_some()); +// match self.channel_mut().start_send(msg) { +// Ok(AsyncSink::Ready) => Ok(Async::Ready(())), +// Ok(AsyncSink::NotReady(msg)) => { +// self.send_buffer = Some(msg); +// Ok(Async::NotReady) +// }, +// Err(_) => { +// mem::drop(self.channel.take()); +// Err(DriverDropped::Stream(self.stream.take())) +// } +// } +// } +// +// fn close_channel(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// try_ready!(self.channel_mut().close()); +// return Ok(Async::Ready(None)); +// } +// +// fn poll_stream(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// match self.stream_mut().poll()? { +// Async::Ready(Some(item)) => Async::Ready(Some(item)), +// Async::Ready(None) => { +// self.stream.take(); +// return self.close_channel(); +// } +// Async::NotReady => { +// try_ready!(self.channel_mut().poll_complete()); +// return Ok(Async::NotRead); +// } +// } +// } +//} +// +//pub enum DriverDropped<S> { +// Stream(S), +// StreamTakenOnPreviousError +//} +// +//impl<RQ, S> Stream for SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// //FIXME[tokio 0.2] use error Never?? +// S::Error: Into<MailSendError> +//{ +// type Item = Result<MailResponse, MailSendError>; +// type Error = DriverDropped<S>; +// +// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// if self.channel.is_none() { +// return Err(DriverDropped::StreamTakenOnPreviousError); +// } +// +// if self.stream.is_none() { +// return self.close_channel(); +// } +// +// if let Some(msg) = self.send_buffer { +// try_ready!(self.try_enqueue_mail(msg)); +// } +// +// loop { +// +// match self.res_buffer.poll() { +// Ok(Async::NotReady) => {}, +// Ok(Async::Ready(res)) => return Ok(AsyncReady(Ok(res))), +// Err(err) => return Ok(AsyncReady(Err(e))) +// } +// +// if self.max_buffer.map(|max| self.res_buffer.len() >= max).unwrap_or_false() { +// return Ok(Async::NotReady); +// } +// +// +// let item = try_some_ready!(self.poll_stream()); +// +// let (tx, rx) = oneshot::channel(); +// +// self.res_buffer.push(rx); +// +// try_ready!(self.try_enqueue_mail((item, tx))); +// } +// } +//}
\ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..1ae530d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,19 @@ +extern crate new_tokio_smtp; + +mod stop_handle; +mod mpsc_ext; + +pub mod error; +mod common; +mod smtp_wrapper; +mod encode; +mod handle; +mod service; + +pub use self::stop_handle::StopServiceHandle; +pub use self::common::*; +pub use self::handle::*; +pub use self::service::*; + +#[cfg(test)] +mod test;
\ No newline at end of file diff --git a/src/mpsc_ext.rs b/src/mpsc_ext.rs new file mode 100644 index 0000000..79e6fef --- /dev/null +++ b/src/mpsc_ext.rs @@ -0,0 +1,34 @@ +use super::stop_handle::StopServiceHandle; + +use futures::sync::mpsc; +use futures::{Poll, Async, Stream}; + +pub struct AutoClose<I> { + inner: mpsc::Receiver<I>, + stop_handle: StopServiceHandle, +} + +impl<I> AutoClose<I> + //FIXME[tokio >= 0.2]: use Never + where I: Stream<Error=()> +{ + pub fn new(inner: mpsc::Receiver<I>, stop_handle: StopServiceHandle) -> Self { + AutoClose { inner, stop_handle } + } +} + +impl<I> Stream for AutoClose<I> + where I: Stream<Error=()> +{ + + type Item = I::Item; + //FIXME[tokio >= 0.2]: use Never + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + if stop_handle.should_stop() { + self.inner.close() + } + self.inner.poll() + } +}
\ No newline at end of file diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..05387d8 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,413 @@ +use std::io::{Error as IoError}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::mem; + +use futures::sync::{mpsc, oneshot}; +use futures::{future, Future, Stream, Poll, Async}; +use futures::stream::Peekable; + +use new_tokio_smtp::Connection; + +use mail::prelude::{Encoder, Encodable, MailType}; +use mail::utils::SendBoxFuture; +use mail::context::BuilderContext; + +use super::smtp_wrapper::{send_mail, ConnectionState, CompletionState}; +use super::encode::{MailEncodingResult, stream_encode_mail}; +use super::common::{MailResponse, MailRequest, EnvelopData, MailSendResult}; +use super::handle::MailServiceHandle; +use super::error::MailSendError; +use super::stop_handle::StopServiceHandle; +use super::mpsc_ext::AutoClose; + + +pub trait SmtpSetup { + + /// The future returned which returns a Smtp connection, + /// + /// as this smtp mail bindings are writting for `tokio_smtp` + /// the Item is fixed to `ClientProxy`, (This might change + /// in future versions) + type ConnectFuture: 'static + Future< + Item=Connection, + Error=Self::NotConnectingError>; + + /// The error returned if it is not possible to connect, + /// this might represent a direct connection failure or + /// one involfing multiple retries or similar aspects. + type NotConnectingError; + + type BuilderContext: BuilderContext; + + // this future can contain all kind of retry connection handling etc. + /// This method is called to connect with an SMTP server. + /// + /// It is called whenever connecting to a SMTP server is necessary, + /// this includes the initial connection as well as reconnecting after + /// the connection might no longer be usable. + /// + /// As it returns a future with it's own error it can be used to + /// handle automatically retrying failed connections and limiting + /// the amount of retries or having a timeout before retrying to + /// connect. + /// + //TODO + /// Currently it is not implemented to retry sending failed mails, even + /// if it reconnects after e.g. an IO error + fn connect(&mut self) -> Self::ConnectFuture; + + fn context(&self) -> Self::BuilderContext; + + /// return how many mail should be encoded at the same time + /// + /// encoding a `Mail` includes transforming it into an `EncodableMail` which means + /// loading all resources associated with the `Mail` + fn mail_encoding_buffer_size(&self) -> usize { 16 } + + /// return the buffer size for the mpsc channel between the service and it's handles + /// + /// By default each handle has one and the loading buffer is dir |