diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-03-26 17:55:58 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-03-26 17:55:58 +0100 |
commit | f03d34b9b359ac137015b0529a651fd7c4beb69f (patch) | |
tree | 94adf643d0261666f7eaded171144b93d9a4ffb2 | |
parent | a37efdbcf33aa8e0b4270b6c299725d3ae1addec (diff) | |
parent | 149aa37a2eb571ee62e16de64559cdebad483435 (diff) |
Merge branch 'asyncawait'
-rw-r--r-- | Cargo.toml | 5 | ||||
-rw-r--r-- | src/main.rs | 757 | ||||
-rw-r--r-- | src/repository/client.rs | 29 | ||||
-rw-r--r-- | src/repository/profile.rs | 41 | ||||
-rw-r--r-- | src/repository/repository.rs | 20 | ||||
-rw-r--r-- | src/typeext/block.rs | 30 |
6 files changed, 459 insertions, 423 deletions
@@ -18,8 +18,8 @@ edition = "2018" [dependencies] is-match = "0.1" failure = "0.1" -futures = "0.1" -ipfs-api = "0.5.0-alpha2" +futures = "0.3" +ipfs-api = "0.7" serde = "1" serde_derive = "1" serde_json = "1" @@ -33,3 +33,4 @@ config = "0.9" toml = "0.4" hyper = "0.12" itertools = "0.7" +tokio = { version = "0.2", features = ["full"] } diff --git a/src/main.rs b/src/main.rs index faf5d6d..ba0eda5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,6 @@ extern crate ipfs_api; extern crate chrono; extern crate mime; -extern crate failure; extern crate futures; extern crate serde; extern crate serde_json; @@ -15,9 +14,11 @@ extern crate hyper; extern crate env_logger; extern crate itertools; +#[macro_use] extern crate failure; #[macro_use] extern crate is_match; #[macro_use] extern crate serde_derive; #[macro_use] extern crate log; +#[macro_use] extern crate tokio; mod cli_ui; mod configuration; @@ -34,8 +35,11 @@ use std::sync::Arc; use chrono::NaiveDateTime; use futures::future::Future; +use futures::future::FutureExt; +use futures::future::TryFutureExt; use serde_json::to_string_pretty as serde_json_to_string_pretty; use serde_json::from_str as serde_json_from_str; +use failure::Fallible as Result; use crate::configuration::Configuration; use crate::repository::Repository; @@ -50,7 +54,8 @@ use crate::types::util::Version; use std::process::exit; -fn main() { +#[tokio::main] +async fn main() -> Result<()> { let _ = env_logger::init(); debug!("Logger initialized"); @@ -91,8 +96,7 @@ fn main() { { ("gui", _mtch) => { - error!("Not yet supported"); - exit(1) + Err(format_err!("Not yet supported")) } ("is-block", Some(mtch)) => { @@ -101,19 +105,19 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - hyper::rt::run(repo - .get_block(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + repo.get_block(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } - exit(1) - }) - .map(|_| ())); + exit(1) + }) + .map(|_| ()) } ("is-content", Some(mtch)) => { @@ -122,19 +126,19 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } - exit(1) - }) - .map(|_| ())); + exit(1) + }) + .map(|_| ()) } ("is-post", Some(mtch)) => { @@ -144,19 +148,19 @@ fn main() { debug!("Working with hash: {}", hash); let (tx, rx) = ::std::sync::mpsc::channel(); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |content| tx.send(content.payload().is_post()).unwrap())); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |content| tx.send(content.payload().is_post()).unwrap()); exit(if rx.recv().unwrap() { 0 } else { 1 }); } @@ -167,30 +171,31 @@ fn main() { debug!("Working with hash: {}", hash); let (tx, rx) = ::std::sync::mpsc::channel(); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |content| { - let is_reply = if !content.payload().is_post() { - false - } else { - match content.payload() { - Payload::Post { reply_to, .. } => reply_to.is_some(), - _ => false, - } - }; - - tx.send(is_reply).unwrap() - })); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |content| { + let is_reply = if !content.payload().is_post() { + false + } else { + match content.payload() { + Payload::Post { reply_to, .. } => reply_to.is_some(), + _ => false, + } + }; + + tx.send(is_reply).unwrap() + }); + exit(if rx.recv().unwrap() { 0 } else { 1 }); } @@ -201,21 +206,21 @@ fn main() { debug!("Working with hash: {}", hash); let (tx, rx) = ::std::sync::mpsc::channel(); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |content| { - tx.send(content.payload().is_profile()).unwrap() - })); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |content| { + tx.send(content.payload().is_profile()).unwrap() + }); exit(if rx.recv().unwrap() { 0 } else { 1 }); } @@ -225,24 +230,26 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - let (tx, rx) = ::std::sync::mpsc::channel(); - hyper::rt::run(repo - .get_block(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + let (tx, rx) = ::std::sync::mpsc::channel(); + repo.get_block(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } - exit(1) - }) - .map(move |block| tx.send(block.parents().clone()).unwrap())); + exit(1) + }) + .map(move |block| tx.send(block.parents().clone()).unwrap()); for parent in rx.recv().unwrap() { println!("{}", parent); } + + Ok(()) } ("get-devices", Some(mtch)) => { @@ -252,23 +259,25 @@ fn main() { debug!("Working with hash: {}", hash); let (tx, rx) = ::std::sync::mpsc::channel(); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } - exit(1) - }) - .map(move |c| tx.send(c.devices().clone()).unwrap())); + exit(1) + }) + .map(move |c| tx.send(c.devices().clone()).unwrap()); for device in rx.recv().unwrap() { println!("{}", device); } + + Ok(()) } ("get-payload-type", Some(mtch)) => { @@ -278,26 +287,27 @@ fn main() { debug!("Working with hash: {}", hash); let (tx, rx) = ::std::sync::mpsc::channel(); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |c| tx.send(match c.payload() { - Payload::None => "None", - Payload::Post { .. } => "Post", - Payload::AttachedPostComments { .. } => "AttachedPostComments", - Payload::Profile { .. } => "Profile", - }).unwrap())); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |c| tx.send(match c.payload() { + Payload::None => "None", + Payload::Post { .. } => "Post", + Payload::AttachedPostComments { .. } => "AttachedPostComments", + Payload::Profile { .. } => "Profile", + }).unwrap()); println!("{}", rx.recv().unwrap()); + Ok(()) } ("get-payload", Some(mtch)) => { @@ -307,25 +317,26 @@ fn main() { debug!("Working with hash: {}", hash); let (tx, rx) = ::std::sync::mpsc::channel(); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .and_then(|c| serde_json_to_string_pretty(c.payload())) - .map_err(|e| { - error!("Error building JSON: {:?}", e); - exit(1) - }) - .map(move |j| tx.send(j).unwrap())); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .and_then(|c| serde_json_to_string_pretty(c.payload())) + .map_err(|e| { + error!("Error building JSON: {:?}", e); + exit(1) + }) + .map(move |j| tx.send(j).unwrap()); println!("{}", rx.recv().unwrap()); + Ok(()) } ("get-post-content", Some(mtch)) => { @@ -335,53 +346,55 @@ fn main() { debug!("Working with hash: {}", hash); let (tx, rx) = ::std::sync::mpsc::channel(); - hyper::rt::run(repo - .get_content(hash.clone()) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |c| { - match c.payload() { - Payload::Post { content, content_format, .. } => { - match (content_format.type_(), content_format.subtype()) { - (mime::TEXT, _) => { // plain text will be printed - content.clone() - }, - - (_, _) => { - error!("Cannot show mimetype of Post {hash} which lives at {content} ({mime})", - hash = hash, - content = content, - mime = content_format); - exit(1) - } - } - }, - _ => { - error!("Not a Post"); - exit(1) - } - } - }) - .and_then(move |content_hash| { - repo.get_raw_bytes(content_hash) - .and_then(|blob| String::from_utf8(blob).map_err(Into::into)) - .map_err(|e| { - error!("Content is not UTF-8: {:?}", e); - exit(1) - }) - }) - .map(move |blob| tx.send(blob).unwrap()) - ); + let content_hash = repo + .get_content(hash.clone()) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |c| { + match c.payload() { + Payload::Post { content, content_format, .. } => { + match (content_format.type_(), content_format.subtype()) { + (mime::TEXT, _) => { // plain text will be printed + content.clone() + }, + + (_, _) => { + error!("Cannot show mimetype of Post {hash} which lives at {content} ({mime})", + hash = hash, + content = content, + mime = content_format); + exit(1) + } + } + }, + _ => { + error!("Not a Post"); + exit(1) + } + } + }) + .unwrap(); + + repo.get_raw_bytes(content_hash) + .await + .and_then(|blob| String::from_utf8(blob).map_err(Into::into)) + .map_err(|e| { + error!("Content is not UTF-8: {:?}", e); + exit(1) + }) + .map(move |blob| tx.send(blob).unwrap()); println!("{}", rx.recv().unwrap()); + Ok(()) } ("get-post-content-format", Some(mtch)) => { @@ -390,27 +403,27 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |content| { - match content.payload() { - Payload::Post { content_format, .. } => println!("{}", content_format), - _ => { - error!("Not a Post"); - exit(1) - } - } - })); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |content| { + match content.payload() { + Payload::Post { content_format, .. } => println!("{}", content_format), + _ => { + error!("Not a Post"); + exit(1) + } + } + }) } ("get-post-reply-to", Some(mtch)) => { @@ -419,29 +432,29 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |content| { - match content.payload() { - Payload::Post { reply_to, .. } => { - reply_to.as_ref().map(|r| println!("{}", r)); - }, - _ => { - error!("Not a Post"); - exit(1) - } - } - })); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |content| { + match content.payload() { + Payload::Post { reply_to, .. } => { + reply_to.as_ref().map(|r| println!("{}", r)); + }, + _ => { + error!("Not a Post"); + exit(1) + } + } + }) } ("get-post-metadata", Some(mtch)) => { @@ -450,38 +463,38 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |content| { - match content.payload() { - Payload::Post { - comments_will_be_propagated, - comments_propagated_until, - .. - } => { - comments_will_be_propagated.as_ref().map(|b| { - println!("comments-will-be-propagated: {}", b); - }); - comments_propagated_until.as_ref().map(|b| { - println!("comments-will-be-propagated-until: {}", b); - }); - }, - _ => { - error!("Not a Post"); - exit(1) - } - } - })); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |content| { + match content.payload() { + Payload::Post { + comments_will_be_propagated, + comments_propagated_until, + .. + } => { + comments_will_be_propagated.as_ref().map(|b| { + println!("comments-will-be-propagated: {}", b); + }); + comments_propagated_until.as_ref().map(|b| { + println!("comments-will-be-propagated-until: {}", b); + }); + }, + _ => { + error!("Not a Post"); + exit(1) + } + } + }) } ("get-profile-names", Some(mtch)) => { @@ -490,30 +503,29 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |content| { - match content.payload() { - Payload::Profile { names, .. } => { - names.iter().for_each(|n| println!("{}", n)); - } - _ => { - error!("Not a Profile"); - exit(1) - } - } - })); - + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |content| { + match content.payload() { + Payload::Profile { names, .. } => { + names.iter().for_each(|n| println!("{}", n)); + } + _ => { + error!("Not a Profile"); + exit(1) + } + } + }) } ("get-profile-picture", Some(mtch)) => { @@ -522,33 +534,33 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(move |content| { - match content.payload() { - Payload::Profile { picture, .. } => { - picture.as_ref().map(|hash| { - warn!("Showing picture on commandline not supported"); - warn!("Printing ipfs hash of picture"); - println!("{}", hash); - }); - } - _ => { - error!("Not a Profile"); - exit(1) - } - } - })); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(move |content| { + match content.payload() { + Payload::Profile { picture, .. } => { + picture.as_ref().map(|hash| { + warn!("Showing picture on commandline not supported"); + warn!("Printing ipfs hash of picture"); + println!("{}", hash); + }); + } + _ => { + error!("Not a Profile"); + exit(1) + } + } + }) } ("get-profile-more", Some(mtch)) => { @@ -557,37 +569,37 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); - hyper::rt::run(repo - .get_content(hash) - .map_err(|e| { - let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); - - if !ignore_err { - error!("Error running: {:?}", e); - print_error_details(e); - } - - exit(1) - }) - .map(|content| { - match content.payload() { - Payload::Profile { more, .. } => { - match serde_json_to_string_pretty(&more) { - Err(e) => { - error!("Error building JSON: {:?}", e); - exit(1) - }, - Ok(s) => { - println!("{}", s); - }, - } - } - _ => { - error!("Not a Profile"); - exit(1) - } - } - })); + repo.get_content(hash) + .await + .map_err(|e| { + let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); + + if !ignore_err { + error!("Error running: {:?}", e); + print_error_details(e); + } + + exit(1) + }) + .map(|content| { + match content.payload() { + Payload::Profile { more, .. } => { + match serde_json_to_string_pretty(&more) { + Err(e) => { + error!("Error building JSON: {:?}", e); + exit(1) + }, + Ok(s) => { + println!("{}", s); + }, + } + } + _ => { + error!("Not a Profile"); + exit(1) + } + } + }) } ("create-post-blob", Some(mtch)) => { @@ -641,16 +653,17 @@ fn main() { Content::new(devices, timestamp, payload) }; - hyper::rt::run({ + { repo .put_content(content) |