diff options
author | Justus Winter <justus@sequoia-pgp.org> | 2018-01-19 13:40:55 +0100 |
---|---|---|
committer | Justus Winter <justus@sequoia-pgp.org> | 2018-01-19 13:40:55 +0100 |
commit | ef83072663900981d802b6950a4de6b0060343c9 (patch) | |
tree | 8e648c41be21eff83caa91c78864e4e5f289b6f8 /net | |
parent | 05e48401af13c0271fc9d32065375397331027ed (diff) |
net: Do not share co-located servers.
- Previously, servers running in threads advertised their cookie in
the synchronization file, making it possible to connect to them.
However, the lifetime of these servers is bound to the lifetime of
the host process. This leads to spurious errors for other
processes when servers go down unexpectedly. A scheme to restart
servers magically seems not worth the trouble. This means that
servers need to rely on some other synchronization mechanism to
control access to shared resources.
Diffstat (limited to 'net')
-rw-r--r-- | net/src/ipc.rs | 80 |
1 files changed, 50 insertions, 30 deletions
diff --git a/net/src/ipc.rs b/net/src/ipc.rs index c88c2666..883b4b3a 100644 --- a/net/src/ipc.rs +++ b/net/src/ipc.rs @@ -19,8 +19,8 @@ //! # External vs internal servers //! //! These servers can be either in external processes, or co-located -//! within the current process. In either case, other processes may -//! connect to the server. +//! within the current process. We will first start an external +//! process, and fall back to starting a thread instead. //! //! # Note //! @@ -94,6 +94,24 @@ impl Descriptor { /// Connect to a descriptor, starting the server if necessary. pub fn connect(&self, handle: &tokio_core::reactor::Handle) -> io::Result<RpcSystem<Side>> { + let do_connect = + move |cookie: Cookie, mut s: TcpStream| -> io::Result<RpcSystem<Side>> { + cookie.send(&mut s)?; + + /* Tokioize. */ + let stream = net::TcpStream::from_stream(s, &handle)?; + stream.set_nodelay(true)?; + let (reader, writer) = stream.split(); + + let network = + Box::new(twoparty::VatNetwork::new(reader, writer, + Side::Client, + Default::default())); + let rpc_system = RpcSystem::new(network, None); + + Ok(rpc_system) + }; + let mut file = fs::OpenOptions::new() .read(true) .write(true) @@ -115,21 +133,8 @@ impl Descriptor { } let stream = TcpStream::connect(addr.unwrap()); - if let Ok(mut s) = stream { - cookie.send(&mut s)?; - - /* Tokioize. */ - let stream = net::TcpStream::from_stream(s, &handle)?; - stream.set_nodelay(true)?; - let (reader, writer) = stream.split(); - - let network = - Box::new(twoparty::VatNetwork::new(reader, writer, - Side::Client, - Default::default())); - let rpc_system = RpcSystem::new(network, None); - - Ok(rpc_system) + if let Ok(s) = stream { + do_connect(cookie, s) } else { /* Failed to connect. Invalidate the cookie and try again. */ file.set_len(0)?; @@ -138,20 +143,35 @@ impl Descriptor { } } else { let cookie = Cookie::new()?; - let addr = self.start(true).or(self.start(false))?; - let mut stream = TcpStream::connect(addr)?; - cookie.send(&mut stream)?; - - /* XXX: It'd be nice not to waste this connection. */ - drop(stream); - - /* Write connection information to file. */ - file.set_len(0)?; - cookie.send(&mut file)?; - write!(file, "{}:{}", LOCALHOST, addr.port())?; - drop(file); + for external in [true, false].iter() { + let addr = match self.start(*external) { + Ok(a) => a, + Err(e) => if *external { + // Try to spawn a thread next. + continue; + } else { + // Failed to spawn a thread. + return Err(e); + } + }; + + let mut stream = TcpStream::connect(addr)?; + cookie.send(&mut stream)?; + + /* XXX: It'd be nice not to waste this connection. */ + drop(stream); + + if *external { + /* Write connection information to file. */ + file.set_len(0)?; + cookie.send(&mut file)?; + write!(file, "{}:{}", LOCALHOST, addr.port())?; + } + drop(file); - self.connect(handle) + return do_connect(cookie, TcpStream::connect(addr)?); + } + unreachable!(); } } |