diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-04-17 19:20:53 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-04-17 19:20:53 +0200 |
commit | 8dfd21ed430e9407880a5fc640b3351ccf34db5c (patch) | |
tree | 993ffecf69680768496ed405af1f4e3665d5eba9 /src | |
parent | 056a0728c775e37460ed00791ad503e03a88f3d6 (diff) |
Refactor into server-client architecture using protobuf
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r-- | src/cli.rs | 37 | ||||
-rw-r--r-- | src/client.rs | 85 | ||||
-rw-r--r-- | src/config.rs | 8 | ||||
-rw-r--r-- | src/main.rs | 107 | ||||
-rw-r--r-- | src/server.rs | 179 |
5 files changed, 305 insertions, 111 deletions
@@ -3,21 +3,34 @@ use clap::Arg; use clap::crate_authors; use clap::crate_version; -pub fn app<'a>() -> App<'a, 'a> { +pub fn server_app<'a>() -> App<'a, 'a> { + App::new("fss") + .author(crate_authors!()) + .version(crate_version!()) + .about("Filesystemsearch") + + .arg(self::common::arg_server_addr()) + .arg(self::common::arg_server_port()) +} + +pub fn client_app<'a>() -> App<'a, 'a> { App::new("fss") .author(crate_authors!()) .version(crate_version!()) .about("Filesystemsearch") + .arg(self::common::arg_server_addr()) + .arg(self::common::arg_server_port()) + .subcommand(App::new("index") .version(crate_version!()) .about("Index a file") .arg(Arg::with_name("file") .required(true) - .multiple(true) + .multiple(false) .value_name("FILE") - .help("Index these files") + .help("Index this file") ) ) @@ -33,4 +46,22 @@ pub fn app<'a>() -> App<'a, 'a> { ) } +mod common { + use clap::Arg; + pub fn arg_server_addr<'a, 'b>() -> Arg<'a, 'b> { + Arg::with_name("server_addr") + .required(false) + .multiple(false) + .value_name("ADDR") + .help("The address of the server") + } + + pub fn arg_server_port<'a, 'b>() -> Arg<'a, 'b> { + Arg::with_name("server_port") + .required(false) + .multiple(false) + .value_name("PORT") + .help("The port of the server") + } +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..7249098 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,85 @@ +use std::io::Write; +use std::str::FromStr; + +use anyhow::Context; +use anyhow::Error; +use anyhow::Result; +use anyhow::anyhow; +use itertools::Itertools; + +mod config; +mod cli; +mod schema; + +use fss::fss_client::FssClient; +use fss::VersionRequest; +use fss::IndexFileRequest; +use fss::SearchQueryRequest; + +mod fss { + tonic::include_proto!("fss"); +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = crate::cli::client_app(); + let matches = cli.get_matches(); + let _ = env_logger::try_init()?; + let mut config = ::config::Config::default(); + { + let xdg = xdg::BaseDirectories::with_prefix("fss")?; + let xdg_config = xdg.find_config_file("config.toml") + .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?; + + log::debug!("Configuration file found with XDG: {}", xdg_config.display()); + config.merge(::config::File::from(xdg_config).required(false)) + .context("Failed to load config.toml from XDG configuration directory")?; + } + let config = config.try_into::<crate::config::Config>()?; + let server_addr = matches.value_of("server_addr").unwrap_or_else(|| config.server_addr()); + let server_port = matches.value_of("server_port").map(usize::from_str).unwrap_or_else(|| Ok(*config.server_port()))?; + let addr = format!("http://{}:{}", server_addr, server_port); + + let mut client = FssClient::connect(addr).await?; + let request = tonic::Request::new(VersionRequest { }); + let _response = client.get_version(request).await?; + + match matches.subcommand() { + ("index", Some(mtch)) => { + let path = mtch.value_of("file").map(String::from).unwrap(); // safe by clap + let request = tonic::Request::new(IndexFileRequest { path }); + let response = client.index_file(request).await?; + + if response.get_ref().error { + Err(anyhow!("Error indexing file")) + } else { + Ok(()) + } + }, + + ("search", Some(mtch)) => { + let query = mtch.values_of("term") + .unwrap() // safe by clap + .join(" "); + + let request = tonic::Request::new(SearchQueryRequest { query }); + let response = client.search_query(request).await?; + let mut output = std::io::stdout(); + + if response.get_ref().success { + response.get_ref() + .pathes + .iter() + .map(|pb| writeln!(output, "{}", pb).map_err(Error::from)) + .collect::<Result<Vec<_>>>() + .map(|_| ()) + } else { + Err(anyhow!("Failed to search")) + } + }, + + (_other, _) => { + unimplemented!() + }, + } +} diff --git a/src/config.rs b/src/config.rs index 27dddb8..6bb83a1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,6 +3,12 @@ use std::path::PathBuf; #[derive(Debug, serde::Deserialize, getset::Getters)] pub struct Config { #[getset(get = "pub")] - database_path: PathBuf + database_path: PathBuf, + + #[getset(get = "pub")] + server_addr: String, + + #[getset(get = "pub")] + server_port: usize, } diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 8fd5fb2..0000000 --- a/src/main.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::io::Write; -use std::path::PathBuf; - -use anyhow::Context; -use anyhow::Error; -use anyhow::Result; -use anyhow::anyhow; -use itertools::Itertools; - -mod config; -mod cli; -mod schema; - -fn main() -> Result<()> { - let cli = crate::cli::app(); - let _ = env_logger::try_init()?; - let mut config = ::config::Config::default(); - { - let xdg = xdg::BaseDirectories::with_prefix("fss")?; - let xdg_config = xdg.find_config_file("config.toml") - .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?; - - log::debug!("Configuration file found with XDG: {}", xdg_config.display()); - config.merge(::config::File::from(xdg_config).required(false)) - .context("Failed to load config.toml from XDG configuration directory")?; - } - let config = config.try_into::<crate::config::Config>()?; - - let index_path = tantivy::directory::MmapDirectory::open(config.database_path())?; - let schema = crate::schema::schema(); - - let index = tantivy::Index::open_or_create(index_path, schema.clone())?; - - let field_path = schema.get_field("path") - .ok_or_else(|| anyhow!("BUG"))?; - let field_ft = schema.get_field("ft") - .ok_or_else(|| anyhow!("BUG"))?; - let field_body = schema.get_field("body") - .ok_or_else(|| anyhow!("BUG"))?; - - match cli.get_matches().subcommand() { - ("index", Some(mtch)) => { - let mut index_writer = index.writer(50_000_000)?; - mtch.values_of("file") - .unwrap() // safe by clap - .map(|filepath| { - let path_str = String::from(filepath); - let path = PathBuf::from(&path_str); - - let filetype = path.extension() - .map(ToOwned::to_owned) - .and_then(|osstr| osstr.to_str().map(|s| s.to_string())) - .ok_or_else(|| anyhow!("Path {} is not UTF8", filepath))?; - - let mut doc = tantivy::Document::default(); - doc.add_text(field_path, &path_str); - doc.add_text(field_ft, &filetype); - - doc.add_text(field_body, std::fs::read_to_string(path)?); - - index_writer.add_document(doc); - Ok(()) - }) - .collect::<Result<Vec<_>>>()?; - - index_writer.commit()?; - Ok(()) - }, - - ("search", Some(mtch)) => { - let query_str = mtch.values_of("term") - .unwrap() // safe by clap - .join(" "); - - let reader = index - .reader_builder() - .reload_policy(tantivy::ReloadPolicy::OnCommit) - .try_into()?; - - let searcher = reader.searcher(); - let query_parser = tantivy::query::QueryParser::for_index(&index, vec![field_path.clone(), field_ft, field_body]); - let query = query_parser.parse_query(&query_str)?; - - let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?; - let mut output = std::io::stdout(); - - top_docs.into_iter() - .map(|(_score, adr)| { - let retrieved_doc = searcher.doc(adr)?; - retrieved_doc.get_all(field_path) - .map(|value| { - value.text().ok_or_else(|| anyhow!("Not a text value..")) - }) - .map_ok(|txt| { - writeln!(output, "{}", txt).map_err(Error::from) - }) - .collect::<Result<Vec<_>>>() - }) - .collect::<Result<Vec<Vec<_>>>>() - .map(|_| ()) - }, - - (_other, _) => { - unimplemented!() - }, - } -} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..bb7a550 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,179 @@ +use std::str::FromStr; +use std::path::PathBuf; + +use anyhow::anyhow; +use anyhow::Error; +use anyhow::Context; +use anyhow::Result; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use resiter::AndThen; +use resiter::Map; + +use fss::VersionReply; +use fss::VersionRequest; +use fss::IndexFileRequest; +use fss::IndexFileReply; +use fss::SearchQueryRequest; +use fss::SearchResponse; + +mod config; +mod cli; +mod schema; + +mod fss { + tonic::include_proto!("fss"); // The string specified here must match the proto package name +} + +pub struct Server { + index: tantivy::Index, + + field_path: tantivy::schema::Field, + field_ft: tantivy::schema::Field, + field_body: tantivy::schema::Field, +} + +impl Server { + fn write_file_to_index(&self, filepath: &str) -> Result<()> { + let mut index_writer = self.index.writer(50_000_000)?; + let path = PathBuf::from(filepath); + + let filetype = path.extension() + .map(ToOwned::to_owned) + .and_then(|osstr| osstr.to_str().map(|s| s.to_string())) + .ok_or_else(|| anyhow!("Path {} is not UTF8", filepath))?; + + let mut doc = tantivy::Document::default(); + doc.add_text(self.field_path, filepath); + doc.add_text(self.field_ft, &filetype); + + doc.add_text(self.field_body, std::fs::read_to_string(path)?); + + index_writer.add_document(doc); + index_writer.commit()?; + Ok(()) + } + + fn search(&self, query_str: &str) -> Result<Vec<PathBuf>> { + let reader = self.index + .reader_builder() + .reload_policy(tantivy::ReloadPolicy::OnCommit) + .try_into()?; + + let searcher = reader.searcher(); + let query_parser = tantivy::query::QueryParser::for_index(&self.index, vec![self.field_path, self.field_ft, self.field_body]); + let query = query_parser.parse_query(query_str)?; + + let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?; + + let results = top_docs + .into_iter() + .map(|(_score, adr)| searcher.doc(adr).map_err(Error::from)) + .and_then_ok(|retr| { + retr.get_all(self.field_path) + .map(|value| { + value.text().ok_or_else(|| anyhow!("Not a text value..")) + }) + .map_ok(PathBuf::from) + .collect::<Result<Vec<_>>>() + }) + .collect::<Result<Vec<Vec<PathBuf>>>>()? + .into_iter() + .map(Vec::into_iter) + .flatten() + .collect(); + Ok(results) + } +} + + +#[tonic::async_trait] +impl fss::fss_server::Fss for Server { + async fn get_version(&self, _request: Request<VersionRequest>) -> Result<Response<VersionReply>, Status> { + let reply = fss::VersionReply { + version: version::version!().to_string(), + }; + + Ok(Response::new(reply)) + } + + async fn index_file(&self, request: Request<IndexFileRequest>) -> Result<Response<IndexFileReply>, Status> { + let error = match self.write_file_to_index(&request.get_ref().path) { + Ok(()) => false, + Err(e) => { + log::error!("Error writing to index: {} -> {:?}", request.get_ref().path, e); + true + } + }; + + let reply = fss::IndexFileReply { + error + }; + Ok(Response::new(reply)) + } + + async fn search_query(&self, request: Request<SearchQueryRequest>) -> Result<Response<SearchResponse>, Status> { + let (success, pathes) = match self.search(&request.get_ref().query) { + Ok(pathes) => (true, pathes), + Err(e) => { + log::error!("Error searching with '{}' -> {:?}", request.get_ref().query, e); + (false, vec![]) + } + }; + + let reply = fss::SearchResponse { + success, + pathes: pathes.into_iter().map(|pb| pb.display().to_string()).collect(), + }; + Ok(Response::new(reply)) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = crate::cli::server_app(); + let matches = cli.get_matches(); + let _ = env_logger::try_init()?; + let mut config = ::config::Config::default(); + { + let xdg = xdg::BaseDirectories::with_prefix("fss")?; + let xdg_config = xdg.find_config_file("config.toml") + .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?; + + log::debug!("Configuration file found with XDG: {}", xdg_config.display()); + config.merge(::config::File::from(xdg_config).required(false)) + .context("Failed to load config.toml from XDG configuration directory")?; + } + let config = config.try_into::<crate::config::Config>()?; + let server_addr = matches.value_of("server_addr").unwrap_or_else(|| config.server_addr()); + let server_port = matches.value_of("server_port").map(usize::from_str).unwrap_or_else(|| Ok(*config.server_port()))?; + + let addr = format!("{}:{}", server_addr, server_port).parse()?; + + let server = { + let index_path = tantivy::directory::MmapDirectory::open(config.database_path())?; + let schema = crate::schema::schema(); + + let index = tantivy::Index::open_or_create(index_path, schema.clone())?; + let field_path = schema.get_field("path").ok_or_else(|| anyhow!("BUG"))?; + let field_ft = schema.get_field("ft").ok_or_else(|| anyhow!("BUG"))?; + let field_body = schema.get_field("body").ok_or_else(|| anyhow!("BUG"))?; + + Server { + index, + + field_path, + field_ft, + field_body, + } + }; + + tonic::transport::Server::builder() + .add_service(fss::fss_server::FssServer::new(server)) + .serve(addr) + .await + .map_err(Error::from) + .map(|_| ()) +} + |