From 969b94846ec223df24ebfb56b69b63a530b1c555 Mon Sep 17 00:00:00 2001 From: Justus Winter Date: Wed, 29 Nov 2017 14:51:59 +0100 Subject: net: Add a module for IPC support. --- net/Cargo.toml | 7 +- net/src/ipc.rs | 332 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ net/src/lib.rs | 5 + 3 files changed, 343 insertions(+), 1 deletion(-) create mode 100644 net/src/ipc.rs (limited to 'net') diff --git a/net/Cargo.toml b/net/Cargo.toml index 044171b4..91775697 100644 --- a/net/Cargo.toml +++ b/net/Cargo.toml @@ -6,6 +6,9 @@ authors = ["Justus Winter "] [dependencies] openpgp = { path = "../openpgp" } sequoia-core = { path = "../core" } + +fs2 = "0.4.2" +futures = "0.1" hyper = "0.11" hyper-tls = "0.1.2" libc = "0.2.33" @@ -14,5 +17,7 @@ nom = "3.2.0" num = "0.1.40" num-derive = "0.1.41" percent-encoding = "1.0.1" +rand = "0.3" tokio-core = "0.1" -futures = "0.1" +tokio-io = "0.1.4" +capnp-rpc = "0.8" diff --git a/net/src/ipc.rs b/net/src/ipc.rs new file mode 100644 index 00000000..c67b049b --- /dev/null +++ b/net/src/ipc.rs @@ -0,0 +1,332 @@ +//! Low-level IPC mechanism for Sequoia. +//! +//! # Rationale +//! +//! Sequoia makes use of background services e.g. for managing and +//! updating public keys. +//! +//! # Design +//! +//! We use the filesystem as namespace to discover services. Every +//! service has a file called rendezvous point. Access to this file +//! is serialized using file locking. This file contains a socket +//! address and a cookie that we use to connect to the server and +//! authenticate us. If the file does not exist, is malformed, or +//! does not point to a usable server, we start a new one on demand. +//! +//! This design mimics Unix sockets, but works on Windows too. +//! +//! # External vs internal servers +//! +//! These servers can be either in external processes, or co-located +//! within the current process. In either case, other processes may +//! connect to the server. +//! +//! # Note +//! +//! Windows support is currently not implemented, but should be +//! straight forward. + +extern crate fs2; +use self::fs2::FileExt; + +use std::fs; +use std::io::{self, Read, Write}; +use std::net::{SocketAddr, AddrParseError, TcpStream, TcpListener}; +use std::path::PathBuf; + +use futures::{Future, Stream}; + +use tokio_core::{self, net}; +use tokio_io; +use tokio_io::io::{ReadHalf, ReadExact}; +use tokio_io::AsyncRead; + +use capnp_rpc::{RpcSystem, twoparty}; +use capnp_rpc::rpc_twoparty_capnp::Side; + +/* Unix-specific options. */ +use std::os::unix::io::FromRawFd; +use std::os::unix::fs::OpenOptionsExt; + +/* XXX: Implement Windows support. */ + +use std::process::{Command, Stdio}; +use std::os::unix::io::AsRawFd; + +use std::thread; + +/// Servers need to implement this trait. +pub trait Handler { + /// Called on every connection. + fn handle(&self, + network: twoparty::VatNetwork>) + -> RpcSystem; +} + +/// A factory for handlers. +pub type HandlerFactory = fn(descriptor: Descriptor) -> Option>; + +/// A descriptor is used to connect to a service. +#[derive(Clone)] +pub struct Descriptor { + pub home: PathBuf, + pub rendezvous: PathBuf, + executable: PathBuf, + factory: HandlerFactory, +} + +const LOCALHOST: &str = "127.0.0.1"; + +impl Descriptor { + /// Create a descriptor given its rendezvous point, the path to + /// the servers executable file, and a handler factory. + pub fn new(home: PathBuf, rendezvous: PathBuf, executable: PathBuf, factory: HandlerFactory) + -> Self { + Descriptor { + home: home, + rendezvous: rendezvous, + executable: executable, + factory: factory, + } + } + + /// Connect to a descriptor, starting the server if necessary. + pub fn connect(&self, handle: &tokio_core::reactor::Handle) -> io::Result> { + let mut file = fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .mode(0o600) + .open(&self.rendezvous)?; + file.lock_exclusive()?; + + let mut c = vec![]; + file.read_to_end(&mut c)?; + + if let Some((cookie, a)) = Cookie::extract(c) { + let addr: Result = String::from_utf8_lossy(&a).parse(); + if addr.is_err() { + /* Malformed. Invalidate the cookie and try again. */ + file.set_len(0)?; + drop(file); + return self.connect(handle); + } + + let stream = TcpStream::connect(addr.unwrap()); + if let Ok(mut s) = stream { + cookie.send(&mut s)?; + + /* Tokioize. */ + let stream = net::TcpStream::from_stream(s, &handle)?; + stream.set_nodelay(true)?; + let (reader, writer) = stream.split(); + + let network = + Box::new(twoparty::VatNetwork::new(reader, writer, + Side::Client, + Default::default())); + let rpc_system = RpcSystem::new(network, None); + + Ok(rpc_system) + } else { + /* Failed to connect. Invalidate the cookie and try again. */ + file.set_len(0)?; + drop(file); + self.connect(handle) + } + } else { + let cookie = Cookie::new()?; + let addr = self.start(true).or(self.start(false))?; + let mut stream = TcpStream::connect(addr)?; + cookie.send(&mut stream)?; + + /* XXX: It'd be nice not to waste this connection. */ + drop(stream); + + /* Write connection information to file. */ + file.set_len(0)?; + cookie.send(&mut file)?; + write!(file, "{}:{}", LOCALHOST, addr.port())?; + drop(file); + + self.connect(handle) + } + } + + /// Try to create a TCP socket, bind it to a random port on + /// localhost. + fn listen(&self) -> io::Result { + let port = OsRng::new()?.next_u32() as u16; + TcpListener::bind((LOCALHOST, port)) + } + + /// Start the service, either as an external process or as a + /// thread. + fn start(&self, external: bool) -> io::Result { + /* Listen on a random port on localhost. */ + let mut listener = self.listen(); + while listener.is_err() { + listener = self.listen(); + } + let listener = listener.unwrap(); + let addr = listener.local_addr()?; + + /* Start the server, connect to it, and send the cookie. */ + if external { + self.fork(listener)?; + } else { + self.spawn(listener)?; + } + + Ok(addr) + } + + fn fork(&self, l: TcpListener) -> io::Result<()> { + Command::new(&self.executable.clone().into_os_string()) + .stdin(unsafe { Stdio::from_raw_fd(l.as_raw_fd()) }) + .spawn()?; + Ok(()) + } + + fn spawn(&self, l: TcpListener) -> io::Result<()> { + let descriptor = self.clone(); + thread::spawn(move || Server::new(descriptor)?.serve_listener(l)); + Ok(()) + } +} + +/// A server. +pub struct Server { + core: tokio_core::reactor::Core, + descriptor: Descriptor, +} + +impl Server { + /// Creates a new server for the descriptor. + pub fn new(descriptor: Descriptor) -> io::Result { + Ok(Server { + core: tokio_core::reactor::Core::new()?, + descriptor: descriptor, + }) + } + + /// Turn the current process into an descriptor. External servers + /// must call this early on. Expects 'stdin' to be a listening + /// TCP socket. + pub fn serve(&mut self) -> io::Result<()> { + self.serve_listener(unsafe { TcpListener::from_raw_fd(0) }) + } + + fn serve_listener(&mut self, l: TcpListener) -> io::Result<()> { + /* The first client tells us our cookie. */ + let mut i = l.accept()?; + let cookie = Cookie::receive(&mut i.0)?; + /* XXX: It'd be nice to recycle this connection. */ + drop(i); + + let handler = (self.descriptor.factory)(self.descriptor.clone()) + .ok_or( + io::Error::new(io::ErrorKind::BrokenPipe, "Failed to start server"))?; + + /* Tokioize. */ + let handle = self.core.handle(); + let a = l.local_addr()?; + let socket = tokio_core::net::TcpListener::from_listener(l, &a, &handle).unwrap(); + + let done = socket.incoming().and_then(|(socket, _addr)| { + let _ = socket.set_nodelay(true); + Cookie::receive_async(socket) + }).and_then(|(socket, buf)| { + if cookie.expect(Cookie::from(&buf)) { + Ok(socket) + } else { + Err(io::Error::new(io::ErrorKind::BrokenPipe, "Bad cookie.")) + } + }).for_each(|socket| { + let (reader, writer) = socket.split(); + + let network = + twoparty::VatNetwork::new(reader, writer, + Side::Server, Default::default()); + + let rpc_system = handler.handle(network); + handle.spawn(rpc_system.map_err(|e| println!("error: {:?}", e))); + Ok(()) + }); + + self.core.run(done) + } +} + +/// Cookies are used to authenticate clients. +#[derive(PartialEq)] +struct Cookie(Vec); + +extern crate rand; +use self::rand::Rng; +use self::rand::os::OsRng; + +const COOKIE_SIZE: usize = 32; + +impl Cookie { + /// Make a new cookie. + fn new() -> io::Result { + let mut c = vec![0; COOKIE_SIZE]; + OsRng::new()?.fill_bytes(&mut c); + Ok(Cookie(c)) + } + + /// Make a new cookie from a slice. + fn from(buf: &Vec) -> Option { + if buf.len() == COOKIE_SIZE { + let mut c = Vec::::with_capacity(COOKIE_SIZE); + c.extend_from_slice(buf); + Some(Cookie(c)) + } else { + None + } + } + + /// Given a vector starting with a cookie, extract it and return + /// the rest. + fn extract(mut buf: Vec) -> Option<(Self, Vec)> { + if buf.len() >= COOKIE_SIZE { + let r = buf.split_off(COOKIE_SIZE); + Some((Cookie(buf), r)) + } else { + None + } + } + + /// Read a cookie from 'from'. + fn receive(from: &mut R) -> io::Result { + let mut buf = vec![0; COOKIE_SIZE]; + from.read_exact(&mut buf)?; + Ok(Cookie(buf)) + } + + /// Asynchronously read a cookie from 'socket'. + fn receive_async(socket: net::TcpStream) -> ReadExact> { + let buf = vec![0; COOKIE_SIZE]; + tokio_io::io::read_exact(socket, buf) + } + + + /// Check that an asynchronously received cookie matches this + /// cookie. + fn expect(&self, other: Option) -> bool { + if let Some(c) = other { + c.0 == self.0 + } else { + false + } + } + + /// Write a cookie to 'to'. + fn send(&self, to: &mut W) -> io::Result<()> { + to.write_all(&self.0)?; + Ok(()) + } +} diff --git a/net/src/lib.rs b/net/src/lib.rs index f93608fe..651ace16 100644 --- a/net/src/lib.rs +++ b/net/src/lib.rs @@ -38,9 +38,12 @@ extern crate hyper; extern crate hyper_tls; extern crate native_tls; extern crate tokio_core; +extern crate tokio_io; #[macro_use] extern crate percent_encoding; +extern crate capnp_rpc; + use percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; use self::futures::{Future, Stream}; use self::hyper::client::{FutureResponse, HttpConnector}; @@ -58,6 +61,8 @@ use openpgp::tpk::{self, TPK}; use openpgp::types::KeyId; use openpgp::{Message, armor}; +pub mod ipc; + define_encode_set! { /// Encoding used for submitting keys. /// -- cgit v1.2.3