summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorJustus Winter <justus@sequoia-pgp.org>2018-01-19 13:40:55 +0100
committerJustus Winter <justus@sequoia-pgp.org>2018-01-19 13:40:55 +0100
commitef83072663900981d802b6950a4de6b0060343c9 (patch)
tree8e648c41be21eff83caa91c78864e4e5f289b6f8 /net
parent05e48401af13c0271fc9d32065375397331027ed (diff)
net: Do not share co-located servers.
- Previously, servers running in threads advertised their cookie in the synchronization file, making it possible to connect to them. However, the lifetime of these servers is bound to the lifetime of the host process. This leads to spurious errors for other processes when servers go down unexpectedly. A scheme to restart servers magically seems not worth the trouble. This means that servers need to rely on some other synchronization mechanism to control access to shared resources.
Diffstat (limited to 'net')
-rw-r--r--net/src/ipc.rs80
1 files changed, 50 insertions, 30 deletions
diff --git a/net/src/ipc.rs b/net/src/ipc.rs
index c88c2666..883b4b3a 100644
--- a/net/src/ipc.rs
+++ b/net/src/ipc.rs
@@ -19,8 +19,8 @@
//! # External vs internal servers
//!
//! These servers can be either in external processes, or co-located
-//! within the current process. In either case, other processes may
-//! connect to the server.
+//! within the current process. We will first start an external
+//! process, and fall back to starting a thread instead.
//!
//! # Note
//!
@@ -94,6 +94,24 @@ impl Descriptor {
/// Connect to a descriptor, starting the server if necessary.
pub fn connect(&self, handle: &tokio_core::reactor::Handle) -> io::Result<RpcSystem<Side>> {
+ let do_connect =
+ move |cookie: Cookie, mut s: TcpStream| -> io::Result<RpcSystem<Side>> {
+ cookie.send(&mut s)?;
+
+ /* Tokioize. */
+ let stream = net::TcpStream::from_stream(s, &handle)?;
+ stream.set_nodelay(true)?;
+ let (reader, writer) = stream.split();
+
+ let network =
+ Box::new(twoparty::VatNetwork::new(reader, writer,
+ Side::Client,
+ Default::default()));
+ let rpc_system = RpcSystem::new(network, None);
+
+ Ok(rpc_system)
+ };
+
let mut file = fs::OpenOptions::new()
.read(true)
.write(true)
@@ -115,21 +133,8 @@ impl Descriptor {
}
let stream = TcpStream::connect(addr.unwrap());
- if let Ok(mut s) = stream {
- cookie.send(&mut s)?;
-
- /* Tokioize. */
- let stream = net::TcpStream::from_stream(s, &handle)?;
- stream.set_nodelay(true)?;
- let (reader, writer) = stream.split();
-
- let network =
- Box::new(twoparty::VatNetwork::new(reader, writer,
- Side::Client,
- Default::default()));
- let rpc_system = RpcSystem::new(network, None);
-
- Ok(rpc_system)
+ if let Ok(s) = stream {
+ do_connect(cookie, s)
} else {
/* Failed to connect. Invalidate the cookie and try again. */
file.set_len(0)?;
@@ -138,20 +143,35 @@ impl Descriptor {
}
} else {
let cookie = Cookie::new()?;
- let addr = self.start(true).or(self.start(false))?;
- let mut stream = TcpStream::connect(addr)?;
- cookie.send(&mut stream)?;
-
- /* XXX: It'd be nice not to waste this connection. */
- drop(stream);
-
- /* Write connection information to file. */
- file.set_len(0)?;
- cookie.send(&mut file)?;
- write!(file, "{}:{}", LOCALHOST, addr.port())?;
- drop(file);
+ for external in [true, false].iter() {
+ let addr = match self.start(*external) {
+ Ok(a) => a,
+ Err(e) => if *external {
+ // Try to spawn a thread next.
+ continue;
+ } else {
+ // Failed to spawn a thread.
+ return Err(e);
+ }
+ };
+
+ let mut stream = TcpStream::connect(addr)?;
+ cookie.send(&mut stream)?;
+
+ /* XXX: It'd be nice not to waste this connection. */
+ drop(stream);
+
+ if *external {
+ /* Write connection information to file. */
+ file.set_len(0)?;
+ cookie.send(&mut file)?;
+ write!(file, "{}:{}", LOCALHOST, addr.port())?;
+ }
+ drop(file);
- self.connect(handle)
+ return do_connect(cookie, TcpStream::connect(addr)?);
+ }
+ unreachable!();
}
}