diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-06 20:29:45 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-06 20:29:45 +0100 |
commit | e145fe4ad9d2160ffe646ae81f831c4e7bbcc718 (patch) | |
tree | 074216f082b011a5d8b499d7ccc8ece3cb9d90c3 | |
parent | 30df43d2d3dc4b53e69bb1666d5d8f3c2550abfc (diff) | |
parent | dc139e38690d30b4061ec065e2207ca6a63a662b (diff) |
Merge branch 'profile-persistence'
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | src/cli.rs | 44 | ||||
-rw-r--r-- | src/client.rs | 9 | ||||
-rw-r--r-- | src/commands/mod.rs | 2 | ||||
-rw-r--r-- | src/commands/profile.rs | 67 | ||||
-rw-r--r-- | src/config.rs | 1 | ||||
-rw-r--r-- | src/main.rs | 11 | ||||
-rw-r--r-- | src/profile.rs | 101 | ||||
-rw-r--r-- | src/profile/mod.rs | 191 | ||||
-rw-r--r-- | src/profile/state.rs | 129 |
10 files changed, 453 insertions, 105 deletions
@@ -35,8 +35,9 @@ serde = "1" serde_json = "1" getset = "0.1" xdg = "2.4" -libp2p = "0.41" +libp2p = "0.39.1" tracing = "0.1" +ctrlc = "3.2" [dependencies.ipfs] git = "https://github.com/rs-ipfs/rust-ipfs/" @@ -1,10 +1,54 @@ use clap::crate_authors; use clap::crate_version; use clap::App; +use clap::Arg; pub fn app<'a>() -> App<'a> { App::new("distrox") .author(crate_authors!()) .version(crate_version!()) .about("Distributed social network") + + + .subcommand(App::new("profile") + .author(crate_authors!()) + .version(crate_version!()) + .about("Profile actions") + + .subcommand(App::new("create") + .author(crate_authors!()) + .version(crate_version!()) + .about("Create profile") + + .arg(Arg::with_name("name") + .long("name") + .required(true) + .takes_value(true) + .value_name("NAME") + .about("Name of the profile") + ) + ) + + .subcommand(App::new("serve") + .author(crate_authors!()) + .version(crate_version!()) + .about("Just serve the profile") + + .arg(Arg::with_name("name") + .long("name") + .required(true) + .takes_value(true) + .value_name("NAME") + .about("Name of the profile") + ) + + .arg(Arg::with_name("connect") + .long("connect") + .required(false) + .takes_value(true) + .value_name("MULTIADDR") + .about("Connect to MULTIADDR as well") + ) + ) + ) } diff --git a/src/client.rs b/src/client.rs index 68aa4cd..4e50a8e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,6 +30,15 @@ impl Client { } } + pub async fn exit(self) -> Result<()> { + self.ipfs.exit_daemon().await; + Ok(()) + } + + pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> { + self.ipfs.connect(peer).await + } + pub async fn post_text_blob(&self, text: String) -> Result<Cid> { self.ipfs .put_dag(text.into()) diff --git a/src/commands/mod.rs b/src/commands/mod.rs new file mode 100644 index 0000000..5569bcb --- /dev/null +++ b/src/commands/mod.rs @@ -0,0 +1,2 @@ +mod profile; +pub use profile::profile; diff --git a/src/commands/profile.rs b/src/commands/profile.rs new file mode 100644 index 0000000..4da70f1 --- /dev/null +++ b/src/commands/profile.rs @@ -0,0 +1,67 @@ +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use anyhow::Context; +use anyhow::Result; +use clap::ArgMatches; + +use crate::config::Config; +use crate::profile::Profile; + +pub async fn profile(matches: &ArgMatches) -> Result<()> { + match matches.subcommand() { + Some(("create", m)) => profile_create(m).await, + Some(("serve", m)) => profile_serve(m).await, + _ => unimplemented!(), + } +} + +async fn profile_create(matches: &ArgMatches) -> Result<()> { + let name = matches.value_of("name").map(String::from).unwrap(); // required + let state_dir = Profile::state_dir_path(&name)?; + log::info!("Creating '{}' in {}", name, state_dir.display()); + + let profile = Profile::create(&state_dir, &name, Config::default()).await?; + log::info!("Saving..."); + profile.save().await?; + + log::info!("Shutting down..."); + profile.exit().await +} + +async fn profile_serve(matches: &ArgMatches) -> Result<()> { + use ipfs::MultiaddrWithPeerId; + + let name = matches.value_of("name").map(String::from).unwrap(); // required + let connect_peer = matches.value_of("connect").map(|s| { + s.parse::<MultiaddrWithPeerId>() + .map_err(anyhow::Error::from) + }).transpose()?; + + let state_dir = Profile::state_dir_path(&name)?; + + log::info!("Loading '{}' from {}", name, state_dir.display()); + let profile = Profile::load(Config::default(), &name).await?; + log::info!("Profile loaded"); + log::info!("Profile HEAD = {}", profile.head()); + + if let Some(connect_to) = connect_peer { + log::info!("Connecting to {:?}", connect_to); + profile.connect(connect_to).await?; + } + + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + + ctrlc::set_handler(move || { + r.store(false, Ordering::SeqCst); + }).context("Error setting Ctrl-C handler")?; + + log::info!("Serving..."); + while running.load(Ordering::SeqCst) { + tokio::time::sleep(std::time::Duration::from_millis(500)).await // sleep not so busy + } + log::info!("Shutting down..."); + profile.exit().await +} diff --git a/src/config.rs b/src/config.rs index d2f0aba..0d25e78 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,6 @@ pub struct Config { } -#[cfg(test)] impl Default for Config { fn default() -> Self { Config { } diff --git a/src/main.rs b/src/main.rs index ef413c6..257847c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use anyhow::Result; pub mod cli; pub mod client; +mod commands; pub mod config; pub mod consts; pub mod ipfs_client; @@ -11,7 +12,13 @@ pub mod types; #[tokio::main] async fn main() -> Result<()> { let _ = env_logger::try_init()?; - let _ = crate::cli::app(); - Ok(()) + let matches = crate::cli::app().get_matches(); + + match matches.subcommand() { + Some(("profile", matches)) => crate::commands::profile(matches).await, + _ => unimplemented!() + } + + } diff --git a/src/profile.rs b/src/profile.rs deleted file mode 100644 index d6f5b4b..0000000 --- a/src/profile.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::path::Path; -use std::path::PathBuf; - -use anyhow::Result; -use tokio::io::AsyncReadExt; - -use crate::client::Client; -use crate::config::Config; -use crate::ipfs_client::IpfsClient; - -#[derive(Debug)] -pub struct Profile { - client: Client, -} - -impl Profile { - pub async fn create(state_dir: &Path, name: &str, config: Config) -> Result<Self> { - let bootstrap = vec![]; // TODO - let mdns = false; // TODO - let keypair = ipfs::Keypair::generate_ed25519(); - Self::write_to_statedir(state_dir, name, &keypair).await?; - - let options = ipfs::IpfsOptions { - ipfs_path: Self::ipfs_path(state_dir, name).await?, - keypair, - bootstrap, - mdns, - kad_protocol: None, - listening_addrs: vec![], - span: Some(tracing::trace_span!("distrox-ipfs")), - }; - - let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(options) - .start() - .await?; - tokio::task::spawn(fut); - Ok(Self::new(ipfs, config)) - } - - async fn new_inmemory(config: Config) -> Result<Self> { - let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys(); - opts.mdns = false; - let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(opts).start().await.unwrap(); - tokio::task::spawn(fut); - Ok(Self::new(ipfs, config)) - } - - fn new(ipfs: IpfsClient, config: Config) -> Self { - Profile { client: Client::new(ipfs, config) } - } - - async fn write_to_statedir(_state_dir: &Path, _name: &str, _keypair: &ipfs::Keypair) -> Result<()> { - unimplemented!() - } - - async fn ipfs_path(state_dir: &Path, name: &str) -> Result<PathBuf> { - let path = state_dir.join(name).join("ipfs"); - tokio::fs::create_dir_all(&path).await?; - Ok(path) - } - - pub fn config_path(name: &str) -> String { - format!("distrox-{}", name) - } - - pub fn config_file_path(name: &str) -> Result<PathBuf> { - xdg::BaseDirectories::with_prefix("distrox") - .map_err(anyhow::Error::from) - .and_then(|dirs| { - let name = Self::config_path(name); - dirs.place_config_file(name) - .map_err(anyhow::Error::from) - }) - } - - /// Load the Profile from disk and ensure the keys exist in IPFS - pub async fn load_from_filesystem(_name: &str, _client: &Client) -> Result<Option<Self>> { - unimplemented!() - } - - async fn load_from_reader<R: AsyncReadExt + std::marker::Unpin>(_r: R, _name: &str, _client: &Client) -> Result<Option<Self>> { - unimplemented!() - } - -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::client::Client; - use crate::config::Config; - use crate::ipfs_client::IpfsClient; - - #[tokio::test] - async fn test_create_profile() { - let _ = env_logger::try_init(); - let profile = Profile::new_inmemory(Config::default()).await; - assert!(profile.is_ok()); - } - -} diff --git a/src/profile/mod.rs b/src/profile/mod.rs new file mode 100644 index 0000000..8849e34 --- /dev/null +++ b/src/profile/mod.rs @@ -0,0 +1,191 @@ +use std::path::PathBuf; +use std::convert::TryInto; + +use anyhow::Context; +use anyhow::Result; + +use crate::client::Client; +use crate::config::Config; +use crate::ipfs_client::IpfsClient; + +mod state; +use state::*; + +#[derive(Debug)] +pub struct Profile { + state: ProfileState, + client: Client, +} + +impl Profile { + pub async fn create(state_dir: &StateDir, name: &str, config: Config) -> Result<Self> { + let bootstrap = vec![]; // TODO + let mdns = true; // TODO + let keypair = ipfs::Keypair::generate_ed25519(); + + let options = ipfs::IpfsOptions { + ipfs_path: Self::ipfs_path(state_dir, name).await?, + keypair, + bootstrap, + mdns, + kad_protocol: None, + listening_addrs: vec![], + span: Some(tracing::trace_span!("distrox-ipfs")), + }; + + let keypair = options.keypair.clone(); + let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(options) + .start() + .await?; + tokio::task::spawn(fut); + Self::new(ipfs, config, name.to_string(), keypair).await + } + + async fn new_inmemory(config: Config, name: &str) -> Result<Self> { + let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys(); + opts.mdns = true; + let keypair = opts.keypair.clone(); + let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(opts).start().await.unwrap(); + tokio::task::spawn(fut); + Self::new(ipfs, config, format!("inmemory-{}", name), keypair).await + } + + async fn new(ipfs: IpfsClient, config: Config, profile_name: String, keypair: libp2p::identity::Keypair) -> Result<Self> { + let client = Client::new(ipfs, config); + let profile_head = Self::post_hello_world(&client, &profile_name).await?; + let state = ProfileState::new(profile_head, profile_name, keypair); + Ok(Profile { state, client }) + } + + pub fn head(&self) -> &cid::Cid { + self.state.profile_head() + } + + pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> { + self.client.connect(peer).await + } + + async fn post_hello_world(client: &Client, name: &str) -> Result<cid::Cid> { + let text = format!("Hello world, I am {}", name); + client.post_text_node(vec![], text).await + } + + async fn ipfs_path(state_dir: &StateDir, name: &str) -> Result<PathBuf> { + let path = state_dir.ipfs(); + tokio::fs::create_dir_all(&path).await?; + Ok(path) + } + + pub fn config_path(name: &str) -> String { + format!("distrox-{}", name) + } + + pub fn config_file_path(name: &str) -> Result<PathBuf> { + xdg::BaseDirectories::with_prefix("distrox") + .map_err(anyhow::Error::from) + .and_then(|dirs| { + let name = Self::config_path(name); + dirs.place_config_file(name) + .map_err(anyhow::Error::from) + }) + } + + pub fn state_dir_path(name: &str) -> Result<StateDir> { + log::debug!("Getting state directory path"); + xdg::BaseDirectories::with_prefix("distrox") + .context("Fetching 'distrox' XDG base directory") + .map_err(anyhow::Error::from) + .and_then(|dirs| { + dirs.create_state_directory(name) + .map(StateDir::from) + .with_context(|| format!("Creating 'distrox' XDG state directory for '{}'", name)) + .map_err(anyhow::Error::from) + }) + } + + pub async fn save(&self) -> Result<()> { + let state_dir_path = Self::state_dir_path(self.state.profile_name())?; + log::trace!("Saving to {:?}", state_dir_path.display()); + ProfileStateSaveable::new(&self.state) + .context("Serializing profile state")? + .save_to_disk(&state_dir_path) + .await + .context("Saving state to disk") + .map_err(anyhow::Error::from) + } + + pub async fn load(config: Config, name: &str) -> Result<Self> { + let state_dir_path = Self::state_dir_path(name)?; + log::trace!("state_dir_path = {:?}", state_dir_path.display()); + let state: ProfileState = ProfileStateSaveable::load_from_disk(&state_dir_path) + .await? + .try_into() + .context("Parsing profile state")?; + log::debug!("Loading state finished"); + + let bootstrap = vec![]; // TODO + let mdns = true; // TODO + let keypair = state.keypair().clone(); + + log::debug!("Configuring IPFS backend"); + let options = ipfs::IpfsOptions { + ipfs_path: Self::ipfs_path(&state_dir_path, name).await?, + keypair, + bootstrap, + mdns, + kad_protocol: None, + listening_addrs: vec![], + span: Some(tracing::trace_span!("distrox-ipfs")), + }; + + log::debug!("Starting IPFS backend"); + let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(options) + .start() + .await?; + tokio::task::spawn(fut); + + log::debug!("Profile loading finished"); + Ok(Profile { + state, + client: Client::new(ipfs, config), + }) + } + + pub async fn exit(self) -> Result<()> { + self.client.exit().await + } + +} + + +#[cfg(test)] +mod tests { + use super::*; + use std::convert::TryFrom; + use crate::client::Client; + use crate::config::Config; + use crate::ipfs_client::IpfsClient; + + #[tokio::test] + async fn test_create_profile() { + let _ = env_logger::try_init(); + let profile = Profile::new_inmemory(Config::default(), "test-create-profile").await; + assert!(profile.is_ok()); + let exit = profile.unwrap().exit().await; + assert!(exit.is_ok(), "Not cleanly exited: {:?}", exit); + } + + #[tokio::test] + async fn test_create_profile_and_helloworld() { + let _ = env_logger::try_init(); + let profile = Profile::new_inmemory(Config::default(), "test-create-profile-and-helloworld").await; + assert!(profile.is_ok()); + let profile = profile.unwrap(); + let head = profile.head(); + let exp_cid = cid::Cid::try_from("bafyreie4haukbqj7u6vogjfvaxbwg73b7bzi7nqxbnkvv77dvwcqg5wtpe").unwrap(); + assert_eq!(*head, exp_cid, "{} != {}", *head, exp_cid); + let exit = profile.exit().await; + assert!(exit.is_ok(), "Not cleanly exited: {:?}", exit); + } + +} diff --git a/src/profile/state.rs b/src/profile/state.rs new file mode 100644 index 0000000..a84ff23 --- /dev/null +++ b/src/profile/state.rs @@ -0,0 +1,129 @@ +use std::path::PathBuf; +use std::convert::TryFrom; +use std::convert::TryInto; + +use anyhow::Context; +use anyhow::Result; +use tokio::io::AsyncWriteExt; + +#[derive(Debug)] +pub struct StateDir(PathBuf); + +impl StateDir { + pub fn ipfs(&self) -> PathBuf { + self.0.join("ipfs") + } + + pub fn profile_state(&self) -> PathBuf { + self.0.join("profile_state") + } + + pub fn display(&self) -> std::path::Display { + self.0.display() + } +} + +impl From<PathBuf> for StateDir { + fn from(p: PathBuf) -> Self { + Self(p) + } +} + +#[derive(getset::Getters)] +pub struct ProfileState { + #[getset(get = "pub")] + profile_head: cid::Cid, + + #[getset(get = "pub")] + profile_name: String, + + #[getset(get = "pub")] + keypair: libp2p::identity::Keypair, +} + +impl ProfileState { + pub(super) fn new(profile_head: cid::Cid, profile_name: String, keypair: libp2p::identity::Keypair) -> Self { + Self { + profile_head, + profile_name, + keypair + } + } +} + +impl std::fmt::Debug for ProfileState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ProfileState {{ name = {}, head = {:?} }}", self.profile_name, self.profile_head) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, getset::Getters)] +pub(super) struct ProfileStateSaveable { + profile_head: Vec<u8>, + profile_name: String, + keypair: Vec<u8>, +} + +impl ProfileStateSaveable { + pub(super) fn new(s: &ProfileState) -> Result<Self> { + Ok(Self { + profile_head: s.profile_head.to_bytes(), + profile_name: s.profile_name.clone(), + keypair: match s.keypair { + libp2p::identity::Keypair::Ed25519(ref kp) => Vec::from(kp.encode()), + _ => anyhow::bail!("Only keypair type ed25519 supported"), + } + }) + } + + pub async fn save_to_disk(&self, state_dir_path: &StateDir) -> Result<()> { + let state_s = serde_json::to_string(&self).context("Serializing state")?; + tokio::fs::OpenOptions::new() + .create_new(false) // do not _always_ create a new file + .create(true) + .truncate(true) + .write(true) + .open(&state_dir_path.profile_state()) + .await + .with_context(|| format!("Opening {}", state_dir_path.profile_state().display()))? + .write_all(state_s.as_bytes()) + .await + .map(|_| ()) + .with_context(|| format!("Writing to {}", state_dir_path.profile_state().display())) + .map_err(anyhow::Error::from) + } + + pub async fn load_from_disk(state_dir_path: &StateDir) -> Result<Self> { + log::trace!("Loading from disk: {:?}", state_dir_path.profile_state().display()); + let reader = tokio::fs::OpenOptions::new() + .read(true) + .open(&state_dir_path.profile_state()) + .await + .context("Opening state file")? + .into_std() + .await; + + log::trace!("Parsing state file"); + serde_json::from_reader(reader) + .context("Parsing state file") + .map_err(anyhow::Error::from) + } + +} + +impl TryInto<ProfileState> for ProfileStateSaveable { + type Error = anyhow::Error; + + fn try_into(mut self) -> Result<ProfileState> { + Ok(ProfileState { + profile_head: cid::Cid::try_from(self.profile_head)?, + profile_name: self.profile_name, + keypair: { + let kp = libp2p::identity::ed25519::Keypair::decode(&mut self.keypair)?; + libp2p::identity::Keypair::Ed25519(kp) + }, + }) + } +} + + |