summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorJustus Winter <justus@sequoia-pgp.org>2019-01-11 15:54:10 +0100
committerJustus Winter <justus@sequoia-pgp.org>2019-05-03 15:20:37 +0200
commit90782208aa073c21fdf6d7e37575f58677c969b5 (patch)
tree0374ccb5b3b0e65f54ef4508fa903270014b3173 /net
parent4ef67bfa4208f6944edadbf0171958cd3172ea22 (diff)
ipc: New crate.
Diffstat (limited to 'net')
-rw-r--r--net/Cargo.toml7
-rw-r--r--net/src/ipc.rs461
-rw-r--r--net/src/lib.rs4
3 files changed, 3 insertions, 469 deletions
diff --git a/net/Cargo.toml b/net/Cargo.toml
index 1974c308..ef837b8c 100644
--- a/net/Cargo.toml
+++ b/net/Cargo.toml
@@ -23,18 +23,17 @@ maintenance = { status = "actively-developed" }
sequoia-openpgp = { path = "../openpgp", version = "0.6" }
sequoia-core = { path = "../core", version = "0.6" }
-capnp-rpc = "0.9"
failure = "0.1.2"
-fs2 = "0.4.2"
futures = "0.1"
http = "0.1.5"
hyper = "0.12"
hyper-tls = "0.3"
libc = "0.2.33"
-memsec = "0.5.4"
native-tls = "0.2.0"
percent-encoding = "1.0.1"
-rand = "0.6"
tokio-core = "0.1"
tokio-io = "0.1.4"
url = "1.6.0"
+
+[dev-dependencies]
+rand = "0.6"
diff --git a/net/src/ipc.rs b/net/src/ipc.rs
deleted file mode 100644
index 17ddacf0..00000000
--- a/net/src/ipc.rs
+++ /dev/null
@@ -1,461 +0,0 @@
-//! Low-level IPC mechanism for Sequoia.
-//!
-//! # Rationale
-//!
-//! Sequoia makes use of background services e.g. for managing and
-//! updating public keys.
-//!
-//! # Design
-//!
-//! We use the filesystem as namespace to discover services. Every
-//! service has a file called rendezvous point. Access to this file
-//! is serialized using file locking. This file contains a socket
-//! address and a cookie that we use to connect to the server and
-//! authenticate us. If the file does not exist, is malformed, or
-//! does not point to a usable server, we start a new one on demand.
-//!
-//! This design mimics Unix sockets, but works on Windows too.
-//!
-//! # External vs internal servers
-//!
-//! These servers can be either in external processes, or co-located
-//! within the current process. We will first start an external
-//! process, and fall back to starting a thread instead.
-//!
-//! Using an external process is the preferred option. It allows us
-//! to continuously update the keys in the keystore, for example. It
-//! also means that we do not spawn a thread in your process, which is
-//! frowned upon for various reasons.
-//!
-//! Please see [IPCPolicy] for more information.
-//!
-//! [IPCPolicy]: ../../sequoia_core/enum.IPCPolicy.html
-//!
-//! # Note
-//!
-//! Windows support is currently not implemented, but should be
-//! straight forward.
-
-extern crate fs2;
-use self::fs2::FileExt;
-
-use std::fs;
-use std::io::{self, Read, Write};
-use std::net::{SocketAddr, AddrParseError, TcpStream, TcpListener};
-use std::path::PathBuf;
-
-use futures::{Future, Stream};
-
-use tokio_core::{self, net};
-use tokio_io;
-use tokio_io::io::{ReadHalf, ReadExact};
-use tokio_io::AsyncRead;
-
-use capnp_rpc::{RpcSystem, twoparty};
-use capnp_rpc::rpc_twoparty_capnp::Side;
-
-/* Unix-specific options. */
-use std::os::unix::io::FromRawFd;
-use std::os::unix::fs::OpenOptionsExt;
-
-/* XXX: Implement Windows support. */
-
-use std::process::{Command, Stdio};
-use std::os::unix::io::AsRawFd;
-
-use std::thread;
-
-use sequoia_core as core;
-use super::Result;
-
-/// Servers need to implement this trait.
-pub trait Handler {
- /// Called on every connection.
- fn handle(&self,
- network: twoparty::VatNetwork<ReadHalf<net::TcpStream>>)
- -> RpcSystem<Side>;
-}
-
-/// A factory for handlers.
-pub type HandlerFactory = fn(descriptor: Descriptor,
- handle: tokio_core::reactor::Handle)
- -> Result<Box<Handler>>;
-
-/// A descriptor is used to connect to a service.
-#[derive(Clone)]
-pub struct Descriptor {
- ctx: core::Context,
- rendezvous: PathBuf,
- executable: PathBuf,
- factory: HandlerFactory,
-}
-
-const LOCALHOST: &str = "127.0.0.1";
-
-impl Descriptor {
- /// Create a descriptor given its rendezvous point, the path to
- /// the servers executable file, and a handler factory.
- pub fn new(ctx: &core::Context, rendezvous: PathBuf,
- executable: PathBuf, factory: HandlerFactory)
- -> Self {
- Descriptor {
- ctx: ctx.clone(),
- rendezvous: rendezvous,
- executable: executable,
- factory: factory,
- }
- }
-
- /// Returns the context.
- pub fn context(&self) -> &core::Context {
- &self.ctx
- }
-
- /// Connects to a descriptor, starting the server if necessary.
- pub fn connect(&self, handle: &tokio_core::reactor::Handle)
- -> Result<RpcSystem<Side>> {
- self.connect_with_policy(handle, *self.ctx.ipc_policy())
- }
-
- /// Connects to a descriptor, starting the server if necessary.
- ///
- /// This function does not use the contexts IPC policy, but uses
- /// the given one.
- pub fn connect_with_policy(&self, handle: &tokio_core::reactor::Handle,
- policy: core::IPCPolicy)
- -> Result<RpcSystem<Side>> {
- let do_connect =
- move |cookie: Cookie, mut s: TcpStream| -> Result<RpcSystem<Side>> {
- cookie.send(&mut s)?;
-
- /* Tokioize. */
- let stream = net::TcpStream::from_stream(s, &handle)?;
- stream.set_nodelay(true)?;
- let (reader, writer) = stream.split();
-
- let network =
- Box::new(twoparty::VatNetwork::new(reader, writer,
- Side::Client,
- Default::default()));
- let rpc_system = RpcSystem::new(network, None);
-
- Ok(rpc_system)
- };
-
- let mut file = fs::OpenOptions::new()
- .read(true)
- .write(true)
- .create(true)
- .mode(0o600)
- .open(&self.rendezvous)?;
- file.lock_exclusive()?;
-
- let mut c = vec![];
- file.read_to_end(&mut c)?;
-
- if let Some((cookie, a)) = Cookie::extract(c) {
- let addr: ::std::result::Result<SocketAddr, AddrParseError> =
- String::from_utf8_lossy(&a).parse();
- if addr.is_err() {
- /* Malformed. Invalidate the cookie and try again. */
- file.set_len(0)?;
- drop(file);
- return self.connect(handle);
- }
-
- let stream = TcpStream::connect(addr.unwrap());
- if let Ok(s) = stream {
- do_connect(cookie, s)
- } else {
- /* Failed to connect. Invalidate the cookie and try again. */
- file.set_len(0)?;
- drop(file);
- self.connect(handle)
- }
- } else {
- let cookie = Cookie::new()?;
- for external in [true, false].iter() {
- // Implement the IPC pocicy.
- if policy == core::IPCPolicy::Internal && *external {
- // Do not try to fork.
- continue;
- }
-
- let addr = match self.start(*external) {
- Ok(a) => a,
- Err(e) => if *external {
- if policy == core::IPCPolicy::External {
- // Fail!
- return Err(e);
- }
-
- // Try to spawn a thread next.
- continue;
- } else {
- // Failed to spawn a thread.
- return Err(e);
- }
- };
-
- let mut stream = TcpStream::connect(addr)?;
- cookie.send(&mut stream)?;
-
- /* XXX: It'd be nice not to waste this connection. */
- drop(stream);
-
- if *external {
- /* Write connection information to file. */
- file.set_len(0)?;
- cookie.send(&mut file)?;
- write!(file, "{}:{}", LOCALHOST, addr.port())?;
- }
- drop(file);
-
- return do_connect(cookie, TcpStream::connect(addr)?);
- }
- unreachable!();
- }
- }
-
- /// Try to create a TCP socket, bind it to a random port on
- /// localhost.
- fn listen(&self) -> Result<TcpListener> {
- let port = OsRng::new()?.next_u32() as u16;
- Ok(TcpListener::bind((LOCALHOST, port))?)
- }
-
- /// Start the service, either as an external process or as a
- /// thread.
- fn start(&self, external: bool) -> Result<SocketAddr> {
- /* Listen on a random port on localhost. */
- let mut listener = self.listen();
- while listener.is_err() {
- listener = self.listen();
- }
- let listener = listener.unwrap();
- let addr = listener.local_addr()?;
-
- /* Start the server, connect to it, and send the cookie. */
- if external {
- self.fork(listener)?;
- } else {
- self.spawn(listener)?;
- }
-
- Ok(addr)
- }
-
- fn fork(&self, l: TcpListener) -> Result<()> {
- // Convert to raw fd, then forget l so that it will not be
- // closed when it is dropped.
- let fd = l.as_raw_fd();
- ::std::mem::forget(l);
-
- Command::new(&self.executable.clone().into_os_string())
- .arg("--home")
- .arg(self.ctx.home().to_string_lossy().into_owned())
- .arg("--lib")
- .arg(self.ctx.home().to_string_lossy().into_owned())
- .arg("--ephemeral")
- .arg(format!("{}", self.ctx.ephemeral()))
- // l will be closed here if the exec fails.
- .stdin(unsafe { Stdio::from_raw_fd(fd) })
- .spawn()?;
- Ok(())
- }
-
- fn spawn(&self, l: TcpListener) -> Result<()> {
- let descriptor = self.clone();
- thread::spawn(move || -> Result<()> {
- Ok(Server::new(descriptor)
- .expect("Failed to spawn server") // XXX
- .serve_listener(l)
- .expect("Failed to spawn server")) // XXX
- });
- Ok(())
- }
-}
-
-/// A server.
-pub struct Server {
- core: tokio_core::reactor::Core,
- descriptor: Descriptor,
-}
-
-impl Server {
- /// Creates a new server for the descriptor.
- pub fn new(descriptor: Descriptor) -> Result<Self> {
- Ok(Server {
- core: tokio_core::reactor::Core::new()?,
- descriptor: descriptor,
- })
- }
-
- /// Creates a Context from `env::args()`.
- pub fn context() -> Result<core::Context> {
- use std::env::args;
- let args: Vec<String> = args().collect();
-
- if args.len() != 7 || args[1] != "--home"
- || args[3] != "--lib" || args[5] != "--ephemeral" {
- return Err(format_err!(
- "Usage: {} --home <HOMEDIR> --lib <LIBDIR> \
- --ephemeral true|false", args[0]));
- }
-
- let mut cfg = core::Context::configure("org.sequoia.api.server")
- .home(&args[2]).lib(&args[4]);
-
- if let Ok(ephemeral) = args[6].parse() {
- if ephemeral {
- cfg.set_ephemeral();
- }
- } else {
- return Err(format_err!(
- "Expected 'true' or 'false' for --ephemeral, got: {}",
- args[6]));
- }
-
- cfg.build()
- }
-
- /// Turns this process into a server.
- ///
- /// External servers must call this early on. Expects 'stdin' to
- /// be a listening TCP socket.
- ///
- /// # Example
- ///
- /// ```compile_fail
- /// // We cannot run this because sequoia-store is not built yet.
- /// extern crate sequoia_core;
- /// extern crate sequoia_net;
- /// extern crate sequoia_store;
- ///
- /// use sequoia_net::ipc::Server;
- ///
- /// fn main() {
- /// let ctx = Server::context()
- /// .expect("Failed to create context");
- /// Server::new(sequoia_store::descriptor(&ctx))
- /// .expect("Failed to create server")
- /// .serve()
- /// .expect("Failed to start server");
- /// }
- /// ```
- pub fn serve(&mut self) -> Result<()> {
- self.serve_listener(unsafe { TcpListener::from_raw_fd(0) })
- }
-
- fn serve_listener(&mut self, l: TcpListener) -> Result<()> {
- /* The first client tells us our cookie. */
- let mut i = l.accept()?;
- let cookie = Cookie::receive(&mut i.0)?;
- /* XXX: It'd be nice to recycle this connection. */
- drop(i);
-
- let handler = (self.descriptor.factory)(self.descriptor.clone(), self.core.handle())?;
-
- /* Tokioize. */
- let handle = self.core.handle();
- let a = l.local_addr()?;
- let socket = tokio_core::net::TcpListener::from_listener(l, &a, &handle).unwrap();
-
- let done = socket.incoming().and_then(|(socket, _addr)| {
- let _ = socket.set_nodelay(true);
- Cookie::receive_async(socket)
- }).and_then(|(socket, buf)| {
- if Cookie::from(&buf).map(|c| c == cookie).unwrap_or(false) {
- Ok(socket)
- } else {
- Err(io::Error::new(io::ErrorKind::BrokenPipe, "Bad cookie."))
- }
- }).for_each(|socket| {
- let (reader, writer) = socket.split();
-
- let network =
- twoparty::VatNetwork::new(reader, writer,
- Side::Server, Default::default());
-
- let rpc_system = handler.handle(network);
- handle.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
- Ok(())
- });
-
- Ok(self.core.run(done)?)
- }
-}
-
-/// Cookies are used to authenticate clients.
-struct Cookie(Vec<u8>);
-
-extern crate rand;
-use self::rand::RngCore;
-use self::rand::rngs::OsRng;
-
-const COOKIE_SIZE: usize = 32;
-
-impl Cookie {
- /// Make a new cookie.
- fn new() -> Result<Self> {
- let mut c = vec![0; COOKIE_SIZE];
- OsRng::new()?.fill_bytes(&mut c);
- Ok(Cookie(c))
- }
-
- /// Make a new cookie from a slice.
- fn from(buf: &Vec<u8>) -> Option<Self> {
- if buf.len() == COOKIE_SIZE {
- let mut c = Vec::<u8>::with_capacity(COOKIE_SIZE);
- c.extend_from_slice(buf);
- Some(Cookie(c))
- } else {
- None
- }
- }
-
- /// Given a vector starting with a cookie, extract it and return
- /// the rest.
- fn extract(mut buf: Vec<u8>) -> Option<(Self, Vec<u8>)> {
- if buf.len() >= COOKIE_SIZE {
- let r = buf.split_off(COOKIE_SIZE);
- Some((Cookie(buf), r))
- } else {
- None
- }
- }
-
- /// Read a cookie from 'from'.
- fn receive<R: Read>(from: &mut R) -> Result<Self> {
- let mut buf = vec![0; COOKIE_SIZE];
- from.read_exact(&mut buf)?;
- Ok(Cookie(buf))
- }
-
- /// Asynchronously read a cookie from 'socket'.
- fn receive_async(socket: net::TcpStream) -> ReadExact<net::TcpStream,
- Vec<u8>> {
- let buf = vec![0; COOKIE_SIZE];
- tokio_io::io::read_exact(socket, buf)
- }
-
-
- /// Write a cookie to 'to'.
- fn send<W: Write>(&self, to: &mut W) -> io::Result<()> {
- to.write_all(&self.0)?;
- Ok(())
- }
-}
-
-impl PartialEq for Cookie {
- fn eq(&self, other: &Cookie) -> bool {
- // First, compare the length.
- self.0.len() == other.0.len()
- // The length is not a secret, hence we can use && here.
- && unsafe {
- ::memsec::memeq(self.0.as_ptr(),
- other.0.as_ptr(),
- self.0.len())
- }
- }
-}
diff --git a/net/src/lib.rs b/net/src/lib.rs
index fc674df5..db52333d 100644
--- a/net/src/lib.rs
+++ b/net/src/lib.rs
@@ -41,7 +41,6 @@ extern crate futures;
extern crate http;
extern crate hyper;
extern crate hyper_tls;
-extern crate memsec;
extern crate native_tls;
extern crate tokio_core;
extern crate tokio_io;
@@ -49,8 +48,6 @@ extern crate tokio_io;
extern crate percent_encoding;
extern crate url;
-extern crate capnp_rpc;
-
use hyper::client::{ResponseFuture, HttpConnector};
use hyper::{Client, Request, Body};
use hyper_tls::HttpsConnector;
@@ -65,7 +62,6 @@ use sequoia_core::Context;
pub mod async;
use async::url2uri;
-pub mod ipc;
/// For accessing keyservers using HKP.
pub struct KeyServer {