diff options
-rw-r--r-- | net/src/ipc.rs | 5 | ||||
-rw-r--r-- | store/Cargo.toml | 2 | ||||
-rw-r--r-- | store/src/backend.rs | 320 | ||||
-rw-r--r-- | store/src/lib.rs | 69 | ||||
-rw-r--r-- | store/src/macros.rs | 27 | ||||
-rw-r--r-- | store/src/store_protocol.capnp | 20 | ||||
-rw-r--r-- | tool/src/main.rs | 19 |
7 files changed, 401 insertions, 61 deletions
diff --git a/net/src/ipc.rs b/net/src/ipc.rs index fef222ad..7bbb529e 100644 --- a/net/src/ipc.rs +++ b/net/src/ipc.rs @@ -65,7 +65,8 @@ pub trait Handler { } /// A factory for handlers. -pub type HandlerFactory = fn(descriptor: Descriptor) -> Option<Box<Handler>>; +pub type HandlerFactory = fn(descriptor: Descriptor, + handle: tokio_core::reactor::Handle) -> Option<Box<Handler>>; /// A descriptor is used to connect to a service. #[derive(Clone)] @@ -231,7 +232,7 @@ impl Server { /* XXX: It'd be nice to recycle this connection. */ drop(i); - let handler = (self.descriptor.factory)(self.descriptor.clone()) + let handler = (self.descriptor.factory)(self.descriptor.clone(), self.core.handle()) .ok_or( io::Error::new(io::ErrorKind::BrokenPipe, "Failed to start server"))?; diff --git a/store/Cargo.toml b/store/Cargo.toml index d994f408..47fbd373 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -15,7 +15,9 @@ sequoia-net = { path = "../net" } capnp = "0.8" capnp-rpc = "0.8" futures = "0.1.17" +rand = "0.3" rusqlite = "0.12.0" +time = "0.1.38" tokio-core = "0.1.10" tokio-io = "0.1.4" diff --git a/store/src/backend.rs b/store/src/backend.rs index e405c059..48c192cb 100644 --- a/store/src/backend.rs +++ b/store/src/backend.rs @@ -1,6 +1,8 @@ //! Storage backend. +use std::cmp; use std::fmt; +use std::io; use std::ops::Add; use std::rc::Rc; use std::time::{SystemTime, Duration, UNIX_EPOCH}; @@ -9,22 +11,50 @@ use capnp::capability::Promise; use capnp; use capnp_rpc::rpc_twoparty_capnp::Side; use capnp_rpc::{self, RpcSystem, twoparty}; +use futures::Future; +use futures::future::{loop_fn, Loop}; +use rand::distributions::{IndependentSample, Range}; +use rand::thread_rng; use rusqlite::Connection; use rusqlite::types::{ToSql, ToSqlOutput}; use rusqlite; +use tokio_core::reactor::{Handle, Timeout}; use tokio_core; use tokio_io::io::ReadHalf; +use openpgp; use openpgp::tpk::{self, TPK}; use sequoia_core as core; +use sequoia_net as net; use sequoia_net::ipc; use store_protocol_capnp::node; +/* Configuration and policy. */ + +/// Minimum sleep time. +fn min_sleep_time() -> Duration { + Duration::new(60 * 5, 0) // 5 minutes. +} + +/// Interval after which all keys should be refreshed once. +fn refresh_interval() -> Duration { + Duration::new(60 * 60 * 24 * 7, 0) // A week. +} + +/// Returns a value from the uniform distribution over [0, 2*d). +/// +/// This function is used to randomize key refresh times. +fn random_duration(d: Duration) -> Duration { + Duration::new(Range::new(0, 2 * d.as_secs()).ind_sample(&mut thread_rng()), 0) +} + +/* Entry point. */ + /// Makes backends. #[doc(hidden)] -pub fn factory(descriptor: ipc::Descriptor) -> Option<Box<ipc::Handler>> { - match Backend::new(descriptor) { +pub fn factory(descriptor: ipc::Descriptor, handle: Handle) -> Option<Box<ipc::Handler>> { + match Backend::new(descriptor, handle) { Ok(backend) => Some(Box::new(backend)), Err(_) => None, } @@ -35,9 +65,9 @@ struct Backend { } impl Backend { - fn new(descriptor: ipc::Descriptor) -> Result<Self> { + fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result<Self> { Ok(Backend { - store: node::ToClient::new(NodeServer::new(descriptor)?) + store: node::ToClient::new(NodeServer::new(descriptor, handle)?) .from_server::<capnp_rpc::Server>(), }) } @@ -51,6 +81,7 @@ impl ipc::Handler for Backend { } } +/* Server implementation. */ struct NodeServer { _descriptor: ipc::Descriptor, @@ -58,7 +89,7 @@ struct NodeServer { } impl NodeServer { - fn new(descriptor: ipc::Descriptor) -> Result<Self> { + fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result<Self> { let mut db_path = descriptor.home.clone(); db_path.push("keystore.sqlite"); @@ -67,10 +98,12 @@ impl NodeServer { c.execute_batch("PRAGMA foreign_keys = true;")?; Self::init(&c)?; - Ok(NodeServer { + let server = NodeServer { _descriptor: descriptor, c: Rc::new(c), - }) + }; + KeyServer::start_housekeeping(server.c.clone(), handle)?; + Ok(server) } /// Initializes or migrates the database. @@ -433,7 +466,7 @@ impl KeyServer { Ok(x) } else { let r = c.execute( - "INSERT INTO keys (fingerprint, created) VALUES (?1, ?2)", + "INSERT INTO keys (fingerprint, created, update_at) VALUES (?1, ?2, ?2)", &[&fp, &Timestamp::now()]); // Some other mutator might race us to the insertion. @@ -452,6 +485,174 @@ impl KeyServer { }.map_err(|e| e.into()) } } + + /// Merges other into this key updating the database. + /// + /// Returnes the merged key as blob. + fn merge(&self, other: TPK) -> Result<Vec<u8>> { + let mut new = other; + + // Get the current key from the database. + let (fingerprint, key): (String, Option<Vec<u8>>) + = self.c.query_row( + "SELECT fingerprint, key FROM keys WHERE id = ?1", + &[&self.id], + |row| (row.get(0), row.get_checked(1).ok()))?; + + // If there was a key stored there, merge it. + if let Some(current) = key { + let current = TPK::from_bytes(¤t)?; + + if current.fingerprint().to_hex() != fingerprint { + // Inconsistent database. + return Err(node::Error::SystemError); + } + + if current.fingerprint() != new.fingerprint() { + return Err(node::Error::Conflict); + } + + new = current.merge(new)?; + } + + // Write key back to the database. + let mut blob = vec![]; + new.serialize(&mut blob)?; + + self.c.execute("UPDATE keys SET key = ?1 WHERE id = ?2", + &[&blob, &self.id])?; + + Ok(blob) + } + + /// Records a successful key update. + fn success(&self, message: &str, next: Duration) -> Result<()> { + let logid = log(&self.c, &self.slug(), message)?; + self.c.execute("UPDATE keys SET updated = ?2, log = ?3, update_at = ?4 + WHERE id = ?1", + &[&self.id, &Timestamp::now(), + &logid, + &(Timestamp::now() + next)])?; + Ok(()) + } + + /// Records an unsuccessful key update. + fn error(&self, message: &str, err: &str, next: Duration) -> Result<()> { + let logid = error(&self.c, &self.slug(), message, err)?; + self.c.execute("UPDATE keys SET log = ?2, update_at = ?3 + WHERE id = ?1", + &[&self.id, &logid, + &(Timestamp::now() + next)])?; + Ok(()) + } + + /// Returns when the next key using the given policy should be updated. + fn next_update_at(c: &Rc<Connection>, network_policy: core::NetworkPolicy) + -> Option<SystemTime> { + let network_policy_u8 = u8::from(&network_policy); + + // Select the key that was updated least recently. + let update_at: Option<i64> = c.query_row( + "SELECT keys.update_at FROM keys + JOIN bindings on keys.id = bindings.key + JOIN stores on stores.id = bindings.store + WHERE stores.network_policy = ?1 + ORDER BY keys.update_at LIMIT 1", + &[&network_policy_u8], |row| row.get(0)).ok(); + update_at.map(|secs| UNIX_EPOCH + Duration::new(secs as u64, 0)) + } + + /// Returns the number of keys using the given policy. + fn need_update(c: &Rc<Connection>, network_policy: core::NetworkPolicy) + -> Result<u32> { + let network_policy_u8 = u8::from(&network_policy); + + let count: i64 = c.query_row( + "SELECT COUNT(*) FROM keys + JOIN bindings on keys.id = bindings.key + JOIN stores on stores.id = bindings.store + WHERE stores.network_policy >= ?1", + &[&network_policy_u8], |row| row.get(0))?; + assert!(count >= 0); + Ok(count as u32) + } + + /// Updates the key that was least recently updated. + fn update(c: &Rc<Connection>, network_policy: core::NetworkPolicy) -> Result<()> { + assert!(network_policy != core::NetworkPolicy::Offline); + let network_policy_u8 = u8::from(&network_policy); + + // Select the key that was updated least recently. + let (id, fingerprint): (i64, String) = c.query_row( + "SELECT keys.id, keys.fingerprint FROM keys + JOIN bindings on keys.id = bindings.key + JOIN stores on stores.id = bindings.store + WHERE stores.network_policy >= ?1 + AND keys.update_at < ?2 + ORDER BY keys.update_at LIMIT 1", + &[&network_policy_u8, &Timestamp::now()], |row| (row.get(0), row.get(1)))?; + let fingerprint = openpgp::Fingerprint::from_hex(&fingerprint) + .ok_or(node::Error::SystemError)?; + + let key = KeyServer::new(c.clone(), id); + let doit = || -> Result<()> { + let ctx = core::Context::configure("org.sequoia-pgp.store") + .network_policy(network_policy).build()?; + let mut keyserver = net::KeyServer::sks_pool(&ctx)?; + + // Get key and merge it into the database. + let tpk = keyserver.get(&fingerprint.to_keyid())?; + key.merge(tpk)?; + Ok(()) + }; + let next = refresh_interval() / Self::need_update(c, network_policy)?; + if let Err(e) = doit() { + key.error("Update unsuccessful", &format!("{:?}", e), next / 2).unwrap_or(()); + } else { + key.success("Update successful", next).unwrap_or(()); + } + Ok(()) + } + + /// Starts the periodic housekeeping. + fn start_housekeeping(c: Rc<Connection>, handle: Handle) -> Result<()> { + let h = handle.clone(); + + let forever = loop_fn(0, move |_| { + // For now, we only update keys with this network policy. + let network_policy = core::NetworkPolicy::Encrypted; + + let now = SystemTime::now(); + let sleep_for = + if let Some(at) = Self::next_update_at(&c, network_policy) { + if at <= now { + if let Err(e) = Self::update(&c, network_policy) { + #[cfg(debug_assertions)] + eprintln!("Odd. Updating failed: {:?}", e); + } + min_sleep_time() + } else { + assert!(at > now); + cmp::max(min_sleep_time(), at.duration_since(now).unwrap()) + } + } else { + min_sleep_time() + }; + assert!(sleep_for > Duration::new(0, 0)); + + Timeout::new(random_duration(sleep_for), &h) + .unwrap() // XXX: May fail if the eventloop expired. + .then(move |timeout| { + if timeout.is_ok() { + Ok(Loop::Continue(0)) + } else { + Ok(Loop::Break(())) + } + }) + }); + handle.spawn(forever); + Ok(()) + } } impl Query for KeyServer { @@ -497,35 +698,8 @@ impl node::key::Server for KeyServer { mut results: node::key::ImportResults) -> Promise<(), capnp::Error> { bind_results!(results); - let mut new = sry!(TPK::from_bytes(&pry!(pry!(params.get()).get_key()))); - - let (fingerprint, key): (String, Option<Vec<u8>>) - = sry!(self.c.query_row( - "SELECT fingerprint, key FROM keys WHERE id = ?1", - &[&self.id], - |row| (row.get(0), row.get_checked(1).ok()))); - if let Some(current) = key { - let current = sry!(TPK::from_bytes(¤t)); - - if current.fingerprint().to_hex() != fingerprint { - // Inconsistent database. - fail!(node::Error::SystemError); - } - - if current.fingerprint() != new.fingerprint() { - fail!(node::Error::Conflict); - } - - new = sry!(current.merge(new)); - } - - // Write key back to the database. - let mut blob = vec![]; - sry!(new.serialize(&mut blob)); - - sry!(self.c.execute("UPDATE keys SET key = ?1 WHERE id = ?2", - &[&blob, &self.id])); - + let new = sry!(TPK::from_bytes(&pry!(pry!(params.get()).get_key()))); + let blob = sry!(self.merge(new)); pry!(pry!(results.get().get_result()).set_ok(&blob[..])); Promise::ok(()) } @@ -537,6 +711,10 @@ trait Query { fn id(&self) -> i64; fn connection(&self) -> Rc<Connection>; + fn slug(&self) -> String { + format!("{}::{}", Self::table_name(), self.id()) + } + fn query(&mut self, column: &str) -> Result<i64> { self.connection().query_row( &format!("SELECT {} FROM {} WHERE id = ?1", column, Self::table_name()), @@ -546,6 +724,19 @@ trait Query { fn query_stats(&mut self, mut stats: node::stats::Builder) -> Result<()> { let created = self.query("created")?; let updated = self.query("updated")?; + let (timestamp, item, message, error): (i64, String, String, String) + = self.connection().query_row( + &format!("SELECT log.timestamp, log.item, log.message, log.error FROM log + JOIN {0} on log.id = {0}.log + WHERE {0}.id = ?1", Self::table_name()), + &[&self.id()], |row| (row.get(0), row.get(1), row.get(2), + row.get_checked(1).unwrap_or("".into()))) + .or_else(|err| match err { + // No log messages. + rusqlite::Error::QueryReturnedNoRows => + Ok((0, "".into(), "".into(), "".into())), + _ => Err(err), + })?; let encryption_count = self.query("encryption_count")?; let encryption_first = self.query("encryption_first")?; let encryption_last = self.query("encryption_last")?; @@ -560,6 +751,11 @@ trait Query { stats.set_verification_count(verification_count); stats.set_verification_first(verification_first); stats.set_verification_last(verification_last); + let mut msg = stats.init_message(); + msg.set_timestamp(timestamp); + msg.set_item(&item); + msg.set_message(&message); + msg.set_error(&error); Ok(()) } } @@ -742,6 +938,24 @@ impl From<tpk::Error> for node::Error { } } +impl From<core::Error> for node::Error { + fn from(_: core::Error) -> Self { + node::Error::SystemError + } +} + +impl From<net::Error> for node::Error { + fn from(_: net::Error) -> Self { + node::Error::SystemError + } +} + +impl From<io::Error> for node::Error { + fn from(_: io::Error) -> Self { + node::Error::SystemError + } +} + /* Database schemata and migrations. */ @@ -760,6 +974,13 @@ CREATE TABLE stores ( name TEXT NOT NULL, UNIQUE (domain, name)); +CREATE TABLE log ( + id INTEGER PRIMARY KEY, + timestamp INTEGER NOT NULL, + item TEXT NOT NULL, + message TEXT NOT NULL, + error TEXT NULL); + CREATE TABLE bindings ( id INTEGER PRIMARY KEY, store INTEGER NOT NULL, @@ -768,6 +989,7 @@ CREATE TABLE bindings ( created INTEGER NOT NULL, updated DEFAULT 0, + log INTEGER NULL, encryption_count DEFAULT 0, encryption_first DEFAULT 0, @@ -778,7 +1000,8 @@ CREATE TABLE bindings ( UNIQUE(store, label), FOREIGN KEY (store) REFERENCES stores(id) ON DELETE CASCADE, - FOREIGN KEY (key) REFERENCES keys(id) ON DELETE CASCADE); + FOREIGN KEY (key) REFERENCES keys(id) ON DELETE CASCADE + FOREIGN KEY (log) REFERENCES log(id)); CREATE TABLE keys ( id INTEGER PRIMARY KEY, @@ -787,6 +1010,8 @@ CREATE TABLE keys ( created INTEGER NOT NULL, updated DEFAULT 0, + log INTEGER NULL, + update_at INTEGER NOT NULL, encryption_count DEFAULT 0, encryption_first DEFAULT 0, @@ -795,7 +1020,8 @@ CREATE TABLE keys ( verification_first DEFAULT 0, verification_last DEFAULT 0, - UNIQUE (fingerprint)); + UNIQUE (fingerprint), + FOREIGN KEY (log) REFERENCES log(id)); "; /* Timestamps. */ @@ -826,6 +1052,22 @@ impl Add<Duration> for Timestamp { } } +/* Logging. */ + +/// Writes a log message to the log. +fn log(c: &Rc<Connection>, item: &str, message: &str) -> Result<i64> { + c.execute("INSERT INTO log (timestamp, item, message) VALUES (?1, ?2, ?3)", + &[&Timestamp::now(), &item, &message])?; + Ok(c.last_insert_rowid()) +} + +/// Writes an error message to the log. +fn error(c: &Rc<Connection>, item: &str, message: &str, error: &str) -> Result<i64> { + c.execute("INSERT INTO log (timestamp, item, message, error) VALUES (?1, ?2, ?3, ?4)", + &[&Timestamp::now(), &item, &message, &error])?; + Ok(c.last_insert_rowid()) +} + /* Miscellaneous. */ impl<'a> From<&'a core::NetworkPolicy> for node::NetworkPolicy { diff --git a/store/src/lib.rs b/store/src/lib.rs index 8a3672ee..5064f6bd 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -50,7 +50,9 @@ extern crate capnp; #[macro_use] extern crate capnp_rpc; extern crate futures; +extern crate rand; extern crate rusqlite; +extern crate time; extern crate tokio_core; extern crate tokio_io; @@ -58,7 +60,7 @@ use std::cell::RefCell; use std::fmt; use std::io; use std::rc::Rc; -use std::time::{SystemTime, Duration, UNIX_EPOCH}; +use std::time::{SystemTime, SystemTimeError, Duration, UNIX_EPOCH}; use capnp::capability::Promise; use capnp_rpc::rpc_twoparty_capnp::Side; @@ -659,6 +661,9 @@ pub struct Stats { /// Records the time this item was last updated. pub updated: Option<SystemTime>, + /// Result of the latest update. + pub message: Option<Log>, + /// Records counters and timestamps of encryptions. pub encryption: Stamps, @@ -666,6 +671,52 @@ pub struct Stats { pub verification: Stamps, } +#[derive(Debug)] +pub struct Log { + pub timestamp: SystemTime, + pub item: String, + pub status: ::std::result::Result<String, (String, String)>, +} + +impl Log { + fn new(timestamp: i64, item: &str, message: &str, error: &str) -> Option<Self> { + let timestamp = from_unix(timestamp)?; + if message == "" { + None + } else { + if error == "" { + Some(Log{ + timestamp: timestamp, + item: item.into(), + status: Err((message.into(), error.into())), + }) + } else { + Some(Log{ + timestamp: timestamp, + item: item.into(), + status: Ok(message.into()), + }) + } + } + } + + /// Returns the message without context. + pub fn short(&self) -> String { + match self.status { + Ok(ref m) => m.clone(), + Err((ref m, ref e)) => format!("{}: {}", m, e), + } + } + + /// Returns the message without context. + pub fn string(&self) -> Result<String> { + Ok(match self.status { + Ok(ref m) => format!("{}: {}", format_system_time(&self.timestamp)?, m), + Err((ref m, ref e)) => format!("{}: {}: {}", format_system_time(&self.timestamp)?, m, e), + }) + } +} + /// Counter and timestamps. #[derive(Debug)] pub struct Stamps { @@ -798,6 +849,16 @@ impl Iterator for KeyIter { } } +/// XXX Use the correct time type. +/// +/// We should use time::Timespec and get rid of this function. +pub fn format_system_time(t: &SystemTime) -> Result<String> { + let tm = time::at(time::Timespec::new(t.duration_since(UNIX_EPOCH)?.as_secs() as i64, 0)); + Ok(time::strftime("%F %H:%M", &tm) + // Only parse errors can happen. + .unwrap()) +} + /* Error handling. */ /// Results for sequoia-store. @@ -879,6 +940,12 @@ impl From<capnp::NotInSchema> for Error { } } +impl From<SystemTimeError> for Error { + fn from(_: SystemTimeError) -> Self { + Error::ProtocolError + } +} + #[cfg(test)] mod store_test { use super::{core, Store, Error, TPK, Fingerprint}; diff --git a/store/src/macros.rs b/store/src/macros.rs index f460bcdc..0945af29 100644 --- a/store/src/macros.rs +++ b/store/src/macros.rs @@ -42,16 +42,23 @@ macro_rules! make_stats_request { let r = pry!(pry!(pry!(response.get()).get_result()).which()); let r = match r { /* The Result. */ - Which::Ok(Ok(s)) => Ok(Stats{ - created: from_unix(s.get_created()), - updated: from_unix(s.get_updated()), - encryption: Stamps::new(s.get_encryption_count(), - s.get_encryption_first(), - s.get_encryption_last()), - verification: Stamps::new(s.get_verification_count(), - s.get_verification_first(), - s.get_verification_last()), - }), + Which::Ok(Ok(s)) => { + let msg = pry!(s.get_message()); + Ok(Stats{ + created: from_unix(s.get_created()), + updated: from_unix(s.get_updated()), + message: Log::new(msg.get_timestamp(), + pry!(msg.get_item()), + pry!(msg.get_message()), + pry!(msg.get_error())), + encryption: Stamps::new(s.get_encryption_count(), + s.get_encryption_first(), + s.get_encryption_last()), + verification: Stamps::new(s.get_verification_count(), + s.get_verification_first(), + s.get_verification_last()), + }) + }, Which::Err(Ok(e)) => Err(e.into()), /* Protocol violations. */ Which::Ok(Err(e)) => Err(e.into()), diff --git a/store/src/store_protocol.capnp b/store/src/store_protocol.capnp index 6f11da0f..f895b856 100644 --- a/store/src/store_protocol.capnp +++ b/store/src/store_protocol.capnp @@ -67,12 +67,20 @@ interface Node { struct Stats { created @0 :Int64; updated @1 :Int64; - encryptionCount @2 :Int64; - encryptionFirst @3 :Int64; - encryptionLast @4 :Int64; - verificationCount @5 :Int64; - verificationFirst @6 :Int64; - verificationLast @7 :Int64; + message @2 :Log; + encryptionCount @3 :Int64; + encryptionFirst @4 :Int64; + encryptionLast @5 :Int64; + verificationCount @6 :Int64; + verificationFirst @7 :Int64; + verificationLast @8 :Int64; + } + + struct Log { + timestamp @0 :Int64; + item @1 :Text; + message @2 :Text; + error @3 :Text; } enum NetworkPolicy { diff --git a/tool/src/main.rs b/tool/src/main.rs index b7e91ca2..913139fa 100644 --- a/tool/src/main.rs +++ b/tool/src/main.rs @@ -387,14 +387,27 @@ fn real_main() -> Result<()> { ("keys", Some(_)) => { let mut table = Table::new(); table.set_format(*prettytable::format::consts::FORMAT_NO_LINESEP_WITH_TITLE); - table.set_titles(row!["fingerprint", "# of bindings"]); + table.set_titles(row!["fingerprint", "# of bindings", "updated", "status"]); for item in Store::list_keys(&ctx) .expect("Failed to iterate over keys") { + let stats = item.key.stats() + .expect("Failed to get stats"); table.add_row(Row::new(vec![ Cell::new(&item.fingerprint.to_string()), - Cell::new(&format!("{}", item.bindings))]) - ); + Cell::new(&format!("{}", item.bindings)), + if let Some(ref t) = stats.updated { + Cell::new(&sequoia_store::format_system_time(t) + .expect("Failed to format timestamp")) + } else { + Cell::new("") + }, + if let Some(m) = stats.message { + Cell::new(&m.short()) + } else { + Cell::new("") + }, + ])); } table.printstd(); |