diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-04-17 19:20:53 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-04-17 19:20:53 +0200 |
commit | 8dfd21ed430e9407880a5fc640b3351ccf34db5c (patch) | |
tree | 993ffecf69680768496ed405af1f4e3665d5eba9 | |
parent | 056a0728c775e37460ed00791ad503e03a88f3d6 (diff) |
Refactor into server-client architecture using protobuf
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | Cargo.toml | 16 | ||||
-rw-r--r-- | build.rs | 4 | ||||
-rw-r--r-- | proto/fss.proto | 36 | ||||
-rw-r--r-- | shell.nix | 29 | ||||
-rw-r--r-- | src/cli.rs | 37 | ||||
-rw-r--r-- | src/client.rs | 85 | ||||
-rw-r--r-- | src/config.rs | 8 | ||||
-rw-r--r-- | src/main.rs | 107 | ||||
-rw-r--r-- | src/server.rs | 179 |
9 files changed, 389 insertions, 112 deletions
@@ -5,7 +5,13 @@ authors = ["Matthias Beyer <mail@beyermatthias.de>"] edition = "2018" license = "GPL-2.0" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bin]] +name = "fss-server" +path = "src/server.rs" + +[[bin]] +name = "fss" +path = "src/client.rs" [dependencies] anyhow = "1.0.40" @@ -15,7 +21,15 @@ env_logger = "0.8.3" getset = "0.1.1" itertools = "0.10.0" log = "0.4.14" +prost = "0.7" serde = "1.0.125" tantivy = "0.14" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +tonic = "0.4" xdg = "2.2.0" +version = "3.0.0" +resiter = "0.4" + +[build-dependencies] +tonic-build = "0.4" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..36587c2 --- /dev/null +++ b/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box<dyn std::error::Error>> { + tonic_build::compile_protos("proto/fss.proto")?; + Ok(()) +} diff --git a/proto/fss.proto b/proto/fss.proto new file mode 100644 index 0000000..12fda7a --- /dev/null +++ b/proto/fss.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; +package fss; + +service FSS { + rpc GetVersion (VersionRequest) returns (VersionReply); + + rpc IndexFile (IndexFileRequest) returns (IndexFileReply); + rpc SearchQuery (SearchQueryRequest) returns (SearchResponse); +} + + +message VersionRequest { +} + +message VersionReply { + string version = 1; // Reply contains only the version string +} + + +message IndexFileRequest { + string path = 1; +} + +message IndexFileReply { + bool error = 1; +} + + +message SearchQueryRequest { + string query = 1; +} + +message SearchResponse { + bool success = 1; + repeated string pathes = 2; +} diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..3379a4c --- /dev/null +++ b/shell.nix @@ -0,0 +1,29 @@ +{ ... }: + +let + moz_overlay = import ( + builtins.fetchTarball https://github.com/mozilla/nixpkgs-mozilla/archive/master.tar.gz + ); + + pkgs = import <nixpkgs> { overlays = [ moz_overlay ]; }; +in +pkgs.mkShell { + buildInputs = with pkgs; [ + rustChannels.stable.rust-std + rustChannels.stable.rust + rustChannels.stable.rustc + rustChannels.stable.cargo + + cmake + curl + gcc + openssl + pkgconfig + + protobuf + ]; + + LIBCLANG_PATH = "${pkgs.llvmPackages.libclang}/lib"; + PROTOC = "${pkgs.protobuf}/bin/protoc"; +} + @@ -3,21 +3,34 @@ use clap::Arg; use clap::crate_authors; use clap::crate_version; -pub fn app<'a>() -> App<'a, 'a> { +pub fn server_app<'a>() -> App<'a, 'a> { + App::new("fss") + .author(crate_authors!()) + .version(crate_version!()) + .about("Filesystemsearch") + + .arg(self::common::arg_server_addr()) + .arg(self::common::arg_server_port()) +} + +pub fn client_app<'a>() -> App<'a, 'a> { App::new("fss") .author(crate_authors!()) .version(crate_version!()) .about("Filesystemsearch") + .arg(self::common::arg_server_addr()) + .arg(self::common::arg_server_port()) + .subcommand(App::new("index") .version(crate_version!()) .about("Index a file") .arg(Arg::with_name("file") .required(true) - .multiple(true) + .multiple(false) .value_name("FILE") - .help("Index these files") + .help("Index this file") ) ) @@ -33,4 +46,22 @@ pub fn app<'a>() -> App<'a, 'a> { ) } +mod common { + use clap::Arg; + pub fn arg_server_addr<'a, 'b>() -> Arg<'a, 'b> { + Arg::with_name("server_addr") + .required(false) + .multiple(false) + .value_name("ADDR") + .help("The address of the server") + } + + pub fn arg_server_port<'a, 'b>() -> Arg<'a, 'b> { + Arg::with_name("server_port") + .required(false) + .multiple(false) + .value_name("PORT") + .help("The port of the server") + } +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..7249098 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,85 @@ +use std::io::Write; +use std::str::FromStr; + +use anyhow::Context; +use anyhow::Error; +use anyhow::Result; +use anyhow::anyhow; +use itertools::Itertools; + +mod config; +mod cli; +mod schema; + +use fss::fss_client::FssClient; +use fss::VersionRequest; +use fss::IndexFileRequest; +use fss::SearchQueryRequest; + +mod fss { + tonic::include_proto!("fss"); +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = crate::cli::client_app(); + let matches = cli.get_matches(); + let _ = env_logger::try_init()?; + let mut config = ::config::Config::default(); + { + let xdg = xdg::BaseDirectories::with_prefix("fss")?; + let xdg_config = xdg.find_config_file("config.toml") + .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?; + + log::debug!("Configuration file found with XDG: {}", xdg_config.display()); + config.merge(::config::File::from(xdg_config).required(false)) + .context("Failed to load config.toml from XDG configuration directory")?; + } + let config = config.try_into::<crate::config::Config>()?; + let server_addr = matches.value_of("server_addr").unwrap_or_else(|| config.server_addr()); + let server_port = matches.value_of("server_port").map(usize::from_str).unwrap_or_else(|| Ok(*config.server_port()))?; + let addr = format!("http://{}:{}", server_addr, server_port); + + let mut client = FssClient::connect(addr).await?; + let request = tonic::Request::new(VersionRequest { }); + let _response = client.get_version(request).await?; + + match matches.subcommand() { + ("index", Some(mtch)) => { + let path = mtch.value_of("file").map(String::from).unwrap(); // safe by clap + let request = tonic::Request::new(IndexFileRequest { path }); + let response = client.index_file(request).await?; + + if response.get_ref().error { + Err(anyhow!("Error indexing file")) + } else { + Ok(()) + } + }, + + ("search", Some(mtch)) => { + let query = mtch.values_of("term") + .unwrap() // safe by clap + .join(" "); + + let request = tonic::Request::new(SearchQueryRequest { query }); + let response = client.search_query(request).await?; + let mut output = std::io::stdout(); + + if response.get_ref().success { + response.get_ref() + .pathes + .iter() + .map(|pb| writeln!(output, "{}", pb).map_err(Error::from)) + .collect::<Result<Vec<_>>>() + .map(|_| ()) + } else { + Err(anyhow!("Failed to search")) + } + }, + + (_other, _) => { + unimplemented!() + }, + } +} diff --git a/src/config.rs b/src/config.rs index 27dddb8..6bb83a1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,6 +3,12 @@ use std::path::PathBuf; #[derive(Debug, serde::Deserialize, getset::Getters)] pub struct Config { #[getset(get = "pub")] - database_path: PathBuf + database_path: PathBuf, + + #[getset(get = "pub")] + server_addr: String, + + #[getset(get = "pub")] + server_port: usize, } diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 8fd5fb2..0000000 --- a/src/main.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::io::Write; -use std::path::PathBuf; - -use anyhow::Context; -use anyhow::Error; -use anyhow::Result; -use anyhow::anyhow; -use itertools::Itertools; - -mod config; -mod cli; -mod schema; - -fn main() -> Result<()> { - let cli = crate::cli::app(); - let _ = env_logger::try_init()?; - let mut config = ::config::Config::default(); - { - let xdg = xdg::BaseDirectories::with_prefix("fss")?; - let xdg_config = xdg.find_config_file("config.toml") - .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?; - - log::debug!("Configuration file found with XDG: {}", xdg_config.display()); - config.merge(::config::File::from(xdg_config).required(false)) - .context("Failed to load config.toml from XDG configuration directory")?; - } - let config = config.try_into::<crate::config::Config>()?; - - let index_path = tantivy::directory::MmapDirectory::open(config.database_path())?; - let schema = crate::schema::schema(); - - let index = tantivy::Index::open_or_create(index_path, schema.clone())?; - - let field_path = schema.get_field("path") - .ok_or_else(|| anyhow!("BUG"))?; - let field_ft = schema.get_field("ft") - .ok_or_else(|| anyhow!("BUG"))?; - let field_body = schema.get_field("body") - .ok_or_else(|| anyhow!("BUG"))?; - - match cli.get_matches().subcommand() { - ("index", Some(mtch)) => { - let mut index_writer = index.writer(50_000_000)?; - mtch.values_of("file") - .unwrap() // safe by clap - .map(|filepath| { - let path_str = String::from(filepath); - let path = PathBuf::from(&path_str); - - let filetype = path.extension() - .map(ToOwned::to_owned) - .and_then(|osstr| osstr.to_str().map(|s| s.to_string())) - .ok_or_else(|| anyhow!("Path {} is not UTF8", filepath))?; - - let mut doc = tantivy::Document::default(); - doc.add_text(field_path, &path_str); - doc.add_text(field_ft, &filetype); - - doc.add_text(field_body, std::fs::read_to_string(path)?); - - index_writer.add_document(doc); - Ok(()) - }) - .collect::<Result<Vec<_>>>()?; - - index_writer.commit()?; - Ok(()) - }, - - ("search", Some(mtch)) => { - let query_str = mtch.values_of("term") - .unwrap() // safe by clap - .join(" "); - - let reader = index - .reader_builder() - .reload_policy(tantivy::ReloadPolicy::OnCommit) - .try_into()?; - - let searcher = reader.searcher(); - let query_parser = tantivy::query::QueryParser::for_index(&index, vec![field_path.clone(), field_ft, field_body]); - let query = query_parser.parse_query(&query_str)?; - - let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?; - let mut output = std::io::stdout(); - - top_docs.into_iter() - .map(|(_score, adr)| { - let retrieved_doc = searcher.doc(adr)?; - retrieved_doc.get_all(field_path) - .map(|value| { - value.text().ok_or_else(|| anyhow!("Not a text value..")) - }) - .map_ok(|txt| { - writeln!(output, "{}", txt).map_err(Error::from) - }) - .collect::<Result<Vec<_>>>() - }) - .collect::<Result<Vec<Vec<_>>>>() - .map(|_| ()) - }, - - (_other, _) => { - unimplemented!() - }, - } -} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..bb7a550 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,179 @@ +use std::str::FromStr; +use std::path::PathBuf; + +use anyhow::anyhow; +use anyhow::Error; +use anyhow::Context; +use anyhow::Result; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use resiter::AndThen; +use resiter::Map; + +use fss::VersionReply; +use fss::VersionRequest; +use fss::IndexFileRequest; +use fss::IndexFileReply; +use fss::SearchQueryRequest; +use fss::SearchResponse; + +mod config; +mod cli; +mod schema; + +mod fss { + tonic::include_proto!("fss"); // The string specified here must match the proto package name +} + +pub struct Server { + index: tantivy::Index, + + field_path: tantivy::schema::Field, + field_ft: tantivy::schema::Field, + field_body: tantivy::schema::Field, +} + +impl Server { + fn write_file_to_index(&self, filepath: &str) -> Result<()> { + let mut index_writer = self.index.writer(50_000_000)?; + let path = PathBuf::from(filepath); + + let filetype = path.extension() + .map(ToOwned::to_owned) + .and_then(|osstr| osstr.to_str().map(|s| s.to_string())) + .ok_or_else(|| anyhow!("Path {} is not UTF8", filepath))?; + + let mut doc = tantivy::Document::default(); + doc.add_text(self.field_path, filepath); + doc.add_text(self.field_ft, &filetype); + + doc.add_text(self.field_body, std::fs::read_to_string(path)?); + + index_writer.add_document(doc); + index_writer.commit()?; + Ok(()) + } + + fn search(&self, query_str: &str) -> Result<Vec<PathBuf>> { + let reader = self.index + .reader_builder() + .reload_policy(tantivy::ReloadPolicy::OnCommit) + .try_into()?; + + let searcher = reader.searcher(); + let query_parser = tantivy::query::QueryParser::for_index(&self.index, vec![self.field_path, self.field_ft, self.field_body]); + let query = query_parser.parse_query(query_str)?; + + let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?; + + let results = top_docs + .into_iter() + .map(|(_score, adr)| searcher.doc(adr).map_err(Error::from)) + .and_then_ok(|retr| { + retr.get_all(self.field_path) + .map(|value| { + value.text().ok_or_else(|| anyhow!("Not a text value..")) + }) + .map_ok(PathBuf::from) + .collect::<Result<Vec<_>>>() + }) + .collect::<Result<Vec<Vec<PathBuf>>>>()? + .into_iter() + .map(Vec::into_iter) + .flatten() + .collect(); + Ok(results) + } +} + + +#[tonic::async_trait] +impl fss::fss_server::Fss for Server { + async fn get_version(&self, _request: Request<VersionRequest>) -> Result<Response<VersionReply>, Status> { + let reply = fss::VersionReply { + version: version::version!().to_string(), + }; + + Ok(Response::new(reply)) + } + + async fn index_file(&self, request: Request<IndexFileRequest>) -> Result<Response<IndexFileReply>, Status> { + let error = match self.write_file_to_index(&request.get_ref().path) { + Ok(()) => false, + Err(e) => { + log::error!("Error writing to index: {} -> {:?}", request.get_ref().path, e); + true + } + }; + + let reply = fss::IndexFileReply { + error + }; + Ok(Response::new(reply)) + } + + async fn search_query(&self, request: Request<SearchQueryRequest>) -> Result<Response<SearchResponse>, Status> { + let (success, pathes) = match self.search(&request.get_ref().query) { + Ok(pathes) => (true, pathes), + Err(e) => { + log::error!("Error searching with '{}' -> {:?}", request.get_ref().query, e); + (false, vec![]) + } + }; + + let reply = fss::SearchResponse { + success, + pathes: pathes.into_iter().map(|pb| pb.display().to_string()).collect(), + }; + Ok(Response::new(reply)) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = crate::cli::server_app(); + let matches = cli.get_matches(); + let _ = env_logger::try_init()?; + let mut config = ::config::Config::default(); + { + let xdg = xdg::BaseDirectories::with_prefix("fss")?; + let xdg_config = xdg.find_config_file("config.toml") + .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?; + + log::debug!("Configuration file found with XDG: {}", xdg_config.display()); + config.merge(::config::File::from(xdg_config).required(false)) + .context("Failed to load config.toml from XDG configuration directory")?; + } + let config = config.try_into::<crate::config::Config>()?; + let server_addr = matches.value_of("server_addr").unwrap_or_else(|| config.server_addr()); + let server_port = matches.value_of("server_port").map(usize::from_str).unwrap_or_else(|| Ok(*config.server_port()))?; + + let addr = format!("{}:{}", server_addr, server_port).parse()?; + + let server = { + let index_path = tantivy::directory::MmapDirectory::open(config.database_path())?; + let schema = crate::schema::schema(); + + let index = tantivy::Index::open_or_create(index_path, schema.clone())?; + let field_path = schema.get_field("path").ok_or_else(|| anyhow!("BUG"))?; + let field_ft = schema.get_field("ft").ok_or_else(|| anyhow!("BUG"))?; + let field_body = schema.get_field("body").ok_or_else(|| anyhow!("BUG"))?; + + Server { + index, + + field_path, + field_ft, + field_body, + } + }; + + tonic::transport::Server::builder() + .add_service(fss::fss_server::FssServer::new(server)) + .serve(addr) + .await + .map_err(Error::from) + .map(|_| ()) +} + |