summaryrefslogtreecommitdiffstats
path: root/ipc/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ipc/src/lib.rs')
-rw-r--r--ipc/src/lib.rs114
1 files changed, 64 insertions, 50 deletions
diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs
index 1a5a58b7..9f6bf339 100644
--- a/ipc/src/lib.rs
+++ b/ipc/src/lib.rs
@@ -43,11 +43,8 @@ use std::path::PathBuf;
use anyhow::{anyhow, Result};
use fs2::FileExt;
-use futures::{Future, Stream};
-use tokio_core::net;
-use tokio_io::io::ReadHalf;
-use tokio_io::AsyncRead;
+use tokio_util::compat::Compat;
use capnp_rpc::{RpcSystem, twoparty};
use capnp_rpc::rpc_twoparty_capnp::Side;
@@ -116,14 +113,15 @@ macro_rules! platform {
pub trait Handler {
/// Called on every connection.
fn handle(&self,
- network: twoparty::VatNetwork<ReadHalf<net::TcpStream>>)
+ network: twoparty::VatNetwork<Compat<tokio::net::tcp::OwnedReadHalf>>)
-> RpcSystem<Side>;
}
/// A factory for handlers.
-pub type HandlerFactory = fn(descriptor: Descriptor,
- handle: tokio_core::reactor::Handle)
- -> Result<Box<dyn Handler>>;
+pub type HandlerFactory = fn(
+ descriptor: Descriptor,
+ local: &tokio::task::LocalSet
+) -> Result<Box<dyn Handler>>;
/// A descriptor is used to connect to a service.
#[derive(Clone)]
@@ -154,25 +152,39 @@ impl Descriptor {
}
/// Connects to a descriptor, starting the server if necessary.
- pub fn connect(&self, handle: &tokio_core::reactor::Handle)
- -> Result<RpcSystem<Side>> {
- self.connect_with_policy(handle, *self.ctx.ipc_policy())
+ ///
+ /// # Panic
+ /// This will panic if called outside of the Tokio runtime context. See
+ /// See [`Handle::enter`] for more details.
+ ///
+ /// [`Handle::enter`]: https://docs.rs/tokio/0.2.22/tokio/runtime/struct.Handle.html#method.enter
+ pub fn connect(&self) -> Result<RpcSystem<Side>> {
+ self.connect_with_policy(*self.ctx.ipc_policy())
}
/// Connects to a descriptor, starting the server if necessary.
///
/// This function does not use the context's IPC policy, but uses
/// the given one.
- pub fn connect_with_policy(&self, handle: &tokio_core::reactor::Handle,
- policy: core::IPCPolicy)
+ ///
+ /// # Panic
+ /// This will panic if called outside of the Tokio runtime context. See
+ /// See [`Handle::enter`] for more details.
+ ///
+ /// [`Handle::enter`]: https://docs.rs/tokio/0.2.22/tokio/runtime/struct.Handle.html#method.enter
+ pub fn connect_with_policy(&self, policy: core::IPCPolicy)
-> Result<RpcSystem<Side>> {
let do_connect = |cookie: Cookie, mut s: TcpStream| {
cookie.send(&mut s)?;
/* Tokioize. */
- let stream = net::TcpStream::from_stream(s, &handle)?;
+ let stream = tokio::net::TcpStream::from_std(s)?;
stream.set_nodelay(true)?;
- let (reader, writer) = stream.split();
+
+ let (reader, writer) = stream.into_split();
+ use tokio_util::compat::Tokio02AsyncReadCompatExt;
+ use tokio_util::compat::Tokio02AsyncWriteCompatExt;
+ let (reader, writer) = (reader.compat(), writer.compat_write());
let network =
Box::new(twoparty::VatNetwork::new(reader, writer,
@@ -207,7 +219,7 @@ impl Descriptor {
/* Failed to connect. Invalidate the cookie and try again. */
file.set_len(0)?;
drop(file);
- self.connect(handle)
+ self.connect()
}
} else {
let cookie = Cookie::new();
@@ -306,7 +318,7 @@ impl Descriptor {
/// A server.
pub struct Server {
- core: tokio_core::reactor::Core,
+ runtime: tokio::runtime::Runtime,
descriptor: Descriptor,
}
@@ -314,7 +326,7 @@ impl Server {
/// Creates a new server for the descriptor.
pub fn new(descriptor: Descriptor) -> Result<Self> {
Ok(Server {
- core: tokio_core::reactor::Core::new()?,
+ runtime: tokio::runtime::Runtime::new()?,
descriptor,
})
}
@@ -393,35 +405,38 @@ impl Server {
Cookie::receive(&mut i.0)?
};
- let handler = (self.descriptor.factory)(self.descriptor.clone(), self.core.handle())?;
-
/* Tokioize. */
- let handle = self.core.handle();
- let a = l.local_addr()?;
- let socket = tokio_core::net::TcpListener::from_listener(l, &a, &handle).unwrap();
-
- let done = socket.incoming().and_then(|(socket, _addr)| {
- let _ = socket.set_nodelay(true);
- Cookie::receive_async(socket)
- }).and_then(|(socket, received_cookie)| {
- if received_cookie == cookie {
- Ok(socket)
- } else {
- Err(io::Error::new(io::ErrorKind::BrokenPipe, "Bad cookie."))
- }
- }).for_each(|socket| {
- let (reader, writer) = socket.split();
+ let local = tokio::task::LocalSet::new();
+ let handler = (self.descriptor.factory)(self.descriptor.clone(), &local)?;
- let network =
- twoparty::VatNetwork::new(reader, writer,
- Side::Server, Default::default());
+ let server = async move {
+ let mut socket = tokio::net::TcpListener::from_std(l).unwrap();
- let rpc_system = handler.handle(network);
- handle.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
- Ok(())
- });
+ loop {
+ let (mut socket, _) = socket.accept().await?;
+
+ let _ = socket.set_nodelay(true);
+ let received_cookie = Cookie::receive_async(&mut socket).await?;
+ if received_cookie != cookie {
+ return Err(anyhow::anyhow!("Bad cookie"));
+ }
+
+ let (reader, writer) = socket.into_split();
+
+ use tokio_util::compat::Tokio02AsyncReadCompatExt;
+ use tokio_util::compat::Tokio02AsyncWriteCompatExt;
+ let (reader, writer) = (reader.compat(), writer.compat_write());
+
+ let network =
+ twoparty::VatNetwork::new(reader, writer,
+ Side::Server, Default::default());
- Ok(self.core.run(done)?)
+ let rpc_system = handler.handle(network);
+ let _ = tokio::task::spawn_local(rpc_system).await;
+ }
+ };
+
+ local.block_on(&mut self.runtime, server)
}
}
@@ -471,13 +486,12 @@ impl Cookie {
}
/// Asynchronously read a cookie from 'socket'.
- fn receive_async(socket: net::TcpStream)
- -> impl Future<Item = (net::TcpStream, Cookie), Error = io::Error> {
- let buf = vec![0; Cookie::SIZE];
- tokio_io::io::read_exact(socket, buf)
- .and_then(|(socket, buf)| {
- Ok((socket, Cookie::from(&buf).expect("enough bytes read")))
- })
+ async fn receive_async(socket: &mut tokio::net::TcpStream) -> io::Result<Cookie> {
+ use tokio::io::AsyncReadExt;
+
+ let mut buf = vec![0; Cookie::SIZE];
+ socket.read_exact(&mut buf).await?;
+ Ok(Cookie::from(&buf).expect("enough bytes read"))
}