summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-06 20:29:45 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-06 20:29:45 +0100
commite145fe4ad9d2160ffe646ae81f831c4e7bbcc718 (patch)
tree074216f082b011a5d8b499d7ccc8ece3cb9d90c3
parent30df43d2d3dc4b53e69bb1666d5d8f3c2550abfc (diff)
parentdc139e38690d30b4061ec065e2207ca6a63a662b (diff)
Merge branch 'profile-persistence'
-rw-r--r--Cargo.toml3
-rw-r--r--src/cli.rs44
-rw-r--r--src/client.rs9
-rw-r--r--src/commands/mod.rs2
-rw-r--r--src/commands/profile.rs67
-rw-r--r--src/config.rs1
-rw-r--r--src/main.rs11
-rw-r--r--src/profile.rs101
-rw-r--r--src/profile/mod.rs191
-rw-r--r--src/profile/state.rs129
10 files changed, 453 insertions, 105 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 553f189..6766fe6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,8 +35,9 @@ serde = "1"
serde_json = "1"
getset = "0.1"
xdg = "2.4"
-libp2p = "0.41"
+libp2p = "0.39.1"
tracing = "0.1"
+ctrlc = "3.2"
[dependencies.ipfs]
git = "https://github.com/rs-ipfs/rust-ipfs/"
diff --git a/src/cli.rs b/src/cli.rs
index 4e01cbb..3041e33 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -1,10 +1,54 @@
use clap::crate_authors;
use clap::crate_version;
use clap::App;
+use clap::Arg;
pub fn app<'a>() -> App<'a> {
App::new("distrox")
.author(crate_authors!())
.version(crate_version!())
.about("Distributed social network")
+
+
+ .subcommand(App::new("profile")
+ .author(crate_authors!())
+ .version(crate_version!())
+ .about("Profile actions")
+
+ .subcommand(App::new("create")
+ .author(crate_authors!())
+ .version(crate_version!())
+ .about("Create profile")
+
+ .arg(Arg::with_name("name")
+ .long("name")
+ .required(true)
+ .takes_value(true)
+ .value_name("NAME")
+ .about("Name of the profile")
+ )
+ )
+
+ .subcommand(App::new("serve")
+ .author(crate_authors!())
+ .version(crate_version!())
+ .about("Just serve the profile")
+
+ .arg(Arg::with_name("name")
+ .long("name")
+ .required(true)
+ .takes_value(true)
+ .value_name("NAME")
+ .about("Name of the profile")
+ )
+
+ .arg(Arg::with_name("connect")
+ .long("connect")
+ .required(false)
+ .takes_value(true)
+ .value_name("MULTIADDR")
+ .about("Connect to MULTIADDR as well")
+ )
+ )
+ )
}
diff --git a/src/client.rs b/src/client.rs
index 68aa4cd..4e50a8e 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -30,6 +30,15 @@ impl Client {
}
}
+ pub async fn exit(self) -> Result<()> {
+ self.ipfs.exit_daemon().await;
+ Ok(())
+ }
+
+ pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> {
+ self.ipfs.connect(peer).await
+ }
+
pub async fn post_text_blob(&self, text: String) -> Result<Cid> {
self.ipfs
.put_dag(text.into())
diff --git a/src/commands/mod.rs b/src/commands/mod.rs
new file mode 100644
index 0000000..5569bcb
--- /dev/null
+++ b/src/commands/mod.rs
@@ -0,0 +1,2 @@
+mod profile;
+pub use profile::profile;
diff --git a/src/commands/profile.rs b/src/commands/profile.rs
new file mode 100644
index 0000000..4da70f1
--- /dev/null
+++ b/src/commands/profile.rs
@@ -0,0 +1,67 @@
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+use anyhow::Context;
+use anyhow::Result;
+use clap::ArgMatches;
+
+use crate::config::Config;
+use crate::profile::Profile;
+
+pub async fn profile(matches: &ArgMatches) -> Result<()> {
+ match matches.subcommand() {
+ Some(("create", m)) => profile_create(m).await,
+ Some(("serve", m)) => profile_serve(m).await,
+ _ => unimplemented!(),
+ }
+}
+
+async fn profile_create(matches: &ArgMatches) -> Result<()> {
+ let name = matches.value_of("name").map(String::from).unwrap(); // required
+ let state_dir = Profile::state_dir_path(&name)?;
+ log::info!("Creating '{}' in {}", name, state_dir.display());
+
+ let profile = Profile::create(&state_dir, &name, Config::default()).await?;
+ log::info!("Saving...");
+ profile.save().await?;
+
+ log::info!("Shutting down...");
+ profile.exit().await
+}
+
+async fn profile_serve(matches: &ArgMatches) -> Result<()> {
+ use ipfs::MultiaddrWithPeerId;
+
+ let name = matches.value_of("name").map(String::from).unwrap(); // required
+ let connect_peer = matches.value_of("connect").map(|s| {
+ s.parse::<MultiaddrWithPeerId>()
+ .map_err(anyhow::Error::from)
+ }).transpose()?;
+
+ let state_dir = Profile::state_dir_path(&name)?;
+
+ log::info!("Loading '{}' from {}", name, state_dir.display());
+ let profile = Profile::load(Config::default(), &name).await?;
+ log::info!("Profile loaded");
+ log::info!("Profile HEAD = {}", profile.head());
+
+ if let Some(connect_to) = connect_peer {
+ log::info!("Connecting to {:?}", connect_to);
+ profile.connect(connect_to).await?;
+ }
+
+ let running = Arc::new(AtomicBool::new(true));
+ let r = running.clone();
+
+ ctrlc::set_handler(move || {
+ r.store(false, Ordering::SeqCst);
+ }).context("Error setting Ctrl-C handler")?;
+
+ log::info!("Serving...");
+ while running.load(Ordering::SeqCst) {
+ tokio::time::sleep(std::time::Duration::from_millis(500)).await // sleep not so busy
+ }
+ log::info!("Shutting down...");
+ profile.exit().await
+}
diff --git a/src/config.rs b/src/config.rs
index d2f0aba..0d25e78 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -2,7 +2,6 @@
pub struct Config {
}
-#[cfg(test)]
impl Default for Config {
fn default() -> Self {
Config { }
diff --git a/src/main.rs b/src/main.rs
index ef413c6..257847c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,6 +2,7 @@ use anyhow::Result;
pub mod cli;
pub mod client;
+mod commands;
pub mod config;
pub mod consts;
pub mod ipfs_client;
@@ -11,7 +12,13 @@ pub mod types;
#[tokio::main]
async fn main() -> Result<()> {
let _ = env_logger::try_init()?;
- let _ = crate::cli::app();
- Ok(())
+ let matches = crate::cli::app().get_matches();
+
+ match matches.subcommand() {
+ Some(("profile", matches)) => crate::commands::profile(matches).await,
+ _ => unimplemented!()
+ }
+
+
}
diff --git a/src/profile.rs b/src/profile.rs
deleted file mode 100644
index d6f5b4b..0000000
--- a/src/profile.rs
+++ /dev/null
@@ -1,101 +0,0 @@
-use std::path::Path;
-use std::path::PathBuf;
-
-use anyhow::Result;
-use tokio::io::AsyncReadExt;
-
-use crate::client::Client;
-use crate::config::Config;
-use crate::ipfs_client::IpfsClient;
-
-#[derive(Debug)]
-pub struct Profile {
- client: Client,
-}
-
-impl Profile {
- pub async fn create(state_dir: &Path, name: &str, config: Config) -> Result<Self> {
- let bootstrap = vec![]; // TODO
- let mdns = false; // TODO
- let keypair = ipfs::Keypair::generate_ed25519();
- Self::write_to_statedir(state_dir, name, &keypair).await?;
-
- let options = ipfs::IpfsOptions {
- ipfs_path: Self::ipfs_path(state_dir, name).await?,
- keypair,
- bootstrap,
- mdns,
- kad_protocol: None,
- listening_addrs: vec![],
- span: Some(tracing::trace_span!("distrox-ipfs")),
- };
-
- let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(options)
- .start()
- .await?;
- tokio::task::spawn(fut);
- Ok(Self::new(ipfs, config))
- }
-
- async fn new_inmemory(config: Config) -> Result<Self> {
- let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys();
- opts.mdns = false;
- let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(opts).start().await.unwrap();
- tokio::task::spawn(fut);
- Ok(Self::new(ipfs, config))
- }
-
- fn new(ipfs: IpfsClient, config: Config) -> Self {
- Profile { client: Client::new(ipfs, config) }
- }
-
- async fn write_to_statedir(_state_dir: &Path, _name: &str, _keypair: &ipfs::Keypair) -> Result<()> {
- unimplemented!()
- }
-
- async fn ipfs_path(state_dir: &Path, name: &str) -> Result<PathBuf> {
- let path = state_dir.join(name).join("ipfs");
- tokio::fs::create_dir_all(&path).await?;
- Ok(path)
- }
-
- pub fn config_path(name: &str) -> String {
- format!("distrox-{}", name)
- }
-
- pub fn config_file_path(name: &str) -> Result<PathBuf> {
- xdg::BaseDirectories::with_prefix("distrox")
- .map_err(anyhow::Error::from)
- .and_then(|dirs| {
- let name = Self::config_path(name);
- dirs.place_config_file(name)
- .map_err(anyhow::Error::from)
- })
- }
-
- /// Load the Profile from disk and ensure the keys exist in IPFS
- pub async fn load_from_filesystem(_name: &str, _client: &Client) -> Result<Option<Self>> {
- unimplemented!()
- }
-
- async fn load_from_reader<R: AsyncReadExt + std::marker::Unpin>(_r: R, _name: &str, _client: &Client) -> Result<Option<Self>> {
- unimplemented!()
- }
-
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::client::Client;
- use crate::config::Config;
- use crate::ipfs_client::IpfsClient;
-
- #[tokio::test]
- async fn test_create_profile() {
- let _ = env_logger::try_init();
- let profile = Profile::new_inmemory(Config::default()).await;
- assert!(profile.is_ok());
- }
-
-}
diff --git a/src/profile/mod.rs b/src/profile/mod.rs
new file mode 100644
index 0000000..8849e34
--- /dev/null
+++ b/src/profile/mod.rs
@@ -0,0 +1,191 @@
+use std::path::PathBuf;
+use std::convert::TryInto;
+
+use anyhow::Context;
+use anyhow::Result;
+
+use crate::client::Client;
+use crate::config::Config;
+use crate::ipfs_client::IpfsClient;
+
+mod state;
+use state::*;
+
+#[derive(Debug)]
+pub struct Profile {
+ state: ProfileState,
+ client: Client,
+}
+
+impl Profile {
+ pub async fn create(state_dir: &StateDir, name: &str, config: Config) -> Result<Self> {
+ let bootstrap = vec![]; // TODO
+ let mdns = true; // TODO
+ let keypair = ipfs::Keypair::generate_ed25519();
+
+ let options = ipfs::IpfsOptions {
+ ipfs_path: Self::ipfs_path(state_dir, name).await?,
+ keypair,
+ bootstrap,
+ mdns,
+ kad_protocol: None,
+ listening_addrs: vec![],
+ span: Some(tracing::trace_span!("distrox-ipfs")),
+ };
+
+ let keypair = options.keypair.clone();
+ let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(options)
+ .start()
+ .await?;
+ tokio::task::spawn(fut);
+ Self::new(ipfs, config, name.to_string(), keypair).await
+ }
+
+ async fn new_inmemory(config: Config, name: &str) -> Result<Self> {
+ let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys();
+ opts.mdns = true;
+ let keypair = opts.keypair.clone();
+ let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(opts).start().await.unwrap();
+ tokio::task::spawn(fut);
+ Self::new(ipfs, config, format!("inmemory-{}", name), keypair).await
+ }
+
+ async fn new(ipfs: IpfsClient, config: Config, profile_name: String, keypair: libp2p::identity::Keypair) -> Result<Self> {
+ let client = Client::new(ipfs, config);
+ let profile_head = Self::post_hello_world(&client, &profile_name).await?;
+ let state = ProfileState::new(profile_head, profile_name, keypair);
+ Ok(Profile { state, client })
+ }
+
+ pub fn head(&self) -> &cid::Cid {
+ self.state.profile_head()
+ }
+
+ pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> {
+ self.client.connect(peer).await
+ }
+
+ async fn post_hello_world(client: &Client, name: &str) -> Result<cid::Cid> {
+ let text = format!("Hello world, I am {}", name);
+ client.post_text_node(vec![], text).await
+ }
+
+ async fn ipfs_path(state_dir: &StateDir, name: &str) -> Result<PathBuf> {
+ let path = state_dir.ipfs();
+ tokio::fs::create_dir_all(&path).await?;
+ Ok(path)
+ }
+
+ pub fn config_path(name: &str) -> String {
+ format!("distrox-{}", name)
+ }
+
+ pub fn config_file_path(name: &str) -> Result<PathBuf> {
+ xdg::BaseDirectories::with_prefix("distrox")
+ .map_err(anyhow::Error::from)
+ .and_then(|dirs| {
+ let name = Self::config_path(name);
+ dirs.place_config_file(name)
+ .map_err(anyhow::Error::from)
+ })
+ }
+
+ pub fn state_dir_path(name: &str) -> Result<StateDir> {
+ log::debug!("Getting state directory path");
+ xdg::BaseDirectories::with_prefix("distrox")
+ .context("Fetching 'distrox' XDG base directory")
+ .map_err(anyhow::Error::from)
+ .and_then(|dirs| {
+ dirs.create_state_directory(name)
+ .map(StateDir::from)
+ .with_context(|| format!("Creating 'distrox' XDG state directory for '{}'", name))
+ .map_err(anyhow::Error::from)
+ })
+ }
+
+ pub async fn save(&self) -> Result<()> {
+ let state_dir_path = Self::state_dir_path(self.state.profile_name())?;
+ log::trace!("Saving to {:?}", state_dir_path.display());
+ ProfileStateSaveable::new(&self.state)
+ .context("Serializing profile state")?
+ .save_to_disk(&state_dir_path)
+ .await
+ .context("Saving state to disk")
+ .map_err(anyhow::Error::from)
+ }
+
+ pub async fn load(config: Config, name: &str) -> Result<Self> {
+ let state_dir_path = Self::state_dir_path(name)?;
+ log::trace!("state_dir_path = {:?}", state_dir_path.display());
+ let state: ProfileState = ProfileStateSaveable::load_from_disk(&state_dir_path)
+ .await?
+ .try_into()
+ .context("Parsing profile state")?;
+ log::debug!("Loading state finished");
+
+ let bootstrap = vec![]; // TODO
+ let mdns = true; // TODO
+ let keypair = state.keypair().clone();
+
+ log::debug!("Configuring IPFS backend");
+ let options = ipfs::IpfsOptions {
+ ipfs_path: Self::ipfs_path(&state_dir_path, name).await?,
+ keypair,
+ bootstrap,
+ mdns,
+ kad_protocol: None,
+ listening_addrs: vec![],
+ span: Some(tracing::trace_span!("distrox-ipfs")),
+ };
+
+ log::debug!("Starting IPFS backend");
+ let (ipfs, fut): (ipfs::Ipfs<_>, _) = ipfs::UninitializedIpfs::<_>::new(options)
+ .start()
+ .await?;
+ tokio::task::spawn(fut);
+
+ log::debug!("Profile loading finished");
+ Ok(Profile {
+ state,
+ client: Client::new(ipfs, config),
+ })
+ }
+
+ pub async fn exit(self) -> Result<()> {
+ self.client.exit().await
+ }
+
+}
+
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::convert::TryFrom;
+ use crate::client::Client;
+ use crate::config::Config;
+ use crate::ipfs_client::IpfsClient;
+
+ #[tokio::test]
+ async fn test_create_profile() {
+ let _ = env_logger::try_init();
+ let profile = Profile::new_inmemory(Config::default(), "test-create-profile").await;
+ assert!(profile.is_ok());
+ let exit = profile.unwrap().exit().await;
+ assert!(exit.is_ok(), "Not cleanly exited: {:?}", exit);
+ }
+
+ #[tokio::test]
+ async fn test_create_profile_and_helloworld() {
+ let _ = env_logger::try_init();
+ let profile = Profile::new_inmemory(Config::default(), "test-create-profile-and-helloworld").await;
+ assert!(profile.is_ok());
+ let profile = profile.unwrap();
+ let head = profile.head();
+ let exp_cid = cid::Cid::try_from("bafyreie4haukbqj7u6vogjfvaxbwg73b7bzi7nqxbnkvv77dvwcqg5wtpe").unwrap();
+ assert_eq!(*head, exp_cid, "{} != {}", *head, exp_cid);
+ let exit = profile.exit().await;
+ assert!(exit.is_ok(), "Not cleanly exited: {:?}", exit);
+ }
+
+}
diff --git a/src/profile/state.rs b/src/profile/state.rs
new file mode 100644
index 0000000..a84ff23
--- /dev/null
+++ b/src/profile/state.rs
@@ -0,0 +1,129 @@
+use std::path::PathBuf;
+use std::convert::TryFrom;
+use std::convert::TryInto;
+
+use anyhow::Context;
+use anyhow::Result;
+use tokio::io::AsyncWriteExt;
+
+#[derive(Debug)]
+pub struct StateDir(PathBuf);
+
+impl StateDir {
+ pub fn ipfs(&self) -> PathBuf {
+ self.0.join("ipfs")
+ }
+
+ pub fn profile_state(&self) -> PathBuf {
+ self.0.join("profile_state")
+ }
+
+ pub fn display(&self) -> std::path::Display {
+ self.0.display()
+ }
+}
+
+impl From<PathBuf> for StateDir {
+ fn from(p: PathBuf) -> Self {
+ Self(p)
+ }
+}
+
+#[derive(getset::Getters)]
+pub struct ProfileState {
+ #[getset(get = "pub")]
+ profile_head: cid::Cid,
+
+ #[getset(get = "pub")]
+ profile_name: String,
+
+ #[getset(get = "pub")]
+ keypair: libp2p::identity::Keypair,
+}
+
+impl ProfileState {
+ pub(super) fn new(profile_head: cid::Cid, profile_name: String, keypair: libp2p::identity::Keypair) -> Self {
+ Self {
+ profile_head,
+ profile_name,
+ keypair
+ }
+ }
+}
+
+impl std::fmt::Debug for ProfileState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "ProfileState {{ name = {}, head = {:?} }}", self.profile_name, self.profile_head)
+ }
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, getset::Getters)]
+pub(super) struct ProfileStateSaveable {
+ profile_head: Vec<u8>,
+ profile_name: String,
+ keypair: Vec<u8>,
+}
+
+impl ProfileStateSaveable {
+ pub(super) fn new(s: &ProfileState) -> Result<Self> {
+ Ok(Self {
+ profile_head: s.profile_head.to_bytes(),
+ profile_name: s.profile_name.clone(),
+ keypair: match s.keypair {
+ libp2p::identity::Keypair::Ed25519(ref kp) => Vec::from(kp.encode()),
+ _ => anyhow::bail!("Only keypair type ed25519 supported"),
+ }
+ })
+ }
+
+ pub async fn save_to_disk(&self, state_dir_path: &StateDir) -> Result<()> {
+ let state_s = serde_json::to_string(&self).context("Serializing state")?;
+ tokio::fs::OpenOptions::new()
+ .create_new(false) // do not _always_ create a new file
+ .create(true)
+ .truncate(true)
+ .write(true)
+ .open(&state_dir_path.profile_state())
+ .await
+ .with_context(|| format!("Opening {}", state_dir_path.profile_state().display()))?
+ .write_all(state_s.as_bytes())
+ .await
+ .map(|_| ())
+ .with_context(|| format!("Writing to {}", state_dir_path.profile_state().display()))
+ .map_err(anyhow::Error::from)
+ }
+
+ pub async fn load_from_disk(state_dir_path: &StateDir) -> Result<Self> {
+ log::trace!("Loading from disk: {:?}", state_dir_path.profile_state().display());
+ let reader = tokio::fs::OpenOptions::new()
+ .read(true)
+ .open(&state_dir_path.profile_state())
+ .await
+ .context("Opening state file")?
+ .into_std()
+ .await;
+
+ log::trace!("Parsing state file");
+ serde_json::from_reader(reader)
+ .context("Parsing state file")
+ .map_err(anyhow::Error::from)
+ }
+
+}
+
+impl TryInto<ProfileState> for ProfileStateSaveable {
+ type Error = anyhow::Error;
+
+ fn try_into(mut self) -> Result<ProfileState> {
+ Ok(ProfileState {
+ profile_head: cid::Cid::try_from(self.profile_head)?,
+ profile_name: self.profile_name,
+ keypair: {
+ let kp = libp2p::identity::ed25519::Keypair::decode(&mut self.keypair)?;
+ libp2p::identity::Keypair::Ed25519(kp)
+ },
+ })
+ }
+}
+
+