summaryrefslogtreecommitdiffstats
path: root/ipc
diff options
context:
space:
mode:
authorJustus Winter <justus@sequoia-pgp.org>2019-01-11 15:54:10 +0100
committerJustus Winter <justus@sequoia-pgp.org>2019-05-03 15:20:37 +0200
commit90782208aa073c21fdf6d7e37575f58677c969b5 (patch)
tree0374ccb5b3b0e65f54ef4508fa903270014b3173 /ipc
parent4ef67bfa4208f6944edadbf0171958cd3172ea22 (diff)
ipc: New crate.
Diffstat (limited to 'ipc')
-rw-r--r--ipc/Cargo.toml32
-rw-r--r--ipc/README.md2
-rw-r--r--ipc/src/lib.rs468
3 files changed, 502 insertions, 0 deletions
diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml
new file mode 100644
index 00000000..27b9aaa0
--- /dev/null
+++ b/ipc/Cargo.toml
@@ -0,0 +1,32 @@
+[package]
+name = "sequoia-ipc"
+description = "Interprocess communication infrastructure for Sequoia"
+version = "0.6.0"
+authors = [
+ "Justus Winter <justus@sequoia-pgp.org>",
+ "Kai Michaelis <kai@sequoia-pgp.org>",
+ "Neal H. Walfield <neal@sequoia-pgp.org>",
+]
+documentation = "https://docs.sequoia-pgp.org/sequoia_ipc"
+homepage = "https://sequoia-pgp.org/"
+repository = "https://gitlab.com/sequoia-pgp/sequoia"
+readme = "README.md"
+license = "GPL-3.0"
+
+[badges]
+gitlab = { repository = "sequoia-pgp/sequoia" }
+maintenance = { status = "actively-developed" }
+
+[dependencies]
+sequoia-openpgp = { path = "../openpgp" }
+sequoia-core = { path = "../core" }
+
+capnp-rpc = "0.9"
+failure = "0.1.2"
+fs2 = "0.4.2"
+futures = "0.1"
+libc = "0.2.33"
+memsec = "0.5.4"
+rand = "0.6"
+tokio-core = "0.1"
+tokio-io = "0.1.4"
diff --git a/ipc/README.md b/ipc/README.md
new file mode 100644
index 00000000..d1820a87
--- /dev/null
+++ b/ipc/README.md
@@ -0,0 +1,2 @@
+Sequoia uses this to communicate with backend servers. This is an
+implementation detail. Do not use this crate.
diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs
new file mode 100644
index 00000000..70ef3cf1
--- /dev/null
+++ b/ipc/src/lib.rs
@@ -0,0 +1,468 @@
+//! Low-level IPC mechanism for Sequoia.
+//!
+//! # Rationale
+//!
+//! Sequoia makes use of background services e.g. for managing and
+//! updating public keys.
+//!
+//! # Design
+//!
+//! We use the filesystem as namespace to discover services. Every
+//! service has a file called rendezvous point. Access to this file
+//! is serialized using file locking. This file contains a socket
+//! address and a cookie that we use to connect to the server and
+//! authenticate us. If the file does not exist, is malformed, or
+//! does not point to a usable server, we start a new one on demand.
+//!
+//! This design mimics Unix sockets, but works on Windows too.
+//!
+//! # External vs internal servers
+//!
+//! These servers can be either in external processes, or co-located
+//! within the current process. We will first start an external
+//! process, and fall back to starting a thread instead.
+//!
+//! Using an external process is the preferred option. It allows us
+//! to continuously update the keys in the keystore, for example. It
+//! also means that we do not spawn a thread in your process, which is
+//! frowned upon for various reasons.
+//!
+//! Please see [IPCPolicy] for more information.
+//!
+//! [IPCPolicy]: ../../sequoia_core/enum.IPCPolicy.html
+//!
+//! # Note
+//!
+//! Windows support is currently not implemented, but should be
+//! straight forward.
+
+use std::fs;
+use std::io::{self, Read, Write};
+use std::net::{SocketAddr, AddrParseError, TcpStream, TcpListener};
+use std::path::PathBuf;
+
+extern crate capnp_rpc;
+#[macro_use] extern crate failure;
+extern crate fs2;
+extern crate futures;
+extern crate memsec;
+extern crate tokio_core;
+extern crate tokio_io;
+
+use failure::Fallible as Result;
+use fs2::FileExt;
+use futures::{Future, Stream};
+
+use tokio_core::net;
+use tokio_io::io::{ReadHalf, ReadExact};
+use tokio_io::AsyncRead;
+
+use capnp_rpc::{RpcSystem, twoparty};
+use capnp_rpc::rpc_twoparty_capnp::Side;
+
+/* Unix-specific options. */
+use std::os::unix::io::FromRawFd;
+use std::os::unix::fs::OpenOptionsExt;
+
+/* XXX: Implement Windows support. */
+
+use std::process::{Command, Stdio};
+use std::os::unix::io::AsRawFd;
+
+use std::thread;
+
+extern crate sequoia_core;
+
+use sequoia_core as core;
+
+/// Servers need to implement this trait.
+pub trait Handler {
+ /// Called on every connection.
+ fn handle(&self,
+ network: twoparty::VatNetwork<ReadHalf<net::TcpStream>>)
+ -> RpcSystem<Side>;
+}
+
+/// A factory for handlers.
+pub type HandlerFactory = fn(descriptor: Descriptor,
+ handle: tokio_core::reactor::Handle)
+ -> Result<Box<Handler>>;
+
+/// A descriptor is used to connect to a service.
+#[derive(Clone)]
+pub struct Descriptor {
+ ctx: core::Context,
+ rendezvous: PathBuf,
+ executable: PathBuf,
+ factory: HandlerFactory,
+}
+
+const LOCALHOST: &str = "127.0.0.1";
+
+impl Descriptor {
+ /// Create a descriptor given its rendezvous point, the path to
+ /// the servers executable file, and a handler factory.
+ pub fn new(ctx: &core::Context, rendezvous: PathBuf,
+ executable: PathBuf, factory: HandlerFactory)
+ -> Self {
+ Descriptor {
+ ctx: ctx.clone(),
+ rendezvous: rendezvous,
+ executable: executable,
+ factory: factory,
+ }
+ }
+
+ /// Returns the context.
+ pub fn context(&self) -> &core::Context {
+ &self.ctx
+ }
+
+ /// Connects to a descriptor, starting the server if necessary.
+ pub fn connect(&self, handle: &tokio_core::reactor::Handle)
+ -> Result<RpcSystem<Side>> {
+ self.connect_with_policy(handle, *self.ctx.ipc_policy())
+ }
+
+ /// Connects to a descriptor, starting the server if necessary.
+ ///
+ /// This function does not use the contexts IPC policy, but uses
+ /// the given one.
+ pub fn connect_with_policy(&self, handle: &tokio_core::reactor::Handle,
+ policy: core::IPCPolicy)
+ -> Result<RpcSystem<Side>> {
+ let do_connect =
+ move |cookie: Cookie, mut s: TcpStream| -> Result<RpcSystem<Side>> {
+ cookie.send(&mut s)?;
+
+ /* Tokioize. */
+ let stream = net::TcpStream::from_stream(s, &handle)?;
+ stream.set_nodelay(true)?;
+ let (reader, writer) = stream.split();
+
+ let network =
+ Box::new(twoparty::VatNetwork::new(reader, writer,
+ Side::Client,
+ Default::default()));
+ let rpc_system = RpcSystem::new(network, None);
+
+ Ok(rpc_system)
+ };
+
+ let mut file = fs::OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create(true)
+ .mode(0o600)
+ .open(&self.rendezvous)?;
+ file.lock_exclusive()?;
+
+ let mut c = vec![];
+ file.read_to_end(&mut c)?;
+
+ if let Some((cookie, a)) = Cookie::extract(c) {
+ let addr: ::std::result::Result<SocketAddr, AddrParseError> =
+ String::from_utf8_lossy(&a).parse();
+ if addr.is_err() {
+ /* Malformed. Invalidate the cookie and try again. */
+ file.set_len(0)?;
+ drop(file);
+ return self.connect(handle);
+ }
+
+ let stream = TcpStream::connect(addr.unwrap());
+ if let Ok(s) = stream {
+ do_connect(cookie, s)
+ } else {
+ /* Failed to connect. Invalidate the cookie and try again. */
+ file.set_len(0)?;
+ drop(file);
+ self.connect(handle)
+ }
+ } else {
+ let cookie = Cookie::new()?;
+ for external in [true, false].iter() {
+ // Implement the IPC pocicy.
+ if policy == core::IPCPolicy::Internal && *external {
+ // Do not try to fork.
+ continue;
+ }
+
+ let addr = match self.start(*external) {
+ Ok(a) => a,
+ Err(e) => if *external {
+ if policy == core::IPCPolicy::External {
+ // Fail!
+ return Err(e);
+ }
+
+ // Try to spawn a thread next.
+ continue;
+ } else {
+ // Failed to spawn a thread.
+ return Err(e);
+ }
+ };
+
+ let mut stream = TcpStream::connect(addr)?;
+ cookie.send(&mut stream)?;
+
+ /* XXX: It'd be nice not to waste this connection. */
+ drop(stream);
+
+ if *external {
+ /* Write connection information to file. */
+ file.set_len(0)?;
+ cookie.send(&mut file)?;
+ write!(file, "{}:{}", LOCALHOST, addr.port())?;
+ }
+ drop(file);
+
+ return do_connect(cookie, TcpStream::connect(addr)?);
+ }
+ unreachable!();
+ }
+ }
+
+ /// Try to create a TCP socket, bind it to a random port on
+ /// localhost.
+ fn listen(&self) -> Result<TcpListener> {
+ let port = OsRng::new()?.next_u32() as u16;
+ Ok(TcpListener::bind((LOCALHOST, port))?)
+ }
+
+ /// Start the service, either as an external process or as a
+ /// thread.
+ fn start(&self, external: bool) -> Result<SocketAddr> {
+ /* Listen on a random port on localhost. */
+ let mut listener = self.listen();
+ while listener.is_err() {
+ listener = self.listen();
+ }
+ let listener = listener.unwrap();
+ let addr = listener.local_addr()?;
+
+ /* Start the server, connect to it, and send the cookie. */
+ if external {
+ self.fork(listener)?;
+ } else {
+ self.spawn(listener)?;
+ }
+
+ Ok(addr)
+ }
+
+ fn fork(&self, l: TcpListener) -> Result<()> {
+ // Convert to raw fd, then forget l so that it will not be
+ // closed when it is dropped.
+ let fd = l.as_raw_fd();
+ ::std::mem::forget(l);
+
+ Command::new(&self.executable.clone().into_os_string())
+ .arg("--home")
+ .arg(self.ctx.home().to_string_lossy().into_owned())
+ .arg("--lib")
+ .arg(self.ctx.home().to_string_lossy().into_owned())
+ .arg("--ephemeral")
+ .arg(format!("{}", self.ctx.ephemeral()))
+ // l will be closed here if the exec fails.
+ .stdin(unsafe { Stdio::from_raw_fd(fd) })
+ .spawn()?;
+ Ok(())
+ }
+
+ fn spawn(&self, l: TcpListener) -> Result<()> {
+ let descriptor = self.clone();
+ thread::spawn(move || -> Result<()> {
+ Ok(Server::new(descriptor)
+ .expect("Failed to spawn server") // XXX
+ .serve_listener(l)
+ .expect("Failed to spawn server")) // XXX
+ });
+ Ok(())
+ }
+}
+
+/// A server.
+pub struct Server {
+ core: tokio_core::reactor::Core,
+ descriptor: Descriptor,
+}
+
+impl Server {
+ /// Creates a new server for the descriptor.
+ pub fn new(descriptor: Descriptor) -> Result<Self> {
+ Ok(Server {
+ core: tokio_core::reactor::Core::new()?,
+ descriptor: descriptor,
+ })
+ }
+
+ /// Creates a Context from `env::args()`.
+ pub fn context() -> Result<core::Context> {
+ use std::env::args;
+ let args: Vec<String> = args().collect();
+
+ if args.len() != 7 || args[1] != "--home"
+ || args[3] != "--lib" || args[5] != "--ephemeral" {
+ return Err(format_err!(
+ "Usage: {} --home <HOMEDIR> --lib <LIBDIR> \
+ --ephemeral true|false", args[0]));
+ }
+
+ let mut cfg = core::Context::configure("org.sequoia.api.server")
+ .home(&args[2]).lib(&args[4]);
+
+ if let Ok(ephemeral) = args[6].parse() {
+ if ephemeral {
+ cfg.set_ephemeral();
+ }
+ } else {
+ return Err(format_err!(
+ "Expected 'true' or 'false' for --ephemeral, got: {}",
+ args[6]));
+ }
+
+ cfg.build()
+ }
+
+ /// Turns this process into a server.
+ ///
+ /// External servers must call this early on. Expects 'stdin' to
+ /// be a listening TCP socket.
+ ///
+ /// # Example
+ ///
+ /// ```compile_fail
+ /// // We cannot run this because sequoia-store is not built yet.
+ /// extern crate sequoia_core;
+ /// extern crate sequoia_net;
+ /// extern crate sequoia_store;
+ ///
+ /// use sequoia_ipc::Server;
+ ///
+ /// fn main() {
+ /// let ctx = Server::context()
+ /// .expect("Failed to create context");
+ /// Server::new(sequoia_store::descriptor(&ctx))
+ /// .expect("Failed to create server")
+ /// .serve()
+ /// .expect("Failed to start server");
+ /// }
+ /// ```
+ pub fn serve(&mut self) -> Result<()> {
+ self.serve_listener(unsafe { TcpListener::from_raw_fd(0) })
+ }
+
+ fn serve_listener(&mut self, l: TcpListener) -> Result<()> {
+ /* The first client tells us our cookie. */
+ let mut i = l.accept()?;
+ let cookie = Cookie::receive(&mut i.0)?;
+ /* XXX: It'd be nice to recycle this connection. */
+ drop(i);
+
+ let handler = (self.descriptor.factory)(self.descriptor.clone(), self.core.handle())?;
+
+ /* Tokioize. */
+ let handle = self.core.handle();
+ let a = l.local_addr()?;
+ let socket = tokio_core::net::TcpListener::from_listener(l, &a, &handle).unwrap();
+
+ let done = socket.incoming().and_then(|(socket, _addr)| {
+ let _ = socket.set_nodelay(true);
+ Cookie::receive_async(socket)
+ }).and_then(|(socket, buf)| {
+ if Cookie::from(&buf).map(|c| c == cookie).unwrap_or(false) {
+ Ok(socket)
+ } else {
+ Err(io::Error::new(io::ErrorKind::BrokenPipe, "Bad cookie."))
+ }
+ }).for_each(|socket| {
+ let (reader, writer) = socket.split();
+
+ let network =
+ twoparty::VatNetwork::new(reader, writer,
+ Side::Server, Default::default());
+
+ let rpc_system = handler.handle(network);
+ handle.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
+ Ok(())
+ });
+
+ Ok(self.core.run(done)?)
+ }
+}
+
+/// Cookies are used to authenticate clients.
+struct Cookie(Vec<u8>);
+
+extern crate rand;
+use self::rand::RngCore;
+use self::rand::rngs::OsRng;
+
+const COOKIE_SIZE: usize = 32;
+
+impl Cookie {
+ /// Make a new cookie.
+ fn new() -> Result<Self> {
+ let mut c = vec![0; COOKIE_SIZE];
+ OsRng::new()?.fill_bytes(&mut c);
+ Ok(Cookie(c))
+ }
+
+ /// Make a new cookie from a slice.
+ fn from(buf: &Vec<u8>) -> Option<Self> {
+ if buf.len() == COOKIE_SIZE {
+ let mut c = Vec::<u8>::with_capacity(COOKIE_SIZE);
+ c.extend_from_slice(buf);
+ Some(Cookie(c))
+ } else {
+ None
+ }
+ }
+
+ /// Given a vector starting with a cookie, extract it and return
+ /// the rest.
+ fn extract(mut buf: Vec<u8>) -> Option<(Self, Vec<u8>)> {
+ if buf.len() >= COOKIE_SIZE {
+ let r = buf.split_off(COOKIE_SIZE);
+ Some((Cookie(buf), r))
+ } else {
+ None
+ }
+ }
+
+ /// Read a cookie from 'from'.
+ fn receive<R: Read>(from: &mut R) -> Result<Self> {
+ let mut buf = vec![0; COOKIE_SIZE];
+ from.read_exact(&mut buf)?;
+ Ok(Cookie(buf))
+ }
+
+ /// Asynchronously read a cookie from 'socket'.
+ fn receive_async(socket: net::TcpStream) -> ReadExact<net::TcpStream,
+ Vec<u8>> {
+ let buf = vec![0; COOKIE_SIZE];
+ tokio_io::io::read_exact(socket, buf)
+ }
+
+
+ /// Write a cookie to 'to'.
+ fn send<W: Write>(&self, to: &mut W) -> io::Result<()> {
+ to.write_all(&self.0)?;
+ Ok(())
+ }
+}
+
+impl PartialEq for Cookie {
+ fn eq(&self, other: &Cookie) -> bool {
+ // First, compare the length.
+ self.0.len() == other.0.len()
+ // The length is not a secret, hence we can use && here.
+ && unsafe {
+ ::memsec::memeq(self.0.as_ptr(),
+ other.0.as_ptr(),
+ self.0.len())
+ }
+ }
+}