diff options
Diffstat (limited to 'ipc/src/lib.rs')
-rw-r--r-- | ipc/src/lib.rs | 114 |
1 files changed, 64 insertions, 50 deletions
diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs index 1a5a58b7..9f6bf339 100644 --- a/ipc/src/lib.rs +++ b/ipc/src/lib.rs @@ -43,11 +43,8 @@ use std::path::PathBuf; use anyhow::{anyhow, Result}; use fs2::FileExt; -use futures::{Future, Stream}; -use tokio_core::net; -use tokio_io::io::ReadHalf; -use tokio_io::AsyncRead; +use tokio_util::compat::Compat; use capnp_rpc::{RpcSystem, twoparty}; use capnp_rpc::rpc_twoparty_capnp::Side; @@ -116,14 +113,15 @@ macro_rules! platform { pub trait Handler { /// Called on every connection. fn handle(&self, - network: twoparty::VatNetwork<ReadHalf<net::TcpStream>>) + network: twoparty::VatNetwork<Compat<tokio::net::tcp::OwnedReadHalf>>) -> RpcSystem<Side>; } /// A factory for handlers. -pub type HandlerFactory = fn(descriptor: Descriptor, - handle: tokio_core::reactor::Handle) - -> Result<Box<dyn Handler>>; +pub type HandlerFactory = fn( + descriptor: Descriptor, + local: &tokio::task::LocalSet +) -> Result<Box<dyn Handler>>; /// A descriptor is used to connect to a service. #[derive(Clone)] @@ -154,25 +152,39 @@ impl Descriptor { } /// Connects to a descriptor, starting the server if necessary. - pub fn connect(&self, handle: &tokio_core::reactor::Handle) - -> Result<RpcSystem<Side>> { - self.connect_with_policy(handle, *self.ctx.ipc_policy()) + /// + /// # Panic + /// This will panic if called outside of the Tokio runtime context. See + /// See [`Handle::enter`] for more details. + /// + /// [`Handle::enter`]: https://docs.rs/tokio/0.2.22/tokio/runtime/struct.Handle.html#method.enter + pub fn connect(&self) -> Result<RpcSystem<Side>> { + self.connect_with_policy(*self.ctx.ipc_policy()) } /// Connects to a descriptor, starting the server if necessary. /// /// This function does not use the context's IPC policy, but uses /// the given one. - pub fn connect_with_policy(&self, handle: &tokio_core::reactor::Handle, - policy: core::IPCPolicy) + /// + /// # Panic + /// This will panic if called outside of the Tokio runtime context. See + /// See [`Handle::enter`] for more details. + /// + /// [`Handle::enter`]: https://docs.rs/tokio/0.2.22/tokio/runtime/struct.Handle.html#method.enter + pub fn connect_with_policy(&self, policy: core::IPCPolicy) -> Result<RpcSystem<Side>> { let do_connect = |cookie: Cookie, mut s: TcpStream| { cookie.send(&mut s)?; /* Tokioize. */ - let stream = net::TcpStream::from_stream(s, &handle)?; + let stream = tokio::net::TcpStream::from_std(s)?; stream.set_nodelay(true)?; - let (reader, writer) = stream.split(); + + let (reader, writer) = stream.into_split(); + use tokio_util::compat::Tokio02AsyncReadCompatExt; + use tokio_util::compat::Tokio02AsyncWriteCompatExt; + let (reader, writer) = (reader.compat(), writer.compat_write()); let network = Box::new(twoparty::VatNetwork::new(reader, writer, @@ -207,7 +219,7 @@ impl Descriptor { /* Failed to connect. Invalidate the cookie and try again. */ file.set_len(0)?; drop(file); - self.connect(handle) + self.connect() } } else { let cookie = Cookie::new(); @@ -306,7 +318,7 @@ impl Descriptor { /// A server. pub struct Server { - core: tokio_core::reactor::Core, + runtime: tokio::runtime::Runtime, descriptor: Descriptor, } @@ -314,7 +326,7 @@ impl Server { /// Creates a new server for the descriptor. pub fn new(descriptor: Descriptor) -> Result<Self> { Ok(Server { - core: tokio_core::reactor::Core::new()?, + runtime: tokio::runtime::Runtime::new()?, descriptor, }) } @@ -393,35 +405,38 @@ impl Server { Cookie::receive(&mut i.0)? }; - let handler = (self.descriptor.factory)(self.descriptor.clone(), self.core.handle())?; - /* Tokioize. */ - let handle = self.core.handle(); - let a = l.local_addr()?; - let socket = tokio_core::net::TcpListener::from_listener(l, &a, &handle).unwrap(); - - let done = socket.incoming().and_then(|(socket, _addr)| { - let _ = socket.set_nodelay(true); - Cookie::receive_async(socket) - }).and_then(|(socket, received_cookie)| { - if received_cookie == cookie { - Ok(socket) - } else { - Err(io::Error::new(io::ErrorKind::BrokenPipe, "Bad cookie.")) - } - }).for_each(|socket| { - let (reader, writer) = socket.split(); + let local = tokio::task::LocalSet::new(); + let handler = (self.descriptor.factory)(self.descriptor.clone(), &local)?; - let network = - twoparty::VatNetwork::new(reader, writer, - Side::Server, Default::default()); + let server = async move { + let mut socket = tokio::net::TcpListener::from_std(l).unwrap(); - let rpc_system = handler.handle(network); - handle.spawn(rpc_system.map_err(|e| println!("error: {:?}", e))); - Ok(()) - }); + loop { + let (mut socket, _) = socket.accept().await?; + + let _ = socket.set_nodelay(true); + let received_cookie = Cookie::receive_async(&mut socket).await?; + if received_cookie != cookie { + return Err(anyhow::anyhow!("Bad cookie")); + } + + let (reader, writer) = socket.into_split(); + + use tokio_util::compat::Tokio02AsyncReadCompatExt; + use tokio_util::compat::Tokio02AsyncWriteCompatExt; + let (reader, writer) = (reader.compat(), writer.compat_write()); + + let network = + twoparty::VatNetwork::new(reader, writer, + Side::Server, Default::default()); - Ok(self.core.run(done)?) + let rpc_system = handler.handle(network); + let _ = tokio::task::spawn_local(rpc_system).await; + } + }; + + local.block_on(&mut self.runtime, server) } } @@ -471,13 +486,12 @@ impl Cookie { } /// Asynchronously read a cookie from 'socket'. - fn receive_async(socket: net::TcpStream) - -> impl Future<Item = (net::TcpStream, Cookie), Error = io::Error> { - let buf = vec![0; Cookie::SIZE]; - tokio_io::io::read_exact(socket, buf) - .and_then(|(socket, buf)| { - Ok((socket, Cookie::from(&buf).expect("enough bytes read"))) - }) + async fn receive_async(socket: &mut tokio::net::TcpStream) -> io::Result<Cookie> { + use tokio::io::AsyncReadExt; + + let mut buf = vec![0; Cookie::SIZE]; + socket.read_exact(&mut buf).await?; + Ok(Cookie::from(&buf).expect("enough bytes read")) } |