diff options
Diffstat (limited to 'src/handle.rs')
-rw-r--r-- | src/handle.rs | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/src/handle.rs b/src/handle.rs new file mode 100644 index 0000000..5f351e8 --- /dev/null +++ b/src/handle.rs @@ -0,0 +1,256 @@ +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::{sink, Sink, Future, Poll, Async}; + +use super::common::{MailRequest, MailResponse}; +use super::error::MailSendError; + +type InnerChannel = mpsc::Sender<(MailRequest, oneshot::Sender<Result<MailResponse, MailSendError>>)>; + +#[derive(Debug, Clone)] +pub struct MailServiceHandle { + channel: InnerChannel +} + + +impl MailServiceHandle { + + pub(crate) fn new(sender: InnerChannel) -> Self { + MailServiceHandle { channel: sender } + } + + pub fn send_mail(self, mail_request: MailRequest) -> MailEnqueueFuture { + let (sender, rx) = oneshot::channel(); + + let send = self.channel.send((mail_request, sender)); + + MailEnqueueFuture { send, rx: Some(rx) } + } + +// pub fn map_request_stream<S>(self, stream: S, max_buffer: Option<usize>) -> SmtpMailStream<RQ, S> +// where S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +// { +// SmtpMailStream::new(self.channel, stream, max_buffer) +// } + + pub fn into_inner(self) -> InnerChannel { + self.channel + } + +} + +pub struct MailEnqueueFuture { + send: sink::Send<InnerChannel>, + rx: Option<oneshot::Receiver<Result<MailResponse, MailSendError>>> +} + +impl Future for MailEnqueueFuture { + + type Item = (MailServiceHandle, MailResponseFuture); + type Error = MailSendError; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let channel = match self.send.poll() { + Ok(Async::Ready(channel)) => channel, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_cancel_err) => return Err(MailSendError::DriverDropped), + }; + + let rx = self.rx.take().expect("called poll after polling completed"); + Ok(Async::Ready((MailServiceHandle { channel }, MailResponseFuture { rx }))) + } +} + + +pub struct MailResponseFuture { + rx: oneshot::Receiver<Result<MailResponse, MailSendError>> +} + +impl Future for MailResponseFuture { + type Item = MailResponse; + type Error = MailSendError; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let res = match self.rx.poll() { + Ok(Async::Ready(res)) => res, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_cancel_err) => return Err(MailSendError::CanceledByDriver), + }; + + match res { + Ok(resp) => Ok(Async::Ready(resp)), + Err(err) => Err(err) + } + } +} + + +// use a closure to setup smtp Clone + FnMut() -> R, R: Future<Item=...,> or similar +//fn setup_smtp(smtp_connect SmtpSetup) -> (Sender, impl Future<(), SendError>) { +// let (send, resc) = mpcs::channel(XX); +// let shared_composition_base = smtp_setup.composition_base; +// let driver = smtp_connect().and_then(|service| { +// let service = shared_composition_base.wrap_smtp_service(service); +// +// let pipe_in = resc;; +// +// pipe_in.for_each(|(cmd, pipe_out)| { +// service.call(cmd).and_then(|res| { +// let _ = pipe_out.send(res); +// Ok(()) +// }) +// }) +// }); +// (send, driver) +//} +// +//fn setup_smtp_with_retry(setup: SmtpSetup) -> impl Future<(),RetryFail> { +// let driver = setup_smtp(setup.clone()); +// +// future::loop_fn(driver, move |driver| { +// driver.or_else(|err| { +// if err.was_canceled() { +// Ok(Loop::Break(())) +// } else if err.can_recover() { +// let new_driver = setup_smtp(setup.clone()); +// Ok(Loop::Continue(new_driver)) +// } else { +// Err(err.into()) +// } +// }) +// }) +//} +// + + + +// TODO: This links into a stream mapping a stream of requests to a stream of responses, +// but it needs more though into it, i.e. it decouples the sending of a mail, with +// the response of getting one. So we need to at last add some mail Id to Req (we +// might want to see if we can generally do so using MessageId). +// +//pub struct SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +//{ +// channel: Option<InnerChannel<RQ>>, +// stream: Option<S>, +// res_buffer: FuturesUnordered<oneshot::Receiver<Result<MailResponse, MailSendError>>>, +// send_buffer: Option<(RQ, oneshot::Sender<Result<MailResponse, MailSendError>>)>, +// max_buffer: Option<usize> +//} +// +// +//impl<RQ, S> SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// S::Error: Into<MailSendError> +//{ +// pub(crate) fn new(channel: InnerChannel<RQ>, stream: S, max_buffer: Option<usize>) -> Self { +// SmtpMailStream { +// channel, stream, max_buffer, +// send_buffer: None, +// res_buffer: FuturesUnordered::new(), +// } +// } +// +// fn channel_mut(&mut self) -> &mut InnerChannel<RQ> { +// self.channel.as_mut().unwrap() +// } +// +// fn stream_mut(&mut self) -> &mut S { +// self.stream.as_mut().unwrap() +// } +// +// fn try_enqueue_mail( +// &mut self, +// msg: (RQ, oneshot::Sender<Result<MailResponse, MailSendError>>) +// ) -> Poll<(), DriverDropped<S>> +// { +// debug_assert!(self.send_buffer.is_none()); +// debug_assert!(self.stream.is_some() && self.channel.is_some()); +// match self.channel_mut().start_send(msg) { +// Ok(AsyncSink::Ready) => Ok(Async::Ready(())), +// Ok(AsyncSink::NotReady(msg)) => { +// self.send_buffer = Some(msg); +// Ok(Async::NotReady) +// }, +// Err(_) => { +// mem::drop(self.channel.take()); +// Err(DriverDropped::Stream(self.stream.take())) +// } +// } +// } +// +// fn close_channel(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// try_ready!(self.channel_mut().close()); +// return Ok(Async::Ready(None)); +// } +// +// fn poll_stream(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// match self.stream_mut().poll()? { +// Async::Ready(Some(item)) => Async::Ready(Some(item)), +// Async::Ready(None) => { +// self.stream.take(); +// return self.close_channel(); +// } +// Async::NotReady => { +// try_ready!(self.channel_mut().poll_complete()); +// return Ok(Async::NotRead); +// } +// } +// } +//} +// +//pub enum DriverDropped<S> { +// Stream(S), +// StreamTakenOnPreviousError +//} +// +//impl<RQ, S> Stream for SmtpMailStream<RQ, S> +// where RQ: MailSendRequest, +// S: Stream<Item = RQ>, +// //FIXME[tokio 0.2] use error Never?? +// S::Error: Into<MailSendError> +//{ +// type Item = Result<MailResponse, MailSendError>; +// type Error = DriverDropped<S>; +// +// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// if self.channel.is_none() { +// return Err(DriverDropped::StreamTakenOnPreviousError); +// } +// +// if self.stream.is_none() { +// return self.close_channel(); +// } +// +// if let Some(msg) = self.send_buffer { +// try_ready!(self.try_enqueue_mail(msg)); +// } +// +// loop { +// +// match self.res_buffer.poll() { +// Ok(Async::NotReady) => {}, +// Ok(Async::Ready(res)) => return Ok(AsyncReady(Ok(res))), +// Err(err) => return Ok(AsyncReady(Err(e))) +// } +// +// if self.max_buffer.map(|max| self.res_buffer.len() >= max).unwrap_or_false() { +// return Ok(Async::NotReady); +// } +// +// +// let item = try_some_ready!(self.poll_stream()); +// +// let (tx, rx) = oneshot::channel(); +// +// self.res_buffer.push(rx); +// +// try_ready!(self.try_enqueue_mail((item, tx))); +// } +// } +//}
\ No newline at end of file |